From e4b6f006b16b1bd5ec1d31b9c08ff84e3dc1d1a2 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 9 Jan 2025 12:34:20 -0700 Subject: [PATCH] Unpin zarr & xarray (#555) * Unpin zarr & xarray * Sync with latest python api * Update icechunk-python/python/icechunk/store.py Co-authored-by: Deepak Cherian * Add tests for other range request types * Change icehunk internal BYteRrange to match python * Lint * Better byterequest impl * last -> until * Fix ranging --------- Co-authored-by: Matthew Iannucci --- icechunk-python/pyproject.toml | 4 +- icechunk-python/python/icechunk/store.py | 39 ++++++++++++++++--- icechunk-python/tests/test_zarr/test_group.py | 3 +- .../tests/test_zarr/test_store/test_core.py | 2 +- .../test_store/test_icechunk_store.py | 15 ++++++- icechunk/src/format/mod.rs | 10 +++-- icechunk/src/refs.rs | 4 +- icechunk/src/session.rs | 13 +++++-- icechunk/src/storage/object_store.rs | 22 ++--------- 9 files changed, 72 insertions(+), 40 deletions(-) diff --git a/icechunk-python/pyproject.toml b/icechunk-python/pyproject.toml index e4892fcf..ec934d6d 100644 --- a/icechunk-python/pyproject.toml +++ b/icechunk-python/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ license = { text = "Apache-2.0" } dynamic = ["version"] -dependencies = ["zarr==3.0.0rc2"] +dependencies = ["zarr>=3"] [tool.poetry] name = "icechunk" @@ -39,7 +39,7 @@ test = [ "ruff", "dask>=2024.11.0", "distributed>=2024.11.0", - "xarray@git+https://github.com/pydata/xarray.git@main", + "xarray>=2025.01.1", "hypothesis", "pandas-stubs", "boto3-stubs[s3]", diff --git a/icechunk-python/python/icechunk/store.py b/icechunk-python/python/icechunk/store.py index fe94a92b..d2788e51 100644 --- a/icechunk-python/python/icechunk/store.py +++ b/icechunk-python/python/icechunk/store.py @@ -3,7 +3,13 @@ from typing import TYPE_CHECKING, Any from icechunk._icechunk_python import PyStore -from zarr.abc.store import ByteRangeRequest, Store +from zarr.abc.store import ( + ByteRequest, + OffsetByteRequest, + RangeByteRequest, + Store, + SuffixByteRequest, +) from zarr.core.buffer import Buffer, BufferPrototype from zarr.core.common import BytesLike from zarr.core.sync import SyncMixin @@ -12,6 +18,20 @@ from icechunk import Session +def _byte_request_to_tuple( + byte_request: ByteRequest | None, +) -> tuple[int | None, int | None]: + match byte_request: + case None: + return (None, None) + case RangeByteRequest(start, end): + return (start, end) + case OffsetByteRequest(offset): + return (offset, None) + case SuffixByteRequest(suffix): + return (None, suffix) + + class IcechunkStore(Store, SyncMixin): _store: PyStore @@ -93,14 +113,20 @@ async def get( self, key: str, prototype: BufferPrototype, - byte_range: tuple[int | None, int | None] | None = None, + byte_range: ByteRequest | None = None, ) -> Buffer | None: """Retrieve the value associated with a given key. Parameters ---------- key : str - byte_range : tuple[int, Optional[int]], optional + byte_range : ByteRequest, optional + + ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. + + - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. + - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. + - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header. Returns ------- @@ -108,7 +134,7 @@ async def get( """ try: - result = await self._store.get(key, byte_range) + result = await self._store.get(key, _byte_request_to_tuple(byte_range)) except KeyError as _e: # Zarr python expects None to be returned if the key does not exist # but an IcechunkStore returns an error if the key does not exist @@ -119,7 +145,7 @@ async def get( async def get_partial_values( self, prototype: BufferPrototype, - key_ranges: Iterable[tuple[str, ByteRangeRequest]], + key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: """Retrieve possibly partial values from given key_ranges. @@ -134,7 +160,8 @@ async def get_partial_values( """ # NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it # to a list here first. Possible opportunity for optimization. - result = await self._store.get_partial_values(list(key_ranges)) + ranges = [(k[0], _byte_request_to_tuple(k[1])) for k in key_ranges] + result = await self._store.get_partial_values(list(ranges)) return [prototype.buffer.from_bytes(r) for r in result] async def exists(self, key: str) -> bool: diff --git a/icechunk-python/tests/test_zarr/test_group.py b/icechunk-python/tests/test_zarr/test_group.py index 04e0e809..8f90c643 100644 --- a/icechunk-python/tests/test_zarr/test_group.py +++ b/icechunk-python/tests/test_zarr/test_group.py @@ -18,7 +18,8 @@ from zarr.core.group import GroupMetadata from zarr.core.sync import sync from zarr.errors import ContainsArrayError, ContainsGroupError -from zarr.storage import StorePath, make_store_path +from zarr.storage import StorePath +from zarr.storage._common import make_store_path @pytest.fixture(params=["memory"]) diff --git a/icechunk-python/tests/test_zarr/test_store/test_core.py b/icechunk-python/tests/test_zarr/test_store/test_core.py index 332e8008..de465e7f 100644 --- a/icechunk-python/tests/test_zarr/test_store/test_core.py +++ b/icechunk-python/tests/test_zarr/test_store/test_core.py @@ -1,6 +1,6 @@ from icechunk import IcechunkStore from tests.conftest import parse_repo -from zarr.storage import make_store_path +from zarr.storage._common import make_store_path async def test_make_store_path() -> None: diff --git a/icechunk-python/tests/test_zarr/test_store/test_icechunk_store.py b/icechunk-python/tests/test_zarr/test_store/test_icechunk_store.py index b5c09610..b8e0a9c2 100644 --- a/icechunk-python/tests/test_zarr/test_store/test_icechunk_store.py +++ b/icechunk-python/tests/test_zarr/test_store/test_icechunk_store.py @@ -8,6 +8,7 @@ from icechunk import IcechunkStore, local_filesystem_storage from icechunk.repository import Repository +from zarr.abc.store import OffsetByteRequest, RangeByteRequest, SuffixByteRequest from zarr.core.buffer import Buffer, cpu, default_buffer_prototype from zarr.core.sync import collect_aiterator from zarr.testing.store import StoreTests @@ -226,15 +227,25 @@ async def test_get_partial_values( values = await store.get_partial_values( default_buffer_prototype(), [ - ("zarr.json", (0, 5)), + ("zarr.json", RangeByteRequest(0, 5)), + ("zarr.json", SuffixByteRequest(5)), + ("zarr.json", OffsetByteRequest(10)), ], ) - assert len(values) == 1 + assert len(values) == 3 data = values[0].to_bytes() assert len(data) == 5 assert data == DEFAULT_GROUP_METADATA[:5] + data = values[1].to_bytes() + assert len(data) == len(DEFAULT_GROUP_METADATA) - 5 + assert data == DEFAULT_GROUP_METADATA[:-5] + + data = values[2].to_bytes() + assert len(data) == len(DEFAULT_GROUP_METADATA) - 10 + assert data == DEFAULT_GROUP_METADATA[10:] + async def test_set(self, store: IcechunkStore) -> None: await store.set("zarr.json", self.buffer_cls.from_bytes(DEFAULT_GROUP_METADATA)) assert await store.exists("zarr.json") diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index 009618c7..3c90c777 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -150,8 +150,10 @@ pub enum ByteRange { Bounded(Range), /// All bytes from the given offset (included) to the end of the object From(ChunkOffset), - /// Last n bytes in the object + /// The last n bytes in the object Last(ChunkLength), + /// All bytes up to the last n bytes in the object + Until(ChunkOffset), } impl From> for ByteRange { @@ -185,7 +187,8 @@ impl ByteRange { bytes.slice(range.start as usize..range.end as usize) } ByteRange::From(from) => bytes.slice(*from as usize..), - ByteRange::Last(n) => bytes.slice(bytes.len() - *n as usize..bytes.len()), + ByteRange::Last(n) => bytes.slice(bytes.len() - *n as usize..), + ByteRange::Until(n) => bytes.slice(0usize..bytes.len() - *n as usize), } } } @@ -195,7 +198,8 @@ impl From<(Option, Option)> for ByteRange { match (start, end) { (Some(start), Some(end)) => Self::Bounded(start..end), (Some(start), None) => Self::From(start), - (None, Some(end)) => Self::Bounded(0..end), + // NOTE: This is relied upon by zarr python + (None, Some(end)) => Self::Until(end), (None, None) => Self::ALL, } } diff --git a/icechunk/src/refs.rs b/icechunk/src/refs.rs index 94efa02d..df2597fd 100644 --- a/icechunk/src/refs.rs +++ b/icechunk/src/refs.rs @@ -262,10 +262,10 @@ pub async fn list_branches( Ok(branches) } -async fn branch_history<'a, 'b>( +async fn branch_history<'a>( storage: &'a (dyn Storage + Send + Sync), storage_settings: &storage::Settings, - branch: &'b str, + branch: &str, ) -> RefResult> + 'a> { let key = branch_root(branch)?; let all = storage.ref_versions(storage_settings, key.as_str()).await?; diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 22b3e505..1ea26fd8 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -1014,9 +1014,9 @@ async fn updated_nodes<'a>( .chain(change_set.new_nodes_iterator(manifest_id))) } -async fn get_node<'a>( +async fn get_node( asset_manager: &AssetManager, - change_set: &'a ChangeSet, + change_set: &ChangeSet, snapshot_id: &SnapshotId, path: &Path, ) -> SessionResult { @@ -1037,9 +1037,9 @@ async fn get_node<'a>( } } -async fn get_existing_node<'a>( +async fn get_existing_node( asset_manager: &AssetManager, - change_set: &'a ChangeSet, + change_set: &ChangeSet, snapshot_id: &SnapshotId, path: &Path, ) -> SessionResult { @@ -1121,6 +1121,11 @@ pub fn construct_valid_byte_range( let new_start = min(chunk_offset + n, chunk_offset + chunk_length - 1); new_start..chunk_offset + chunk_length } + ByteRange::Until(n) => { + let new_end = chunk_offset + chunk_length; + let new_start = new_end - n; + new_start..new_end + } ByteRange::Last(n) => { let new_end = chunk_offset + chunk_length; let new_start = new_end - n; diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index aca3a546..99d495be 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -1,7 +1,5 @@ use crate::{ - format::{ - ByteRange, ChunkId, ChunkOffset, FileTypeTag, ManifestId, ObjectId, SnapshotId, - }, + format::{ChunkId, ChunkOffset, FileTypeTag, ManifestId, ObjectId, SnapshotId}, private, }; use async_trait::async_trait; @@ -12,8 +10,8 @@ use futures::{ }; use object_store::{ local::LocalFileSystem, parse_url_opts, path::Path as ObjectPath, Attribute, - AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore, PutMode, - PutOptions, PutPayload, UpdateVersion, + AttributeValue, Attributes, GetOptions, ObjectMeta, ObjectStore, PutMode, PutOptions, + PutPayload, UpdateVersion, }; use serde::{Deserialize, Serialize}; use std::{ @@ -37,20 +35,6 @@ use super::{ SNAPSHOT_PREFIX, TRANSACTION_PREFIX, }; -// Get Range is object_store specific, keep it with this module -impl From<&ByteRange> for Option { - fn from(value: &ByteRange) -> Self { - match value { - ByteRange::Bounded(Range { start, end }) => { - Some(GetRange::Bounded(*start as usize..*end as usize)) - } - ByteRange::From(start) if *start == 0u64 => None, - ByteRange::From(start) => Some(GetRange::Offset(*start as usize)), - ByteRange::Last(n) => Some(GetRange::Suffix(*n as usize)), - } - } -} - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct ObjectStorageConfig { pub url: String,