Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement missing methods in InMemStorage #14

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes = "1.7.1"
futures = "0.3.30"
itertools = "0.13.0"
rand = "0.8.5"
thiserror = "1.0.63"

[profile.release-with-debug]
inherits = "release"
Expand Down
100 changes: 37 additions & 63 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use futures::{Stream, StreamExt};
use itertools::Either;
use thiserror::Error;

use crate::{
manifest::mk_manifests_table, structure::mk_structure_table, AddNodeError,
Expand Down Expand Up @@ -127,7 +128,7 @@ impl Dataset {
self.change_set.add_group(path, id);
Ok(())
} else {
Err(AddNodeError::AlreadyExists)
Err(AddNodeError::AlreadyExists(path))
}
}

Expand All @@ -144,7 +145,7 @@ impl Dataset {
self.change_set.add_array(path, id, metadata);
Ok(())
} else {
Err(AddNodeError::AlreadyExists)
Err(AddNodeError::AlreadyExists(path))
}
}

Expand All @@ -157,12 +158,12 @@ impl Dataset {
metadata: ZarrArrayMetadata,
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
None => Err(UpdateNodeError::NotFound(path)),
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
self.change_set.update_array(path, metadata);
Ok(())
}
Some(_) => Err(UpdateNodeError::NotAnArray),
Some(_) => Err(UpdateNodeError::NotAnArray(path)),
}
}

Expand All @@ -173,7 +174,7 @@ impl Dataset {
atts: Option<UserAttributes>,
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
None => Err(UpdateNodeError::NotFound(path)),
Some(_) => {
self.change_set.update_user_attributes(path, atts);
Ok(())
Expand All @@ -191,12 +192,12 @@ impl Dataset {
data: Option<ChunkPayload>,
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
None => Err(UpdateNodeError::NotFound(path)),
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
self.change_set.set_chunk(path, coord, data);
Ok(())
}
Some(_) => Err(UpdateNodeError::NotAnArray),
Some(_) => Err(UpdateNodeError::NotAnArray(path)),
}
}

Expand Down Expand Up @@ -553,16 +554,14 @@ impl Dataset {
let new_manifest_id = ObjectId::random();
self.storage
.write_manifests(new_manifest_id.clone(), Arc::new(new_manifest))
.await
.map_err(FlushError::StorageError)?;
.await?;

let all_nodes = self.updated_nodes(&new_manifest_id, &region_tracker).await;
let new_structure = mk_structure_table(all_nodes);
let new_structure_id = ObjectId::random();
self.storage
.write_structure(new_structure_id.clone(), Arc::new(new_structure))
.await
.map_err(FlushError::StorageError)?;
.await?;

self.structure_id = Some(new_structure_id.clone());
self.change_set = ChangeSet::default();
Expand All @@ -587,10 +586,12 @@ impl TableRegionTracker {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum FlushError {
#[error("no changes made to the dataset")]
NoChangesToFlush,
StorageError(StorageError),
#[error("error contacting storage")]
StorageError(#[from] StorageError),
}
Comment on lines +591 to 595
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this come from thiserror?


#[cfg(test)]
Expand Down Expand Up @@ -636,10 +637,7 @@ mod tests {
.await,
);
let manifest_id = ObjectId::random();
storage
.write_manifests(manifest_id.clone(), manifest)
.await
.map_err(|err| format!("{err:#?}"))?;
storage.write_manifests(manifest_id.clone(), manifest).await?;

let zarr_meta1 = ZarrArrayMetadata {
shape: vec![2, 2, 2],
Expand Down Expand Up @@ -685,20 +683,15 @@ mod tests {

let structure = Arc::new(mk_structure_table(nodes.clone()));
let structure_id = ObjectId::random();
storage
.write_structure(structure_id.clone(), structure)
.await
.map_err(|err| format!("{err:#?}"))?;
storage.write_structure(structure_id.clone(), structure).await?;
let mut ds = Dataset::update(Arc::new(storage), structure_id);

// retrieve the old array node
let node = ds.get_node(&array1_path).await;
assert_eq!(nodes.get(1), node.as_ref());

// add a new array and retrieve its node
ds.add_group("/group".to_string().into())
.await
.map_err(|err| format!("{err:#?}"))?;
ds.add_group("/group".to_string().into()).await?;

let zarr_meta2 = ZarrArrayMetadata {
shape: vec![3],
Expand All @@ -712,9 +705,7 @@ mod tests {
};

let new_array_path: PathBuf = "/group/array2".to_string().into();
ds.add_array(new_array_path.clone(), zarr_meta2.clone())
.await
.map_err(|err| format!("{err:#?}"))?;
ds.add_array(new_array_path.clone(), zarr_meta2.clone()).await?;

let node = ds.get_node(&new_array_path).await;
assert_eq!(
Expand All @@ -729,8 +720,7 @@ mod tests {

// set user attributes for the new array and retrieve them
ds.set_user_attributes(new_array_path.clone(), Some("{n:42}".to_string()))
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;
let node = ds.get_node(&new_array_path).await;
assert_eq!(
node,
Expand All @@ -750,8 +740,7 @@ mod tests {
ArrayIndices(vec![0]),
Some(ChunkPayload::Inline(vec![0, 0, 0, 7])),
)
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;

let chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 7])));
Expand All @@ -762,8 +751,7 @@ mod tests {

// update old array use attriutes and check them
ds.set_user_attributes(array1_path.clone(), Some("{updated: true}".to_string()))
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;
let node = ds.get_node(&array1_path).await.unwrap();
assert_eq!(
node.user_attributes,
Expand All @@ -772,9 +760,7 @@ mod tests {

// update old array zarr metadata and check it
let new_zarr_meta1 = ZarrArrayMetadata { shape: vec![2, 2, 3], ..zarr_meta1 };
ds.update_array(array1_path.clone(), new_zarr_meta1)
.await
.map_err(|err| format!("{err:#?}"))?;
ds.update_array(array1_path.clone(), new_zarr_meta1).await?;
let node = ds.get_node(&array1_path).await;
if let Some(NodeStructure {
node_data: NodeData::Array(ZarrArrayMetadata { shape, .. }, _),
Expand All @@ -792,8 +778,7 @@ mod tests {
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline(vec![0, 0, 0, 99])),
)
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;

let chunk = ds.get_chunk_ref(&array1_path, &ArrayIndices(vec![0, 0, 0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99])));
Expand Down Expand Up @@ -888,8 +873,8 @@ mod tests {
let mut ds = Dataset::create(Arc::clone(&storage));

// add a new array and retrieve its node
ds.add_group("/".into()).await.map_err(|err| format!("{err:#?}"))?;
let structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?;
ds.add_group("/".into()).await?;
let structure_id = ds.flush().await?;

assert_eq!(Some(structure_id), ds.structure_id);
assert_eq!(
Expand All @@ -901,8 +886,8 @@ mod tests {
node_data: NodeData::Group
})
);
ds.add_group("/group".into()).await.map_err(|err| format!("{err:#?}"))?;
let _structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?;
ds.add_group("/group".into()).await?;
let _structure_id = ds.flush().await?;
assert_eq!(
ds.get_node(&"/".into()).await,
Some(NodeStructure {
Expand Down Expand Up @@ -933,20 +918,17 @@ mod tests {
};

let new_array_path: PathBuf = "/group/array1".to_string().into();
ds.add_array(new_array_path.clone(), zarr_meta.clone())
.await
.map_err(|err| format!("{err:#?}"))?;
ds.add_array(new_array_path.clone(), zarr_meta.clone()).await?;

// we set a chunk in a new array
ds.set_chunk(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline(b"hello".into())),
)
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;

let _structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?;
let _structure_id = ds.flush().await?;
assert_eq!(
ds.get_node(&"/".into()).await,
Some(NodeStructure {
Expand Down Expand Up @@ -985,20 +967,17 @@ mod tests {
ArrayIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline(b"bye".into())),
)
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;

// we add a new chunk in an existing array
ds.set_chunk(
new_array_path.clone(),
ArrayIndices(vec![0, 0, 1]),
Some(ChunkPayload::Inline(b"new chunk".into())),
)
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;

let previous_structure_id =
ds.flush().await.map_err(|err| format!("{err:#?}"))?;
let previous_structure_id = ds.flush().await?;
assert_eq!(
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"bye".into()))
Expand All @@ -1009,22 +988,17 @@ mod tests {
);

// we delete a chunk
ds.set_chunk(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None)
.await
.map_err(|err| format!("{err:#?}"))?;
ds.set_chunk(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None).await?;

let new_meta = ZarrArrayMetadata { shape: vec![1, 1, 1], ..zarr_meta };
// we change zarr metadata
ds.update_array(new_array_path.clone(), new_meta.clone())
.await
.map_err(|err| format!("{err:#?}"))?;
ds.update_array(new_array_path.clone(), new_meta.clone()).await?;

// we change user attributes metadata
ds.set_user_attributes(new_array_path.clone(), Some("{foo:42}".to_string()))
.await
.map_err(|err| format!("{err:#?}"))?;
.await?;

let structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?;
let structure_id = ds.flush().await?;
let ds = Dataset::update(Arc::clone(&storage), structure_id);

assert_eq!(
Expand Down
20 changes: 13 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::{
sync::Arc,
};
use structure::StructureTable;
use thiserror::Error;

#[derive(Debug, Clone)]
pub enum IcechunkFormatError {
Expand Down Expand Up @@ -531,20 +532,25 @@ pub struct ChunkInfo {
pub struct AttributesTable();

// FIXME: implement std::error::Error for these
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Error)]
pub enum AddNodeError {
AlreadyExists,
#[error("node already exists at `{0}`")]
AlreadyExists(Path),
}

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Error)]
pub enum UpdateNodeError {
NotFound,
NotAnArray,
#[error("node not found at `{0}`")]
NotFound(Path),
#[error("there is not an array at `{0}`")]
NotAnArray(Path),
}

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Error)]
pub enum StorageError {
NotFound,
#[error("object not found `{0:?}`")]
NotFound(ObjectId),
#[error("synchronization error on the Storage instance")]
Deadlock,
}

Expand Down
Loading