diff --git a/Cargo.lock b/Cargo.lock index 6564ea3c..449a91d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -358,9 +358,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cc" @@ -658,6 +658,7 @@ version = "0.1.0" dependencies = [ "arrow", "async-trait", + "bytes", "futures", "itertools", "pretty_assertions", diff --git a/Cargo.toml b/Cargo.toml index f44e07ff..b621e5ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ publish = false [dependencies] arrow = { version = "52.2.0", features = ["prettyprint"] } async-trait = "0.1.81" +bytes = "1.7.1" futures = "0.3.30" itertools = "0.13.0" rand = "0.8.5" diff --git a/src/dataset.rs b/src/dataset.rs index 33b49896..d2ebddff 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -57,7 +57,7 @@ impl ChangeSet { .or_insert(HashMap::from([(coord, data)])); } - fn get_chunk( + fn get_chunk_ref( &self, path: &Path, coords: &ArrayIndices, @@ -271,7 +271,7 @@ impl Dataset { id: *id, path: path.clone(), user_attributes: atts.flatten().map(UserAttributesStructure::Inline), - // We put no manifests in new arrays, see get_chunk to understand how chunks get + // We put no manifests in new arrays, see get_chunk_ref to understand how chunks get // fetched for those arrays node_data: NodeData::Array(meta.clone(), vec![]), } @@ -290,7 +290,7 @@ impl Dataset { }) } - pub async fn get_chunk( + pub async fn get_chunk_ref( &self, path: &Path, coords: &ArrayIndices, @@ -302,7 +302,7 @@ impl Dataset { NodeData::Array(_, manifests) => { // check the chunks modified in this session first // TODO: I hate rust forces me to clone to search in a hashmap. How to do better? - let session_chunk = self.change_set.get_chunk(path, coords).cloned(); + let session_chunk = self.change_set.get_chunk_ref(path, coords).cloned(); // If session_chunk is not None we have to return it, because is the update the // user made in the current session // If session_chunk == None, user hasn't modified the chunk in this session and we @@ -409,7 +409,7 @@ impl Dataset { chunks: impl Iterator + 'a, ) -> impl Iterator + 'a { chunks.filter_map(move |chunk| { - match self.change_set.get_chunk(&path, &chunk.coord) { + match self.change_set.get_chunk_ref(&path, &chunk.coord) { None => Some(chunk), Some(new_payload) => { new_payload.clone().map(|pl| ChunkInfo { payload: pl, ..chunk }) @@ -753,11 +753,11 @@ mod tests { .await .map_err(|err| format!("{err:#?}"))?; - let chunk = ds.get_chunk(&new_array_path, &ArrayIndices(vec![0])).await; + let chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0])).await; assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 7]))); // retrieve a non initialized chunk of the new array - let non_chunk = ds.get_chunk(&new_array_path, &ArrayIndices(vec![1])).await; + let non_chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![1])).await; assert_eq!(non_chunk, None); // update old array use attriutes and check them @@ -795,7 +795,7 @@ mod tests { .await .map_err(|err| format!("{err:#?}"))?; - let chunk = ds.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0])).await; + let chunk = ds.get_chunk_ref(&array1_path, &ArrayIndices(vec![0, 0, 0])).await; assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99]))); Ok(()) @@ -975,7 +975,7 @@ mod tests { }) if path == new_array_path && meta == zarr_meta.clone() && manifests.len() == 1 )); assert_eq!( - ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, + ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, Some(ChunkPayload::Inline(b"hello".into())) ); @@ -1000,11 +1000,11 @@ mod tests { let previous_structure_id = ds.flush().await.map_err(|err| format!("{err:#?}"))?; assert_eq!( - ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, + ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, Some(ChunkPayload::Inline(b"bye".into())) ); assert_eq!( - ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await, + ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await, Some(ChunkPayload::Inline(b"new chunk".into())) ); @@ -1028,11 +1028,11 @@ mod tests { let ds = Dataset::update(Arc::clone(&storage), structure_id); assert_eq!( - ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, + ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, Some(ChunkPayload::Inline(b"bye".into())) ); assert_eq!( - ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await, + ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await, None ); assert!(matches!( @@ -1048,11 +1048,11 @@ mod tests { //test the previous version is still alive let ds = Dataset::update(Arc::clone(&storage), previous_structure_id); assert_eq!( - ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, + ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await, Some(ChunkPayload::Inline(b"bye".into())) ); assert_eq!( - ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await, + ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await, Some(ChunkPayload::Inline(b"new chunk".into())) ); diff --git a/src/lib.rs b/src/lib.rs index 258c0917..161a7f7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,9 +27,11 @@ pub mod storage; pub mod structure; use async_trait::async_trait; +use bytes::Bytes; use manifest::ManifestsTable; use std::{ - collections::HashMap, fmt::Display, num::NonZeroU64, path::PathBuf, sync::Arc, + collections::HashMap, fmt::Display, num::NonZeroU64, ops::Range, path::PathBuf, + sync::Arc, }; use structure::StructureTable; @@ -564,6 +566,11 @@ pub trait Storage { &self, id: &ObjectId, ) -> Result, StorageError>; // FIXME: format flags + async fn fetch_chunk( + &self, + id: &ObjectId, + range: &Option>, + ) -> Result, StorageError>; // FIXME: format flags async fn write_structure( &self, @@ -580,6 +587,7 @@ pub trait Storage { id: ObjectId, table: Arc, ) -> Result<(), StorageError>; + async fn write_chunk(&self, id: ObjectId, bytes: Bytes) -> Result<(), StorageError>; } pub struct Dataset { diff --git a/src/storage.rs b/src/storage.rs index 3d9b0a5d..781b0d8e 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -4,6 +4,7 @@ use std::{ }; use async_trait::async_trait; +use bytes::Bytes; use crate::{ AttributesTable, ManifestsTable, ObjectId, Storage, StorageError, StructureTable, @@ -14,6 +15,7 @@ pub struct InMemoryStorage { struct_files: Arc>>>, attr_files: Arc>>>, man_files: Arc>>>, + chunk_files: Arc>>>, } impl InMemoryStorage { @@ -22,6 +24,7 @@ impl InMemoryStorage { struct_files: Arc::new(RwLock::new(HashMap::new())), attr_files: Arc::new(RwLock::new(HashMap::new())), man_files: Arc::new(RwLock::new(HashMap::new())), + chunk_files: Arc::new(RwLock::new(HashMap::new())), } } } @@ -99,4 +102,22 @@ impl Storage for InMemoryStorage { .insert(id, Arc::clone(&table)); Ok(()) } + + async fn fetch_chunk( + &self, + _id: &ObjectId, + _range: &Option>, + ) -> Result, StorageError> { + // avoid unused warning + let _x = &self.chunk_files; + todo!() + } + + async fn write_chunk( + &self, + _id: ObjectId, + _bytes: bytes::Bytes, + ) -> Result<(), StorageError> { + todo!() + } }