Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

options to provide prefect s3 bucket blocks and to not use a cluster #96

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import argparse
from importlib.resources import files

from aodn_cloud_optimised.lib import cluster_lib
from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation
from aodn_cloud_optimised.lib.config import (
load_variable_from_config,
Expand Down Expand Up @@ -91,9 +92,10 @@ def main():
)
parser.add_argument(
"--cluster-mode",
default="local",
type=cluster_lib.parse_cluster_mode,
default=None,
choices=["local", "remote"],
help="Cluster mode to use. Options: 'local' or 'remote'. Default is 'local'.",
help="Cluster mode to use. Options: 'local' or 'remote'. Default is None.",
)

parser.add_argument(
Expand Down
16 changes: 15 additions & 1 deletion aodn_cloud_optimised/lib/CommonHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import tempfile
import timeit
from enum import Enum
from typing import List

import dask
Expand Down Expand Up @@ -75,6 +76,8 @@ def __init__(self, **kwargs):
# Cluster options
valid_clusters = ["remote", "local", None]
self.cluster_mode = kwargs.get("cluster_mode", "local")
if isinstance(self.cluster_mode, Enum):
self.cluster_mode = self.cluster_mode.value

if self.cluster_mode not in valid_clusters:
raise ValueError(
Expand All @@ -92,6 +95,11 @@ def __init__(self, **kwargs):
self.logger = get_logger(logger_name)

cloud_optimised_format = self.dataset_config.get("cloud_optimised_format")

optimised_bucket = kwargs.get("optimised_bucket", None)
if optimised_bucket:
self.optimised_bucket_name = optimised_bucket.bucket_name

self.cloud_optimised_output_path = (
PureS3Path.from_uri(f"s3://{self.optimised_bucket_name}")
.joinpath(
Expand Down Expand Up @@ -592,11 +600,17 @@ def cloud_optimised_creation(
kwargs_handler_class["dataset_config"] = dataset_config
kwargs_handler_class["clear_existing_data"] = handler_clear_existing_data_arg

kwargs_handler_class["optimised_bucket"] = kwargs.get("optimised_bucket", None)
kwargs_handler_class["source_bucket"] = kwargs.get("source_bucket", None)

# Creating an instance of the specified class with the provided arguments
start_whole_processing = timeit.default_timer()
with handler_class(**kwargs_handler_class) as handler_instance:
handler_instance.to_cloud_optimised(s3_file_uri_list)
cluster_id = handler_instance.cluster_id
if kwargs_handler_class["cluster_mode"].value:
cluster_id = handler_instance.cluster_id
else:
cluster_id = None

time_spent_processing = timeit.default_timer() - start_whole_processing
logger.info(f"Processed entire dataset in {time_spent_processing}s")
Expand Down
24 changes: 16 additions & 8 deletions aodn_cloud_optimised/lib/GenericZarrHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ def publish_cloud_optimised_fileset_batch(self, s3_file_uri_list):
preprocess_xarray, dataset_config=self.dataset_config
)

batch_size = self.get_batch_size(client=self.client)
if self.cluster_mode:
batch_size = self.get_batch_size(client=self.client)
else:
batch_size = 1

for idx, batch_files in enumerate(
self.batch_process_fileset(s3_file_handle_list, batch_size=batch_size)
Expand Down Expand Up @@ -868,14 +871,19 @@ def to_cloud_optimised(self, s3_file_uri_list=None):
) # ensure the list is unique!

self.s3_file_uri_list = s3_file_uri_list
# creating a cluster to process multiple files at once
self.client, self.cluster = self.create_cluster()
if self.cluster_mode == "remote":
self.cluster_id = self.cluster.cluster_id
else:
self.cluster_id = self.cluster.name

if self.cluster_mode:
# creating a cluster to process multiple files at once
self.client, self.cluster = self.create_cluster()
if self.cluster_mode == "remote":
self.cluster_id = self.cluster.cluster_id
else:
self.cluster_id = self.cluster.name

self.publish_cloud_optimised_fileset_batch(s3_file_uri_list)
self.close_cluster(self.client, self.cluster)

if self.cluster_mode:
self.close_cluster(self.client, self.cluster)

@staticmethod
def filter_rechunk_dimensions(dimensions):
Expand Down
11 changes: 11 additions & 0 deletions aodn_cloud_optimised/lib/cluster_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum


class ClusterMode(Enum):
LOCAL = "local"
REMOTE = "remote"
NONE = None


def parse_cluster_mode(value):
return ClusterMode(value)
3 changes: 2 additions & 1 deletion test_aodn_cloud_optimised/test_bin_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

from aodn_cloud_optimised.bin.generic_cloud_optimised_creation import main
from aodn_cloud_optimised.lib.cluster_lib import ClusterMode

ROOT_DIR = os.path.dirname(os.path.abspath(__file__))

Expand Down Expand Up @@ -128,7 +129,7 @@ def test_main(self, mock_parse_args):
dataset_config=DATASET_CONFIG_NC_ACORN_JSON,
clear_existing_data=True,
force_previous_parquet_deletion=False,
cluster_mode="local",
cluster_mode=ClusterMode.LOCAL,
optimised_bucket_name=self.BUCKET_OPTIMISED_NAME,
root_prefix_cloud_optimised_path="testing",
bucket_raw="imos-data",
Expand Down
Loading