diff --git a/Cargo.toml b/Cargo.toml index e6eed785..4304106c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ thiserror = "1.0.63" serde_json = "1.0.125" serde = { version = "1.0.208", features = ["derive"] } serde_with = "3.9.0" +tokio = { version = "1.39.2", features = ["rt-multi-thread", "macros"] } [profile.release-with-debug] inherits = "release" @@ -28,4 +29,3 @@ debug = true [dev-dependencies] pretty_assertions = "1.4.0" proptest = "1.0.0" -tokio = { version = "1.39.2", features = ["rt-multi-thread", "macros"] } diff --git a/examples/low_level_dataset.rs b/examples/low_level_dataset.rs index 070afcfe..8c5d8309 100644 --- a/examples/low_level_dataset.rs +++ b/examples/low_level_dataset.rs @@ -154,7 +154,7 @@ ds.flush().await?; println!(); println!(); println!("## Adding an inline chunk"); - ds.set_chunk( + ds.set_chunk_ref( array1_path.clone(), ArrayIndices(vec![0]), Some(ChunkPayload::Inline("hello".into())), @@ -210,7 +210,7 @@ let mut ds = Dataset::update(Arc::clone(&storage), ObjectId.from("{v2_id:?}")); print_nodes(&ds).await; println!("## Adding a new inline chunk"); - ds.set_chunk( + ds.set_chunk_ref( array1_path.clone(), ArrayIndices(vec![1]), Some(icechunk::ChunkPayload::Inline("bye".into())), diff --git a/src/dataset.rs b/src/dataset.rs index 9e55e8fb..2ce3ec58 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -90,7 +90,12 @@ impl ChangeSet { self.updated_attributes.get(path) } - fn set_chunk(&mut self, path: Path, coord: ArrayIndices, data: Option) { + fn set_chunk_ref( + &mut self, + path: Path, + coord: ArrayIndices, + data: Option, + ) { self.set_chunks .entry(path) .and_modify(|h| { @@ -258,21 +263,18 @@ impl Dataset { // Record the write, referenceing or delete of a chunk // // Caller has to write the chunk before calling this. - pub async fn set_chunk( + pub async fn set_chunk_ref( &mut self, path: Path, coord: ArrayIndices, data: Option, ) -> Result<(), UpdateNodeError> { - match self - .get_node(&path) - .await - .map_err(|_| UpdateNodeError::NotFound(path.clone()))? - { - NodeStructure { node_data: NodeData::Array(..), .. } => { - self.change_set.set_chunk(path, coord, data); + match self.get_node(&path).await { + Ok(NodeStructure { node_data: NodeData::Array(..), .. }) => { + self.change_set.set_chunk_ref(path, coord, data); Ok(()) } + Err(GetNodeError::NotFound(path)) => Err(UpdateNodeError::NotFound(path)), _ => Err(UpdateNodeError::NotAnArray(path)), } } @@ -317,6 +319,26 @@ impl Dataset { } } + pub async fn get_array(&self, path: &Path) -> Result { + match self.get_node(path).await { + res @ Ok(NodeStructure { node_data: NodeData::Array(..), .. }) => res, + Ok(NodeStructure { node_data: NodeData::Group, .. }) => { + Err(GetNodeError::NotFound(path.clone())) + } + other => other, + } + } + + pub async fn get_group(&self, path: &Path) -> Result { + match self.get_node(path).await { + res @ Ok(NodeStructure { node_data: NodeData::Group, .. }) => res, + Ok(NodeStructure { node_data: NodeData::Array(..), .. }) => { + Err(GetNodeError::NotFound(path.clone())) + } + other => other, + } + } + async fn get_existing_node( &self, path: &Path, @@ -325,6 +347,7 @@ impl Dataset { let structure_id = self.structure_id.as_ref().ok_or(GetNodeError::NotFound(path.clone()))?; let structure = self.storage.fetch_structure(structure_id).await?; + let session_atts = self .change_set .get_user_attributes(path) @@ -429,6 +452,32 @@ impl Dataset { } } + pub async fn set_chunk( + &mut self, + path: &Path, + coord: &ArrayIndices, + data: Bytes, + ) -> Result<(), UpdateNodeError> { + // TODO: support inline chunks + match self.get_array(path).await { + Ok(_) => { + let new_id = ObjectId::random(); + self.storage + .write_chunk(new_id.clone(), data.clone()) + .await + .map_err(UpdateNodeError::StorageError)?; + let payload = ChunkPayload::Ref(ChunkRef { + id: new_id, + offset: 0, + length: data.len() as u64, + }); + self.change_set.set_chunk_ref(path.clone(), coord.clone(), Some(payload)); + Ok(()) + } + Err(_) => Err(UpdateNodeError::NotFound(path.clone())), + } + } + async fn get_old_chunk( &self, manifests: &[ManifestRef], @@ -885,7 +934,7 @@ mod tests { ); // set a chunk for the new array and retrieve it - ds.set_chunk( + ds.set_chunk_ref( new_array_path.clone(), ArrayIndices(vec![0]), Some(ChunkPayload::Inline("foo".into())), @@ -928,7 +977,7 @@ mod tests { } // set old array chunk and check them - ds.set_chunk( + ds.set_chunk_ref( array1_path.clone(), ArrayIndices(vec![0, 0, 0]), Some(ChunkPayload::Inline("bac".into())), @@ -977,25 +1026,25 @@ mod tests { change_set.add_array("foo/baz".into(), 2, zarr_meta); assert_eq!(None, change_set.new_arrays_chunk_iterator().next()); - change_set.set_chunk("foo/bar".into(), ArrayIndices(vec![0, 1]), None); + change_set.set_chunk_ref("foo/bar".into(), ArrayIndices(vec![0, 1]), None); assert_eq!(None, change_set.new_arrays_chunk_iterator().next()); - change_set.set_chunk( + change_set.set_chunk_ref( "foo/bar".into(), ArrayIndices(vec![1, 0]), Some(ChunkPayload::Inline("bar1".into())), ); - change_set.set_chunk( + change_set.set_chunk_ref( "foo/bar".into(), ArrayIndices(vec![1, 1]), Some(ChunkPayload::Inline("bar2".into())), ); - change_set.set_chunk( + change_set.set_chunk_ref( "foo/baz".into(), ArrayIndices(vec![0]), Some(ChunkPayload::Inline("baz1".into())), ); - change_set.set_chunk( + change_set.set_chunk_ref( "foo/baz".into(), ArrayIndices(vec![1]), Some(ChunkPayload::Inline("baz2".into())), @@ -1093,7 +1142,7 @@ mod tests { let _structure_id = ds.flush().await?; // we set a chunk in a new array - ds.set_chunk( + ds.set_chunk_ref( new_array_path.clone(), ArrayIndices(vec![0, 0, 0]), Some(ChunkPayload::Inline("hello".into())), @@ -1134,7 +1183,7 @@ mod tests { ); // we modify a chunk in an existing array - ds.set_chunk( + ds.set_chunk_ref( new_array_path.clone(), ArrayIndices(vec![0, 0, 0]), Some(ChunkPayload::Inline("bye".into())), @@ -1142,7 +1191,7 @@ mod tests { .await?; // we add a new chunk in an existing array - ds.set_chunk( + ds.set_chunk_ref( new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), Some(ChunkPayload::Inline("new chunk".into())), @@ -1160,7 +1209,8 @@ mod tests { ); // we delete a chunk - ds.set_chunk(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None).await?; + ds.set_chunk_ref(new_array_path.clone(), ArrayIndices(vec![0, 0, 1]), None) + .await?; let new_meta = ZarrArrayMetadata { shape: vec![1, 1, 1], ..zarr_meta }; // we change zarr metadata diff --git a/src/lib.rs b/src/lib.rs index 89893f2b..8a276727 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -678,7 +678,7 @@ pub enum AddNodeError { AlreadyExists(Path), } -#[derive(Clone, Debug, PartialEq, Eq, Error)] +#[derive(Debug, Error)] pub enum DeleteNodeError { #[error("node not found at `{0}`")] NotFound(Path), @@ -688,20 +688,21 @@ pub enum DeleteNodeError { NotAGroup(Path), } -#[derive(Clone, Debug, PartialEq, Eq, Error)] +#[derive(Debug, Error)] pub enum UpdateNodeError { #[error("node not found at `{0}`")] NotFound(Path), #[error("there is not an array at `{0}`")] NotAnArray(Path), - // TODO: Don't we need a NotAGroup here? + #[error("error contacting storage")] + StorageError(#[from] StorageError), } #[derive(Debug, Error)] pub enum GetNodeError { #[error("node not found at `{0}`")] NotFound(Path), - #[error("storage error when searching for node")] + #[error("error contacting storage")] StorageError(#[from] StorageError), } diff --git a/src/storage.rs b/src/storage.rs index 130773d8..a55eb29d 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,6 +1,6 @@ use base64::{engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, Engine as _}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fs::create_dir_all, ops::Range, sync::{Arc, RwLock}, @@ -219,6 +219,11 @@ impl InMemoryStorage { chunk_files: Arc::new(RwLock::new(HashMap::new())), } } + + /// Intended for tests + pub fn chunk_ids(&self) -> HashSet { + self.chunk_files.read().unwrap().keys().cloned().collect() + } } #[async_trait] diff --git a/src/zarr.rs b/src/zarr.rs index 73ececaf..dfa4b42b 100644 --- a/src/zarr.rs +++ b/src/zarr.rs @@ -4,14 +4,15 @@ use bytes::Bytes; use futures::Stream; use serde::{Deserialize, Serialize}; use thiserror::Error; +use tokio::{spawn, sync::RwLock}; use crate::{ - ArrayIndices, ChunkOffset, Dataset, NodeData, Path, UserAttributes, - UserAttributesStructure, ZarrArrayMetadata, + AddNodeError, ArrayIndices, ChunkOffset, Dataset, NodeData, Path, UpdateNodeError, + UserAttributes, UserAttributesStructure, ZarrArrayMetadata, }; pub struct Store { - pub dataset: Arc, + pub dataset: Arc>, } type ByteRange = (Option, Option); @@ -25,24 +26,31 @@ pub enum KeyNotFoundError { NodeNotFound { path: Path }, } -#[derive(Debug, Clone, PartialEq, Eq, Error)] +#[derive(Debug, Error)] pub enum StoreError { #[error("invalid zarr key format `{key}`")] InvalidKey { key: String }, #[error("object not found: `{0}`")] NotFound(#[from] KeyNotFoundError), + #[error("cannot update object: `{0}`")] + CannotUpdate(#[from] UpdateNodeError), + #[error("bad metadata: `{0}`")] + BadMetadata(#[from] serde_json::Error), + #[error("add node error: `{0}`")] + AddNode(#[from] AddNodeError), } impl Store { - pub fn new(dataset: Arc) -> Self { - Store { dataset } + pub fn new(dataset: Dataset) -> Self { + Store { dataset: Arc::new(RwLock::new(dataset)) } } pub async fn empty(&self) -> StoreResult { - Ok(self.dataset.list_nodes().await.next().is_none()) + let res = self.dataset.read().await.list_nodes().await.next().is_none(); + Ok(res) } - pub async fn clear(&self) -> StoreResult<()> { + pub async fn clear(&mut self) -> StoreResult<()> { todo!() } @@ -58,26 +66,57 @@ impl Store { // TODO: prototype argument pub async fn get_partial_values( - &self, - _key_ranges: impl IntoIterator, - ) -> StoreResult { - todo!() + // TODO: calling this argument self gives a compiler error for some reason + this: Arc, + key_ranges: impl IntoIterator, + ) -> StoreResult>> { + let mut tasks = Vec::new(); + for (key, range) in key_ranges { + let this = Arc::clone(&this); + tasks.push(spawn(async move { this.get(&key, &range).await })); + } + let mut outputs = Vec::with_capacity(tasks.len()); + for task in tasks { + outputs.push(task.await.unwrap()); + } + Ok(outputs) } // TODO: prototype argument - pub async fn exists(&self, _key: &str) -> StoreResult { - todo!() + pub async fn exists(&self, key: &str) -> StoreResult { + match self.get(key, &(None, None)).await { + Ok(_) => Ok(true), + Err(StoreError::NotFound(_)) => Ok(false), + Err(other_error) => Err(other_error), + } } pub fn supports_writes(&self) -> StoreResult { Ok(true) } - pub async fn set(&self, _key: &str, _value: Bytes) -> StoreResult<()> { - todo!() + pub async fn set(&mut self, key: &str, value: Bytes) -> StoreResult<()> { + match Key::parse(key)? { + Key::Metadata { node_path } => { + if let Ok(array_meta) = serde_json::from_slice(value.as_ref()) { + self.set_array_meta(node_path, array_meta).await + } else { + match serde_json::from_slice(value.as_ref()) { + Ok(group_meta) => { + self.set_group_meta(node_path, group_meta).await + } + Err(err) => Err(StoreError::BadMetadata(err)), + } + } + } + Key::Chunk { ref node_path, ref coords } => { + self.dataset.write().await.set_chunk(node_path, coords, value).await?; + Ok(()) + } + } } - pub async fn delete(&self, _key: &str) -> StoreResult<()> { + pub async fn delete(&mut self, _key: &str) -> StoreResult<()> { todo!() } @@ -86,7 +125,7 @@ impl Store { } pub async fn set_partial_values( - self, + &mut self, _key_start_values: impl IntoIterator, ) -> StoreResult<()> { unimplemented!() @@ -120,13 +159,16 @@ impl Store { path: Path, coords: ArrayIndices, ) -> StoreResult { - self.dataset.get_chunk(&path, &coords).await.ok_or(StoreError::NotFound( - KeyNotFoundError::ChunkNotFound { key: key.to_string(), path, coords }, - )) + let chunk = self.dataset.read().await.get_chunk(&path, &coords).await; + chunk.ok_or(StoreError::NotFound(KeyNotFoundError::ChunkNotFound { + key: key.to_string(), + path, + coords, + })) } async fn get_metadata(&self, _key: &str, path: &Path) -> StoreResult { - let node = self.dataset.get_node(path).await.map_err(|_| { + let node = self.dataset.read().await.get_node(path).await.map_err(|_| { StoreError::NotFound(KeyNotFoundError::NodeNotFound { path: path.clone() }) })?; let user_attributes = match node.user_attributes { @@ -141,6 +183,42 @@ impl Store { } } } + + async fn set_array_meta( + &self, + path: Path, + array_meta: ArrayMetadata, + ) -> Result<(), StoreError> { + if self.dataset.read().await.get_array(&path).await.is_ok() { + let mut ds = self.dataset.write().await; + // TODO: we don't necessarily need to update both + ds.set_user_attributes(path.clone(), array_meta.attributes).await?; + ds.update_array(path, array_meta.zarr_metadata).await?; + Ok(()) + } else { + let mut ds = self.dataset.write().await; + ds.add_array(path.clone(), array_meta.zarr_metadata).await?; + ds.set_user_attributes(path, array_meta.attributes).await?; + Ok(()) + } + } + + async fn set_group_meta( + &self, + path: Path, + group_meta: GroupMetadata, + ) -> Result<(), StoreError> { + if self.dataset.read().await.get_group(&path).await.is_ok() { + let mut ds = self.dataset.write().await; + ds.set_user_attributes(path, group_meta.attributes).await?; + Ok(()) + } else { + let mut ds = self.dataset.write().await; + ds.add_group(path.clone()).await?; + ds.set_user_attributes(path, group_meta.attributes).await?; + Ok(()) + } + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -152,10 +230,16 @@ enum Key { impl Key { const ROOT_KEY: &'static str = "zarr.json"; const METADATA_SUFFIX: &'static str = "/zarr.json"; - const CHUNK_COORD_INFIX: &'static str = "c"; + const CHUNK_COORD_INFIX: &'static str = "/c"; fn parse(key: &str) -> Result { fn parse_chunk(key: &str) -> Result { + if key == "c" { + return Ok(Key::Chunk { + node_path: "/".into(), + coords: ArrayIndices(vec![]), + }); + } if let Some((path, coords)) = key.rsplit_once(Key::CHUNK_COORD_INFIX) { if coords.is_empty() { Ok(Key::Chunk { @@ -237,140 +321,122 @@ impl GroupMetadata { #[cfg(test)] mod tests { - use std::{collections::HashMap, iter, num::NonZeroU64}; - use crate::{ - storage::InMemoryStorage, ChunkKeyEncoding, ChunkShape, Codec, DataType, - FillValue, Storage, StorageTransformer, - }; + use crate::{storage::InMemoryStorage, Storage}; use super::*; use pretty_assertions::assert_eq; #[test] fn test_parse_key() { - assert_eq!(Key::parse("zarr.json"), Ok(Key::Metadata { node_path: "/".into() })); - assert_eq!( + assert!(matches!( + Key::parse("zarr.json"), + Ok(Key::Metadata { node_path}) if node_path.to_str() == Some("/") + )); + assert!(matches!( Key::parse("a/zarr.json"), - Ok(Key::Metadata { node_path: "/a".into() }) - ); - assert_eq!( + Ok(Key::Metadata { node_path }) if node_path.to_str() == Some("/a") + )); + assert!(matches!( Key::parse("a/b/c/zarr.json"), - Ok(Key::Metadata { node_path: "/a/b/c".into() }) - ); - assert_eq!( + Ok(Key::Metadata { node_path }) if node_path.to_str() == Some("/a/b/c") + )); + assert!(matches!( Key::parse("foo/c"), - Ok(Key::Chunk { node_path: "/foo".into(), coords: ArrayIndices(vec![]) }) - ); - assert_eq!( + Ok(Key::Chunk { node_path, coords }) if node_path.to_str() == Some("/foo") && coords == ArrayIndices(vec![]) + )); + assert!(matches!( Key::parse("foo/bar/c"), - Ok(Key::Chunk { node_path: "/foo/bar".into(), coords: ArrayIndices(vec![]) }) - ); - assert_eq!( + Ok(Key::Chunk { node_path, coords}) if node_path.to_str() == Some("/foo/bar") && coords == ArrayIndices(vec![]) + )); + assert!(matches!( Key::parse("foo/c/1/2/3"), Ok(Key::Chunk { - node_path: "/foo".into(), - coords: ArrayIndices(vec![1, 2, 3]) - }) - ); - assert_eq!( + node_path, + coords, + }) if node_path.to_str() == Some("/foo") && coords == ArrayIndices(vec![1,2,3]) + )); + assert!(matches!( Key::parse("foo/bar/baz/c/1/2/3"), Ok(Key::Chunk { - node_path: "/foo/bar/baz".into(), - coords: ArrayIndices(vec![1, 2, 3]) - }) - ); - assert_eq!( + node_path, + coords, + }) if node_path.to_str() == Some("/foo/bar/baz") && coords == ArrayIndices(vec![1,2,3]) + )); + assert!(matches!( Key::parse("c"), - Ok(Key::Chunk { node_path: "/".into(), coords: ArrayIndices(vec![]) }) - ); + Ok(Key::Chunk { node_path, coords}) if node_path.to_str() == Some("/") && coords == ArrayIndices(vec![]) + )); } #[tokio::test] - async fn test_metadata_get() -> Result<(), Box> { + async fn test_metadata_set_and_get() -> Result<(), Box> { // TODO: turn this test into pure Store operations once we support writes through Zarr let storage: Arc = Arc::new(InMemoryStorage::new()); - let ds = Arc::new(Dataset::create(Arc::clone(&storage))); - let store = Store { dataset: Arc::clone(&ds) }; + let ds = Dataset::create(Arc::clone(&storage)); + let mut store = Store::new(ds); - assert_eq!( + assert!(matches!( store.get("zarr.json", &(None, None)).await, - Err(StoreError::NotFound(KeyNotFoundError::NodeNotFound { - path: "/".into() - })) - ); - - let mut ds = Dataset::create(Arc::clone(&storage)); - ds.add_group("/".into()).await?; - let store = Store { dataset: Arc::clone(&Arc::new(ds)) }; + Err(StoreError::NotFound(KeyNotFoundError::NodeNotFound {path})) if path.to_str() == Some("/") + )); + + store + .set( + "zarr.json", + Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#), + ) + .await?; assert_eq!( - store.get("zarr.json", &(None, None)).await, - Ok(Bytes::copy_from_slice( + store.get("zarr.json", &(None, None)).await.unwrap(), + Bytes::copy_from_slice( br#"{"zarr_format":3,"node_type":"group","attributes":null}"# - )) + ) ); - let mut ds = Dataset::create(Arc::clone(&storage)); - ds.add_group("/a/b".into()).await?; - ds.set_user_attributes( - "/a/b".into(), - Some(UserAttributes::try_new(br#"{"foo": 42}"#).unwrap()), - ) - .await?; - let store = Store { dataset: Arc::clone(&Arc::new(ds)) }; + store.set("a/b/zarr.json", Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group", "attributes": {"spam":"ham", "eggs":42}}"#)).await?; assert_eq!( - store.get("a/b/zarr.json", &(None, None)).await, - Ok(Bytes::copy_from_slice( - br#"{"zarr_format":3,"node_type":"group","attributes":{"foo":42}}"# - )) + store.get("a/b/zarr.json", &(None, None)).await.unwrap(), + Bytes::copy_from_slice( + br#"{"zarr_format":3,"node_type":"group","attributes":{"eggs":42,"spam":"ham"}}"# + ) ); - let zarr_meta = ZarrArrayMetadata { - shape: vec![2, 2, 2], - data_type: DataType::Int32, - chunk_shape: ChunkShape(vec![ - NonZeroU64::new(1).unwrap(), - NonZeroU64::new(1).unwrap(), - NonZeroU64::new(1).unwrap(), - ]), - chunk_key_encoding: ChunkKeyEncoding::Slash, - fill_value: FillValue::Int32(0), - codecs: vec![Codec { - name: "mycodec".to_string(), - configuration: Some(HashMap::from_iter(iter::once(( - "foo".to_string(), - serde_json::Value::from(42), - )))), - }], - storage_transformers: Some(vec![StorageTransformer { - name: "mytransformer".to_string(), - configuration: Some(HashMap::from_iter(iter::once(( - "bar".to_string(), - serde_json::Value::from(43), - )))), - }]), - dimension_names: Some(vec![ - Some("x".to_string()), - Some("y".to_string()), - Some("t".to_string()), - ]), - }; - let mut ds = Dataset::create(Arc::clone(&storage)); - ds.add_group("/a/b".into()).await?; - ds.add_array("/a/b/array".into(), zarr_meta.clone()).await?; - ds.set_user_attributes( - "/a/b/array".into(), - Some(UserAttributes::try_new(br#"{"foo": 42}"#).unwrap()), - ) - .await?; - let store = Store { dataset: Arc::clone(&Arc::new(ds)) }; + let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#); + store.set("a/b/array/zarr.json", zarr_meta.clone()).await?; assert_eq!( - store.get("a/b/array/zarr.json", &(None, None)).await, - Ok(Bytes::copy_from_slice( - br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"# - )) + store.get("a/b/array/zarr.json", &(None, None)).await.unwrap(), + zarr_meta.clone() ); Ok(()) } + + #[tokio::test] + async fn test_chunk_set_and_get() -> Result<(), Box> { + // TODO: turn this test into pure Store operations once we support writes through Zarr + let in_mem_storage = Arc::new(InMemoryStorage::new()); + let storage = + Arc::clone(&(in_mem_storage.clone() as Arc)); + let ds = Dataset::create(Arc::clone(&storage)); + let mut store = Store::new(ds); + + store + .set( + "zarr.json", + Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#), + ) + .await?; + let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#); + store.set("array/zarr.json", zarr_meta.clone()).await?; + + let data = Bytes::copy_from_slice(b"hello"); + store.set("array/c/0/1/0", data.clone()).await?; + assert_eq!(store.get("array/c/0/1/0", &(None, None)).await.unwrap(), data); + + let chunk_id = in_mem_storage.chunk_ids().iter().next().cloned().unwrap(); + assert_eq!(in_mem_storage.fetch_chunk(&chunk_id, &None).await?, data); + + Ok(()) + } }