Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into seba/zarr
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba committed Aug 22, 2024
2 parents 9e71ee4 + 67c4997 commit a7c9207
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 42 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ serde_json = "1.0.125"
serde = { version = "1.0.208", features = ["derive"] }
serde_with = "3.9.0"
tokio = { version = "1.39.2", features = ["rt-multi-thread", "macros"] }
test-strategy = "0.4.0"
proptest = "1.5.0"

[profile.release-with-debug]
inherits = "release"
debug = true

[dev-dependencies]
pretty_assertions = "1.4.0"
proptest = "1.0.0"
7 changes: 7 additions & 0 deletions proptest-regressions/dataset.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 10b7f8cfd2afb88ddee1d4abf0d6f1bb9e6c6bc87af8c2c0d5e90e49d50dede5 # shrinks to path = ""
109 changes: 106 additions & 3 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ impl ChangeSet {
path: Path,
node_id: NodeId,
) -> Result<(), DeleteNodeError> {
// TODO: test delete a deleted group.
// TODO: test delete a non-existent group
let was_new = self.new_groups.remove(&path).is_some();
self.updated_attributes.remove(&path);
if !was_new {
Expand Down Expand Up @@ -771,10 +769,11 @@ pub enum FlushError {

#[cfg(test)]
mod tests {

use std::{error::Error, num::NonZeroU64, path::PathBuf};

use crate::{
manifest::mk_manifests_table, storage::InMemoryStorage,
manifest::mk_manifests_table, storage::InMemoryStorage, strategies::*,
structure::mk_structure_table, ChunkInfo, ChunkKeyEncoding, ChunkRef, ChunkShape,
Codec, DataType, FillValue, Flags, ManifestExtents, StorageTransformer,
TableRegion,
Expand All @@ -783,6 +782,110 @@ mod tests {
use super::*;
use itertools::Itertools;
use pretty_assertions::assert_eq;
use proptest::prelude::{prop_assert, prop_assert_eq};
use test_strategy::proptest;

#[proptest(async = "tokio")]
async fn test_add_delete_group(
#[strategy(node_paths())] path: Path,
#[strategy(empty_datasets())] mut dataset: Dataset,
) {
// getting any path from an empty dataset must fail
prop_assert!(dataset.get_node(&path).await.is_err());

// adding a new group must succeed
prop_assert!(dataset.add_group(path.clone()).await.is_ok());

// Getting a group just added must succeed
let node = dataset.get_node(&path).await;
prop_assert!(node.is_ok());

// Getting the group twice must be equal
prop_assert_eq!(node.unwrap(), dataset.get_node(&path).await.unwrap());

// adding an existing group fails
prop_assert_eq!(
dataset.add_group(path.clone()).await.unwrap_err(),
AddNodeError::AlreadyExists(path.clone())
);

// deleting the added group must succeed
prop_assert!(dataset.delete_group(path.clone()).await.is_ok());

// deleting twice must fail
prop_assert_eq!(
dataset.delete_group(path.clone()).await.unwrap_err(),
DeleteNodeError::NotFound(path.clone())
);

// getting a deleted group must fail
prop_assert!(dataset.get_node(&path).await.is_err());

// adding again must succeed
prop_assert!(dataset.add_group(path.clone()).await.is_ok());

// deleting again must succeed
prop_assert!(dataset.delete_group(path.clone()).await.is_ok());
}

#[proptest(async = "tokio")]
async fn test_add_delete_array(
#[strategy(node_paths())] path: Path,
#[strategy(zarr_array_metadata())] metadata: ZarrArrayMetadata,
#[strategy(empty_datasets())] mut dataset: Dataset,
) {
// new array must always succeed
prop_assert!(dataset.add_array(path.clone(), metadata.clone()).await.is_ok());

// adding to the same path must fail
prop_assert!(dataset.add_array(path.clone(), metadata.clone()).await.is_err());

// first delete must succeed
prop_assert!(dataset.delete_array(path.clone()).await.is_ok());

// deleting twice must fail
prop_assert_eq!(
dataset.delete_array(path.clone()).await.unwrap_err(),
DeleteNodeError::NotFound(path.clone())
);

// adding again must succeed
prop_assert!(dataset.add_array(path.clone(), metadata.clone()).await.is_ok());

// deleting again must succeed
prop_assert!(dataset.delete_array(path.clone()).await.is_ok());
}

#[proptest(async = "tokio")]
async fn test_add_array_group_clash(
#[strategy(node_paths())] path: Path,
#[strategy(zarr_array_metadata())] metadata: ZarrArrayMetadata,
#[strategy(empty_datasets())] mut dataset: Dataset,
) {
// adding a group at an existing array node must fail
prop_assert!(dataset.add_array(path.clone(), metadata.clone()).await.is_ok());
prop_assert_eq!(
dataset.add_group(path.clone()).await.unwrap_err(),
AddNodeError::AlreadyExists(path.clone())
);
prop_assert_eq!(
dataset.delete_group(path.clone()).await.unwrap_err(),
DeleteNodeError::NotAGroup(path.clone())
);
prop_assert!(dataset.delete_array(path.clone()).await.is_ok());

// adding an array at an existing group node must fail
prop_assert!(dataset.add_group(path.clone()).await.is_ok());
prop_assert_eq!(
dataset.add_array(path.clone(), metadata.clone()).await.unwrap_err(),
AddNodeError::AlreadyExists(path.clone())
);
prop_assert_eq!(
dataset.delete_array(path.clone()).await.unwrap_err(),
DeleteNodeError::NotAnArray(path.clone())
);
prop_assert!(dataset.delete_group(path.clone()).await.is_ok());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_dataset_with_updates() -> Result<(), Box<dyn Error>> {
Expand Down
20 changes: 11 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
pub mod dataset;
pub mod manifest;
pub mod storage;
pub mod strategies;
pub mod structure;
pub mod zarr;

Expand All @@ -43,6 +44,7 @@ use std::{
sync::Arc,
};
use structure::StructureTable;
use test_strategy::Arbitrary;
use thiserror::Error;

#[derive(Debug, Clone, Error)]
Expand All @@ -62,6 +64,9 @@ pub struct ArrayIndices(pub Vec<u64>);
/// The shape of an array.
/// 0 is a valid shape member
pub type ArrayShape = Vec<u64>;
// each dimension name can be null in Zarr
pub type DimensionName = Option<String>;
pub type DimensionNames = Vec<DimensionName>;

pub type Path = PathBuf;

Expand Down Expand Up @@ -171,7 +176,7 @@ impl Display for DataType {
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct ChunkShape(pub Vec<NonZeroU64>);

#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Arbitrary, Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub enum ChunkKeyEncoding {
Slash,
Dot,
Expand Down Expand Up @@ -201,7 +206,7 @@ impl From<ChunkKeyEncoding> for u8 {
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum FillValue {
// FIXME: test all json (de)serializations
Expand Down Expand Up @@ -547,8 +552,6 @@ pub struct StorageTransformer {
pub configuration: Option<HashMap<String, serde_json::Value>>,
}

pub type DimensionName = String;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct UserAttributes {
#[serde(flatten)]
Expand Down Expand Up @@ -645,8 +648,7 @@ pub struct ZarrArrayMetadata {
pub fill_value: FillValue,
pub codecs: Vec<Codec>,
pub storage_transformers: Option<Vec<StorageTransformer>>,
// each dimension name can be null in Zarr
pub dimension_names: Option<Vec<Option<DimensionName>>>,
pub dimension_names: Option<DimensionNames>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -716,7 +718,7 @@ pub enum AddNodeError {
AlreadyExists(Path),
}

#[derive(Debug, Error)]
#[derive(Debug, Error, PartialEq, Eq)]
pub enum DeleteNodeError {
#[error("node not found at `{0}`")]
NotFound(Path),
Expand Down Expand Up @@ -767,7 +769,7 @@ pub enum StorageError {
/// Different implementation can cache the files differently, or not at all.
/// Implementations are free to assume files are never overwritten.
#[async_trait]
pub trait Storage {
pub trait Storage: fmt::Debug {
async fn fetch_structure(
&self,
id: &ObjectId,
Expand Down Expand Up @@ -804,7 +806,7 @@ pub trait Storage {
async fn write_chunk(&self, id: ObjectId, bytes: Bytes) -> Result<(), StorageError>;
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Dataset {
storage: Arc<dyn Storage + Send + Sync>,
structure_id: Option<ObjectId>,
Expand Down
14 changes: 13 additions & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use base64::{engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, Engine as _};
use std::{
collections::{HashMap, HashSet},
fmt,
fs::create_dir_all,
ops::Range,
sync::{Arc, RwLock},
Expand Down Expand Up @@ -60,7 +61,7 @@ impl ObjectStorage {
) -> Result<ObjectStorage, StorageError> {
use object_store::aws::AmazonS3Builder;
let store = AmazonS3Builder::new()
// TODO: Generalize the auth config
// FIXME: Generalize the auth config
.with_access_key_id("minio123")
.with_secret_access_key("minio123")
.with_endpoint("http://localhost:9000")
Expand Down Expand Up @@ -108,6 +109,11 @@ impl ObjectStorage {
}
}

impl fmt::Debug for ObjectStorage {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ObjectStorage, prefix={}, store={}", self.prefix, self.store)
}
}
#[async_trait]
impl Storage for ObjectStorage {
async fn fetch_structure(
Expand Down Expand Up @@ -226,6 +232,12 @@ impl InMemoryStorage {
}
}

impl fmt::Debug for InMemoryStorage {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "InMemoryStorage at {:p}", self)
}
}

#[async_trait]
impl Storage for InMemoryStorage {
async fn fetch_structure(
Expand Down
Loading

0 comments on commit a7c9207

Please sign in to comment.