From cb2674ff1d0909be4d95d82a86561a2f29218445 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 25 Feb 2025 09:43:49 -0700 Subject: [PATCH 1/3] Add multi-processing write example --- docs/docs/icechunk-python/parallel.md | 4 +++ icechunk-python/examples/mpwrite.py | 46 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 icechunk-python/examples/mpwrite.py diff --git a/docs/docs/icechunk-python/parallel.md b/docs/docs/icechunk-python/parallel.md index 9a830a9e..7dad4a06 100644 --- a/docs/docs/icechunk-python/parallel.md +++ b/docs/docs/icechunk-python/parallel.md @@ -81,6 +81,10 @@ xr.testing.assert_identical(ds, ondisk) ## Distributed writes +!!! info + + See a full executable example [here](https://github.com/earth-mover/icechunk/blob/main/icechunk-python/examples/mpwrite.py). + Any task execution framework (e.g. `ProcessPoolExecutor`, Joblib, Lithops, Dask Distributed, Ray, etc.) can be used instead of the `ThreadPoolExecutor`. However such workloads should account for Icehunk being a "stateful" store that records changes executed in a write session. diff --git a/icechunk-python/examples/mpwrite.py b/icechunk-python/examples/mpwrite.py new file mode 100644 index 00000000..a7a75f4a --- /dev/null +++ b/icechunk-python/examples/mpwrite.py @@ -0,0 +1,46 @@ +import tempfile +from concurrent.futures import ProcessPoolExecutor + +import xarray as xr +from icechunk import Repository, Session, local_filesystem_storage +from icechunk.distributed import merge_sessions + + +def write_timestamp(*, itime: int, session: Session) -> Session: + # pass a list to isel to preserve the time dimension + ds = xr.tutorial.open_dataset("rasm").isel(time=[itime]) + # region="auto" tells Xarray to infer which "region" of the output arrays to write to. + ds.to_zarr(session.store, region="auto", consolidated=False) + return session + + +if __name__ == "__main__": + ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24)) + repo = Repository.create(local_filesystem_storage(tempfile.mkdtemp())) + session = repo.writable_session("main") + + chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims} + ds.to_zarr( + session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w" + ) + # this commit is optional, but may be useful in your workflow + session.commit("initialize store") + + session = repo.writable_session("main") + with ProcessPoolExecutor() as executor: + # opt-in to successful pickling of a writable session + with session.allow_pickling(): + # submit the writes + futures = [ + executor.submit(write_timestamp, itime=i, session=session) + for i in range(ds.sizes["time"]) + ] + # grab the Session objects from each individual write task + sessions = [f.result() for f in futures] + + # manually merge the remote sessions in to the local session + session = merge_sessions(session, *sessions) + session.commit("finished writes") + + ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False) + xr.testing.assert_identical(ds, ondisk) From 53641042b8148464cd79166d0360ea96b7e8a73e Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 25 Feb 2025 09:55:57 -0700 Subject: [PATCH 2/3] bugfix --- docs/docs/icechunk-python/parallel.md | 4 ++-- icechunk-python/examples/mpwrite.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/docs/icechunk-python/parallel.md b/docs/docs/icechunk-python/parallel.md index 7dad4a06..ad9073ab 100644 --- a/docs/docs/icechunk-python/parallel.md +++ b/docs/docs/icechunk-python/parallel.md @@ -127,8 +127,8 @@ with ProcessPoolExecutor() as executor: executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"]) ] - # grab the Session objects from each individual write task - sessions = [f.result() for f in futures] + # grab the Session objects from each individual write task + sessions = [f.result() for f in futures] # manually merge the remote sessions in to the local session session = merge_sessions(session, *sessions) diff --git a/icechunk-python/examples/mpwrite.py b/icechunk-python/examples/mpwrite.py index a7a75f4a..a86f8a3a 100644 --- a/icechunk-python/examples/mpwrite.py +++ b/icechunk-python/examples/mpwrite.py @@ -1,3 +1,5 @@ +# An example of using multiprocessing to write to an Icechunk dataset + import tempfile from concurrent.futures import ProcessPoolExecutor From 1934b6814146e50b850cd5cd394893feb9a17ef5 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 26 Feb 2025 10:44:05 -0700 Subject: [PATCH 3/3] edits --- docs/docs/icechunk-python/parallel.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/docs/icechunk-python/parallel.md b/docs/docs/icechunk-python/parallel.md index ad9073ab..8a23a713 100644 --- a/docs/docs/icechunk-python/parallel.md +++ b/docs/docs/icechunk-python/parallel.md @@ -83,6 +83,8 @@ xr.testing.assert_identical(ds, ondisk) !!! info + This code will not execute with a `ProcessPoolExecutor` without [some changes](https://docs.python.org/3/library/multiprocessing.html#programming-guidelines). + Specifically it requires wrapping the code in a `if __name__ == "__main__":` block. See a full executable example [here](https://github.com/earth-mover/icechunk/blob/main/icechunk-python/examples/mpwrite.py). Any task execution framework (e.g. `ProcessPoolExecutor`, Joblib, Lithops, Dask Distributed, Ray, etc.)