diff --git a/Cargo.lock b/Cargo.lock index d665869..6f72bc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,18 @@ dependencies = [ "memchr", ] +[[package]] +name = "ambassador" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b8741165d4c4a8e6e8dcf8a2d09a1b0f94d85722fb57caed8babdd421a9837" +dependencies = [ + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "anyhow" version = "1.0.60" @@ -215,6 +227,12 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "either" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" + [[package]] name = "env_logger" version = "0.7.1" @@ -263,6 +281,7 @@ dependencies = [ name = "fixity_store" version = "0.0.1" dependencies = [ + "ambassador", "anyhow", "async-trait", "fixity_types", @@ -345,6 +364,15 @@ dependencies = [ "quick-error", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 3771ff4..a4ba10d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,3 +85,5 @@ members = [ # "git_fixi", ] +[workspace.dependencies] +ambassador = "0.3.5" diff --git a/core/src/lib.rs b/core/src/lib.rs index c5a8eb6..c6e24a9 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -200,8 +200,8 @@ where self.meta .set_head( "local", - &*self.repo, - &*self.branch, + &self.repo, + &self.branch, &self.replica_id, log_head.clone(), ) diff --git a/fixity_store/Cargo.toml b/fixity_store/Cargo.toml index 085d8fa..d072e17 100644 --- a/fixity_store/Cargo.toml +++ b/fixity_store/Cargo.toml @@ -7,6 +7,8 @@ license-file = "LICENSE" [features] default = ["rkyv", "json"] +# Provide various test helpers or test focused implementations. +test = [] json = ["serde_json", "serde"] [dependencies] @@ -16,6 +18,7 @@ thiserror = "1.0" anyhow = "1.0" multihash = "0.16" multibase = "0.9" +ambassador.workspace = true # Feature: rkyv rkyv = { version = "0.7", optional = true } # Feature: json diff --git a/fixity_store/src/container.rs b/fixity_store/src/container.rs index a684f8e..273dfd8 100644 --- a/fixity_store/src/container.rs +++ b/fixity_store/src/container.rs @@ -1,32 +1,74 @@ use crate::{ + contentid::NewContentId, deser::{Deserialize, Serialize}, + deser_store::DeserStore, store::StoreError, + type_desc::{TypeDescription, ValueDesc}, Store, }; use async_trait::async_trait; #[async_trait] -pub trait NewContainer<'s, S>: Sized + Send + 's -where - S: Store, -{ - async fn open(store: &'s S, cid: &S::Cid) -> Result; - async fn save(&mut self, store: &'s S) -> Result; - async fn save_with_cids( +pub trait NewContainer: Sized + Send + TypeDescription { + /// A description of the [de]serialized type(s) that this container manages. + /// + /// Used to determine / validate Fixity repository types. + /// + /// This is in contrast to the `Container: TypeDescription` bound for `Self`, + /// which describes the `Container` itself - which may or may not be what is written + /// to stores. + fn deser_type_desc() -> ValueDesc; + fn new_container>(store: &S) -> Self; + async fn open>(store: &S, cid: &Cid) -> Result; + async fn save>(&mut self, store: &S) -> Result; + async fn save_with_cids>( &mut self, store: &S, - cids_buf: &mut Vec, + cids_buf: &mut Vec, ) -> Result<(), StoreError>; + async fn merge>( + &mut self, + store: &S, + other: &Cid, + ) -> Result<(), StoreError>; + async fn diff>( + &mut self, + store: &S, + other: &Cid, + ) -> Result; + // TODO: Method to report contained Cids and/or Containers to allow correct syncing of a + // Container and all the cids within it. } #[async_trait] -pub trait ContainerRef<'s, S>: NewContainer<'s, S> -where - S: Store, -{ - type Ref: TryInto; - async fn open_ref(store: &'s S, cid: &S::Cid) -> Result; +pub trait ContainerRef: NewContainer { + type Ref: ContainerRefInto; + type DiffRef: ContainerRefInto; + async fn open_ref>( + store: &S, + cid: &Cid, + ) -> Result; + async fn diff_ref>( + &mut self, + store: &S, + other: &Cid, + ) -> Result; +} +// NIT: Infallible conversions were making `TryInto` awkward for `Ref` and `DiffRef` on +// `ContainerRef`, so this trait fills that role without the infallible issues. +// I must be misunderstanding how to deal with Infallible `TryInto`'s easily, while +// also putting bounds on the associated `TryInto::Error` type. +// +// Or perhaps it's just awkward because associated type bounds don't exist yet. +pub trait ContainerRefInto { + type Error: Into; + fn container_ref_into(self) -> Result; +} +impl ContainerRefInto for Owned { + type Error = StoreError; + fn container_ref_into(self) -> Result { + Ok(self) + } } - #[async_trait] pub trait Container<'s, S>: Sized + Send + 's where diff --git a/fixity_store/src/content_store.rs b/fixity_store/src/content_store.rs index 7002dde..432d1f3 100644 --- a/fixity_store/src/content_store.rs +++ b/fixity_store/src/content_store.rs @@ -12,10 +12,20 @@ pub enum ContentStoreError { } #[async_trait] pub trait ContentStore: Send + Sync { + // NIT: The conversion around the the generic byte types is .. annoying. + // A single type (Into> for example) doesn't cover common cases. + // So we either add a lot of conversions on the type, and hope they align.. + // or some types just end up needlessly converting. Which is unfortunate. + // + // Not sure the ideal solution. type Bytes: AsRef<[u8]> + Into>; async fn exists(&self, cid: &Cid) -> Result; async fn read_unchecked(&self, cid: &Cid) -> Result; - async fn write_unchecked(&self, cid: &Cid, bytes: Vec) -> Result<(), ContentStoreError>; + async fn write_unchecked(&self, cid: &Cid, bytes: B) -> Result<(), ContentStoreError> + where + B: AsRef<[u8]> + Into> + Send; + // TODO: Allow the caller to own the buf, for mutation of buf. + // async fn read_unchecked_vec(&self, cid: &Cid) -> Result, ContentStoreError>; } #[async_trait] impl ContentStore for Arc @@ -30,7 +40,17 @@ where async fn read_unchecked(&self, cid: &Cid) -> Result { self.deref().read_unchecked(cid).await } - async fn write_unchecked(&self, cid: &Cid, bytes: Vec) -> Result<(), ContentStoreError> { + async fn write_unchecked(&self, cid: &Cid, bytes: B) -> Result<(), ContentStoreError> + where + B: AsRef<[u8]> + Into> + Send, + { self.deref().write_unchecked(cid, bytes).await } } +#[async_trait] +pub trait ContentStoreV2: Send + Sync { + async fn exists(&self, cid: &Cid) -> Result; + // NIT: This return type will probably need to change to work with mmap. + async fn read_unchecked(&self, cid: &Cid) -> Result, ContentStoreError>; + async fn write_unchecked(&self, cid: &Cid, bytes: Vec) -> Result<(), ContentStoreError>; +} diff --git a/fixity_store/src/contentid.rs b/fixity_store/src/contentid.rs index bc66a0f..be3d6a2 100644 --- a/fixity_store/src/contentid.rs +++ b/fixity_store/src/contentid.rs @@ -1,5 +1,9 @@ pub mod multihash_256; +use crate::{ + deser::{Deserialize, Serialize}, + type_desc::TypeDescription, +}; use multibase::Base; use multihash::MultihashDigest; use std::{ @@ -11,17 +15,24 @@ use thiserror::Error; pub const CID_LENGTH: usize = 34; -pub trait NewContentId: Clone + Sized + Send + Sync + Eq + Ord + Hash + Debug + Display { - type Hash: AsRef<[u8]>; +pub trait NewContentId: + Clone + Sized + Send + Sync + Eq + Ord + Hash + Debug + Display + 'static + TypeDescription +{ + type Hash<'a>: AsRef<[u8]>; /// Hash the given bytes and producing a content identifier. - fn hash>(buf: B) -> Self; + fn hash(buf: &[u8]) -> Self; /// Construct a content identifier from the given hash. - fn from_hash>(hash: H) -> Result; - fn as_hash(&self) -> &Self::Hash; - fn len(&self) -> usize { + fn from_hash(hash: Vec) -> Result; + fn as_hash(&self) -> Self::Hash<'_>; + fn size(&self) -> usize { self.as_hash().as_ref().len() } } +pub trait ContentIdDeser: NewContentId + Serialize + Deserialize {} +impl ContentIdDeser for T where + T: NewContentId + Serialize + Deserialize +{ +} #[derive(Error, Debug)] pub enum FromHashError { #[error("invalid length")] @@ -146,3 +157,50 @@ impl From for multihash::Code { } } } + +#[cfg(any(test, feature = "test"))] +pub mod test { + use super::{FromHashError, NewContentId}; + use multihash::MultihashDigest; + + // TODO: macro these impls. + + impl NewContentId for i32 { + type Hash<'a> = [u8; 4]; + fn hash(buf: &[u8]) -> Self { + let mhash = multihash::Code::Blake2s128.digest(buf.as_ref()); + let digest = &mhash.digest()[0..4]; + Self::from_be_bytes( + digest + .try_into() + .expect("Blake2s128 truncated to 4 bytes fits into a [u8; 4]"), + ) + } + fn from_hash(hash: Vec) -> Result { + let hash = Self::Hash::try_from(hash).map_err(|_| FromHashError::Length)?; + Ok(Self::from_be_bytes(hash)) + } + fn as_hash(&self) -> Self::Hash<'static> { + self.to_be_bytes() + } + } + impl NewContentId for i64 { + type Hash<'a> = [u8; 8]; + fn hash(buf: &[u8]) -> Self { + let mhash = multihash::Code::Blake2s128.digest(buf.as_ref()); + let digest = &mhash.digest()[0..8]; + Self::from_be_bytes( + digest + .try_into() + .expect("Blake2s128 truncated to 8 bytes fits into a [u8; 8]"), + ) + } + fn from_hash(hash: Vec) -> Result { + let hash = Self::Hash::try_from(hash).map_err(|_| FromHashError::Length)?; + Ok(Self::from_be_bytes(hash)) + } + fn as_hash(&self) -> Self::Hash<'static> { + self.to_be_bytes() + } + } +} diff --git a/fixity_store/src/contentid/multihash_256.rs b/fixity_store/src/contentid/multihash_256.rs index 12e8ded..91f8aad 100644 --- a/fixity_store/src/contentid/multihash_256.rs +++ b/fixity_store/src/contentid/multihash_256.rs @@ -1,9 +1,14 @@ +use crate::type_desc::{TypeDescription, ValueDesc}; + use super::{FromHashError, NewContentId}; use multibase::Base; use multihash::MultihashDigest; #[cfg(feature = "serde")] use serde_big_array::BigArray; -use std::fmt::{Debug, Display}; +use std::{ + any::TypeId, + fmt::{Debug, Display}, +}; const MULTIHASH_256_LEN: usize = 34; const MULTIBASE_ENCODE: Base = Base::Base58Btc; @@ -17,8 +22,8 @@ pub struct Multihash256( #[cfg_attr(feature = "rkyv", serde(with = "BigArray"))] [u8; MULTIHASH_256_LEN], ); impl NewContentId for Multihash256 { - type Hash = [u8; MULTIHASH_256_LEN]; - fn hash>(buf: B) -> Self { + type Hash<'a> = &'a [u8; MULTIHASH_256_LEN]; + fn hash(buf: &[u8]) -> Self { let hash = multihash::Code::Blake3_256.digest(buf.as_ref()).to_bytes(); match Self::from_hash(hash) { Ok(cid) => cid, @@ -27,17 +32,31 @@ impl NewContentId for Multihash256 { }, } } - fn from_hash>(hash: H) -> Result { + fn from_hash(hash: Vec) -> Result { hash.try_into() .map_or(Err(FromHashError::Length), |hash| Ok(Self(hash))) } - fn as_hash(&self) -> &Self::Hash { + fn as_hash(&self) -> Self::Hash<'_> { &self.0 } - fn len(&self) -> usize { + fn size(&self) -> usize { self.0.len() } } +impl TypeDescription for Multihash256 { + fn type_desc() -> ValueDesc { + // TODO: use the inner TypeDescription impls .. + ValueDesc::Struct { + name: "Multihash256", + type_id: TypeId::of::(), + values: vec![ValueDesc::Array { + value: Box::new(ValueDesc::Number(TypeId::of::())), + type_id: TypeId::of::<::Hash<'_>>(), + len: MULTIHASH_256_LEN, + }], + } + } +} impl Debug for Multihash256 { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // PERF: Can we fork multibase to make a non-allocating display? I would think diff --git a/fixity_store/src/deser_store.rs b/fixity_store/src/deser_store.rs index a3057b9..4ad4c5b 100644 --- a/fixity_store/src/deser_store.rs +++ b/fixity_store/src/deser_store.rs @@ -1,3 +1,6 @@ +pub mod deser_store_v3; +//pub mod deser_store_v4; + use crate::{ content_store::ContentStore, contentid::NewContentId, @@ -41,14 +44,21 @@ where where T: Serialize + Send + Sync, { - todo!() + let buf = t.serialize().unwrap(); + let cid = ::hash(buf.as_ref()); + self.write_unchecked(&cid, buf.into()).await.unwrap(); + Ok(cid) } async fn put_with_cids(&self, t: &T, cids_buf: &mut Vec) -> Result<(), StoreError> where T: Serialize + Send + Sync, { - todo!() + let buf = t.serialize().unwrap(); + let cid = ::hash(buf.as_ref()); + self.write_unchecked(&cid, buf.into()).await.unwrap(); + cids_buf.push(cid); + Ok(()) } } #[derive(Clone, PartialEq, Eq)] diff --git a/fixity_store/src/deser_store/deser_store_v3.rs b/fixity_store/src/deser_store/deser_store_v3.rs new file mode 100644 index 0000000..dc5acb8 --- /dev/null +++ b/fixity_store/src/deser_store/deser_store_v3.rs @@ -0,0 +1,120 @@ +use crate::{ + content_store::{ContentStore, ContentStoreError}, + contentid::NewContentId, + deser::DeserError, + store::StoreError, +}; +use async_trait::async_trait; +use std::{ + fmt::{Debug, Display}, + marker::PhantomData, + sync::Arc, +}; + +pub struct Owned; + +pub trait DeserializeRef: Sized { + type Ref<'a>; +} + +#[async_trait] +pub trait DeserStoreV3: ContentStore { + type DeserRepr: Deserialize + where + T: DeserializeRef; + async fn get(&self, cid: &Cid) -> Result, StoreError> + where + T: DeserializeRef, + Self::DeserRepr: Deserialize; + // async fn put(&self, t: &T) -> Result + // where + // Self: Serialize, + // T: Send + Sync; + // async fn put_with_cids(&self, t: &T, cids_buf: &mut Vec) -> Result<(), StoreError> + // where + // Self: Serialize, + // T: Send + Sync; +} +// NIT: I wanted to remove the generic params such that any implementation of `Deserialize` would +// only have a single output type to further remove ambiguity. However i was getting type bound +// overflows and so i backed off that decision. Maybe it can be revisited in the future, but +// hopefully it's mostly unnecessary. +pub trait Deserialize +where + T: DeserializeRef, +{ + fn deserialize_owned(buf: &[u8]) -> Result; + fn deserialize_ref(buf: &[u8]) -> Result, DeserError>; +} +pub trait Serialize { + fn serialize(&self, t: &T) -> Result, DeserError>; +} + +pub struct DeserStoreImpl { + _d: PhantomData, + store: Store, +} +impl From for DeserStoreImpl { + fn from(store: S) -> Self { + Self { + _d: PhantomData, + store, + } + } +} +#[async_trait] +impl ContentStore for DeserStoreImpl +where + Cid: NewContentId, + S: ContentStore, + D: Send + Sync, +{ + type Bytes = S::Bytes; + async fn exists(&self, cid: &Cid) -> Result { + self.store.exists(cid).await + } + async fn read_unchecked(&self, cid: &Cid) -> Result { + self.store.read_unchecked(cid).await + } + async fn write_unchecked(&self, cid: &Cid, bytes: B) -> Result<(), ContentStoreError> + where + B: AsRef<[u8]> + Into> + Send, + { + self.store.write_unchecked(cid, bytes).await + } +} +#[async_trait] +impl DeserStoreV3 for DeserStoreImpl +where + Cid: NewContentId, + S: ContentStore, +{ + type DeserRepr, Repr> = DeserReprImpl; + // type DeserRepr, Repr> = DeserReprImpl; + async fn get(&self, cid: &Cid) -> Result, StoreError> + where + T: DeserializeRef, + Self::DeserRepr: Deserialize, + { + todo!() + } +} +pub struct DeserJson; +#[derive(Clone, PartialEq, Eq)] +pub struct DeserReprImpl { + buf: Arc<[u8]>, + _d: PhantomData, + _t: PhantomData, + _r: PhantomData, +} +impl Deserialize for DeserReprImpl +where + T: DeserializeRef, +{ + fn deserialize_owned(buf: &[u8]) -> Result { + todo!() + } + fn deserialize_ref(buf: &[u8]) -> Result, DeserError> { + todo!() + } +} diff --git a/fixity_store/src/deser_store/deser_store_v4.rs b/fixity_store/src/deser_store/deser_store_v4.rs new file mode 100644 index 0000000..d311aa2 --- /dev/null +++ b/fixity_store/src/deser_store/deser_store_v4.rs @@ -0,0 +1,50 @@ +use crate::{ + content_store::ContentStore, + contentid::NewContentId, + deser::{DeserError, Serialize}, + store::StoreError, +}; +use async_trait::async_trait; +use std::{marker::PhantomData, sync::Arc}; + +pub trait Deserialize: Sized { + type Ref<'a>; + fn deserialize_owned(buf: &[u8]) -> Result; + fn deserialize_ref(buf: &[u8]) -> Result, DeserError>; +} + +/// An extension trait for [`ContentStore`]. +#[async_trait] +pub trait DeserExt: ContentStore { + async fn get_unchecked(&self, cid: &Cid) -> Result, StoreError> + where + T: Deserialize; + async fn get_owned_unchecked(&self, cid: &Cid) -> Result + where + T: Deserialize; + async fn put(&self, t: &T) -> Result + where + T: Serialize + Send + Sync; + async fn put_with_cids(&self, t: &T, cids_buf: &mut Vec) -> Result<(), StoreError> + where + T: Serialize + Send + Sync; +} +#[derive(Clone, PartialEq, Eq)] +pub struct DeserBuf { + buf: B, + _t: PhantomData, +} +impl DeserBuf +where + B: AsRef<[u8]>, + T: Deserialize, +{ + pub fn buf_to_owned(&self) -> Result { + let value = T::deserialize_owned(self.buf.as_ref()).unwrap(); + Ok(value) + } + pub fn buf_to_ref(&self) -> Result, StoreError> { + let value = T::deserialize_ref(self.buf.as_ref()).unwrap(); + Ok(value) + } +} diff --git a/fixity_store/src/lib.rs b/fixity_store/src/lib.rs index f00d5f4..ead0d34 100644 --- a/fixity_store/src/lib.rs +++ b/fixity_store/src/lib.rs @@ -8,10 +8,12 @@ pub mod store; pub use contentid::ContentHasher; pub mod replicaid; pub use meta::Meta; +pub mod type_desc; pub use storage::{ContentStorage, MutStorage}; pub use store::Store; pub mod content_store; pub mod deser_store; +pub mod meta_store; pub mod mut_store; pub mod stores; diff --git a/fixity_store/src/meta_store.rs b/fixity_store/src/meta_store.rs new file mode 100644 index 0000000..8c90926 --- /dev/null +++ b/fixity_store/src/meta_store.rs @@ -0,0 +1,288 @@ +use std::fmt::Display; + +use crate::{ + contentid::NewContentId, + mut_store::{MutStore, MutStoreError}, + replicaid::NewReplicaId, + storage::StorageError, +}; +use async_trait::async_trait; +use multibase::Base; +use thiserror::Error; + +#[async_trait] +pub trait MetaStore: Send + Sync { + /// List all Replicas under a specific Remote. + async fn replicas(&self, remote: &str) -> Result, MetaStoreError>; + /// List the heads for the provided Replicas. + async fn heads( + &self, + remote: &str, + replicas: &[Rid], + ) -> Result, MetaStoreError>; + async fn set_head( + &self, + remote: &str, + rid: &Rid, + head: Cid, + ) -> Result<(), MetaStoreError>; + // async fn set_remote_config( + // &self, + // remote: &str, + // config: RemoteConfig, + // ) -> Result<(), MetaStoreError>; +} +async fn get_cid_from_path( + ms: &MS, + remote: &str, + repo: &str, + branch: &str, + rid: &Rid, + path: &str, +) -> Result> { + let cid_value = ms.get(&path).await.map_err(|err| match err { + MutStoreError::NotFound => MetaStoreError::NotFound, + err => MetaStoreError::Storage { + remote: Some(String::from(remote)), + repo: Some(String::from(repo)), + branch: Some(String::from(branch)), + rid: Some(rid.clone()), + cid: None, + err, + }, + })?; + let encoded_cid = + std::str::from_utf8(cid_value.as_ref()).map_err(|err| MetaStoreError::Cid { + remote: Some(String::from(remote)), + repo: Some(String::from(repo)), + branch: Some(String::from(branch)), + rid: Some(rid.clone()), + message: format!("verifying cid utf8: {}", err), + })?; + let (_, head_bytes) = multibase::decode(encoded_cid).map_err(|err| MetaStoreError::Cid { + remote: Some(String::from(remote)), + repo: Some(String::from(repo)), + branch: Some(String::from(branch)), + rid: Some(rid.clone()), + message: format!("decoding head cid: {}", err), + })?; + Cid::from_hash(head_bytes).map_err(|_| MetaStoreError::Cid { + remote: Some(String::from(remote)), + repo: Some(String::from(repo)), + branch: Some(String::from(branch)), + rid: Some(rid.clone()), + message: String::from("creating cid from head bytes"), + }) +} +#[derive(Error, Debug)] +pub enum MetaStoreError { + #[error("resource not found")] + NotFound, + #[error("invalid replica id: {message}")] + Rid { + remote: Option, + repo: Option, + branch: Option, + // TODO: convert to rid error. + message: String, + }, + #[error("cid: {message}")] + Cid { + remote: Option, + repo: Option, + branch: Option, + rid: Option, + // TODO: convert to cid error. + message: String, + }, + #[error("storage {}/{}/{}, : {err}", + .remote.clone().unwrap_or(String::from("")), + .repo.clone().unwrap_or(String::from("")), + .branch.clone().unwrap_or(String::from("")), + // .rid.unwrap_or(String::from("")), + )] + Storage { + remote: Option, + repo: Option, + branch: Option, + rid: Option, + cid: Option, + err: MutStoreError, + }, + #[error("{}/{}/{}, {}: {message}", + .remote.clone().unwrap_or(String::from("")), + // .repo.unwrap_or(String::from("")), + DisplayOption(.repo), + .branch.clone().unwrap_or(String::from("")), + // .rid.map(|rid| format!("{}")) + DisplayOption(.repo) + )] + Other { + remote: Option, + repo: Option, + branch: Option, + rid: Option, + cid: Option, + message: String, + }, +} +#[derive(Debug)] +struct DisplayOption<'a, T: Display>(pub &'a Option); +impl Display for DisplayOption<'_, T> +where + T: Display, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(t) = self.0.as_ref() { + write!(f, "{}", t) + } else { + Ok(()) + } + } +} +#[derive(Debug)] +pub struct Log { + pub remote: String, + pub repo: String, + pub replica: Rid, + pub head: Cid, + pub message: String, +} +const MUT_CID_RID_ENCODING: Base = Base::Base32HexLower; +// NIT: This should probably be a wrapper type, rather than a blanket +// impl. As the blanket impl is focused on implicitly being filesystem +// and this seems wrong to assume in all cases. +// +// A simple `MetaOverMut(pub T)` wrapper struct would help make this +// explicit. +// +// IMPORTANT: This impl is not using any kind of escaping, so `/` +// breaks the whole thing at the moment. POC impl, beware. +#[async_trait] +impl MetaStore for T +where + T: MutStore, + Rid: NewReplicaId, + Cid: NewContentId, +{ + async fn replicas(&self, remote: &str) -> Result, MetaStoreError> { + let remote_path = format!("{remote}/"); + let paths = self.list::<_, &str>(&remote_path, None).await.unwrap(); + let mut items = Vec::new(); + for path in paths { + let encoded_rid = path.strip_prefix(&paths).unwrap(); + let (_, rid_bytes) = multibase::decode(&encoded_rid).unwrap(); + let rid = Rid::from_buf(rid_bytes).unwrap(); + items.push(rid); + } + Ok(items) + } + async fn heads(&self, remote: &str) -> Result, MetaStoreError> { + let remote_path = format!("{remote}/"); + let paths = self.list::<_, &str>(&remote_path, None).await.unwrap(); + let mut items = Vec::new(); + for path in paths { + let encoded_rid = path.strip_prefix(&paths).unwrap(); + let (_, rid_bytes) = + multibase::decode(&encoded_rid).map_err(|err| MetaStoreError::Rid { + remote: Some(String::from(remote)), + repo: Some(String::from(repo)), + branch: Some(String::from(branch)), + message: format!("decoding rid: {}", err), + })?; + let rid = Rid::from_buf(rid_bytes).map_err(|_| MetaStoreError::Rid { + remote: Some(String::from(remote)), + repo: Some(String::from(repo)), + branch: Some(String::from(branch)), + message: String::from("creating rid"), + })?; + let head = get_cid_from_path(self, remote, repo, branch, &rid, &path).await?; + items.push((rid, head)); + } + Ok(items) + } + async fn head( + &self, + remote: &str, + repo: &str, + branch: &str, + rid: &Rid, + ) -> Result> { + let replica = multibase::encode(MUT_CID_RID_ENCODING, rid.as_buf()); + let path = format!("{remote}/{repo}/{branch}/{replica}"); + get_cid_from_path(self, remote, repo, branch, rid, &path).await + } + async fn set_head( + &self, + remote: &str, + repo: &str, + branch: &str, + rid: &Rid, + head: Cid, + ) -> Result<(), MetaStoreError> { + let replica = multibase::encode(MUT_CID_RID_ENCODING, rid.as_buf()); + let head = multibase::encode(MUT_CID_RID_ENCODING, head.as_hash()); + let path = format!("{remote}/{repo}/{branch}/{replica}"); + self.put(path, head) + .await + .map_err(|err| MetaStoreError::Storage { + remote: Some(String::from(remote)), + repo: Some(String::from(repo)), + branch: Some(String::from(branch)), + rid: Some(rid.clone()), + cid: None, + err, + })?; + Ok(()) + } +} +async fn list_dirs( + storage: &S, + base_path: String, +) -> Result, MutStoreError> { + let paths = storage.list::<_, &str>(&base_path, Some("/")).await?; + let dirs = paths + .into_iter() + .filter_map(|path| { + // silently dropping items in the repo that may not be great, but we can't + // fail either since users could make the repo dirty. So in general, ignore. + let mut dir = path.strip_prefix(&base_path)?.to_owned(); + // each dir ends in a delim, so drop it. + let _ = dir.pop(); + Some(dir) + }) + .collect::>(); + Ok(dirs) +} +#[cfg(test)] +pub mod meta_mut_storage { + use super::*; + use crate::{contentid::Cid, replicaid::Rid, storage::Memory}; + use rstest::*; + + #[rstest] + #[case::test_storage(Memory::<_>::default())] + #[tokio::test] + async fn basic>(#[case] s: S) { + s.set_head("remote", "repo", "branch", &1, 10) + .await + .unwrap(); + s.set_head("remote", "repo", "branch", &2, 20) + .await + .unwrap(); + assert_eq!(s.head("remote", "repo", "branch", &1).await.unwrap(), 10,); + assert_eq!(s.head("remote", "repo", "branch", &2).await.unwrap(), 20,); + assert_eq!( + s.heads("remote", "repo", "branch").await.unwrap(), + vec![(10, 1), (20, 20)], + ); + s.set_head("remote", "repo", "branch-foo", &2, 20) + .await + .unwrap(); + assert_eq!( + s.branches("remote", "repo").await.unwrap(), + vec!["branch", "branch-foo"] + ); + assert_eq!(s.repos("remote").await.unwrap(), vec!["repo"]); + } +} diff --git a/fixity_store/src/replicaid.rs b/fixity_store/src/replicaid.rs index 98ece9c..ebfcd5f 100644 --- a/fixity_store/src/replicaid.rs +++ b/fixity_store/src/replicaid.rs @@ -1,6 +1,40 @@ -use std::fmt::{Debug, Display}; +use crate::{ + contentid::{ContentId, NewContentId}, + deser::{Deserialize, Serialize}, + type_desc::{TypeDescription, ValueDesc}, +}; +use std::{ + any::TypeId, + fmt::{Debug, Display}, + hash::Hash, +}; +use thiserror::Error; -use crate::contentid::ContentId; +pub trait NewNewReplicaId { + type Rid: NewReplicaId; + fn new(&mut self) -> Self::Rid; +} +pub trait NewReplicaId: + Clone + Sized + Send + Sync + Eq + Ord + Hash + Debug + Display + 'static + TypeDescription +{ + type Buf<'a>: AsRef<[u8]>; + /// Construct a replica identifier from the given buffer. + fn from_buf(buf: Vec) -> Result; + fn as_buf(&self) -> Self::Buf<'_>; + fn len(&self) -> usize { + self.as_buf().as_ref().len() + } +} +pub trait ReplicaIdDeser: NewReplicaId + Serialize + Deserialize {} +impl ReplicaIdDeser for T where + T: NewReplicaId + Serialize + Deserialize +{ +} +#[derive(Error, Debug)] +pub enum FromBufError { + #[error("invalid length")] + Length, +} // TODO: Remove bounds, impl methods manually - so ReplicaId doesn't impl ContentId, // since they have no direct relation. @@ -25,6 +59,33 @@ impl ContentId for Rid { self.0.len() } } +impl NewReplicaId for Rid { + type Buf<'a> = &'a [u8; N]; + fn from_buf(buf: Vec) -> Result { + let inner = <[u8; N]>::try_from(buf).map_err(|_| FromBufError::Length)?; + Ok(Self(inner)) + } + fn as_buf(&self) -> Self::Buf<'_> { + &self.0 + } + fn len(&self) -> usize { + self.0.len() + } +} +impl TypeDescription for Rid { + fn type_desc() -> ValueDesc { + // TODO: use the inner TypeDescription impls .. + ValueDesc::Struct { + name: "Rid", + type_id: TypeId::of::(), + values: vec![ValueDesc::Array { + value: Box::new(ValueDesc::Number(TypeId::of::())), + type_id: TypeId::of::<::Buf<'_>>(), + len: N, + }], + } + } +} impl Default for Rid where [u8; N]: Default, @@ -66,6 +127,49 @@ impl PartialEq<[u8; N]> for Rid { &self.0 == other } } +#[cfg(any(test, feature = "test"))] +mod test_rids { + //! Test focused `ReplicaId` implementations over integers and conversions from integers + //! for `Rid`. + //! + //! ## Endian + //! Note that all integer representations use Big Endian to ensure stable representations + //! and thus Content IDs when written to test stores. + use super::{FromBufError, NewReplicaId, Rid}; + + // TODO: macro these impls. + + impl NewReplicaId for i32 { + type Buf<'a> = [u8; 4]; + fn from_buf(buf: Vec) -> Result { + let buf = Self::Buf::try_from(buf).map_err(|_| FromBufError::Length)?; + Ok(Self::from_be_bytes(buf)) + } + fn as_buf(&self) -> Self::Buf<'static> { + self.to_be_bytes() + } + } + impl NewReplicaId for i64 { + type Buf<'a> = [u8; 8]; + fn from_buf(buf: Vec) -> Result { + let buf = Self::Buf::try_from(buf).map_err(|_| FromBufError::Length)?; + Ok(Self::from_be_bytes(buf)) + } + fn as_buf(&self) -> Self::Buf<'static> { + self.to_be_bytes() + } + } + impl From for Rid<4> { + fn from(i: i32) -> Self { + Self::from(i.to_be_bytes()) + } + } + impl From for Rid<8> { + fn from(i: i64) -> Self { + Self::from(i.to_be_bytes()) + } + } +} #[cfg(feature = "rkyv")] mod rkyv_impls { use super::*; diff --git a/fixity_store/src/store.rs b/fixity_store/src/store.rs index cd117f1..1e3b3a3 100644 --- a/fixity_store/src/store.rs +++ b/fixity_store/src/store.rs @@ -49,6 +49,12 @@ pub enum StoreError { NotFound, #[error("resource not modified")] NotModified, + // TODO: move to merge error type. + #[error("type cannot be merged")] + UnmergableType, + // TODO: move to diff error type. + #[error("type cannot be diff'd")] + UndiffableType, #[error("storage: {0}")] Storage(StorageError), } @@ -111,9 +117,10 @@ impl StoreImpl { #[async_trait] impl Store for StoreImpl where - // FIXME: ... What? CID_LENGTH stopped working in the const Param here - // as of beta-2022-09-20. Need to report this if it's still around - // whenever i clean this code up.. something is fishy. + // FIXME: ... What? CID_LENGTH stopped working in the + // const Param here as of beta-2022-09-20. Need to + // report this if it's still around whenever i clean + // this code up.. something is fishy. S: ContentStorage>, D: Send + Sync, H: ContentHasher>, diff --git a/fixity_store/src/stores/memory.rs b/fixity_store/src/stores/memory.rs index 3912a0f..3729441 100644 --- a/fixity_store/src/stores/memory.rs +++ b/fixity_store/src/stores/memory.rs @@ -18,6 +18,12 @@ pub struct Memory { bytes: Mutex>>, mut_: Mutex>>, } +#[cfg(any(test, feature = "test"))] +impl Memory { + pub fn test() -> Self { + Self::default() + } +} #[async_trait] impl ContentStore for Memory where @@ -32,9 +38,12 @@ where let buf = lock.get(cid).unwrap(); Ok(Arc::clone(&buf)) } - async fn write_unchecked(&self, cid: &Cid, bytes: Vec) -> Result<(), ContentStoreError> { + async fn write_unchecked(&self, cid: &Cid, bytes: B) -> Result<(), ContentStoreError> + where + B: AsRef<[u8]> + Into> + Send, + { let mut lock = self.bytes.lock().unwrap(); - let _ = lock.insert(cid.clone(), Arc::from(bytes)); + let _ = lock.insert(cid.clone(), bytes.into()); Ok(()) } } diff --git a/fixity_store/src/type_desc.rs b/fixity_store/src/type_desc.rs new file mode 100644 index 0000000..01ae690 --- /dev/null +++ b/fixity_store/src/type_desc.rs @@ -0,0 +1,137 @@ +use std::{ + any::TypeId, + collections::{BTreeMap, BTreeSet}, + fmt::Display, +}; + +pub trait TypeDescription { + fn type_desc() -> ValueDesc; +} +// TODO: a more concise Debug impl. +#[derive(Debug)] +pub enum ValueDesc { + Number(TypeId), + String(TypeId), + Ptr(Box), + Tuple { + type_id: TypeId, + values: Vec, + }, + Vec { + value: Box, + type_id: TypeId, + }, + Array { + value: Box, + type_id: TypeId, + len: usize, + }, + Struct { + name: &'static str, + type_id: TypeId, + values: Vec, + }, +} +impl ValueDesc { + pub fn of() -> ValueDesc { + T::type_desc() + } + /// An equality check for the inner type values of a given type. + pub fn type_eq(&self, _other: &Self) -> bool { + todo!() + } +} +// NIT: might need a prettier impl than Debug for Display. Depends on what +// the Debug display ends up being. +impl Display for ValueDesc { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} +impl TypeDescription for u32 { + fn type_desc() -> ValueDesc { + ValueDesc::Number(TypeId::of::()) + } +} +impl TypeDescription for i32 { + fn type_desc() -> ValueDesc { + ValueDesc::Number(TypeId::of::()) + } +} +impl TypeDescription for u64 { + fn type_desc() -> ValueDesc { + ValueDesc::Number(TypeId::of::()) + } +} +impl TypeDescription for i64 { + fn type_desc() -> ValueDesc { + ValueDesc::Number(TypeId::of::()) + } +} +impl TypeDescription for String { + fn type_desc() -> ValueDesc { + ValueDesc::String(TypeId::of::()) + } +} +// TODO: Make Generic over tuple length +impl TypeDescription for (T1, T2) +where + // NIT: Why is static needed? :confused: + T1: TypeDescription + 'static, + T2: TypeDescription + 'static, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Tuple { + type_id: TypeId::of::(), + values: vec![T1::type_desc(), T2::type_desc()], + } + } +} +impl TypeDescription for Vec +where + T: TypeDescription, +{ + fn type_desc() -> ValueDesc { + todo!() + } +} +impl TypeDescription for Option +where + // NIT: Why is static needed? :confused: + T: TypeDescription + 'static, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Struct { + name: "Option", + type_id: TypeId::of::(), + values: vec![ValueDesc::of::()], + } + } +} +impl TypeDescription for BTreeMap +where + // NIT: Why is static needed? :confused: + K: TypeDescription + 'static, + V: TypeDescription + 'static, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Struct { + name: "BTreeMap", + type_id: TypeId::of::(), + values: vec![ValueDesc::of::(), ValueDesc::of::()], + } + } +} +impl TypeDescription for BTreeSet +where + // NIT: Why is static needed? :confused: + T: TypeDescription + 'static, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Struct { + name: "BTreeSet", + type_id: TypeId::of::(), + values: vec![ValueDesc::of::()], + } + } +} diff --git a/flake.lock b/flake.lock index 798a581..e0f97fe 100644 --- a/flake.lock +++ b/flake.lock @@ -75,11 +75,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1668912095, - "narHash": "sha256-3oGeJgeA4+8Wj2XZnbZKJYHoZziyJl36ZGU49I2VC8U=", + "lastModified": 1670034122, + "narHash": "sha256-EqmuOKucPWtMvCZtHraHr3Q3bgVszq1x2PoZtQkUuEk=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "5200195aa2a0cef1becc2ba6ff61cba65e1f90fe", + "rev": "a0d5773275ecd4f141d792d3a0376277c0fc0b65", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 9bbd2b4..1ae2f14 100644 --- a/flake.nix +++ b/flake.nix @@ -24,7 +24,7 @@ gcc rust-analyzer # using a hardcoded rustfmt version to support nightly rustfmt features. - rust-bin.nightly."2022-11-20".rustfmt + rust-bin.nightly."2022-12-01".rustfmt rust-toolchain ]; }; diff --git a/structs/Cargo.toml b/structs/Cargo.toml index bb94898..74c4ac8 100644 --- a/structs/Cargo.toml +++ b/structs/Cargo.toml @@ -23,5 +23,6 @@ serde = { version = "1.0", features = ["derive"], optional = true } [dev-dependencies] fixity_core = { path = "../core" } +fixity_store = { path = "../fixity_store", features = ["test"] } tokio = { version = "1.17", features = ["test-util", "macros"] } rstest = "0.12" diff --git a/structs/src/gcounter/gcounter_v2.rs b/structs/src/gcounter/gcounter_v2.rs index c5c0543..45394b3 100644 --- a/structs/src/gcounter/gcounter_v2.rs +++ b/structs/src/gcounter/gcounter_v2.rs @@ -1,57 +1,195 @@ use super::GCounterInt; use async_trait::async_trait; use fixity_store::{ - container::NewContainer, - deser::Rkyv, - replicaid::Rid, + container::{ContainerRef, ContainerRefInto, NewContainer}, + contentid::NewContentId, + deser::{Deserialize, Rkyv, Serialize}, + deser_store::DeserStore, + replicaid::NewReplicaId, store::{Repr, StoreError}, - Store, + type_desc::{TypeDescription, ValueDesc}, }; +use std::any::TypeId; -pub struct GCounter(Vec<(Rid, GCounterInt)>); +type IVec = Vec<(Rid, GCounterInt)>; +#[derive(Debug)] +pub struct GCounter(IVec); +impl GCounter { + pub fn new() -> Self { + Self(IVec::default()) + } +} +impl GCounter { + pub fn inc(&mut self, rid: Rid) { + let self_ = &mut self.0; + let idx_result = self_.binary_search_by_key(&&rid, |(rid, _)| rid); + match idx_result { + Ok(idx) => { + let (_, count) = self_ + .get_mut(idx) + .expect("index returned by `binary_search`"); + *count += 1; + }, + Err(idx) => self_.insert(idx, (rid, 1)), + } + debug_assert!(self_.windows(2).all(|w| w[0] <= w[1])); + } + pub fn value(&self) -> GCounterInt { + // TODO: cache the result. + self.0.iter().map(|(_, i)| i).sum() + } + pub fn get(&self, rid: &Rid) -> Option { + let i = self.0.binary_search_by_key(&rid, |(rid, _)| rid).ok()?; + let (_, count) = self.0.get(i).expect("index returned by `binary_search`"); + Some(*count) + } +} +impl TypeDescription for GCounter +where + Rid: NewReplicaId, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Struct { + name: "GCounter", + type_id: TypeId::of::(), + values: vec![ValueDesc::of::>()], + } + } +} #[async_trait] -impl<'s, const N: usize, S> NewContainer<'s, S> for GCounter +impl NewContainer for GCounter where - S: Store, + Cid: NewContentId, + Rid: NewReplicaId, + IVec: Serialize + Deserialize, { - type Ref = GCounterRef; - async fn open( - store: &'s S, - cid: &::Cid, - ) -> Result { - todo!() + fn deser_type_desc() -> ValueDesc { + Self::type_desc() } - async fn open_ref( - store: &'s S, - cid: &::Cid, - ) -> Result { - todo!() + fn new_container>(_: &S) -> Self { + Self::new() + } + async fn open>(store: &S, cid: &Cid) -> Result { + let repr = store.get::>(cid).await?; + let inner = repr.repr_to_owned()?; + Ok(Self(inner)) } - async fn save( + async fn save>(&mut self, store: &S) -> Result { + store.put::>(&self.0).await + } + async fn save_with_cids>( &mut self, - store: &'s S, - ) -> Result<::Cid, fixity_store::store::StoreError> { - todo!() + store: &S, + cids_buf: &mut Vec, + ) -> Result<(), StoreError> { + store.put_with_cids::>(&self.0, cids_buf).await } - async fn save_with_cids( + async fn merge>( &mut self, store: &S, - cids_buf: &mut Vec<::Cid>, - ) -> Result<(), fixity_store::store::StoreError> { + other: &Cid, + ) -> Result<(), StoreError> { + let other = { + let repr = store.get::>(other).await?; + repr.repr_to_owned()? + }; + let mut start_idx = 0; + for (other_rid, other_value) in other { + if start_idx >= self.0.len() { + self.0.push((other_rid, other_value)); + continue; + } + // Assume both are sorted, nearby debug_assert helps validate. + let idx = self.0[start_idx..].binary_search_by_key(&&other_rid, |(rid, _)| rid); + let idx = match idx { + Ok(idx) => { + let (_, self_value) = &mut self.0[idx]; + if other_value > *self_value { + *self_value = other_value; + } + idx + }, + Err(idx) => { + self.0.insert(idx, (other_rid, other_value)); + idx + }, + }; + start_idx = idx + 1; + } + debug_assert!(self.0.windows(2).all(|w| w[0] <= w[1])); + Ok(()) + } + async fn diff>( + &mut self, + _store: &S, + _other: &Cid, + ) -> Result { todo!() } } - +#[async_trait] +impl ContainerRef for GCounter +where + Cid: NewContentId, + Rid: NewReplicaId, + IVec: Serialize + Deserialize, +{ + type Ref = GCounterRef; + type DiffRef = GCounter; + async fn open_ref>( + _store: &S, + _cid: &Cid, + ) -> Result { + todo!() + } + async fn diff_ref>( + &mut self, + _store: &S, + _other: &Cid, + ) -> Result { + todo!() + } +} +impl Default for GCounter { + fn default() -> Self { + Self::new() + } +} // TODO: Convert Vec back to BTree for faster lookups? This was made a Vec // due to difficulties in looking up `ArchivedRid`. // Once `ArchivedRid` and `Rid` are unified into a single Rkyv-friendly type, // in theory we can go back to a Rid. -pub struct GCounterRef(Repr, GCounterInt)>, D>); - -impl TryInto> for GCounterRef { +pub struct GCounterRef(Repr, D>); +impl ContainerRefInto> for GCounterRef { type Error = StoreError; - fn try_into(self) -> Result, Self::Error> { + fn container_ref_into(self) -> Result, Self::Error> { todo!() } } +#[cfg(test)] +pub mod test { + use fixity_store::stores::memory::Memory; + + use super::*; + #[tokio::test] + async fn poc() { + let store = Memory::test(); + let mut a = GCounter::default(); + a.inc(1); + assert_eq!(a.value(), 1); + a.inc(1); + a.inc(0); + assert_eq!(a.value(), 3); + let mut b = GCounter::default(); + b.inc(1); + b.inc(1); + let b_cid = b.save(&store).await.unwrap(); + a.merge(&store, &b_cid).await.unwrap(); + assert_eq!(a.value(), 3); + b.inc(1); + let b_cid = b.save(&store).await.unwrap(); + a.merge(&store, &b_cid).await.unwrap(); + assert_eq!(a.value(), 4); + } +} diff --git a/structs/src/lib.rs b/structs/src/lib.rs index 213ef86..23658e1 100644 --- a/structs/src/lib.rs +++ b/structs/src/lib.rs @@ -2,6 +2,7 @@ pub mod appendlog; pub mod gcounter; pub mod prolly_tree; pub mod ptr; +pub mod replicalog; /* pub mod vclock { use crate::gcounter::GCounter; diff --git a/structs/src/replicalog.rs b/structs/src/replicalog.rs new file mode 100644 index 0000000..845186b --- /dev/null +++ b/structs/src/replicalog.rs @@ -0,0 +1,213 @@ +use std::{ + any::TypeId, + collections::{BTreeMap, BTreeSet}, +}; + +use async_trait::async_trait; +use fixity_store::{ + container::NewContainer, + contentid::NewContentId, + deser::{Deserialize, Rkyv, Serialize}, + deser_store::DeserStore, + replicaid::NewReplicaId, + store::StoreError, + type_desc::{TypeDescription, ValueDesc}, +}; + +/// An append only log of all actions for an individual Replica on a Repo. The HEAD of a repo for a +/// Replica. non-CRDT. +#[derive(Debug)] +pub struct ReplicaLog { + entry: Option>, +} +impl ReplicaLog { + pub fn new() -> Self { + Self::default() + } + pub fn set_commit(&mut self, cid: Cid) { + match self.entry.as_mut() { + Some(entry) => { + entry.branches.content = cid; + }, + None => { + self.entry = Some(LogEntry { + previous_entry: None, + branches: Branches { + active: "main".to_string(), + content: cid, + inactive: Default::default(), + }, + identity: Default::default(), + }) + }, + } + } +} +impl Default for ReplicaLog { + fn default() -> Self { + Self { + entry: Default::default(), + } + } +} +impl TypeDescription for ReplicaLog +where + Rid: NewReplicaId, + Cid: NewContentId, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Struct { + name: "ReplicaLog", + type_id: TypeId::of::(), + values: vec![ValueDesc::of::(), ValueDesc::of::()], + } + } +} +// // TODO: Placeholder for signature chain. Need to mock up +// // replica sig and identity sig. +// #[cfg_attr( +// feature = "rkyv", +// derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +// )] +// #[derive(Debug)] +// pub struct SignedLogEntry { +// pub entry: LogEntry, +// pub replica_sig: (), +// } +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] +#[derive(Debug)] +pub struct LogEntry { + pub previous_entry: Option, + /// A map of `BranchName: HEAD`s to track the various branches that this Replica tracks. + pub branches: Branches, + // /// An [`Identity`] pointer for this Replica. + // TODO: Move to a sub container, as this data doesn't need to be stored in with active data. + // pub identity: Option, + pub identity: Option>, +} +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] +#[derive(Debug)] +pub struct Branches { + /// The name of the active branch. + pub active: String, + /// The content id of the active branch. + pub content: Cid, + /// A map of `BranchName: HEAD`s to track the various branches that this Replica tracks. + // TODO: Move to a sub container, as this data doesn't need to be stored in with active data. + // pub branches: Option, + pub inactive: BTreeMap, +} +impl TypeDescription for Branches +where + Cid: NewContentId, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Struct { + name: "Branches", + type_id: TypeId::of::(), + values: vec![ + ValueDesc::of::(), + ValueDesc::of::(), + ValueDesc::of::>(), + ], + } + } +} +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] +#[derive(Debug)] +pub struct Identity { + pub claimed_replicas: BTreeSet, + // pub metadata: CrdtMap +} +impl TypeDescription for Identity +where + Rid: NewReplicaId, +{ + fn type_desc() -> ValueDesc { + ValueDesc::Struct { + name: "Identity", + type_id: TypeId::of::(), + values: vec![ValueDesc::of::>()], + } + } +} +#[async_trait] +impl NewContainer for ReplicaLog +where + Rid: NewReplicaId + Serialize + Deserialize, + Cid: NewContentId + Serialize + Deserialize, + LogEntry: Serialize + Deserialize, +{ + fn deser_type_desc() -> ValueDesc { + Self::type_desc() + } + fn new_container>(_: &S) -> Self { + Self::default() + } + async fn open>(store: &S, cid: &Cid) -> Result { + let repr = store.get::>(cid).await?; + let entry = repr.repr_to_owned()?; + Ok(Self { entry: Some(entry) }) + } + async fn save>(&mut self, store: &S) -> Result { + // TODO: standardized error, not initialized or something? + let entry = self.entry.as_mut().unwrap(); + let cid = store.put(&*entry).await?; + entry.previous_entry = Some(cid.clone()); + Ok(cid) + } + async fn save_with_cids>( + &mut self, + store: &S, + cids_buf: &mut Vec, + ) -> Result<(), StoreError> { + // TODO: standardized error, not initialized or something? + let entry = self.entry.as_mut().unwrap(); + store.put_with_cids(entry, cids_buf).await?; + // TODO: add standardized error for cid missing from buf, store did not write to cid buf + let cid = cids_buf.last().cloned().unwrap(); + entry.previous_entry = Some(cid); + Ok(()) + } + async fn merge>( + &mut self, + _store: &S, + _other: &Cid, + ) -> Result<(), StoreError> { + Err(StoreError::UnmergableType) + } + async fn diff>( + &mut self, + _store: &S, + _other: &Cid, + ) -> Result { + Err(StoreError::UndiffableType) + } +} +#[cfg(test)] +pub mod test { + use fixity_store::stores::memory::Memory; + + use super::*; + #[tokio::test] + async fn poc() { + let store = Memory::default(); + let mut rl = ReplicaLog::::default(); + rl.set_commit(1); + dbg!(&rl); + let cid = rl.save(&store).await.unwrap(); + dbg!(cid, &rl); + rl.set_commit(2); + let cid = rl.save(&store).await.unwrap(); + dbg!(cid, &rl); + } +}