Skip to content

Commit

Permalink
refactor zarr + comment a lot of functions to be removed
Browse files Browse the repository at this point in the history
  • Loading branch information
lbesnard committed Jun 27, 2024
1 parent d5cf88b commit 9e8d6b7
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 358 deletions.
16 changes: 8 additions & 8 deletions aodn_cloud_optimised/bin/acorn_gridded_qc_turq.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ def main():
)
)

# First zarr creation - remove all previous objects
cloud_optimised_creation_loop(
[nc_obj_ls[0]], dataset_config=dataset_config, clear_existing_data=True
nc_obj_ls,
dataset_config=dataset_config,
clear_existing_data=True,
cluster_mode="local",
)

# append to zarr
cloud_optimised_creation_loop(nc_obj_ls[1:], dataset_config=dataset_config)
# rechunking
GenericHandler(
input_object_key=nc_obj_ls[0],
dataset_config=dataset_config,
).rechunk()
# GenericHandler(
# input_object_key=nc_obj_ls[0],
# dataset_config=dataset_config,
# ).rechunk()


if __name__ == "__main__":
Expand Down
10 changes: 10 additions & 0 deletions aodn_cloud_optimised/config/dataset/acorn_gridded_qc_turq.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
"dataset_name": "acorn_gridded_qc_turq",
"logger_name": "acorn_gridded_qc_turq",
"cloud_optimised_format": "zarr",
"cluster_options" : {
"n_workers": [2, 8],
"scheduler_vm_types": "t3.small",
"worker_vm_types": "t3.medium",
"allow_ingress_from": "me",
"compute_purchase_option": "spot_with_fallback",
"worker_options": {
"nthreads": 8,
"memory_limit": "32GB" }
},
"cluster_config" : {
"n_workers": [0, 6],
"scheduler_vm_types": "t3.medium"
Expand Down
10 changes: 10 additions & 0 deletions aodn_cloud_optimised/config/dataset/anmn_hourly_timeseries.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
"dataset_name": "anmn_hourly_timeseries",
"logger_name": "anmn_hourly_timeseries",
"cloud_optimised_format": "parquet",
"cluster_options" : {
"n_workers": [1, 20],
"scheduler_vm_types": "t3.small",
"worker_vm_types": "t3.small",
"allow_ingress_from": "me",
"compute_purchase_option": "spot_with_fallback",
"worker_options": {
"nthreads": 8,
"memory_limit": "32GB" }
},
"metadata_uuid": "7b901002-b1dc-46c3-89f2-b4951cedca48",
"gattrs_to_variables": [
"site_code"
Expand Down
32 changes: 30 additions & 2 deletions aodn_cloud_optimised/lib/CommonHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,34 @@ def close_cluster(self):
except Exception as e:
self.logger.error(f"Error while closing the cluster or client: {e}")

@staticmethod
def batch_process_fileset(fileset, batch_size=10):
"""
Processes a list of files in batches.
This method yields successive batches of files from the input fileset.
Each batch contains up to `batch_size` files. Adjusting `batch_size`
can impact memory usage and performance and lead to out of memory errors. Be cautious
Parameters
----------
fileset : list
A list of files to be processed in batches.
batch_size : int, optional
The number of files to include in each batch (default is 10).
Yields
------
list
A sublist of `fileset` containing up to `batch_size` files.
"""
# batch_size modification could lead to some out of mem
num_files = len(fileset)
for start_idx in range(0, num_files, batch_size):
end_idx = min(start_idx + batch_size, num_files)
yield fileset[start_idx:end_idx]

@staticmethod
def create_fileset(bucket_name, object_keys):
s3_fs = s3fs.S3FileSystem(
Expand Down Expand Up @@ -514,7 +542,7 @@ def cloud_optimised_creation_loop(
"root_prefix_cloud_optimised_path",
load_variable_from_config("ROOT_PREFIX_CLOUD_OPTIMISED_PATH"),
),
"cluster_mode": kwargs.get("cluster_mode", None),
"cluster_mode": kwargs.get("cluster_mode", "local"),
}

# Filter out None values
Expand Down Expand Up @@ -649,7 +677,7 @@ def task(f, i):
# TODO: write some code to check the amount of unmanaged memory and restart cluster when above a threshold
# TODO: test if no cluster_mode is set, if the zarr code still works without a cluster. Should probably work then on a file per file basis and set run_zarr_loop_sequentially = True

# TODO: check if this is the best approach
# TODO: check if this is the best approach. default should be local!
# we dont want to create a cluster for a file per file
if kwargs.get("cluster_mode", None) is None:
run_zarr_loop_sequentially = True
Expand Down
Loading

0 comments on commit 9e8d6b7

Please sign in to comment.