-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start of executable docs #777
base: main
Are you sure you want to change the base?
Changes from all commits
5d65b26
4a2fbe9
079c5f2
3c0c0bd
7e95499
146801d
01e2a3e
d567379
8d346fe
5fb8ed1
ddd6283
d7f8cc8
66c4522
0a4dc89
bb650a3
64e74e7
e2d1bf1
170559b
7fa4d1a
9b50cf4
b9e5c54
45820c9
4f70655
ce755a9
52c6c16
3d8b943
bbe0780
86201c9
4c6966a
ca5914f
8a7f081
da754d9
723e7b0
ec2753d
9c83871
e775d27
b35caf0
082ad54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,3 +76,5 @@ ENV/ | |
|
||
# MkDocs documentation | ||
site*/ | ||
|
||
icechunk-local |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,23 +5,16 @@ However, because of how Icechunk works, it's not possible to use the existing [` | |
|
||
Instead, Icechunk provides its own specialized functions to make distributed writes with Dask and Xarray. | ||
This page explains how to use these specialized functions. | ||
!!! note | ||
|
||
Using Xarray, Dask, and Icechunk requires `icechunk>=0.1.0a5`, `dask>=2024.11.0`, and `xarray>=2024.11.0`. | ||
|
||
Start with an icechunk store and dask arrays. | ||
|
||
First let's start a distributed Client and create an IcechunkStore. | ||
|
||
```python | ||
# initialize a distributed Client | ||
from distributed import Client | ||
|
||
client = Client() | ||
|
||
Comment on lines
-13
to
-20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. noteable change here is that the first example doesn't use a client, because it was not working without |
||
# initialize the icechunk store | ||
```python exec="on" session="dask" source="material-block" | ||
import icechunk | ||
import tempfile | ||
|
||
storage = icechunk.local_filesystem_storage("./icechunk-dask") | ||
# initialize the icechunk store | ||
storage = icechunk.local_filesystem_storage(tempfile.TemporaryDirectory().name) | ||
icechunk_repo = icechunk.Repository.create(storage) | ||
icechunk_session = icechunk_repo.writable_session("main") | ||
``` | ||
|
@@ -33,15 +26,15 @@ The API follows that of [`dask.array.store`](https://docs.dask.org/en/stable/gen | |
support for the `compute` kwarg. | ||
|
||
First create a dask array to write: | ||
```python | ||
```python exec="on" session="dask" source="material-block" | ||
import dask.array as da | ||
shape = (100, 100) | ||
dask_chunks = (20, 20) | ||
dask_array = dask.array.random.random(shape, chunks=dask_chunks) | ||
dask_array = da.random.random(shape, chunks=dask_chunks) | ||
``` | ||
|
||
Now create the Zarr array you will write to. | ||
```python | ||
```python exec="on" session="dask" source="material-block" | ||
import zarr | ||
|
||
zarr_chunks = (10, 10) | ||
|
@@ -60,33 +53,48 @@ write task is independent, and will not conflict. It is your responsibility to e | |
conflicts are avoided. | ||
|
||
Now write | ||
```python | ||
```python exec="on" session="dask" source="material-block" result="code" | ||
import icechunk.dask | ||
|
||
icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray]) | ||
icechunk.dask.store_dask( | ||
icechunk_session, | ||
sources=[dask_array], | ||
targets=[zarray] | ||
) | ||
``` | ||
|
||
Finally commit your changes! | ||
```python | ||
icechunk_session.commit("wrote a dask array!") | ||
```python exec="on" session="dask" source="material-block" | ||
print(icechunk_session.commit("wrote a dask array!")) | ||
``` | ||
|
||
|
||
## Distributed | ||
|
||
In distributed contexts where the Session, and Zarr Array objects are sent across the network, | ||
you must opt-in to successful pickling of a writable store. | ||
|
||
you must opt-in to successful pickling of a writable store. This will happen when you have initialized a dask | ||
cluster. This will be case if you have initialized a `distributed.Client`. | ||
[`icechunk.dask.store_dask`](./reference.md#icechunk.dask.store_dask) takes care of the hard bit of | ||
merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects. | ||
|
||
Here is an example: | ||
```python | ||
|
||
```python exec="on" session="dask" source="material-block" result="code" | ||
|
||
from distributed import Client | ||
client = Client() | ||
|
||
import icechunk.dask | ||
|
||
# start a new session. Old session is readonly after committing | ||
|
||
icechunk_session = icechunk_repo.writable_session("main") | ||
zarr_chunks = (10, 10) | ||
with icechunk_session.allow_pickling(): | ||
group = zarr.group(store=icechunk_sesion.store, overwrite=True) | ||
group = zarr.group( | ||
store=icechunk_session.store, | ||
overwrite=True | ||
) | ||
|
||
zarray = group.create_array( | ||
"array", | ||
|
@@ -95,8 +103,13 @@ with icechunk_session.allow_pickling(): | |
dtype="f8", | ||
fill_value=float("nan"), | ||
) | ||
icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray]) | ||
icechunk_session.commit("wrote a dask array!") | ||
|
||
icechunk.dask.store_dask( | ||
icechunk_session, | ||
sources=[dask_array], | ||
targets=[zarray] | ||
) | ||
print(icechunk_session.commit("wrote a dask array!")) | ||
``` | ||
|
||
## Icechunk + Dask + Xarray | ||
|
@@ -113,20 +126,31 @@ Notably the ``compute`` kwarg is not supported. | |
|
||
|
||
Now roundtrip an xarray dataset | ||
```python | ||
|
||
```python exec="on" session="dask" source="material-block" result="code" | ||
import icechunk.xarray | ||
import xarray as xr | ||
|
||
# Assuming you have a valid writable Session named icechunk_session | ||
dataset = xr.tutorial.open_dataset("rasm", chunks={"time": 1}).isel(time=slice(24)) | ||
icechunk_session = icechunk_repo.writable_session("main") | ||
dataset = xr.tutorial.open_dataset( | ||
"rasm", | ||
chunks={"time": 1}).isel(time=slice(24) | ||
) | ||
|
||
icechunk.xarray.to_icechunk(dataset, session) | ||
# `to_icechunk` takes care of "allow_pickling" for you | ||
icechunk.xarray.to_icechunk(dataset, icechunk_session, mode="w") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. had to add the mode to avoid an error. |
||
|
||
roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False) | ||
dataset.identical(roundtripped) | ||
with icechunk_session.allow_pickling(): | ||
roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False) | ||
print(dataset.identical(roundtripped)) | ||
``` | ||
|
||
Finally commit your changes! | ||
```python | ||
icechunk_session.commit("wrote an Xarray dataset!") | ||
```python exec="on" session="dask" source="material-block" result="code" | ||
print(icechunk_session.commit("wrote an Xarray dataset!")) | ||
``` | ||
|
||
```python exec="on" session="dask" | ||
# handy when running mkdocs serve locally | ||
client.shutdown(); | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,38 +16,38 @@ including those executed remotely in a multi-processing or any other remote exec | |
Here is how you can execute such writes with Icechunk, illustrate with a `ThreadPoolExecutor`. | ||
First read some example data, and create an Icechunk Repository. | ||
|
||
```python | ||
```python exec="on" session="parallel" source="material-block" | ||
import xarray as xr | ||
import tempfile | ||
from icechunk import Repository, local_filesystem_storage | ||
|
||
ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24)) | ||
repo = Repository.create(local_filesystem_storage(tempfile.mkdtemp())) | ||
repo = Repository.create(local_filesystem_storage(tempfile.TemporaryDirectory().name)) | ||
session = repo.writable_session("main") | ||
``` | ||
|
||
We will orchestrate so that each task writes one timestep. | ||
This is an arbitrary choice but determines what we set for the Zarr chunk size. | ||
|
||
```python | ||
```python exec="on" session="parallel" source="material-block" result="code" | ||
chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims} | ||
``` | ||
|
||
Initialize the dataset using [`Dataset.to_zarr`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.to_zarr.html) | ||
and `compute=False`, this will NOT write any chunked array data, but will write all array metadata, and any | ||
in-memory arrays (only `time` in this case). | ||
|
||
```python | ||
```python exec="on" session="parallel" source="material-block" | ||
ds.to_zarr(session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w") | ||
# this commit is optional, but may be useful in your workflow | ||
session.commit("initialize store") | ||
print(session.commit("initialize store")) | ||
``` | ||
|
||
## Multi-threading | ||
|
||
First define a function that constitutes one "write task". | ||
|
||
```python | ||
```python exec="on" session="parallel" source="material-block" | ||
from icechunk import Session | ||
|
||
def write_timestamp(*, itime: int, session: Session) -> None: | ||
|
@@ -59,22 +59,22 @@ def write_timestamp(*, itime: int, session: Session) -> None: | |
|
||
Now execute the writes. | ||
|
||
<!-- ```python exec="on" session="parallel" source="material-block" result="code" --> | ||
```python | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this example runs fine for me locally, but ends up not writing anything to store when running on readthedocs. |
||
from concurrent.futures import ThreadPoolExecutor, wait | ||
from icechunk.distributed import merge_sessions | ||
|
||
session = repo.writable_session("main") | ||
with ThreadPoolExecutor() as executor: | ||
# submit the writes | ||
futures = [executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"])] | ||
wait(futures) | ||
|
||
session.commit("finished writes") | ||
print(session.commit("finished writes")) | ||
``` | ||
|
||
Verify that the writes worked as expected: | ||
|
||
```python | ||
```python exec="on" session="parallel" source="material-block" result="code" | ||
ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False) | ||
xr.testing.assert_identical(ds, ondisk) | ||
``` | ||
|
@@ -134,7 +134,7 @@ with ProcessPoolExecutor() as executor: | |
|
||
# manually merge the remote sessions in to the local session | ||
session = merge_sessions(session, *sessions) | ||
session.commit("finished writes") | ||
print(session.commit("finished writes")) | ||
``` | ||
|
||
Verify that the writes worked as expected: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can directly install from github with pip too if we know the commit ID. example:
This will require maturin in the env, which we seem to have.