From b6e4b26119c115e2b1db16faffb2e11055c5042c Mon Sep 17 00:00:00 2001 From: craigrose Date: Tue, 12 Nov 2024 16:30:35 +1100 Subject: [PATCH 1/4] options to provide prefect s3 bucket blocks and to not use a cluster --- aodn_cloud_optimised/config/common.json | 2 +- aodn_cloud_optimised/lib/CommonHandler.py | 17 ++++++++++++- .../lib/GenericZarrHandler.py | 24 ++++++++++++------- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/aodn_cloud_optimised/config/common.json b/aodn_cloud_optimised/config/common.json index 77286380..4efbab73 100644 --- a/aodn_cloud_optimised/config/common.json +++ b/aodn_cloud_optimised/config/common.json @@ -1,7 +1,7 @@ { "BUCKET_RAW_DEFAULT": "imos-data", "BUCKET_OPTIMISED_DEFAULT": "aodn-cloud-optimised", - "ROOT_PREFIX_CLOUD_OPTIMISED_PATH": "", + "ROOT_PREFIX_CLOUD_OPTIMISED_PATH": "prefect_testing", "BUCKET_INTEGRATION_TESTING_RAW_DEFAULT": "imos-data", "BUCKET_INTEGRATION_TESTING_OPTIMISED_DEFAULT": "imos-data-lab-optimised", "ROOT_PREFIX_CLOUD_OPTIMISED_INTEGRATION_TESTING_PATH": "cloud_optimised/integration_testing" diff --git a/aodn_cloud_optimised/lib/CommonHandler.py b/aodn_cloud_optimised/lib/CommonHandler.py index 4159b799..eae67ac1 100644 --- a/aodn_cloud_optimised/lib/CommonHandler.py +++ b/aodn_cloud_optimised/lib/CommonHandler.py @@ -2,6 +2,7 @@ import os import tempfile import timeit +from enum import Enum from typing import List import dask @@ -75,6 +76,8 @@ def __init__(self, **kwargs): # Cluster options valid_clusters = ["remote", "local", None] self.cluster_mode = kwargs.get("cluster_mode", "local") + if isinstance(self.cluster_mode, Enum): + self.cluster_mode = self.cluster_mode.value if self.cluster_mode not in valid_clusters: raise ValueError( @@ -92,6 +95,12 @@ def __init__(self, **kwargs): self.logger = get_logger(logger_name) cloud_optimised_format = self.dataset_config.get("cloud_optimised_format") + + optimised_bucket = kwargs.get("optimised_bucket", None) + if optimised_bucket: + self.optimised_bucket_name = optimised_bucket.bucket_name + + self.cloud_optimised_output_path = ( PureS3Path.from_uri(f"s3://{self.optimised_bucket_name}") .joinpath( @@ -592,11 +601,17 @@ def cloud_optimised_creation( kwargs_handler_class["dataset_config"] = dataset_config kwargs_handler_class["clear_existing_data"] = handler_clear_existing_data_arg + kwargs_handler_class["optimised_bucket"] = kwargs.get("optimised_bucket", None) + kwargs_handler_class["source_bucket"] = kwargs.get("source_bucket", None) + # Creating an instance of the specified class with the provided arguments start_whole_processing = timeit.default_timer() with handler_class(**kwargs_handler_class) as handler_instance: handler_instance.to_cloud_optimised(s3_file_uri_list) - cluster_id = handler_instance.cluster_id + if kwargs_handler_class["cluster_mode"].value: + cluster_id = handler_instance.cluster_id + else: + cluster_id = None time_spent_processing = timeit.default_timer() - start_whole_processing logger.info(f"Processed entire dataset in {time_spent_processing}s") diff --git a/aodn_cloud_optimised/lib/GenericZarrHandler.py b/aodn_cloud_optimised/lib/GenericZarrHandler.py index 0cc47cb7..de8466ee 100644 --- a/aodn_cloud_optimised/lib/GenericZarrHandler.py +++ b/aodn_cloud_optimised/lib/GenericZarrHandler.py @@ -277,7 +277,10 @@ def publish_cloud_optimised_fileset_batch(self, s3_file_uri_list): preprocess_xarray, dataset_config=self.dataset_config ) - batch_size = self.get_batch_size(client=self.client) + if self.cluster_mode: + batch_size = self.get_batch_size(client=self.client) + else: + batch_size = 1 for idx, batch_files in enumerate( self.batch_process_fileset(s3_file_handle_list, batch_size=batch_size) @@ -868,14 +871,19 @@ def to_cloud_optimised(self, s3_file_uri_list=None): ) # ensure the list is unique! self.s3_file_uri_list = s3_file_uri_list - # creating a cluster to process multiple files at once - self.client, self.cluster = self.create_cluster() - if self.cluster_mode == "remote": - self.cluster_id = self.cluster.cluster_id - else: - self.cluster_id = self.cluster.name + + if self.cluster_mode: + # creating a cluster to process multiple files at once + self.client, self.cluster = self.create_cluster() + if self.cluster_mode == "remote": + self.cluster_id = self.cluster.cluster_id + else: + self.cluster_id = self.cluster.name + self.publish_cloud_optimised_fileset_batch(s3_file_uri_list) - self.close_cluster(self.client, self.cluster) + + if self.cluster_mode: + self.close_cluster(self.client, self.cluster) @staticmethod def filter_rechunk_dimensions(dimensions): From ee8935f38e9461f21ca219a4d83d1878695e81e1 Mon Sep 17 00:00:00 2001 From: craigrose Date: Wed, 13 Nov 2024 13:36:05 +1100 Subject: [PATCH 2/4] fixed for pre-commit --- aodn_cloud_optimised/lib/CommonHandler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aodn_cloud_optimised/lib/CommonHandler.py b/aodn_cloud_optimised/lib/CommonHandler.py index eae67ac1..210f7efd 100644 --- a/aodn_cloud_optimised/lib/CommonHandler.py +++ b/aodn_cloud_optimised/lib/CommonHandler.py @@ -100,7 +100,6 @@ def __init__(self, **kwargs): if optimised_bucket: self.optimised_bucket_name = optimised_bucket.bucket_name - self.cloud_optimised_output_path = ( PureS3Path.from_uri(f"s3://{self.optimised_bucket_name}") .joinpath( From 68767bc0f4895b659e2b37b7ab89c6e1be2c47e1 Mon Sep 17 00:00:00 2001 From: craigrose Date: Wed, 13 Nov 2024 14:52:15 +1100 Subject: [PATCH 3/4] fix tests --- .../bin/generic_cloud_optimised_creation.py | 6 ++++-- aodn_cloud_optimised/lib/cluster_lib.py | 11 +++++++++++ test_aodn_cloud_optimised/test_bin_generic.py | 3 ++- 3 files changed, 17 insertions(+), 3 deletions(-) create mode 100644 aodn_cloud_optimised/lib/cluster_lib.py diff --git a/aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py b/aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py index 9b479a9d..46c56815 100644 --- a/aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py +++ b/aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py @@ -38,6 +38,7 @@ import argparse from importlib.resources import files +from aodn_cloud_optimised.lib import cluster_lib from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation from aodn_cloud_optimised.lib.config import ( load_variable_from_config, @@ -91,9 +92,10 @@ def main(): ) parser.add_argument( "--cluster-mode", - default="local", + type=cluster_lib.parse_cluster_mode, + default=None, choices=["local", "remote"], - help="Cluster mode to use. Options: 'local' or 'remote'. Default is 'local'.", + help="Cluster mode to use. Options: 'local' or 'remote'. Default is None.", ) parser.add_argument( diff --git a/aodn_cloud_optimised/lib/cluster_lib.py b/aodn_cloud_optimised/lib/cluster_lib.py new file mode 100644 index 00000000..85948684 --- /dev/null +++ b/aodn_cloud_optimised/lib/cluster_lib.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class ClusterMode(Enum): + LOCAL = "local" + REMOTE = "remote" + NONE = None + + +def parse_cluster_mode(value): + return ClusterMode(value) diff --git a/test_aodn_cloud_optimised/test_bin_generic.py b/test_aodn_cloud_optimised/test_bin_generic.py index 0d13865c..76c1b823 100644 --- a/test_aodn_cloud_optimised/test_bin_generic.py +++ b/test_aodn_cloud_optimised/test_bin_generic.py @@ -12,6 +12,7 @@ from moto.moto_server.threaded_moto_server import ThreadedMotoServer from aodn_cloud_optimised.bin.generic_cloud_optimised_creation import main +from aodn_cloud_optimised.lib.cluster_lib import ClusterMode ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -128,7 +129,7 @@ def test_main(self, mock_parse_args): dataset_config=DATASET_CONFIG_NC_ACORN_JSON, clear_existing_data=True, force_previous_parquet_deletion=False, - cluster_mode="local", + cluster_mode=ClusterMode.LOCAL, optimised_bucket_name=self.BUCKET_OPTIMISED_NAME, root_prefix_cloud_optimised_path="testing", bucket_raw="imos-data", From a4fdab8f88942983f99f3d48bf4f65180d4b328a Mon Sep 17 00:00:00 2001 From: craigrose Date: Wed, 13 Nov 2024 14:55:48 +1100 Subject: [PATCH 4/4] remove ROOT_PREFIX_CLOUD_OPTIMISED_PATH value --- aodn_cloud_optimised/config/common.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aodn_cloud_optimised/config/common.json b/aodn_cloud_optimised/config/common.json index 4efbab73..77286380 100644 --- a/aodn_cloud_optimised/config/common.json +++ b/aodn_cloud_optimised/config/common.json @@ -1,7 +1,7 @@ { "BUCKET_RAW_DEFAULT": "imos-data", "BUCKET_OPTIMISED_DEFAULT": "aodn-cloud-optimised", - "ROOT_PREFIX_CLOUD_OPTIMISED_PATH": "prefect_testing", + "ROOT_PREFIX_CLOUD_OPTIMISED_PATH": "", "BUCKET_INTEGRATION_TESTING_RAW_DEFAULT": "imos-data", "BUCKET_INTEGRATION_TESTING_OPTIMISED_DEFAULT": "imos-data-lab-optimised", "ROOT_PREFIX_CLOUD_OPTIMISED_INTEGRATION_TESTING_PATH": "cloud_optimised/integration_testing"