Skip to content

Commit

Permalink
working state for both zarr and parquet on dask remotely and local cl…
Browse files Browse the repository at this point in the history
…uster. next commit is removing commented functions
  • Loading branch information
lbesnard committed Jun 28, 2024
1 parent ac9a769 commit b6b5bff
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 114 deletions.
41 changes: 40 additions & 1 deletion aodn_cloud_optimised/config/dataset/anmn_hourly_timeseries.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"cluster_options" : {
"n_workers": [1, 20],
"scheduler_vm_types": "t3.small",
"worker_vm_types": "t3.small",
"worker_vm_types": "t3.large",
"allow_ingress_from": "me",
"compute_purchase_option": "spot_with_fallback",
"worker_options": {
Expand Down Expand Up @@ -447,6 +447,45 @@
"long_name": "std data value in the bin, after rejection of flagged data",
"cell_methods": "TIME:std"
},
"DOX1_count": {
"type": "float",
"standard_name": "mole_concentration_of_dissolved_molecular_oxygen_in_sea_water number_of_observations",
"units": "1",
"long_name": "count data value in the bin, after rejection of flagged data",
"cell_methods": "TIME:count"
},
"DOX1_max": {
"type": "float",
"units": "umol l-1",
"standard_name": "mole_concentration_of_dissolved_molecular_oxygen_in_sea_water",
"long_name": "max data value in the bin, after rejection of flagged data",
"cell_methods": "TIME:max"
},
"DOX1_min": {
"type": "float",
"units": "umol l-1",
"standard_name": "mole_concentration_of_dissolved_molecular_oxygen_in_sea_water",
"long_name": "min data value in the bin, after rejection of flagged data",
"cell_methods": "TIME:min"
},
"DOX1_std": {
"type": "float",
"units": "umol l-1",
"standard_name": "mole_concentration_of_dissolved_molecular_oxygen_in_sea_water",
"long_name": "std data value in the bin, after rejection of flagged data",
"cell_methods": "TIME:std"
},
"DOX2": {
"type": "float",
"ancillary_variables": "DOX2_min DOX2_max DOX2_std DOX2_count",
"comment": "Originally expressed in ml/l, assuming 1ml/l = 44.660umol/l and using density computed from Temperature, Salinity and Pressure with the CSIRO SeaWater library (EOS-80) v1.1.",
"long_name": "mean moles_of_oxygen_per_unit_mass_in_sea_water",
"standard_name": "moles_of_oxygen_per_unit_mass_in_sea_water",
"units": "umol kg-1",
"valid_max": 1000.0,
"valid_min": 0.0,
"cell_methods": "TIME:mean (interval: 1 hr comment: time mid point)"
},
"timestamp": {
"type": "int64"
},
Expand Down
145 changes: 72 additions & 73 deletions aodn_cloud_optimised/lib/CommonHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from dask.distributed import LocalCluster
from .config import load_variable_from_config, load_dataset_config
from .logging import get_logger
from aodn_cloud_optimised.lib.s3Tools import prefix_exists
from aodn_cloud_optimised.lib.s3Tools import prefix_exists, create_fileset

import s3fs


Expand Down Expand Up @@ -60,7 +61,7 @@ def __init__(self, **kwargs):

if self.cluster_mode not in valid_clusters:
raise ValueError(
f"Invalid cluster value: {self.cluster}. Valid values are {valid_clusters}"
f"Invalid cluster value: {self.cluster_mode}. Valid values are {valid_clusters}"
)

self.dataset_config = kwargs.get("dataset_config")
Expand Down Expand Up @@ -130,50 +131,55 @@ def create_cluster(self):
ValueError: If an invalid cluster_mode is specified.
"""

# TODO: quite crazy, but if client and cluster become self.client and self.cluster, then they can't be used
# with self.client.submit as they can't be serialize ... what a bloody pain in .. seriously

local_cluster_options = self.dataset_config.get(
"local_cluster_options",
{
"n_workers": 2,
"memory_limit": "12GB",
"memory_limit": "8GB",
"threads_per_worker": 2,
},
)

if self.cluster_mode == "remote":
try:
self.logger.info("Creating a remote cluster cluster")
self.logger.info("Creating a remote cluster")
cluster_options = self.dataset_config.get("cluster_options", None)
if cluster_options is None:
self.logger.error("No cluster options provided in dataset_config")

cluster_options["name"] = f"Processing_{self.dataset_name}"

self.cluster = Cluster(**cluster_options)
self.client = Client(self.cluster)
cluster = Cluster(**cluster_options)
client = Client(cluster)
self.logger.info(
f"Coiled Cluster dask dashboard available at {self.cluster.dashboard_link}"
f"Coiled Cluster dask dashboard available at {cluster.dashboard_link}"
)

except Exception as e:
self.logger.warning(
f"Could not create a Coiled cluster: {e}. Falling back to local cluster."
)
# Create a local Dask cluster as a fallback
self.cluster = LocalCluster(**local_cluster_options)
self.client = Client(self.cluster)
cluster = LocalCluster(**local_cluster_options)
client = Client(cluster)
self.logger.info(
f"Local Cluster dask dashboard available at {self.cluster.dashboard_link}"
f"Local Cluster dask dashboard available at {cluster.dashboard_link}"
)
elif self.cluster_mode == "local":
self.logger.info("Creating a local cluster")

self.cluster = LocalCluster(**local_cluster_options)
self.client = Client(self.cluster)
cluster = LocalCluster(**local_cluster_options)
client = Client(cluster)
self.logger.info(
f"Local Cluster dask dashboard available at {self.cluster.dashboard_link}"
f"Local Cluster dask dashboard available at {cluster.dashboard_link}"
)

def close_cluster(self):
return client, cluster

def close_cluster(self, client, cluster):
"""
Close the Dask cluster and client.
Expand All @@ -191,12 +197,11 @@ def close_cluster(self):
Error: Logs a message if there is an error while closing the Dask client or cluster.
"""
try:
if self.client:
self.client.close()
self.logger.info("Dask client closed successfully.")
if self.cluster:
self.cluster.close()
self.logger.info("Dask cluster closed successfully.")
client.close()
self.logger.info("Dask client closed successfully.")

cluster.close()
self.logger.info("Dask cluster closed successfully.")
except Exception as e:
self.logger.error(f"Error while closing the cluster or client: {e}")

Expand Down Expand Up @@ -228,19 +233,6 @@ def batch_process_fileset(fileset, batch_size=10):
end_idx = min(start_idx + batch_size, num_files)
yield fileset[start_idx:end_idx]

@staticmethod
def create_fileset(bucket_name, object_keys):
s3_fs = s3fs.S3FileSystem(
anon=True
) # TODO: check with data team this can be anon=True

remote_files = [f"s3://{bucket_name}/{key}" for key in object_keys]

# Create a fileset by opening each file
fileset = [s3_fs.open(file) for file in remote_files]

return fileset

def validate_json(self, json_validation_path):
"""
Validate the JSON configuration of a dataset against a specified pyarrow_schema.
Expand Down Expand Up @@ -492,13 +484,13 @@ def _get_generic_handler_class(dataset_config):

# TODO: input_obj of the class should be a full s3 path so that object can come from anywhere
def cloud_optimised_creation_loop(
obj_ls: List[str], dataset_config: dict, **kwargs
s3_file_uri_list: List[str], dataset_config: dict, **kwargs
) -> None:
"""
Iterate through a list of file paths and create Cloud Optimised files for each file.
Args:
obj_ls (List[str]): List of file paths to process.
s3_file_uri_list (List[str]): List of file paths to process.
dataset_config (dictionary): dataset configuration. Check config/dataset_template.json for example
**kwargs: Additional keyword arguments for customization.
handler_class (class, optional): Handler class for cloud optimised creation.
Expand All @@ -519,9 +511,9 @@ def cloud_optimised_creation_loop(
# Create the kwargs_handler_class dictionary, to be used as list of arguments to call cloud_optimised_creation -> handler_class
# when values need to be overwritten
kwargs_handler_class = {
"raw_bucket_name": kwargs.get(
"raw_bucket_name", load_variable_from_config("BUCKET_RAW_DEFAULT")
),
# "raw_bucket_name": kwargs.get(
# "raw_bucket_name", load_variable_from_config("BUCKET_RAW_DEFAULT")
# ),
"optimised_bucket_name": kwargs.get(
"optimised_bucket_name",
load_variable_from_config("BUCKET_OPTIMISED_DEFAULT"),
Expand Down Expand Up @@ -561,51 +553,58 @@ def task(f, i, handler_clear_existing_data_arg=False):
# )
time_spent = timeit.default_timer() - start_time
logger.info(
f"{i}/{len(obj_ls)}: {f} Cloud Optimised file completed in {time_spent}s"
f"{i}/{len(s3_file_uri_list)}: {f} Cloud Optimised file completed in {time_spent}s"
)
except Exception as e:
logger.error(f"{i}/{len(obj_ls)} issue with {f}: {e}")
logger.error(f"{i}/{len(s3_file_uri_list)} issue with {f}: {e}")

start_whole_processing = timeit.default_timer()
if dataset_config.get("cloud_optimised_format") == "parquet":

kwargs_handler_class["dataset_config"] = dataset_config

# kwargs_handler_class["dataset_config"] = dataset_config
kwargs_handler_class["clear_existing_data"] = handler_clear_existing_data_arg

# Creating an instance of the specified class with the provided arguments
with handler_class(**kwargs_handler_class) as handler_instance:
handler_instance.to_cloud_optimised(s3_file_uri_list)

# TODO: get read of cloud_optimised_creation function, only used for parquet single file. just rewrite this function
# TODO: for parquet, we got to create the cluster here! maybe the cluster creation should be written in its own class? some refactoring
# TODO: handle the clustering properly below
local_cluster_options = {
"n_workers": 2,
"memory_limit": "8GB",
"threads_per_worker": 2,
}

cluster = LocalCluster(**local_cluster_options)
client = Client(cluster)

client.amm.start() # Start Active Memory Manager
logger.info(
f"Local Cluster dask dashboard available at {cluster.dashboard_link}"
)

if handler_clear_existing_data_arg:
# if handler_clear_existing_data_arg, better to wait for this task to complete before adding new data!!
futures_init = [
client.submit(task, obj_ls[0], 1, handler_clear_existing_data_arg=True)
]
wait(futures_init)

# Parallel Execution with List Comprehension
futures = [
client.submit(task, f, i) for i, f in enumerate(obj_ls[1:], start=2)
]
wait(futures)
else:
futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)]
wait(futures)

client.close()
cluster.close()
# local_cluster_options = {
# "n_workers": 2,
# "memory_limit": "8GB",
# "threads_per_worker": 2,
# }
#
# cluster = LocalCluster(**local_cluster_options)
# client = Client(cluster)
#
# client.amm.start() # Start Active Memory Manager
# logger.info(
# f"Local Cluster dask dashboard available at {cluster.dashboard_link}"
# )
#
# if handler_clear_existing_data_arg:
# # if handler_clear_existing_data_arg, better to wait for this task to complete before adding new data!!
# futures_init = [
# client.submit(task, obj_ls[0], 1, handler_clear_existing_data_arg=True)
# ]
# wait(futures_init)
#
# # Parallel Execution with List Comprehension
# futures = [
# client.submit(task, f, i) for i, f in enumerate(obj_ls[1:], start=2)
# ]
# wait(futures)
# else:
# futures = [client.submit(task, f, i) for i, f in enumerate(obj_ls, start=1)]
# wait(futures)
#
# client.close()
# cluster.close()

elif dataset_config.get("cloud_optimised_format") == "zarr":

Expand Down Expand Up @@ -652,7 +651,7 @@ def task(f, i, handler_clear_existing_data_arg=False):

# Creating an instance of the specified class with the provided arguments
with handler_class(**kwargs_handler_class) as handler_instance:
handler_instance.to_cloud_optimised(obj_ls)
handler_instance.to_cloud_optimised(s3_file_uri_list)

time_spent_processing = timeit.default_timer() - start_whole_processing
logger.info(f"Whole dataset completed in {time_spent_processing}s")
Loading

0 comments on commit b6b5bff

Please sign in to comment.