Skip to content

Commit

Permalink
Zarr facade learns how to get and set
Browse files Browse the repository at this point in the history
Also:
* Dataset learns how to `get_chunk` and `set_chunk`
* Dataset has `get_array` and `get_group` to complement `get_node`
* Facade learns `get_partial_value`
* Facade learns `exists`
* Better zarr key parsing
  • Loading branch information
paraseba committed Aug 20, 2024
1 parent 33fefe9 commit e701dae
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 152 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ thiserror = "1.0.63"
serde_json = "1.0.125"
serde = { version = "1.0.208", features = ["derive"] }
serde_with = "3.9.0"
tokio = { version = "1.39.2", features = ["rt-multi-thread", "macros"] }

[profile.release-with-debug]
inherits = "release"
Expand All @@ -28,4 +29,3 @@ debug = true
[dev-dependencies]
pretty_assertions = "1.4.0"
proptest = "1.0.0"
tokio = { version = "1.39.2", features = ["rt-multi-thread", "macros"] }
4 changes: 2 additions & 2 deletions examples/low_level_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ ds.flush().await?;
println!();
println!();
println!("## Adding an inline chunk");
ds.set_chunk(
ds.set_chunk_ref(
array1_path.clone(),
ArrayIndices(vec![0]),
Some(ChunkPayload::Inline("hello".into())),
Expand Down Expand Up @@ -210,7 +210,7 @@ let mut ds = Dataset::update(Arc::clone(&storage), ObjectId.from("{v2_id:?}"));
print_nodes(&ds).await;

println!("## Adding a new inline chunk");
ds.set_chunk(
ds.set_chunk_ref(
array1_path.clone(),
ArrayIndices(vec![1]),
Some(icechunk::ChunkPayload::Inline("bye".into())),
Expand Down
90 changes: 70 additions & 20 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ impl ChangeSet {
self.updated_attributes.get(path)
}

fn set_chunk(&mut self, path: Path, coord: ArrayIndices, data: Option<ChunkPayload>) {
fn set_chunk_ref(
&mut self,
path: Path,
coord: ArrayIndices,
data: Option<ChunkPayload>,
) {
self.set_chunks
.entry(path)
.and_modify(|h| {
Expand Down Expand Up @@ -258,21 +263,18 @@ impl Dataset {
// Record the write, referenceing or delete of a chunk
//
// Caller has to write the chunk before calling this.
pub async fn set_chunk(
pub async fn set_chunk_ref(
&mut self,
path: Path,
coord: ArrayIndices,
data: Option<ChunkPayload>,
) -> Result<(), UpdateNodeError> {
match self
.get_node(&path)
.await
.map_err(|_| UpdateNodeError::NotFound(path.clone()))?
{
NodeStructure { node_data: NodeData::Array(..), .. } => {
self.change_set.set_chunk(path, coord, data);
match self.get_node(&path).await {
Ok(NodeStructure { node_data: NodeData::Array(..), .. }) => {
self.change_set.set_chunk_ref(path, coord, data);
Ok(())
}
Err(GetNodeError::NotFound(path)) => Err(UpdateNodeError::NotFound(path)),
_ => Err(UpdateNodeError::NotAnArray(path)),
}
}
Expand Down Expand Up @@ -317,6 +319,26 @@ impl Dataset {
}
}

pub async fn get_array(&self, path: &Path) -> Result<NodeStructure, GetNodeError> {
match self.get_node(path).await {
res @ Ok(NodeStructure { node_data: NodeData::Array(..), .. }) => res,
Ok(NodeStructure { node_data: NodeData::Group, .. }) => {
Err(GetNodeError::NotFound(path.clone()))
}
other => other,
}
}

pub async fn get_group(&self, path: &Path) -> Result<NodeStructure, GetNodeError> {
match self.get_node(path).await {
res @ Ok(NodeStructure { node_data: NodeData::Group, .. }) => res,
Ok(NodeStructure { node_data: NodeData::Array(..), .. }) => {
Err(GetNodeError::NotFound(path.clone()))
}
other => other,
}
}

async fn get_existing_node(
&self,
path: &Path,
Expand All @@ -325,6 +347,7 @@ impl Dataset {
let structure_id =
self.structure_id.as_ref().ok_or(GetNodeError::NotFound(path.clone()))?;
let structure = self.storage.fetch_structure(structure_id).await?;

let session_atts = self
.change_set
.get_user_attributes(path)
Expand Down Expand Up @@ -429,6 +452,32 @@ impl Dataset {
}
}

pub async fn set_chunk(
&mut self,
path: &Path,
coord: &ArrayIndices,
data: Bytes,
) -> Result<(), UpdateNodeError> {
// TODO: support inline chunks
match self.get_array(path).await {
Ok(_) => {
let new_id = ObjectId::random();
self.storage
.write_chunk(new_id.clone(), data.clone())
.await
.map_err(UpdateNodeError::StorageError)?;
let payload = ChunkPayload::Ref(ChunkRef {
id: new_id,
offset: 0,
length: data.len() as u64,
});
self.change_set.set_chunk_ref(path.clone(), coord.clone(), Some(payload));
Ok(())
}
Err(_) => Err(UpdateNodeError::NotFound(path.clone())),
}
}

async fn get_old_chunk(
&self,
manifests: &[ManifestRef],
Expand Down Expand Up @@ -885,7 +934,7 @@ mod tests {
);

// set a chunk for the new array and retrieve it
ds.set_chunk(
ds.set_chunk_ref(
new_array_path.clone(),
ArrayIndices(vec![0]),
Some(ChunkPayload::Inline("foo".into())),
Expand Down Expand Up @@ -928,7 +977,7 @@ mod tests {
}

// set old array chunk and check them
ds.set_chunk(
ds.set_chunk_ref(
array1_path.clone(),
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline("bac".into())),
Expand Down Expand Up @@ -977,25 +1026,25 @@ mod tests {
change_set.add_array("foo/baz".into(), 2, zarr_meta);
assert_eq!(None, change_set.new_arrays_chunk_iterator().next());

change_set.set_chunk("foo/bar".into(), ArrayIndices(vec![0, 1]), None);
change_set.set_chunk_ref("foo/bar".into(), ArrayIndices(vec![0, 1]), None);
assert_eq!(None, change_set.new_arrays_chunk_iterator().next());

change_set.set_chunk(
change_set.set_chunk_ref(
"foo/bar".into(),
ArrayIndices(vec![1, 0]),
Some(ChunkPayload::Inline("bar1".into())),
);
change_set.set_chunk(
change_set.set_chunk_ref(
"foo/bar".into(),
ArrayIndices(vec![1, 1]),
Some(ChunkPayload::Inline("bar2".into())),
);
change_set.set_chunk(
change_set.set_chunk_ref(
"foo/baz".into(),
ArrayIndices(vec![0]),
Some(ChunkPayload::Inline("baz1".into())),
);
change_set.set_chunk(
change_set.set_chunk_ref(
"foo/baz".into(),
ArrayIndices(vec![1]),
Some(ChunkPayload::Inline("baz2".into())),
Expand Down Expand Up @@ -1093,7 +1142,7 @@ mod tests {
let _structure_id = ds.flush().await?;

// we set a chunk in a new array
ds.set_chunk(
ds.set_chunk_ref(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline("hello".into())),
Expand Down Expand Up @@ -1134,15 +1183,15 @@ mod tests {
);

// we modify a chunk in an existing array
ds.set_chunk(
ds.set_chunk_ref(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline("bye".into())),
)
.await?;

// we add a new chunk in an existing array
ds.set_chunk(
ds.set_chunk_ref(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 1]),
Some(ChunkPayload::Inline("new chunk".into())),
Expand All @@ -1160,7 +1209,8 @@ mod tests {
);

// we delete a chunk
ds.set_chunk(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None).await?;
ds.set_chunk_ref(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None)
.await?;

let new_meta = ZarrArrayMetadata { shape: vec![1, 1, 1], ..zarr_meta };
// we change zarr metadata
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ pub enum AddNodeError {
AlreadyExists(Path),
}

#[derive(Clone, Debug, PartialEq, Eq, Error)]
#[derive(Debug, Error)]
pub enum DeleteNodeError {
#[error("node not found at `{0}`")]
NotFound(Path),
Expand All @@ -688,20 +688,21 @@ pub enum DeleteNodeError {
NotAGroup(Path),
}

#[derive(Clone, Debug, PartialEq, Eq, Error)]
#[derive(Debug, Error)]
pub enum UpdateNodeError {
#[error("node not found at `{0}`")]
NotFound(Path),
#[error("there is not an array at `{0}`")]
NotAnArray(Path),
// TODO: Don't we need a NotAGroup here?
#[error("error contacting storage")]
StorageError(#[from] StorageError),
}

#[derive(Debug, Error)]
pub enum GetNodeError {
#[error("node not found at `{0}`")]
NotFound(Path),
#[error("storage error when searching for node")]
#[error("error contacting storage")]
StorageError(#[from] StorageError),
}

Expand Down
7 changes: 6 additions & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use base64::{engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, Engine as _};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
fs::create_dir_all,
ops::Range,
sync::{Arc, RwLock},
Expand Down Expand Up @@ -219,6 +219,11 @@ impl InMemoryStorage {
chunk_files: Arc::new(RwLock::new(HashMap::new())),
}
}

/// Intended for tests
pub fn chunk_ids(&self) -> HashSet<ObjectId> {
self.chunk_files.read().unwrap().keys().cloned().collect()
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit e701dae

Please sign in to comment.