diff --git a/Cargo.lock b/Cargo.lock index 449a91d4..ecb2df47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -664,6 +664,7 @@ dependencies = [ "pretty_assertions", "proptest", "rand", + "thiserror", "tokio", ] @@ -1190,6 +1191,26 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "thiserror" +version = "1.0.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tiny-keccak" version = "2.0.2" diff --git a/Cargo.toml b/Cargo.toml index b621e5ed..73265b98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.7.1" futures = "0.3.30" itertools = "0.13.0" rand = "0.8.5" +thiserror = "1.0.63" [profile.release-with-debug] inherits = "release" diff --git a/src/dataset.rs b/src/dataset.rs index d2ebddff..243b152a 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -6,6 +6,7 @@ use std::{ use futures::{Stream, StreamExt}; use itertools::Either; +use thiserror::Error; use crate::{ manifest::mk_manifests_table, structure::mk_structure_table, AddNodeError, @@ -127,7 +128,7 @@ impl Dataset { self.change_set.add_group(path, id); Ok(()) } else { - Err(AddNodeError::AlreadyExists) + Err(AddNodeError::AlreadyExists(path)) } } @@ -144,7 +145,7 @@ impl Dataset { self.change_set.add_array(path, id, metadata); Ok(()) } else { - Err(AddNodeError::AlreadyExists) + Err(AddNodeError::AlreadyExists(path)) } } @@ -157,12 +158,12 @@ impl Dataset { metadata: ZarrArrayMetadata, ) -> Result<(), UpdateNodeError> { match self.get_node(&path).await { - None => Err(UpdateNodeError::NotFound), + None => Err(UpdateNodeError::NotFound(path)), Some(NodeStructure { node_data: NodeData::Array(..), .. }) => { self.change_set.update_array(path, metadata); Ok(()) } - Some(_) => Err(UpdateNodeError::NotAnArray), + Some(_) => Err(UpdateNodeError::NotAnArray(path)), } } @@ -173,7 +174,7 @@ impl Dataset { atts: Option, ) -> Result<(), UpdateNodeError> { match self.get_node(&path).await { - None => Err(UpdateNodeError::NotFound), + None => Err(UpdateNodeError::NotFound(path)), Some(_) => { self.change_set.update_user_attributes(path, atts); Ok(()) @@ -191,12 +192,12 @@ impl Dataset { data: Option, ) -> Result<(), UpdateNodeError> { match self.get_node(&path).await { - None => Err(UpdateNodeError::NotFound), + None => Err(UpdateNodeError::NotFound(path)), Some(NodeStructure { node_data: NodeData::Array(..), .. }) => { self.change_set.set_chunk(path, coord, data); Ok(()) } - Some(_) => Err(UpdateNodeError::NotAnArray), + Some(_) => Err(UpdateNodeError::NotAnArray(path)), } } @@ -553,16 +554,14 @@ impl Dataset { let new_manifest_id = ObjectId::random(); self.storage .write_manifests(new_manifest_id.clone(), Arc::new(new_manifest)) - .await - .map_err(FlushError::StorageError)?; + .await?; let all_nodes = self.updated_nodes(&new_manifest_id, ®ion_tracker).await; let new_structure = mk_structure_table(all_nodes); let new_structure_id = ObjectId::random(); self.storage .write_structure(new_structure_id.clone(), Arc::new(new_structure)) - .await - .map_err(FlushError::StorageError)?; + .await?; self.structure_id = Some(new_structure_id.clone()); self.change_set = ChangeSet::default(); @@ -587,10 +586,12 @@ impl TableRegionTracker { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Error)] pub enum FlushError { + #[error("no changes made to the dataset")] NoChangesToFlush, - StorageError(StorageError), + #[error("error contacting storage")] + StorageError(#[from] StorageError), } #[cfg(test)] @@ -636,10 +637,7 @@ mod tests { .await, ); let manifest_id = ObjectId::random(); - storage - .write_manifests(manifest_id.clone(), manifest) - .await - .map_err(|err| format!("{err:#?}"))?; + storage.write_manifests(manifest_id.clone(), manifest).await?; let zarr_meta1 = ZarrArrayMetadata { shape: vec![2, 2, 2], @@ -685,10 +683,7 @@ mod tests { let structure = Arc::new(mk_structure_table(nodes.clone())); let structure_id = ObjectId::random(); - storage - .write_structure(structure_id.clone(), structure) - .await - .map_err(|err| format!("{err:#?}"))?; + storage.write_structure(structure_id.clone(), structure).await?; let mut ds = Dataset::update(Arc::new(storage), structure_id); // retrieve the old array node @@ -696,9 +691,7 @@ mod tests { assert_eq!(nodes.get(1), node.as_ref()); // add a new array and retrieve its node - ds.add_group("/group".to_string().into()) - .await - .map_err(|err| format!("{err:#?}"))?; + ds.add_group("/group".to_string().into()).await?; let zarr_meta2 = ZarrArrayMetadata { shape: vec![3], @@ -712,9 +705,7 @@ mod tests { }; let new_array_path: PathBuf = "/group/array2".to_string().into(); - ds.add_array(new_array_path.clone(), zarr_meta2.clone()) - .await - .map_err(|err| format!("{err:#?}"))?; + ds.add_array(new_array_path.clone(), zarr_meta2.clone()).await?; let node = ds.get_node(&new_array_path).await; assert_eq!( @@ -729,8 +720,7 @@ mod tests { // set user attributes for the new array and retrieve them ds.set_user_attributes(new_array_path.clone(), Some("{n:42}".to_string())) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; let node = ds.get_node(&new_array_path).await; assert_eq!( node, @@ -750,8 +740,7 @@ mod tests { ArrayIndices(vec![0]), Some(ChunkPayload::Inline(vec![0, 0, 0, 7])), ) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; let chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0])).await; assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 7]))); @@ -762,8 +751,7 @@ mod tests { // update old array use attriutes and check them ds.set_user_attributes(array1_path.clone(), Some("{updated: true}".to_string())) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; let node = ds.get_node(&array1_path).await.unwrap(); assert_eq!( node.user_attributes, @@ -772,9 +760,7 @@ mod tests { // update old array zarr metadata and check it let new_zarr_meta1 = ZarrArrayMetadata { shape: vec![2, 2, 3], ..zarr_meta1 }; - ds.update_array(array1_path.clone(), new_zarr_meta1) - .await - .map_err(|err| format!("{err:#?}"))?; + ds.update_array(array1_path.clone(), new_zarr_meta1).await?; let node = ds.get_node(&array1_path).await; if let Some(NodeStructure { node_data: NodeData::Array(ZarrArrayMetadata { shape, .. }, _), @@ -792,8 +778,7 @@ mod tests { ArrayIndices(vec![0, 0, 0]), Some(ChunkPayload::Inline(vec![0, 0, 0, 99])), ) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; let chunk = ds.get_chunk_ref(&array1_path, &ArrayIndices(vec![0, 0, 0])).await; assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99]))); @@ -888,8 +873,8 @@ mod tests { let mut ds = Dataset::create(Arc::clone(&storage)); // add a new array and retrieve its node - ds.add_group("/".into()).await.map_err(|err| format!("{err:#?}"))?; - let structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?; + ds.add_group("/".into()).await?; + let structure_id = ds.flush().await?; assert_eq!(Some(structure_id), ds.structure_id); assert_eq!( @@ -901,8 +886,8 @@ mod tests { node_data: NodeData::Group }) ); - ds.add_group("/group".into()).await.map_err(|err| format!("{err:#?}"))?; - let _structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?; + ds.add_group("/group".into()).await?; + let _structure_id = ds.flush().await?; assert_eq!( ds.get_node(&"/".into()).await, Some(NodeStructure { @@ -933,9 +918,7 @@ mod tests { }; let new_array_path: PathBuf = "/group/array1".to_string().into(); - ds.add_array(new_array_path.clone(), zarr_meta.clone()) - .await - .map_err(|err| format!("{err:#?}"))?; + ds.add_array(new_array_path.clone(), zarr_meta.clone()).await?; // we set a chunk in a new array ds.set_chunk( @@ -943,10 +926,9 @@ mod tests { ArrayIndices(vec![0, 0, 0]), Some(ChunkPayload::Inline(b"hello".into())), ) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; - let _structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?; + let _structure_id = ds.flush().await?; assert_eq!( ds.get_node(&"/".into()).await, Some(NodeStructure { @@ -985,8 +967,7 @@ mod tests { ArrayIndices(vec![0, 0, 0]), Some(ChunkPayload::Inline(b"bye".into())), ) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; // we add a new chunk in an existing array ds.set_chunk( @@ -994,11 +975,9 @@ mod tests { ArrayIndices(vec![0, 0, 1]), Some(ChunkPayload::Inline(b"new chunk".into())), ) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; - let previous_structure_id = - ds.flush().await.map_err(|err| format!("{err:#?}"))?; + let previous_structure_id = ds.flush().await?; assert_eq!( ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, Some(ChunkPayload::Inline(b"bye".into())) @@ -1009,22 +988,17 @@ mod tests { ); // we delete a chunk - ds.set_chunk(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None) - .await - .map_err(|err| format!("{err:#?}"))?; + ds.set_chunk(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None).await?; let new_meta = ZarrArrayMetadata { shape: vec![1, 1, 1], ..zarr_meta }; // we change zarr metadata - ds.update_array(new_array_path.clone(), new_meta.clone()) - .await - .map_err(|err| format!("{err:#?}"))?; + ds.update_array(new_array_path.clone(), new_meta.clone()).await?; // we change user attributes metadata ds.set_user_attributes(new_array_path.clone(), Some("{foo:42}".to_string())) - .await - .map_err(|err| format!("{err:#?}"))?; + .await?; - let structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?; + let structure_id = ds.flush().await?; let ds = Dataset::update(Arc::clone(&storage), structure_id); assert_eq!( diff --git a/src/lib.rs b/src/lib.rs index 161a7f7b..d6cc10ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ use std::{ sync::Arc, }; use structure::StructureTable; +use thiserror::Error; #[derive(Debug, Clone)] pub enum IcechunkFormatError { @@ -531,20 +532,25 @@ pub struct ChunkInfo { pub struct AttributesTable(); // FIXME: implement std::error::Error for these -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Error)] pub enum AddNodeError { - AlreadyExists, + #[error("node already exists at `{0}`")] + AlreadyExists(Path), } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Error)] pub enum UpdateNodeError { - NotFound, - NotAnArray, + #[error("node not found at `{0}`")] + NotFound(Path), + #[error("there is not an array at `{0}`")] + NotAnArray(Path), } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Error)] pub enum StorageError { - NotFound, + #[error("object not found `{0:?}`")] + NotFound(ObjectId), + #[error("synchronization error on the Storage instance")] Deadlock, } diff --git a/src/storage.rs b/src/storage.rs index 781b0d8e..c2e318aa 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + ops::Range, sync::{Arc, RwLock}, }; @@ -7,7 +8,8 @@ use async_trait::async_trait; use bytes::Bytes; use crate::{ - AttributesTable, ManifestsTable, ObjectId, Storage, StorageError, StructureTable, + AttributesTable, ChunkOffset, ManifestsTable, ObjectId, Storage, StorageError, + StructureTable, }; #[derive(Default)] @@ -40,7 +42,7 @@ impl Storage for InMemoryStorage { .or(Err(StorageError::Deadlock))? .get(id) .cloned() - .ok_or(StorageError::NotFound) + .ok_or(StorageError::NotFound(id.clone())) } async fn fetch_attributes( @@ -52,7 +54,7 @@ impl Storage for InMemoryStorage { .or(Err(StorageError::Deadlock))? .get(id) .cloned() - .ok_or(StorageError::NotFound) + .ok_or(StorageError::NotFound(id.clone())) } async fn fetch_manifests( @@ -64,7 +66,7 @@ impl Storage for InMemoryStorage { .or(Err(StorageError::Deadlock))? .get(id) .cloned() - .ok_or(StorageError::NotFound) + .ok_or(StorageError::NotFound(id.clone())) } async fn write_structure( @@ -105,19 +107,33 @@ impl Storage for InMemoryStorage { async fn fetch_chunk( &self, - _id: &ObjectId, - _range: &Option>, + id: &ObjectId, + range: &Option>, ) -> Result, StorageError> { // avoid unused warning - let _x = &self.chunk_files; - todo!() + let chunk = self + .chunk_files + .read() + .or(Err(StorageError::Deadlock))? + .get(id) + .cloned() + .ok_or(StorageError::NotFound(id.clone()))?; + if let Some(range) = range { + Ok(Arc::new(chunk.slice((range.start as usize)..(range.end as usize)))) + } else { + Ok(Arc::clone(&chunk)) + } } async fn write_chunk( &self, - _id: ObjectId, - _bytes: bytes::Bytes, + id: ObjectId, + bytes: bytes::Bytes, ) -> Result<(), StorageError> { - todo!() + self.chunk_files + .write() + .or(Err(StorageError::Deadlock))? + .insert(id, Arc::new(bytes)); + Ok(()) } }