Skip to content

Commit

Permalink
Storage learns how to r/w chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba committed Aug 15, 2024
1 parent f7156cd commit 0b5405a
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 18 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ publish = false
[dependencies]
arrow = { version = "52.2.0", features = ["prettyprint"] }
async-trait = "0.1.81"
bytes = "1.7.1"
futures = "0.3.30"
itertools = "0.13.0"
rand = "0.8.5"
Expand Down
30 changes: 15 additions & 15 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl ChangeSet {
.or_insert(HashMap::from([(coord, data)]));
}

fn get_chunk(
fn get_chunk_ref(
&self,
path: &Path,
coords: &ArrayIndices,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl Dataset {
id: *id,
path: path.clone(),
user_attributes: atts.flatten().map(UserAttributesStructure::Inline),
// We put no manifests in new arrays, see get_chunk to understand how chunks get
// We put no manifests in new arrays, see get_chunk_ref to understand how chunks get
// fetched for those arrays
node_data: NodeData::Array(meta.clone(), vec![]),
}
Expand All @@ -290,7 +290,7 @@ impl Dataset {
})
}

pub async fn get_chunk(
pub async fn get_chunk_ref(
&self,
path: &Path,
coords: &ArrayIndices,
Expand All @@ -302,7 +302,7 @@ impl Dataset {
NodeData::Array(_, manifests) => {
// check the chunks modified in this session first
// TODO: I hate rust forces me to clone to search in a hashmap. How to do better?
let session_chunk = self.change_set.get_chunk(path, coords).cloned();
let session_chunk = self.change_set.get_chunk_ref(path, coords).cloned();
// If session_chunk is not None we have to return it, because is the update the
// user made in the current session
// If session_chunk == None, user hasn't modified the chunk in this session and we
Expand Down Expand Up @@ -409,7 +409,7 @@ impl Dataset {
chunks: impl Iterator<Item = ChunkInfo> + 'a,
) -> impl Iterator<Item = ChunkInfo> + 'a {
chunks.filter_map(move |chunk| {
match self.change_set.get_chunk(&path, &chunk.coord) {
match self.change_set.get_chunk_ref(&path, &chunk.coord) {
None => Some(chunk),
Some(new_payload) => {
new_payload.clone().map(|pl| ChunkInfo { payload: pl, ..chunk })
Expand Down Expand Up @@ -753,11 +753,11 @@ mod tests {
.await
.map_err(|err| format!("{err:#?}"))?;

let chunk = ds.get_chunk(&new_array_path, &ArrayIndices(vec![0])).await;
let chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 7])));

// retrieve a non initialized chunk of the new array
let non_chunk = ds.get_chunk(&new_array_path, &ArrayIndices(vec![1])).await;
let non_chunk = ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![1])).await;
assert_eq!(non_chunk, None);

// update old array use attriutes and check them
Expand Down Expand Up @@ -795,7 +795,7 @@ mod tests {
.await
.map_err(|err| format!("{err:#?}"))?;

let chunk = ds.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0])).await;
let chunk = ds.get_chunk_ref(&array1_path, &ArrayIndices(vec![0, 0, 0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99])));

Ok(())
Expand Down Expand Up @@ -975,7 +975,7 @@ mod tests {
}) if path == new_array_path && meta == zarr_meta.clone() && manifests.len() == 1
));
assert_eq!(
ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"hello".into()))
);

Expand All @@ -1000,11 +1000,11 @@ mod tests {
let previous_structure_id =
ds.flush().await.map_err(|err| format!("{err:#?}"))?;
assert_eq!(
ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"bye".into()))
);
assert_eq!(
ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
Some(ChunkPayload::Inline(b"new chunk".into()))
);

Expand All @@ -1028,11 +1028,11 @@ mod tests {
let ds = Dataset::update(Arc::clone(&storage), structure_id);

assert_eq!(
ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"bye".into()))
);
assert_eq!(
ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
None
);
assert!(matches!(
Expand All @@ -1048,11 +1048,11 @@ mod tests {
//test the previous version is still alive
let ds = Dataset::update(Arc::clone(&storage), previous_structure_id);
assert_eq!(
ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 0])).await,
Some(ChunkPayload::Inline(b"bye".into()))
);
assert_eq!(
ds.get_chunk(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
ds.get_chunk_ref(&new_array_path, &ArrayIndices(vec![0, 0, 1])).await,
Some(ChunkPayload::Inline(b"new chunk".into()))
);

Expand Down
10 changes: 9 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ pub mod storage;
pub mod structure;

use async_trait::async_trait;
use bytes::Bytes;
use manifest::ManifestsTable;
use std::{
collections::HashMap, fmt::Display, num::NonZeroU64, path::PathBuf, sync::Arc,
collections::HashMap, fmt::Display, num::NonZeroU64, ops::Range, path::PathBuf,
sync::Arc,
};
use structure::StructureTable;

Expand Down Expand Up @@ -564,6 +566,11 @@ pub trait Storage {
&self,
id: &ObjectId,
) -> Result<Arc<ManifestsTable>, StorageError>; // FIXME: format flags
async fn fetch_chunk(
&self,
id: &ObjectId,
range: &Option<Range<ChunkOffset>>,
) -> Result<Arc<Bytes>, StorageError>; // FIXME: format flags

async fn write_structure(
&self,
Expand All @@ -580,6 +587,7 @@ pub trait Storage {
id: ObjectId,
table: Arc<ManifestsTable>,
) -> Result<(), StorageError>;
async fn write_chunk(&self, id: ObjectId, bytes: Bytes) -> Result<(), StorageError>;
}

pub struct Dataset {
Expand Down
21 changes: 21 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use async_trait::async_trait;
use bytes::Bytes;

use crate::{
AttributesTable, ManifestsTable, ObjectId, Storage, StorageError, StructureTable,
Expand All @@ -14,6 +15,7 @@ pub struct InMemoryStorage {
struct_files: Arc<RwLock<HashMap<ObjectId, Arc<StructureTable>>>>,
attr_files: Arc<RwLock<HashMap<ObjectId, Arc<AttributesTable>>>>,
man_files: Arc<RwLock<HashMap<ObjectId, Arc<ManifestsTable>>>>,
chunk_files: Arc<RwLock<HashMap<ObjectId, Arc<Bytes>>>>,
}

impl InMemoryStorage {
Expand All @@ -22,6 +24,7 @@ impl InMemoryStorage {
struct_files: Arc::new(RwLock::new(HashMap::new())),
attr_files: Arc::new(RwLock::new(HashMap::new())),
man_files: Arc::new(RwLock::new(HashMap::new())),
chunk_files: Arc::new(RwLock::new(HashMap::new())),
}
}
}
Expand Down Expand Up @@ -99,4 +102,22 @@ impl Storage for InMemoryStorage {
.insert(id, Arc::clone(&table));
Ok(())
}

async fn fetch_chunk(
&self,
_id: &ObjectId,
_range: &Option<std::ops::Range<crate::ChunkOffset>>,
) -> Result<Arc<Bytes>, StorageError> {
// avoid unused warning
let _x = &self.chunk_files;
todo!()
}

async fn write_chunk(
&self,
_id: ObjectId,
_bytes: bytes::Bytes,
) -> Result<(), StorageError> {
todo!()
}
}

0 comments on commit 0b5405a

Please sign in to comment.