Skip to content

Commit

Permalink
Introduce Zarr store interface
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba committed Aug 17, 2024
1 parent 7009d23 commit 3fa48f7
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 67 deletions.
6 changes: 3 additions & 3 deletions examples/low_level_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ ds.set_user_attributes(array1_path.clone(), Some("{{n:42}}".to_string())).await?
```
"#,
);
ds.set_user_attributes(array1_path.clone(), Some("{n:42}".to_string())).await?;
ds.set_user_attributes(array1_path.clone(), Some("{n:42}".into())).await?;
print_nodes(&ds).await;

println!("## Committing");
Expand All @@ -141,7 +141,7 @@ ds.flush().await?;
ds.set_chunk(
array1_path.clone(),
ArrayIndices(vec![0]),
Some(ChunkPayload::Inline(b"hello".into())),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;
println!(
Expand Down Expand Up @@ -197,7 +197,7 @@ let mut ds = Dataset::update(Arc::clone(&storage), ObjectId.from("{v2_id:?}"));
ds.set_chunk(
array1_path.clone(),
ArrayIndices(vec![1]),
Some(icechunk::ChunkPayload::Inline(b"bye".into())),
Some(icechunk::ChunkPayload::Inline("bye".into())),
)
.await?;

Expand Down
99 changes: 53 additions & 46 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
sync::Arc,
};

use bytes::Bytes;
use futures::{Stream, StreamExt};
use itertools::Either;
use thiserror::Error;
Expand Down Expand Up @@ -314,6 +315,14 @@ impl Dataset {
}
}

pub async fn get_chunk(&self, path: &Path, coords: &ArrayIndices) -> Option<Bytes> {
match self.get_chunk_ref(path, coords).await? {
ChunkPayload::Inline(bytes) => Some(bytes),
ChunkPayload::Virtual(_) => todo!(),
ChunkPayload::Ref(_) => todo!(),
}
}

async fn get_old_chunk(
&self,
manifests: &[ManifestRef],
Expand Down Expand Up @@ -607,7 +616,7 @@ pub enum FlushError {

#[cfg(test)]
mod tests {
use std::{collections::HashSet, error::Error, num::NonZeroU64, path::PathBuf};
use std::{error::Error, num::NonZeroU64, path::PathBuf};

use crate::{
manifest::mk_manifests_table, storage::InMemoryStorage,
Expand All @@ -617,6 +626,7 @@ mod tests {
};

use super::*;
use itertools::Itertools;
use pretty_assertions::assert_eq;

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -637,7 +647,7 @@ mod tests {
let chunk2 = ChunkInfo {
node: array_id,
coord: ArrayIndices(vec![0, 0, 1]),
payload: ChunkPayload::Inline(vec![0, 0, 0, 42]),
payload: ChunkPayload::Inline("hello".into()),
};

let manifest = Arc::new(
Expand Down Expand Up @@ -685,9 +695,7 @@ mod tests {
NodeStructure {
path: array1_path.clone(),
id: array_id,
user_attributes: Some(UserAttributesStructure::Inline(
"{foo:1}".to_string(),
)),
user_attributes: Some(UserAttributesStructure::Inline("{foo:1}".into())),
node_data: NodeData::Array(zarr_meta1.clone(), vec![manifest_ref]),
},
];
Expand Down Expand Up @@ -730,17 +738,14 @@ mod tests {
);

// set user attributes for the new array and retrieve them
ds.set_user_attributes(new_array_path.clone(), Some("{n:42}".to_string()))
.await?;
ds.set_user_attributes(new_array_path.clone(), Some("{n:42}".into())).await?;
let node = ds.get_node(&new_array_path).await;
assert_eq!(
node,
Some(NodeStructure {
path: "/group/array2".into(),
id: 4,
user_attributes: Some(UserAttributesStructure::Inline(
"{n:42}".to_string(),
)),
user_attributes: Some(UserAttributesStructure::Inline("{n:42}".into(),)),
node_data: NodeData::Array(zarr_meta2.clone(), vec![]),
})
);
Expand All @@ -749,24 +754,24 @@ mod tests {
ds.set_chunk(
new_array_path.clone(),
ArrayIndices(vec![0]),
Some(ChunkPayload::Inline(vec![0, 0, 0, 7])),
Some(ChunkPayload::Inline("foo".into())),
)
.await?;

let chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 7])));
assert_eq!(chunk, Some(ChunkPayload::Inline("foo".into())));

// retrieve a non initialized chunk of the new array
let non_chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![1])).await;
assert_eq!(non_chunk, None);

// update old array use attriutes and check them
ds.set_user_attributes(array1_path.clone(), Some("{updated: true}".to_string()))
ds.set_user_attributes(array1_path.clone(), Some("{updated: true}".into()))
.await?;
let node = ds.get_node(&array1_path).await.unwrap();
assert_eq!(
node.user_attributes,
Some(UserAttributesStructure::Inline("{updated: true}".to_string()))
Some(UserAttributesStructure::Inline("{updated: true}".into()))
);

// update old array zarr metadata and check it
Expand All @@ -787,12 +792,12 @@ mod tests {
ds.set_chunk(
array1_path.clone(),
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline(vec![0, 0, 0, 99])),
Some(ChunkPayload::Inline("bac".into())),
)
.await?;

let chunk = ds.get_chunk_ref(&array1_path, &ArrayIndices(vec![0, 0, 0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99])));
assert_eq!(chunk, Some(ChunkPayload::Inline("bac".into())));

Ok(())
}
Expand Down Expand Up @@ -831,46 +836,49 @@ mod tests {
change_set.set_chunk(
"foo/bar".into(),
ArrayIndices(vec![1, 0]),
Some(ChunkPayload::Inline(b"bar1".into())),
Some(ChunkPayload::Inline("bar1".into())),
);
change_set.set_chunk(
"foo/bar".into(),
ArrayIndices(vec![1, 1]),
Some(ChunkPayload::Inline(b"bar2".into())),
Some(ChunkPayload::Inline("bar2".into())),
);
change_set.set_chunk(
"foo/baz".into(),
ArrayIndices(vec![0]),
Some(ChunkPayload::Inline(b"baz1".into())),
Some(ChunkPayload::Inline("baz1".into())),
);
change_set.set_chunk(
"foo/baz".into(),
ArrayIndices(vec![1]),
Some(ChunkPayload::Inline(b"baz2".into())),
Some(ChunkPayload::Inline("baz2".into())),
);

{
let all_chunks: HashSet<_> = change_set.new_arrays_chunk_iterator().collect();
let expected_chunks = [
ChunkInfo {
node: 1,
coord: ArrayIndices(vec![1, 0]),
payload: ChunkPayload::Inline(b"bar1".into()),
},
ChunkInfo {
node: 1,
coord: ArrayIndices(vec![1, 1]),
payload: ChunkPayload::Inline(b"bar2".into()),
},
let all_chunks: Vec<_> = change_set
.new_arrays_chunk_iterator()
.sorted_by_key(|c| c.coord.clone())
.collect();
let expected_chunks: Vec<_> = [
ChunkInfo {
node: 2,
coord: ArrayIndices(vec![0]),
payload: ChunkPayload::Inline(b"baz1".into()),
payload: ChunkPayload::Inline("baz1".into()),
},
ChunkInfo {
node: 2,
coord: ArrayIndices(vec![1]),
payload: ChunkPayload::Inline(b"baz2".into()),
payload: ChunkPayload::Inline("baz2".into()),
},
ChunkInfo {
node: 1,
coord: ArrayIndices(vec![1, 0]),
payload: ChunkPayload::Inline("bar1".into()),
},
ChunkInfo {
node: 1,
coord: ArrayIndices(vec![1, 1]),
payload: ChunkPayload::Inline("bar2".into()),
},
]
.into();
Expand Down Expand Up @@ -938,7 +946,7 @@ mod tests {
ds.set_chunk(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline(b"hello".into())),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;

Expand Down Expand Up @@ -972,33 +980,33 @@ mod tests {
));
assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"hello".into()))
Some(ChunkPayload::Inline("hello".into()))
);

// we modify a chunk in an existing array
ds.set_chunk(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline(b"bye".into())),
Some(ChunkPayload::Inline("bye".into())),
)
.await?;

// we add a new chunk in an existing array
ds.set_chunk(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 1]),
Some(ChunkPayload::Inline(b"new chunk".into())),
Some(ChunkPayload::Inline("new chunk".into())),
)
.await?;

let previous_structure_id = ds.flush().await?;
assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"bye".into()))
Some(ChunkPayload::Inline("bye".into()))
);
assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
Some(ChunkPayload::Inline(b"new chunk".into()))
Some(ChunkPayload::Inline("new chunk".into()))
);

// we delete a chunk
Expand All @@ -1009,15 +1017,14 @@ mod tests {
ds.update_array(new_array_path.clone(), new_meta.clone()).await?;

// we change user attributes metadata
ds.set_user_attributes(new_array_path.clone(), Some("{foo:42}".to_string()))
.await?;
ds.set_user_attributes(new_array_path.clone(), Some("{foo:42}".into())).await?;

let structure_id = ds.flush().await?;
let ds = Dataset::update(Arc::clone(&storage), structure_id);

assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"bye".into()))
Some(ChunkPayload::Inline("bye".into()))
);
assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
Expand All @@ -1030,18 +1037,18 @@ mod tests {
path,
user_attributes: Some(atts),
node_data: NodeData::Array(meta, manifests)
}) if path == new_array_path && meta == new_meta.clone() && manifests.len() == 1 && atts == UserAttributesStructure::Inline("{foo:42}".to_string())
}) if path == new_array_path && meta == new_meta.clone() && manifests.len() == 1 && atts == UserAttributesStructure::Inline("{foo:42}".into())
));

//test the previous version is still alive
let ds = Dataset::update(Arc::clone(&storage), previous_structure_id);
assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"bye".into()))
Some(ChunkPayload::Inline("bye".into()))
);
assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
Some(ChunkPayload::Inline(b"new chunk".into()))
Some(ChunkPayload::Inline("new chunk".into()))
);

Ok(())
Expand Down
17 changes: 9 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod dataset;
pub mod manifest;
pub mod storage;
pub mod structure;
pub mod zarr;

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -49,7 +50,7 @@ pub enum IcechunkFormatError {
NullFillValueError,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
/// An ND index to an element in an array.
pub struct ArrayIndices(pub Vec<u64>);

Expand Down Expand Up @@ -403,14 +404,14 @@ pub struct StorageTransformers(pub String); // FIXME: define

pub type DimensionName = String;

pub type UserAttributes = String; // FIXME: better definition
pub type UserAttributes = Bytes; // FIXME: better definition

/// The internal id of an array or group, unique only to a single store version
pub type NodeId = u32;

/// The id of a file in object store
/// FIXME: should this be passed by ref everywhere?
#[derive(Hash, Clone, PartialEq, Eq)]
#[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ObjectId([u8; 16]); // FIXME: this doesn't need to be this big

impl ObjectId {
Expand Down Expand Up @@ -515,28 +516,28 @@ impl NodeStructure {
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct VirtualChunkRef {
location: String, // FIXME: better type
offset: u64,
length: u64,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ChunkRef {
id: ObjectId, // FIXME: better type
offset: u64,
length: u64,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum ChunkPayload {
Inline(Vec<u8>), // FIXME: optimize copies
Inline(Bytes), // FIXME: optimize copies
Virtual(VirtualChunkRef),
Ref(ChunkRef),
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ChunkInfo {
node: NodeId,
coord: ArrayIndices,
Expand Down
Loading

0 comments on commit 3fa48f7

Please sign in to comment.