Skip to content

Commit

Permalink
create group of n_task before submitting all the tasks-- seems to wor…
Browse files Browse the repository at this point in the history
…k for daks2
  • Loading branch information
lbesnard committed Jun 14, 2024
1 parent 799a736 commit 7252697
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 44 deletions.
10 changes: 6 additions & 4 deletions aodn_cloud_optimised/bin/srs_l3s_1d_dn_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ def main():
)
)
)
# cloud_optimised_creation(
# nc_obj_ls[0], dataset_config=dataset_config, handler_reprocess_arg=True
# )
# important, with zarr creation, if we want to create a cluster and "append" data to a zarr if no data exists, it's important
# to create the first one manually, otherwise, all the tasks will think that the dataset doesnt exist
cloud_optimised_creation(
nc_obj_ls[0], dataset_config=dataset_config, handler_reprocess_arg=True
)

# for i in range(10):
# cloud_optimised_creation(
# nc_obj_ls[i+1], dataset_config=dataset_config,
# )

cloud_optimised_creation_loop(
nc_obj_ls[0:150],
nc_obj_ls[1:20],
dataset_config=dataset_config,
)

Expand Down
2 changes: 1 addition & 1 deletion aodn_cloud_optimised/config/dataset/srs_l3s_1d_dn.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"dataset_name": "srs_l3s_1d_dn",
"dataset_name": "srs_l3s_1d_dn_5",
"logger_name": "srs_l3s_1d_dn",
"cloud_optimised_format": "zarr",
"metadata_uuid": "",
Expand Down
101 changes: 74 additions & 27 deletions aodn_cloud_optimised/lib/CommonHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,15 @@ def cloud_optimised_creation_loop(
compute_purchase_option="spot_with_fallback",
)
elif dataset_config.get("cloud_optimised_format") == "zarr":
# cluster = Cluster(
# n_workers=[1,5],
# scheduler_vm_types="c6gn.medium", # t3.small",
# worker_vm_types="c6gn.4xlarge",
# allow_ingress_from="me",
# compute_purchase_option="spot_with_fallback",
# )
cluster = Cluster(
n_workers=1,
n_workers=1, # havent managed to use more than one worker successfully without corrupting the zarr dataset, even by using the dask distributed lock
scheduler_vm_types="c6gn.medium", # t3.small",
worker_vm_types="c6gn.2xlarge",
allow_ingress_from="me",
Expand Down Expand Up @@ -443,6 +450,19 @@ def task(f, i):
except Exception as e:
logger.error(f"{i}/{len(obj_ls)} issue with {f}: {e}")

# Number of tasks to submit at once
n_tasks = 4

# Function to submit tasks in batches
def submit_tasks_in_batches(client, task, obj_ls, n_tasks):
results = []
for i in range(0, len(obj_ls), n_tasks):
batch = obj_ls[i : i + n_tasks]
futures = [client.submit(task, f, i + j) for j, f in enumerate(batch)]
wait(futures) # Wait for the batch to complete
results.extend(client.gather(futures))
return results

client.amm.start()

def wait_for_no_workers(client):
Expand All @@ -459,33 +479,60 @@ def wait_for_no_workers(client):
wait(futures)

elif dataset_config.get("cloud_optimised_format") == "zarr":

# TODO: because of memory leaks growing over time, it could make sense to define the cluster in this if elif
# section and recreate it every 50-100 files?
# TODO: we need to get the parallelisation work like this for now, but eventually, the handler class should take
# many NetCDF files as a list, and then do the dask processing of mfdataset and to_zarr. but how to deal
# the download of the input data without saturating the disk, especially if to_zarr(compute=False)
# TODO: I tried verious thing to have multiple workers for zarr. the main thing is to have a proper lock on the
# zarr dataset to avoid corruption and having multiple threads writing at the same time. I tried using a lock
# which seems to work for one worker, but doesn't get shared amongts workers as claimed by the doc.
# I tried retrieving the scheduler worker, and have it as an argument of task function. However, its not
# possible to serialise a client() object with pickle or dill, and have it as a parameter... Nor was it
# possible to have it as a global variable.
# I then tried to create a custom lock by creating a zarr lock file on s3. Realistically, that should have
# worked. Not sure why it didnt? maybe I should try again, I may have done to many changes as the same time.
# would have to make sure that the first NetCDF is properly converted outside of the loop to make sure that
# the consecutive parallel task don't think it's an empty dataset.
# TODO: my code seems to work fine in parallel instead of being sequential, however if too many tasks are put at once,
# , even like 20, everything seems to be very slow, hangs. I never have the patience to wait

submit_tasks_in_batches(client, task, obj_ls, n_tasks)

# Parallel Execution with List Comprehension
# futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)]

# Wait for all futures to complete
# wait(futures)

# Submit tasks to the Dask cluster sequentially
for i, f in enumerate(obj_ls, start=1):
client.amm.start()
future = client.submit(task, f, i)
result = future.result() # Sequential Execution with future.result()
wait(future)

trim_future = client.submit(trim_memory)
wait(trim_future)

restart_cluster_every_n = 100
if i % 10 == restart_cluster_every_n:
# Scale down to zero workers
logger.info(
f"Restarting workers after {i} iterations to avoid memory leaks"
)
cluster.scale(0)

# Wait for the workers to be removed
wait_for_no_workers(client)

desired_n_workers = 1
# Scale back up to the desired number of workers
cluster.scale(desired_n_workers)

# Wait for the workers to be ready
client.wait_for_workers(desired_n_workers)
# for i, f in enumerate(obj_ls, start=1):
# client.amm.start()
# future = client.submit(task, f, i)
# result = future.result() # Sequential Execution with future.result()
# wait(future)
#
# trim_future = client.submit(trim_memory)
# wait(trim_future)
#
# restart_cluster_every_n = 100
# if i % 10 == restart_cluster_every_n:
# # Scale down to zero workers
# logger.info(
# f"Restarting workers after {i} iterations to avoid memory leaks"
# )
# cluster.scale(0)
#
# # Wait for the workers to be removed
# wait_for_no_workers(client)
#
# desired_n_workers = 1
# # Scale back up to the desired number of workers
# cluster.scale(desired_n_workers)
#
# # Wait for the workers to be ready
# client.wait_for_workers(desired_n_workers)

time_spent_processing = timeit.default_timer() - start_whole_processing
logger.info(f"Whole dataset completed in {time_spent_processing}s")
Expand Down
37 changes: 25 additions & 12 deletions aodn_cloud_optimised/lib/GenericZarrHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import s3fs
import xarray as xr
import zarr
from zarr.sync import ThreadSynchronizer

import time

from dask.diagnostics import ProgressBar
Expand Down Expand Up @@ -82,7 +84,7 @@ def acquire_dask_lock(self):
if self.dask_client:
lock = Lock(name=lock_name, client=self.dask_client)
lock.acquire(
blocking=True
blocking=False
) # https://docs.python.org/3/library/threading.html#threading.Lock.acquire
# When invoked with the blocking argument set to True (the default), block until the lock is unlocked, then set it to locked and return True.

Expand All @@ -98,12 +100,12 @@ def release_lock(self):
is not held, it will log an info message indicating that no lock is held.
"""
if self.lock:
if self.lock.locked():
self.lock.release()
self.logger.info("Lock released.")
self.lock = None
else:
self.logger.info("No lock is held.")
# if self.lock.locked():
self.lock.release()
self.logger.info("Lock released.")
self.lock = None
else:
self.logger.info("No lock is held.")

def check_file_already_processed(self) -> bool:
"""
Expand Down Expand Up @@ -352,6 +354,8 @@ def publish_cloud_optimised(self, ds):
compute=self.compute,
consolidated=True,
)
self.logger.info(f"{self.filename}: Writen data to new Zarr dataset")

# write_job = write_job.persist()
# distributed.progress(write_job, notebook=False)
self.logger.info(
Expand Down Expand Up @@ -382,11 +386,19 @@ def to_cloud_optimised(self):
ds = self.preprocess()

# Attempt to acquire the zarr lock
self.acquire_dask_lock()
# self.acquire_dask_lock()
# Critical section - perform operations protected by the lock
self.publish_cloud_optimised(ds)
try:
lock = Lock(name="zarr_lock", client=get_client())
self.logger.info(f"Get lock from Client {lock}")
with lock:
self.publish_cloud_optimised(ds)
except:
self.logger.info("No existing Dask client to set up a lock")
self.publish_cloud_optimised(ds)

# Release the lock
self.release_lock()
# self.release_lock()

self.push_metadata_aws_registry()

Expand All @@ -402,8 +414,9 @@ def to_cloud_optimised(self):

if "ds" in locals():
self.postprocess(ds)
finally:
self.release_lock()
# finally:
# self.release_lock()
# pass

@staticmethod
def filter_rechunk_dimensions(dimensions):
Expand Down

0 comments on commit 7252697

Please sign in to comment.