From c89d3c09d05baa31fbeb9ce2ca8ad32215311ce1 Mon Sep 17 00:00:00 2001 From: lbesnard Date: Wed, 12 Jun 2024 16:14:15 +1000 Subject: [PATCH] wip --- .../bin/srs_l3s_3d_dn_to_zarr.py | 38 ++++++ .../config/dataset/srs_l3s_3d_dn.json | 128 ++++++++++++++++++ aodn_cloud_optimised/lib/CommonHandler.py | 80 +++++++---- .../lib/GenericZarrHandler.py | 69 ++++++---- poetry.lock | 1 - pyproject.toml | 1 + 6 files changed, 266 insertions(+), 51 deletions(-) create mode 100755 aodn_cloud_optimised/bin/srs_l3s_3d_dn_to_zarr.py create mode 100644 aodn_cloud_optimised/config/dataset/srs_l3s_3d_dn.json diff --git a/aodn_cloud_optimised/bin/srs_l3s_3d_dn_to_zarr.py b/aodn_cloud_optimised/bin/srs_l3s_3d_dn_to_zarr.py new file mode 100755 index 0000000..3f3a09c --- /dev/null +++ b/aodn_cloud_optimised/bin/srs_l3s_3d_dn_to_zarr.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +import importlib.resources + +from aodn_cloud_optimised.lib.CommonHandler import ( + cloud_optimised_creation_loop, + cloud_optimised_creation, +) +from aodn_cloud_optimised.lib.config import ( + load_variable_from_config, + load_dataset_config, +) +from aodn_cloud_optimised.lib.s3Tools import s3_ls + + +def main(): + BUCKET_RAW_DEFAULT = load_variable_from_config("BUCKET_RAW_DEFAULT") + + dataset_config = load_dataset_config( + str( + importlib.resources.path( + "aodn_cloud_optimised.config.dataset", "srs_l3s_3d_dn.json" + ) + ) + ) + nc_obj_ls = s3_ls(BUCKET_RAW_DEFAULT, "IMOS/SRS/SST/ghrsst/L3S-3d/dn/") + + cloud_optimised_creation( + nc_obj_ls[0], dataset_config=dataset_config, handler_reprocess_arg=True + ) + + cloud_optimised_creation_loop( + nc_obj_ls[1:], + dataset_config=dataset_config, + ) + + +if __name__ == "__main__": + main() diff --git a/aodn_cloud_optimised/config/dataset/srs_l3s_3d_dn.json b/aodn_cloud_optimised/config/dataset/srs_l3s_3d_dn.json new file mode 100644 index 0000000..a8b0ae5 --- /dev/null +++ b/aodn_cloud_optimised/config/dataset/srs_l3s_3d_dn.json @@ -0,0 +1,128 @@ +{ + "dataset_name": "srs_l3s_3d_dn", + "logger_name": "srs_l3s_3d_dn", + "cloud_optimised_format": "zarr", + "metadata_uuid": "", + "dimensions": { + "time": {"name": "time", + "chunk": 10, + "rechunk": true}, + "latitude": {"name": "lat", + "chunk": 1000}, + "longitude": {"name": "lon", + "chunk": 1000} + }, + "var_template_shape": "sea_surface_temperature", + "vars_to_drop_no_common_dimension": ["lat", "lon"], + "schema": { + "lon": {"type": "float32"}, + "lat": {"type": "float32"}, + "time": {"type": "datetime64[ns]"}, + "sea_surface_temperature": {"type": "float32"}, + "sea_surface_temperature_day_night": {"type": "float32"}, + "sst_dtime": {"type": "float64"}, + "dt_analysis": {"type": "float32"}, + "wind_speed": {"type": "float32"}, + "wind_speed_dtime_from_sst": {"type": "float32"}, + "sea_ice_fraction": {"type": "float32"}, + "sea_ice_fraction_dtime_from_sst": {"type": "float32"}, + "satellite_zenith_angle": {"type": "float32"}, + "l2p_flags": {"type": "float32"}, + "quality_level": {"type": "float32"}, + "sses_bias": {"type": "float32"}, + "sses_standard_deviation": {"type": "float32"}, + "sses_count": {"type": "float32"}, + "sst_count": {"type": "float32"}, + "sst_mean": {"type": "float32"}, + "sst_standard_deviation": {"type": "float32"} + }, + "dataset_gattrs": { + "title": "Temperature logger" + }, + "aws_opendata_registry": { + "Name": "", + "Description": "", + "Documentation": "", + "Contact": "", + "ManagedBy": "", + "UpdateFrequency": "", + "Tags": [], + "License": "", + "Resources": [ + { + "Description": "", + "ARN": "", + "Region": "", + "Type": "", + "Explore": [] + }, + { + "Description": "", + "ARN": "", + "Region": "", + "Type": "" + }, + { + "Description": "", + "ARN": "", + "Region": "", + "Type": "" + }, + { + "Description": "", + "ARN": "", + "Region": "", + "Type": "" + } + ], + "DataAtWork": { + "Tutorials": [ + { + "Title": "", + "URL": "", + "Services": "", + "AuthorName": "", + "AuthorURL": "" + }, + { + "Title": "", + "URL": "", + "AuthorName": "", + "AuthorURL": "" + }, + { + "Title": "", + "URL": "", + "AuthorName": "", + "AuthorURL": "" + } + ], + "Tools & Applications": [ + { + "Title": "", + "URL": "", + "AuthorName": "", + "AuthorURL": "" + }, + { + "Title": "", + "URL": "", + "AuthorName": "", + "AuthorURL": "" + } + ], + "Publications": [ + { + "Title": "", + "URL": "", + "AuthorName": "" + }, + { + "Title": "", + "URL": "", + "AuthorName": "" + } + ] + } + } +} diff --git a/aodn_cloud_optimised/lib/CommonHandler.py b/aodn_cloud_optimised/lib/CommonHandler.py index 50e9fd5..b236dd8 100644 --- a/aodn_cloud_optimised/lib/CommonHandler.py +++ b/aodn_cloud_optimised/lib/CommonHandler.py @@ -1,5 +1,7 @@ +import ctypes import os import tempfile +import time import timeit from typing import List @@ -285,6 +287,11 @@ def cloud_optimised_creation(obj_key: str, dataset_config, **kwargs) -> None: handler_instance.to_cloud_optimised() +def trim_memory() -> int: + libc = ctypes.CDLL("libc.so.6") + return libc.malloc_trim(0) + + def cloud_optimised_creation_loop( obj_ls: List[str], dataset_config: dict, **kwargs ) -> None: @@ -338,17 +345,19 @@ def cloud_optimised_creation_loop( if dataset_config.get("cloud_optimised_format") == "parquet": cluster = Cluster( - n_workers=[0, 12], + n_workers=[1, 12], scheduler_vm_types="t3.small", worker_vm_types="t3.medium", allow_ingress_from="me", + compute_purchase_option="spot_with_fallback", ) elif dataset_config.get("cloud_optimised_format") == "zarr": cluster = Cluster( n_workers=1, - scheduler_vm_types="t3.medium", - worker_vm_types="m6i.xlarge", + scheduler_vm_types="t3.small", + worker_vm_types="c6i.xlarge", allow_ingress_from="me", + compute_purchase_option="spot_with_fallback", ) client = Client(cluster) @@ -363,6 +372,8 @@ def cloud_optimised_creation_loop( start_whole_processing = timeit.default_timer() + client.amm.start() # Start Active Memory Manager + # Define the task function to be executed in parallel def task(f, i): start_time = timeit.default_timer() @@ -381,26 +392,49 @@ def task(f, i): except Exception as e: logger.error(f"{i}/{len(obj_ls)} issue with {f}: {e}") - # Parallel Execution with List Comprehension - futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)] - - # Wait for all futures to complete - wait(futures) - - # #Submit tasks to the Dask cluster - # if dataset_config.get("cloud_optimised_format") == 'parquet': - # - # # Parallel Execution with List Comprehension - # futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)] - # - # # Wait for all futures to complete - # wait(futures) - # - # elif dataset_config.get("cloud_optimised_format") == 'zarr': - # # Submit tasks to the Dask cluster sequentially - # for i, f in enumerate(obj_ls, start=1): - # future = client.submit(task, f, i) - # result = future.result() # Sequential Execution with future.result() + client.amm.start() + + def wait_for_no_workers(client): + while len(client.scheduler_info()["workers"]) > 0: + time.sleep(1) + + # Submit tasks to the Dask cluster + if dataset_config.get("cloud_optimised_format") == "parquet": + + # Parallel Execution with List Comprehension + futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)] + + # Wait for all futures to complete + wait(futures) + + elif dataset_config.get("cloud_optimised_format") == "zarr": + # Submit tasks to the Dask cluster sequentially + for i, f in enumerate(obj_ls, start=1): + client.amm.start() + future = client.submit(task, f, i) + result = future.result() # Sequential Execution with future.result() + wait(future) + + trim_future = client.submit(trim_memory) + wait(trim_future) + + restart_cluster_every_n = 100 + if i % 10 == restart_cluster_every_n: + # Scale down to zero workers + logger.info( + f"Restarting workers after {i} iterations to avoid memory leaks" + ) + cluster.scale(0) + + # Wait for the workers to be removed + wait_for_no_workers(client) + + desired_n_workers = 1 + # Scale back up to the desired number of workers + cluster.scale(desired_n_workers) + + # Wait for the workers to be ready + client.wait_for_workers(desired_n_workers) time_spent_processing = timeit.default_timer() - start_whole_processing logger.info(f"Whole dataset completed in {time_spent_processing}s") diff --git a/aodn_cloud_optimised/lib/GenericZarrHandler.py b/aodn_cloud_optimised/lib/GenericZarrHandler.py index b3c0615..657c6be 100644 --- a/aodn_cloud_optimised/lib/GenericZarrHandler.py +++ b/aodn_cloud_optimised/lib/GenericZarrHandler.py @@ -9,12 +9,10 @@ import s3fs import xarray as xr import zarr +import time -# from dask import distributed from dask.diagnostics import ProgressBar -from dask.distributed import worker_client -from dask.distributed import Client -from dask.distributed import Lock, get_client +from dask.distributed import Client, Lock, wait, get_client, worker_client from rechunker import rechunk @@ -59,21 +57,48 @@ def __init__(self, **kwargs): self.dimensions["longitude"]["name"]: self.dimensions["longitude"]["chunk"], } + def acquire_dask_lock(self): + """ + Acquire a Dask distributed lock to ensure exclusive access to a shared resource. + + This method attempts to acquire a named Dask lock to prevent concurrent access to a + shared resource, such as writing to a Zarr dataset. If a Dask client is available, + it will obtain the lock, blocking if necessary until the lock becomes available. + If no Dask client is available, it will set the lock attribute to None and log a warning. + """ + lock_name = "zarr_write_lock" + # Attempt to get the Dask client try: dask_client = get_client() # Get the Dask client except ValueError: dask_client = None # Set to None if no Dask client is found - - if dask_client: - self.lock = ( - Lock() - ) # Create a Dask Lock object, to avoid corruption of writting zarr objects in parallel - self.logger.info("Setting up a Cluster Dask Lock to avoid Zarr corruption") - else: self.lock = None # Set to None if no Dask cluster is found self.logger.warning("No cluster lock to setup") + if dask_client: + lock = Lock(lock_name) + lock.acquire() # https://docs.python.org/3/library/threading.html#threading.Lock.acquire + # When invoked with the blocking argument set to True (the default), block until the lock is unlocked, then set it to locked and return True. + + self.logger.info(f"Lock '{lock_name}' acquired successfully.") + self.lock = lock + + def release_lock(self): + """ + Release the currently held Dask lock. + + This method releases the Dask lock previously acquired by the `acquire_dask_lock` method. + If the lock is held, it will be released and an info message will be logged. If the lock + is not held, it will log an info message indicating that no lock is held. + """ + if self.lock: + if self.lock.locked(): + self.lock.release() + self.logger.info("Lock released.") + else: + self.logger.info("No lock is held.") + def check_file_already_processed(self) -> bool: """ Check whether a NetCDF file has been previously processed and integrated into an existing Zarr dataset. @@ -258,7 +283,6 @@ def publish_cloud_optimised(self, ds): ds = ds.chunk(chunks=self.chunks) - # Acquire the lock before writing to the Zarr dataset # first file of the dataset (overwrite) if self.reprocess: self.logger.warning( @@ -338,21 +362,12 @@ def to_cloud_optimised(self): try: ds = self.preprocess() - # Attempt to acquire the lock - if self.lock: - if self.lock.acquire(blocking=False): - try: - # Critical section - perform operations protected by the lock - self.publish_cloud_optimised(ds) - finally: - # Release the lock in a finally block to ensure it's always released - self.lock.release() - else: - # The lock is already held by another process, handle accordingly - self.logger.warning("Lock is already held by another process") - else: - # No Dask cluster lock available, proceed without locking - self.publish_cloud_optimised(ds) + # Attempt to acquire the zarr lock + self.acquire_dask_lock() + # Critical section - perform operations protected by the lock + self.publish_cloud_optimised(ds) + # Release the lock + self.release_lock() self.push_metadata_aws_registry() diff --git a/poetry.lock b/poetry.lock index 1982fb5..8b6d250 100644 --- a/poetry.lock +++ b/poetry.lock @@ -517,7 +517,6 @@ files = [ {file = "cftime-1.6.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:25f043703e785de0bd7cd8222c0a53317e9aeb3dfc062588b05e6f3ebb007468"}, {file = "cftime-1.6.4-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:f9acc272df1022f24fe7dbe9de43fa5d8271985161df14549e4d8d28c90dc9ea"}, {file = "cftime-1.6.4-cp39-cp39-win_amd64.whl", hash = "sha256:e8467b6fbf8dbfe0be8c04d61180765fdd3b9ab0fe51313a0bbf87e63634a3d8"}, - {file = "cftime-1.6.4.tar.gz", hash = "sha256:e325406193758a7ed67308deb52e727782a19e384e183378e7ff62098be0aedc"}, ] [package.dependencies] diff --git a/pyproject.toml b/pyproject.toml index 8f743a6..db9d4ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ gsla_nrt_to_zarr = "aodn_cloud_optimised.bin.gsla_nrt_to_zarr:main" soop_xbt_nrt_to_parquet = "aodn_cloud_optimised.bin.soop_xbt_nrt_to_parquet:main" srs_oc_ljco_to_parquet = "aodn_cloud_optimised.bin.srs_oc_ljco_to_parquet:main" srs_l3s_1d_dn_to_zarr = "aodn_cloud_optimised.bin.srs_l3s_1d_dn_to_zarr:main" +srs_l3s_3d_dn_to_zarr = "aodn_cloud_optimised.bin.srs_l3s_3d_dn_to_zarr:main" #[tool.poetry.include] #data = ["aodn_cloud_optimised/config/*.json", "aodn_cloud_optimised/config/dataset/*.json"]