Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
lbesnard committed Jun 12, 2024
1 parent e183282 commit c89d3c0
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 51 deletions.
38 changes: 38 additions & 0 deletions aodn_cloud_optimised/bin/srs_l3s_3d_dn_to_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python3
import importlib.resources

from aodn_cloud_optimised.lib.CommonHandler import (
cloud_optimised_creation_loop,
cloud_optimised_creation,
)
from aodn_cloud_optimised.lib.config import (
load_variable_from_config,
load_dataset_config,
)
from aodn_cloud_optimised.lib.s3Tools import s3_ls


def main():
BUCKET_RAW_DEFAULT = load_variable_from_config("BUCKET_RAW_DEFAULT")

dataset_config = load_dataset_config(
str(
importlib.resources.path(
"aodn_cloud_optimised.config.dataset", "srs_l3s_3d_dn.json"
)
)
)
nc_obj_ls = s3_ls(BUCKET_RAW_DEFAULT, "IMOS/SRS/SST/ghrsst/L3S-3d/dn/")

cloud_optimised_creation(
nc_obj_ls[0], dataset_config=dataset_config, handler_reprocess_arg=True
)

cloud_optimised_creation_loop(
nc_obj_ls[1:],
dataset_config=dataset_config,
)


if __name__ == "__main__":
main()
128 changes: 128 additions & 0 deletions aodn_cloud_optimised/config/dataset/srs_l3s_3d_dn.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
{
"dataset_name": "srs_l3s_3d_dn",
"logger_name": "srs_l3s_3d_dn",
"cloud_optimised_format": "zarr",
"metadata_uuid": "",
"dimensions": {
"time": {"name": "time",
"chunk": 10,
"rechunk": true},
"latitude": {"name": "lat",
"chunk": 1000},
"longitude": {"name": "lon",
"chunk": 1000}
},
"var_template_shape": "sea_surface_temperature",
"vars_to_drop_no_common_dimension": ["lat", "lon"],
"schema": {
"lon": {"type": "float32"},
"lat": {"type": "float32"},
"time": {"type": "datetime64[ns]"},
"sea_surface_temperature": {"type": "float32"},
"sea_surface_temperature_day_night": {"type": "float32"},
"sst_dtime": {"type": "float64"},
"dt_analysis": {"type": "float32"},
"wind_speed": {"type": "float32"},
"wind_speed_dtime_from_sst": {"type": "float32"},
"sea_ice_fraction": {"type": "float32"},
"sea_ice_fraction_dtime_from_sst": {"type": "float32"},
"satellite_zenith_angle": {"type": "float32"},
"l2p_flags": {"type": "float32"},
"quality_level": {"type": "float32"},
"sses_bias": {"type": "float32"},
"sses_standard_deviation": {"type": "float32"},
"sses_count": {"type": "float32"},
"sst_count": {"type": "float32"},
"sst_mean": {"type": "float32"},
"sst_standard_deviation": {"type": "float32"}
},
"dataset_gattrs": {
"title": "Temperature logger"
},
"aws_opendata_registry": {
"Name": "",
"Description": "",
"Documentation": "",
"Contact": "",
"ManagedBy": "",
"UpdateFrequency": "",
"Tags": [],
"License": "",
"Resources": [
{
"Description": "",
"ARN": "",
"Region": "",
"Type": "",
"Explore": []
},
{
"Description": "",
"ARN": "",
"Region": "",
"Type": ""
},
{
"Description": "",
"ARN": "",
"Region": "",
"Type": ""
},
{
"Description": "",
"ARN": "",
"Region": "",
"Type": ""
}
],
"DataAtWork": {
"Tutorials": [
{
"Title": "",
"URL": "",
"Services": "",
"AuthorName": "",
"AuthorURL": ""
},
{
"Title": "",
"URL": "",
"AuthorName": "",
"AuthorURL": ""
},
{
"Title": "",
"URL": "",
"AuthorName": "",
"AuthorURL": ""
}
],
"Tools & Applications": [
{
"Title": "",
"URL": "",
"AuthorName": "",
"AuthorURL": ""
},
{
"Title": "",
"URL": "",
"AuthorName": "",
"AuthorURL": ""
}
],
"Publications": [
{
"Title": "",
"URL": "",
"AuthorName": ""
},
{
"Title": "",
"URL": "",
"AuthorName": ""
}
]
}
}
}
80 changes: 57 additions & 23 deletions aodn_cloud_optimised/lib/CommonHandler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import ctypes
import os
import tempfile
import time
import timeit
from typing import List

Expand Down Expand Up @@ -285,6 +287,11 @@ def cloud_optimised_creation(obj_key: str, dataset_config, **kwargs) -> None:
handler_instance.to_cloud_optimised()


def trim_memory() -> int:
libc = ctypes.CDLL("libc.so.6")
return libc.malloc_trim(0)


def cloud_optimised_creation_loop(
obj_ls: List[str], dataset_config: dict, **kwargs
) -> None:
Expand Down Expand Up @@ -338,17 +345,19 @@ def cloud_optimised_creation_loop(

if dataset_config.get("cloud_optimised_format") == "parquet":
cluster = Cluster(
n_workers=[0, 12],
n_workers=[1, 12],
scheduler_vm_types="t3.small",
worker_vm_types="t3.medium",
allow_ingress_from="me",
compute_purchase_option="spot_with_fallback",
)
elif dataset_config.get("cloud_optimised_format") == "zarr":
cluster = Cluster(
n_workers=1,
scheduler_vm_types="t3.medium",
worker_vm_types="m6i.xlarge",
scheduler_vm_types="t3.small",
worker_vm_types="c6i.xlarge",
allow_ingress_from="me",
compute_purchase_option="spot_with_fallback",
)

client = Client(cluster)
Expand All @@ -363,6 +372,8 @@ def cloud_optimised_creation_loop(

start_whole_processing = timeit.default_timer()

client.amm.start() # Start Active Memory Manager

# Define the task function to be executed in parallel
def task(f, i):
start_time = timeit.default_timer()
Expand All @@ -381,26 +392,49 @@ def task(f, i):
except Exception as e:
logger.error(f"{i}/{len(obj_ls)} issue with {f}: {e}")

# Parallel Execution with List Comprehension
futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)]

# Wait for all futures to complete
wait(futures)

# #Submit tasks to the Dask cluster
# if dataset_config.get("cloud_optimised_format") == 'parquet':
#
# # Parallel Execution with List Comprehension
# futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)]
#
# # Wait for all futures to complete
# wait(futures)
#
# elif dataset_config.get("cloud_optimised_format") == 'zarr':
# # Submit tasks to the Dask cluster sequentially
# for i, f in enumerate(obj_ls, start=1):
# future = client.submit(task, f, i)
# result = future.result() # Sequential Execution with future.result()
client.amm.start()

def wait_for_no_workers(client):
while len(client.scheduler_info()["workers"]) > 0:
time.sleep(1)

# Submit tasks to the Dask cluster
if dataset_config.get("cloud_optimised_format") == "parquet":

# Parallel Execution with List Comprehension
futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)]

# Wait for all futures to complete
wait(futures)

elif dataset_config.get("cloud_optimised_format") == "zarr":
# Submit tasks to the Dask cluster sequentially
for i, f in enumerate(obj_ls, start=1):
client.amm.start()
future = client.submit(task, f, i)
result = future.result() # Sequential Execution with future.result()
wait(future)

trim_future = client.submit(trim_memory)
wait(trim_future)

restart_cluster_every_n = 100
if i % 10 == restart_cluster_every_n:
# Scale down to zero workers
logger.info(
f"Restarting workers after {i} iterations to avoid memory leaks"
)
cluster.scale(0)

# Wait for the workers to be removed
wait_for_no_workers(client)

desired_n_workers = 1
# Scale back up to the desired number of workers
cluster.scale(desired_n_workers)

# Wait for the workers to be ready
client.wait_for_workers(desired_n_workers)

time_spent_processing = timeit.default_timer() - start_whole_processing
logger.info(f"Whole dataset completed in {time_spent_processing}s")
Expand Down
69 changes: 42 additions & 27 deletions aodn_cloud_optimised/lib/GenericZarrHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
import s3fs
import xarray as xr
import zarr
import time

# from dask import distributed
from dask.diagnostics import ProgressBar
from dask.distributed import worker_client
from dask.distributed import Client
from dask.distributed import Lock, get_client
from dask.distributed import Client, Lock, wait, get_client, worker_client

from rechunker import rechunk

Expand Down Expand Up @@ -59,21 +57,48 @@ def __init__(self, **kwargs):
self.dimensions["longitude"]["name"]: self.dimensions["longitude"]["chunk"],
}

def acquire_dask_lock(self):
"""
Acquire a Dask distributed lock to ensure exclusive access to a shared resource.
This method attempts to acquire a named Dask lock to prevent concurrent access to a
shared resource, such as writing to a Zarr dataset. If a Dask client is available,
it will obtain the lock, blocking if necessary until the lock becomes available.
If no Dask client is available, it will set the lock attribute to None and log a warning.
"""
lock_name = "zarr_write_lock"

# Attempt to get the Dask client
try:
dask_client = get_client() # Get the Dask client
except ValueError:
dask_client = None # Set to None if no Dask client is found

if dask_client:
self.lock = (
Lock()
) # Create a Dask Lock object, to avoid corruption of writting zarr objects in parallel
self.logger.info("Setting up a Cluster Dask Lock to avoid Zarr corruption")
else:
self.lock = None # Set to None if no Dask cluster is found
self.logger.warning("No cluster lock to setup")

if dask_client:
lock = Lock(lock_name)
lock.acquire() # https://docs.python.org/3/library/threading.html#threading.Lock.acquire
# When invoked with the blocking argument set to True (the default), block until the lock is unlocked, then set it to locked and return True.

self.logger.info(f"Lock '{lock_name}' acquired successfully.")
self.lock = lock

def release_lock(self):
"""
Release the currently held Dask lock.
This method releases the Dask lock previously acquired by the `acquire_dask_lock` method.
If the lock is held, it will be released and an info message will be logged. If the lock
is not held, it will log an info message indicating that no lock is held.
"""
if self.lock:
if self.lock.locked():
self.lock.release()
self.logger.info("Lock released.")
else:
self.logger.info("No lock is held.")

def check_file_already_processed(self) -> bool:
"""
Check whether a NetCDF file has been previously processed and integrated into an existing Zarr dataset.
Expand Down Expand Up @@ -258,7 +283,6 @@ def publish_cloud_optimised(self, ds):

ds = ds.chunk(chunks=self.chunks)

# Acquire the lock before writing to the Zarr dataset
# first file of the dataset (overwrite)
if self.reprocess:
self.logger.warning(
Expand Down Expand Up @@ -338,21 +362,12 @@ def to_cloud_optimised(self):
try:
ds = self.preprocess()

# Attempt to acquire the lock
if self.lock:
if self.lock.acquire(blocking=False):
try:
# Critical section - perform operations protected by the lock
self.publish_cloud_optimised(ds)
finally:
# Release the lock in a finally block to ensure it's always released
self.lock.release()
else:
# The lock is already held by another process, handle accordingly
self.logger.warning("Lock is already held by another process")
else:
# No Dask cluster lock available, proceed without locking
self.publish_cloud_optimised(ds)
# Attempt to acquire the zarr lock
self.acquire_dask_lock()
# Critical section - perform operations protected by the lock
self.publish_cloud_optimised(ds)
# Release the lock
self.release_lock()

self.push_metadata_aws_registry()

Expand Down
Loading

0 comments on commit c89d3c0

Please sign in to comment.