Skip to content

Commit

Permalink
(Feat) add clustering capability - precommit (#23)
Browse files Browse the repository at this point in the history
### Feature Additions:
- Feat: add clustering capability for both Zarr and Parquet
  * local dask cluster
  * remote coiled cluster
  * Driven by dataset config
- Feat: add generic_cloud_optimised_creation script
- Implemented coiled cluster for zarr with parallelism
- Added unittests for parquet and zarr using moto to mock aws

### Bug Fixes:
- Fix version number release in artifacts
- Fix parquet queries

### Refactorings:
- a lot of refactoring for zarr and parquet + removed a lot of functions
- Change to_cloud_optimised to take argument. work for zarr +dask
- Clean all scripts to use generic co + add args

### Documentation and Miscellaneous:
- Update README
- Improve docstrings
  • Loading branch information
lbesnard authored Jul 3, 2024
1 parent d0a7d21 commit 00ad862
Show file tree
Hide file tree
Showing 79 changed files with 8,926 additions and 6,304 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ jobs:
source .venv/bin/activate
poetry run pytest
- name: Build package
run: poetry build

# Configure git for committing version bump
- name: Configure git for committing version bump
run: |
Expand All @@ -82,6 +79,10 @@ jobs:
run: |
git push origin HEAD:main
# build the package after bumping version
- name: Build package
run: poetry build

- name: Delete existing tag (if any)
run: |
git tag -d v${{ env.new_version }} || true
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repos:
files: \.json$
exclude: ^aodn_cloud_optimised/config/dataset/dataset_template.json$
- id: end-of-file-fixer
exclude: '\.schema$'
- id: trailing-whitespace
- id: check-toml

Expand Down
135 changes: 87 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,44 @@
![Release](https://img.shields.io/github/v/release/aodn/aodn_cloud_optimised.svg)
[![codecov](https://codecov.io/gh/aodn/aodn_cloud_optimised/branch/main/graph/badge.svg)](https://codecov.io/gh/aodn/aodn_cloud_optimised/branch/main)

A tool designed to convert IMOS NetCDF files and CSV into Cloud Optimised formats such as Zarr and Parquet
A tool designed to convert IMOS NetCDF and CSV files into Cloud Optimised formats such as Zarr and Parquet

## Key Features

* Conversion of a dataset with YAML Configuration: Convert tabular data (CSV or NetCDF) to Parquet and gridded data to Zarr using YAML configuration files only.
* Preservation of NetCDF Metadata: Maintain NetCDF global attributes metadata
* Improve Querying of Cloud Optimised data by Geographical Bounding box and Time Slice: Create geometry polygon and time slice partitions for Parquet dataset, facilitating efficient data querying by time and geographical bounding box.
* Data Reprocessing: Easily reprocess NetCDF files into Zarr and Parquet formats
* Conversion of CSV/NetCDF to Cloud Optimised format (Zarr/Parquet)
* YAML configuration approach with parent and child YAML configuration if multiple dataset are very similar (i.e. Radar ACORN, GHRSST, see [config](https://github.com/aodn/aodn_cloud_optimised/tree/main/aodn_cloud_optimised/config/dataset))
* Generic handlers for most dataset ([GenericParquetHandler](https://github.com/aodn/aodn_cloud_optimised/blob/main/aodn_cloud_optimised/lib/GenericParquetHandler.py), [GenericZarrHandler](https://github.com/aodn/aodn_cloud_optimised/blob/main/aodn_cloud_optimised/lib/GenericZarrHandler.py)).
* Specific handlers can be written and inherits methods from a generic handler ([Argo handler](https://github.com/aodn/aodn_cloud_optimised/blob/main/aodn_cloud_optimised/lib/ArgoHandler.py), [Mooring Timseries Handler](https://github.com/aodn/aodn_cloud_optimised/blob/main/aodn_cloud_optimised/lib/AnmnHourlyTsHandler.py))
* Clustering capability:
* Local dask cluster
* Remote Coiled cluster
* driven by configuration/can be easily overwritten
* Zarr: gridded dataset are done in batch and in parallel with xarray.open_mfdataset
* Parquet: tabular files are done in batch and in parallel as independent task, done with future
* Reprocessing:
* Zarr,: reprocessing is achieved by writting to specific regions with slices. Non-contigous regions are handled
* Parquet: reprocessing is done via pyarrow internal overwritting function, but can also be forced in case an input file has significantly changed
* Chunking:
* Parquet: to facilitate the query of geospatial data, polygon and timestamp slices are created as partitions
* Zarr: done via dataset configuration
* Metadata:
* Parquet: Metadata is created as a sidecar _metadata.parquet file
* Unittesting of module: Very close to integration testing, local cluster is used to create cloud optimised files


# Installation
## Users
Requirements:
* python >= 3.10.14

### automatic install of latest wheel release
```bash
curl -s https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/install.sh | bash
```

Otherwise go to
github.com/aodn/aodn_cloud_optimised/releases/latest

## Development
Requirements:
* Mamba from miniforge3: https://github.com/conda-forge/miniforge
Expand All @@ -46,57 +66,76 @@ to update the poetry.lock file. Commit the changes to poetry.lock
# Requirements
AWS SSO to push files to S3

# Features List

## Parquet Features
| Feature | Status | Comment |
|------------------------------------------------------------------------------------------------|--------|------------------------------------------------------------------------------------|
| Process IMOS tabular NetCDF to Parquet with GenericHandler | Done | Converts NetCDF files to Parquet format using a generic handler. |
| Process CSV to Parquet with GenericHandler | Done | Converts CSV files to Parquet format using a generic handler. |
| Specific Handlers inherit all methods from GenericHandler with super() | Done | Simplifies the creation of new handlers by inheriting methods. |
| Unittests implemented | Done | Tests to ensure functionality and reliability. |
| Reprocessing of files already converted to Parquet | Done | Reprocessing of NetCDF files; original method can be slow for large datasets. |
| Metadata variable attributes in sidecar parquet file | Done | Metadata attributes available in dataset sidecars. |
| Add new variables to dataset | Done | Addition of new variables such as site_code, deployment_code, filename attributes. |
| Add timestamp variable for partition key | Done | Enhances query performance by adding a timestamp variable. |
| Remove NaN timestamp when NetCDF not CF compliant | Done | Eliminates NaN timestamps, particularly for non CF compliant data like Argo. |
| Create dataset Schema | Done | Creation of a schema for the dataset. |
| Create missing variables available in Schema | Done | Ensures dataset consistency by adding missing variables from the schema. |
| Warning when new variable from NetCDF is missing from Schema | Done | Alerts when a new variable from NetCDF is absent in the schema. |
| Creating metadata parquet sidecar | Done | |
| Create AWS OpenData Registry Yaml | Done |
| Config file JSON validation against schema | Done |
| Create polygon variable to facilite geometry queries | Done |

## Zarr Features
| Feature | Status | Comment |
|------------------------------------------------------------------------|--------|------------------------------------------------------------------------------------|
| Process IMOS Gridded NetCDF to Zarr with GenericHandler | Done | Converts NetCDF files to Parquet format using a generic handler. |
| Specific Handlers inherit all methods from GenericHandler with super() | Done | Simplifies the creation of new handlers by inheriting methods. |

# Usage

## As a standalone bash script
```bash
generic_cloud_optimised_creation -h
usage: generic_cloud_optimised_creation [-h] --paths PATHS [PATHS ...] [--filters [FILTERS ...]] [--suffix SUFFIX] --dataset-config
DATASET_CONFIG [--clear-existing-data] [--force-previous-parquet-deletion]
[--cluster-mode {local,remote}]

Process S3 paths and create cloud-optimized datasets.

options:
-h, --help show this help message and exit
--paths PATHS [PATHS ...]
List of S3 paths to process. Example: 'IMOS/ANMN/NSW' 'IMOS/ANMN/PA'
--filters [FILTERS ...]
Optional filter strings to apply on the S3 paths. Example: '_hourly-timeseries_' 'FV02'
--suffix SUFFIX Optional suffix used by s3_ls to filter S3 objects. Example: '.nc'
--dataset-config DATASET_CONFIG
Path to the dataset config JSON file. Example: 'anmn_hourly_timeseries.json'
--clear-existing-data
Flag to clear existing data. Default is False.
--force-previous-parquet-deletion
Flag to force the search of previous equivalent parquet file created. Much slower. Default is False.
--cluster-mode {local,remote}
Cluster mode to use. Options: 'local' or 'remote'. Default is 'local'.

Examples:
generic_cloud_optimised_creation --paths 'IMOS/ANMN/NSW' 'IMOS/ANMN/PA' --filters '_hourly-timeseries_' 'FV02' --dataset-config 'anmn_hourly_timeseries.json' --clear-existing-data --cluster-mode 'remote'
generic_cloud_optimised_creation --paths 'IMOS/ANMN/NSW' 'IMOS/ANMN/QLD' --dataset-config 'anmn_ctd_ts_fv01.json'
generic_cloud_optimised_creation --paths 'IMOS/ACORN/gridded_1h-avg-current-map_QC/TURQ/2024' --dataset-config 'acorn_gridded_qc_turq.json' --clear-existing-data --cluster-mode 'remote'

# Usage
```
## Parquet
The GenericHandler for parquet dataset creation is designed to be used either as a standalone class or as a base class for more specialised handler implementations. Here's a basic usage example:
## As a python module
```python
# Read the content of the dataset template JSON file (with comments)
#import commentjson
#with open('aodn_cloud_optimised/config/dataset/dataset_template.json', 'r') as file:
# json_with_comments = file.read()
#dataset_config = commentjson.loads(json_with_comments)

import importlib.resources
from aodn_cloud_optimised.lib.config import load_dataset_config
from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation

dataset_config = load_dataset_config(str(importlib.resources.path("aodn_cloud_optimised.config.dataset", "anfog_slocum_glider.json")))

cloud_optimised_creation('object/path/netcdf_file.nc',
dataset_config=dataset_config
)
from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation
from aodn_cloud_optimised.lib.config import (
load_variable_from_config,
load_dataset_config,
)
from aodn_cloud_optimised.lib.s3Tools import s3_ls


def main():
BUCKET_RAW_DEFAULT = load_variable_from_config("BUCKET_RAW_DEFAULT")
nc_obj_ls = s3_ls(BUCKET_RAW_DEFAULT, "IMOS/SRS/SST/ghrsst/L3S-1d/dn/2024")

dataset_config = load_dataset_config(
str(
importlib.resources.path(
"aodn_cloud_optimised.config.dataset", "srs_l3s_1d_dn.json"
)
)
)

cloud_optimised_creation(
nc_obj_ls,
dataset_config=dataset_config,
reprocess=True,
cluster_mode='remote'
)


if __name__ == "__main__":
main()
```
Expand Down
5 changes: 5 additions & 0 deletions README_add_new_dataset.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# Creating a dataset configuration file


This module aims to be generic enough so that adding a new IMOS dataset is driven through a json config file.
Examples of dataset configuration can be found at [config](https://github.com/aodn/aodn_cloud_optimised/tree/main/aodn_cloud_optimised/config/dataset).

For more complicated dataset, such as Argo for example, it's also possible to create a specific handler which would
inherit with ```Super()``` all of the methods for the ```aodn_cloud_optimised.lib.GenericParquetHandler.GenericHandler``` class.

Expand Down
39 changes: 13 additions & 26 deletions aodn_cloud_optimised/bin/aatams_acoustic_tagging.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,18 @@
#!/usr/bin/env python3
import importlib.resources

from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation_loop
from aodn_cloud_optimised.lib.config import (
load_variable_from_config,
load_dataset_config,
)
from aodn_cloud_optimised.lib.s3Tools import s3_ls
import subprocess


def main():
BUCKET_RAW_DEFAULT = load_variable_from_config("BUCKET_RAW_DEFAULT")
obj_ls = s3_ls(BUCKET_RAW_DEFAULT, "IMOS/AATAMS/acoustic_tagging/", suffix=".csv")

dataset_config = load_dataset_config(
str(
importlib.resources.path(
"aodn_cloud_optimised.config.dataset", "aatams_acoustic_tagging.json"
)
)
)

cloud_optimised_creation_loop(
obj_ls,
dataset_config=dataset_config,
)

command = [
"generic_cloud_optimised_creation",
"--paths",
"IMOS/AATAMS/acoustic_tagging/",
"--dataset-config",
"aatams_acoustic_tagging.json",
"--clear-existing-data",
"--cluster-mode",
"remote",
]

if __name__ == "__main__":
main()
# Run the command
subprocess.run(command, check=True)
53 changes: 14 additions & 39 deletions aodn_cloud_optimised/bin/acorn_gridded_qc_turq.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,43 +1,18 @@
#!/usr/bin/env python3
import importlib.resources

from aodn_cloud_optimised.lib.GenericZarrHandler import GenericHandler
from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation_loop

from aodn_cloud_optimised.lib.config import (
load_variable_from_config,
load_dataset_config,
)
from aodn_cloud_optimised.lib.s3Tools import s3_ls
import subprocess


def main():
BUCKET_RAW_DEFAULT = load_variable_from_config("BUCKET_RAW_DEFAULT")
nc_obj_ls = s3_ls(
BUCKET_RAW_DEFAULT, "IMOS/ACORN/gridded_1h-avg-current-map_QC/TURQ/2023"
)

dataset_config = load_dataset_config(
str(
importlib.resources.path(
"aodn_cloud_optimised.config.dataset", "acorn_gridded_qc_turq.json"
)
)
)

# First zarr creation
cloud_optimised_creation_loop(
[nc_obj_ls[0]], dataset_config=dataset_config, reprocess=True
)

# append to zarr
cloud_optimised_creation_loop(nc_obj_ls[1:], dataset_config=dataset_config)
# rechunking
GenericHandler(
input_object_key=nc_obj_ls[0],
dataset_config=dataset_config,
).rechunk()


if __name__ == "__main__":
main()
command = [
"generic_cloud_optimised_creation",
"--paths",
"IMOS/ACORN/gridded_1h-avg-current-map_QC/TURQ/2024/01/",
"--dataset-config",
"acorn_gridded_qc_turq.json",
"--clear-existing-data",
"--cluster-mode",
"remote",
]

# Run the command
subprocess.run(command, check=True)
36 changes: 13 additions & 23 deletions aodn_cloud_optimised/bin/anfog_to_parquet.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
#!/usr/bin/env python3
import importlib.resources

from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation_loop
from aodn_cloud_optimised.lib.config import (
load_variable_from_config,
load_dataset_config,
)
from aodn_cloud_optimised.lib.s3Tools import s3_ls
import subprocess


def main():
BUCKET_RAW_DEFAULT = load_variable_from_config("BUCKET_RAW_DEFAULT")
nc_obj_ls = s3_ls(BUCKET_RAW_DEFAULT, "IMOS/ANFOG/slocum_glider")

dataset_config = load_dataset_config(
str(
importlib.resources.path(
"aodn_cloud_optimised.config.dataset", "anfog_slocum_glider.json"
)
)
)

cloud_optimised_creation_loop(nc_obj_ls, dataset_config=dataset_config)

command = [
"generic_cloud_optimised_creation",
"--paths",
"IMOS/ANFOG/slocum_glider",
"--dataset-config",
"anfog_slocum_glider.json",
"--clear-existing-data",
"--cluster-mode",
"remote",
]

if __name__ == "__main__":
main()
# Run the command
subprocess.run(command, check=True)
Loading

0 comments on commit 00ad862

Please sign in to comment.