From f1cd9590ade6bc68c73721bf16b1930d23da3088 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Fri, 10 May 2024 10:22:43 +0100 Subject: [PATCH 1/4] STORAGE: KVRead::iter --- fendermint/storage/src/im.rs | 62 ++++++++++++++++++++++++++++++++++- fendermint/storage/src/lib.rs | 6 ++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/fendermint/storage/src/im.rs b/fendermint/storage/src/im.rs index ee0fdd0e3..17df9eea5 100644 --- a/fendermint/storage/src/im.rs +++ b/fendermint/storage/src/im.rs @@ -2,13 +2,15 @@ // SPDX-License-Identifier: Apache-2.0, MIT use std::{ hash::Hash, + marker::PhantomData, mem, sync::{Arc, Mutex, MutexGuard}, thread, }; use crate::{ - Decode, Encode, KVRead, KVReadable, KVResult, KVStore, KVTransaction, KVWritable, KVWrite, + Decode, Encode, KVError, KVRead, KVReadable, KVResult, KVStore, KVTransaction, KVWritable, + KVWrite, }; /// Read-only mode. @@ -16,6 +18,7 @@ pub struct Read; /// Read-write mode. pub struct Write; +/// Immutable data multimap. type IDataMap = im::HashMap< ::Namespace, im::HashMap<::Repr, Arc<::Repr>>, @@ -152,6 +155,21 @@ where } Ok(None) } + + fn iter(&self, ns: &S::Namespace) -> impl Iterator> + where + S: Decode + Decode, + ::Repr: Ord + 'static, + { + if let Some(m) = self.data.get(ns) { + let mut items = m.iter().map(|(k, v)| (k, v.as_ref())).collect::>(); + items.sort_by(|a, b| a.0.cmp(b.0)); + + KVIter::::new(items) + } else { + KVIter::empty() + } + } } impl<'a, S: KVStore> KVWrite for Transaction<'a, S, Write> @@ -183,6 +201,48 @@ where } } +struct KVIter<'a, S: KVStore, K, V> { + items: Vec<(&'a S::Repr, &'a S::Repr)>, + next: usize, + phantom_v: PhantomData, + phantom_k: PhantomData, +} + +impl<'a, S, K, V> KVIter<'a, S, K, V> +where + S: KVStore, +{ + pub fn new(items: Vec<(&'a S::Repr, &'a S::Repr)>) -> Self { + KVIter { + items, + next: 0, + phantom_v: PhantomData, + phantom_k: PhantomData, + } + } + + pub fn empty() -> Self { + Self::new(vec![]) + } +} + +impl<'a, S, K, V> Iterator for KVIter<'a, S, K, V> +where + S: KVStore + Decode + Decode, +{ + type Item = Result<(K, V), KVError>; + + fn next(&mut self) -> Option { + if let Some((k, v)) = self.items.get(self.next) { + self.next += 1; + let kv = S::from_repr(k).and_then(|k| S::from_repr(v).map(|v| (k, v))); + Some(kv) + } else { + None + } + } +} + #[cfg(all(feature = "inmem", test))] mod tests { use std::borrow::Cow; diff --git a/fendermint/storage/src/lib.rs b/fendermint/storage/src/lib.rs index bb028edfd..f09870c24 100644 --- a/fendermint/storage/src/lib.rs +++ b/fendermint/storage/src/lib.rs @@ -66,6 +66,12 @@ pub trait KVRead { fn get(&self, ns: &S::Namespace, k: &K) -> KVResult> where S: Encode + Decode; + + fn iter(&self, ns: &S::Namespace) -> impl Iterator> + where + S: Decode + Decode, + S::Repr: Ord, + ::Repr: 'static; } /// Operations available on a write transaction. From 7c0ff43239895af8fd221d81a1aafad82bfcd794 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Fri, 10 May 2024 10:35:27 +0100 Subject: [PATCH 2/4] ROCKSDB: Impl KVRead::iter --- fendermint/rocksdb/src/kvstore.rs | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/fendermint/rocksdb/src/kvstore.rs b/fendermint/rocksdb/src/kvstore.rs index 65280c09a..e9b4fe781 100644 --- a/fendermint/rocksdb/src/kvstore.rs +++ b/fendermint/rocksdb/src/kvstore.rs @@ -138,6 +138,28 @@ where } }) } + + fn iter(&self, ns: &S::Namespace) -> impl Iterator> + where + S: Decode + Decode, + ::Repr: Ord + 'static, + { + self.cache + .with_cf_handle(ns.as_ref(), |cf| { + let it = self.snapshot.iterator_cf(cf, rocksdb::IteratorMode::Start); + + let it = it.map(|res| res.map_err(to_kv_error)).map(|res| { + res.and_then(|(k, v)| { + let k: K = S::from_repr(&k.to_vec())?; + let v: V = S::from_repr(&v.to_vec())?; + Ok((k, v)) + }) + }); + + Ok(it) + }) + .expect("just wrapped into ok") + } } impl<'a, S> KVRead for RocksDbWriteTx<'a> @@ -160,6 +182,28 @@ where } }) } + + fn iter(&self, ns: &S::Namespace) -> impl Iterator> + where + S: Decode + Decode, + ::Repr: Ord + 'static, + { + self.cache + .with_cf_handle(ns.as_ref(), |cf| { + let it = self.tx.iterator_cf(cf, rocksdb::IteratorMode::Start); + + let it = it.map(|res| res.map_err(to_kv_error)).map(|res| { + res.and_then(|(k, v)| { + let k: K = S::from_repr(&k.to_vec())?; + let v: V = S::from_repr(&v.to_vec())?; + Ok((k, v)) + }) + }); + + Ok(it) + }) + .expect("just wrapped into ok") + } } impl<'a, S> KVWrite for RocksDbWriteTx<'a> From 814d6e293c6cf1e59abcad877d77fb71e5da778c Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Fri, 10 May 2024 13:22:39 +0100 Subject: [PATCH 3/4] STORAGE: Add KVCollection::iter --- fendermint/storage/src/im.rs | 2 ++ fendermint/storage/src/lib.rs | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/fendermint/storage/src/im.rs b/fendermint/storage/src/im.rs index 17df9eea5..40230e80e 100644 --- a/fendermint/storage/src/im.rs +++ b/fendermint/storage/src/im.rs @@ -160,6 +160,8 @@ where where S: Decode + Decode, ::Repr: Ord + 'static, + K: 'static, + V: 'static, { if let Some(m) = self.data.get(ns) { let mut items = m.iter().map(|(k, v)| (k, v.as_ref())).collect::>(); diff --git a/fendermint/storage/src/lib.rs b/fendermint/storage/src/lib.rs index f09870c24..06e9d5ece 100644 --- a/fendermint/storage/src/lib.rs +++ b/fendermint/storage/src/lib.rs @@ -69,9 +69,10 @@ pub trait KVRead { fn iter(&self, ns: &S::Namespace) -> impl Iterator> where + K: 'static, + V: 'static, S: Decode + Decode, - S::Repr: Ord, - ::Repr: 'static; + ::Repr: Ord + 'static; } /// Operations available on a write transaction. @@ -162,4 +163,18 @@ where pub fn delete(&self, kv: &mut impl KVWrite, k: &K) -> KVResult<()> { kv.delete(&self.ns, k) } + + pub fn iter<'a, 'b>( + &'a self, + kv: &'b impl KVRead, + ) -> impl Iterator> + 'b + where + S::Repr: Ord + 'static, + S: Decode, + K: 'static, + V: 'static, + 'a: 'b, + { + kv.iter::(&self.ns) + } } From b1260f6801b74c49d4385ba462a41df47d183308 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Fri, 10 May 2024 14:09:24 +0100 Subject: [PATCH 4/4] STORAGE: Unit tests for iteration --- fendermint/rocksdb/src/kvstore.rs | 4 +-- fendermint/storage/src/im.rs | 2 +- fendermint/storage/src/lib.rs | 9 ++++--- fendermint/storage/src/testing.rs | 42 ++++++++++++++++++++++++++++--- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/fendermint/rocksdb/src/kvstore.rs b/fendermint/rocksdb/src/kvstore.rs index e9b4fe781..945065fa4 100644 --- a/fendermint/rocksdb/src/kvstore.rs +++ b/fendermint/rocksdb/src/kvstore.rs @@ -139,7 +139,7 @@ where }) } - fn iter(&self, ns: &S::Namespace) -> impl Iterator> + fn iterate(&self, ns: &S::Namespace) -> impl Iterator> where S: Decode + Decode, ::Repr: Ord + 'static, @@ -183,7 +183,7 @@ where }) } - fn iter(&self, ns: &S::Namespace) -> impl Iterator> + fn iterate(&self, ns: &S::Namespace) -> impl Iterator> where S: Decode + Decode, ::Repr: Ord + 'static, diff --git a/fendermint/storage/src/im.rs b/fendermint/storage/src/im.rs index 40230e80e..8719914af 100644 --- a/fendermint/storage/src/im.rs +++ b/fendermint/storage/src/im.rs @@ -156,7 +156,7 @@ where Ok(None) } - fn iter(&self, ns: &S::Namespace) -> impl Iterator> + fn iterate(&self, ns: &S::Namespace) -> impl Iterator> where S: Decode + Decode, ::Repr: Ord + 'static, diff --git a/fendermint/storage/src/lib.rs b/fendermint/storage/src/lib.rs index 06e9d5ece..e6cd857cb 100644 --- a/fendermint/storage/src/lib.rs +++ b/fendermint/storage/src/lib.rs @@ -67,7 +67,10 @@ pub trait KVRead { where S: Encode + Decode; - fn iter(&self, ns: &S::Namespace) -> impl Iterator> + /// Iterate items in the namespace ordered by their representation. + /// + /// TODO: Add parameters for iteration direction and bounds. + fn iterate(&self, ns: &S::Namespace) -> impl Iterator> where K: 'static, V: 'static, @@ -164,7 +167,7 @@ where kv.delete(&self.ns, k) } - pub fn iter<'a, 'b>( + pub fn iterate<'a, 'b>( &'a self, kv: &'b impl KVRead, ) -> impl Iterator> + 'b @@ -175,6 +178,6 @@ where V: 'static, 'a: 'b, { - kv.iter::(&self.ns) + kv.iterate::(&self.ns) } } diff --git a/fendermint/storage/src/testing.rs b/fendermint/storage/src/testing.rs index 96f8d3adc..5cdeee48c 100644 --- a/fendermint/storage/src/testing.rs +++ b/fendermint/storage/src/testing.rs @@ -25,11 +25,14 @@ pub enum TestOpKV { Get(K), Put(K, V), Del(K), + Iter, } #[derive(Clone, Debug)] pub enum TestOpNs { + /// String-to-Int S2I(TestNamespace, TestOpKV), + /// Int-to-String I2S(TestNamespace, TestOpKV), Rollback, } @@ -48,18 +51,20 @@ impl Arbitrary for TestOpNs { match u8::arbitrary(g) % 100 { i if i < 47 => { let ns = g.choose(&["spam", "eggs"]).unwrap(); - let k = *g.choose(&["foo", "bar"]).unwrap(); + let k = *g.choose(&["foo", "bar", "baz"]).unwrap(); match u8::arbitrary(g) % 10 { i if i < 3 => S2I(ns, Get(k.to_owned())), + i if i < 4 => S2I(ns, Iter), i if i < 9 => S2I(ns, Put(k.to_owned(), Arbitrary::arbitrary(g))), _ => S2I(ns, Del(k.to_owned())), } } i if i < 94 => { let ns = g.choose(&["fizz", "buzz"]).unwrap(); - let k = u8::arbitrary(g) % 2; + let k = u8::arbitrary(g) % 3; match u8::arbitrary(g) % 10 { i if i < 3 => I2S(ns, Get(k)), + i if i < 4 => I2S(ns, Iter), i if i < 9 => { let sz = u8::arbitrary(g) % 5; let s = (0..sz).map(|_| char::arbitrary(g)).collect(); @@ -146,6 +151,7 @@ where pub fn check_writable(sut: &impl KVWritable, data: TestData) -> bool where S: KVStore + Clone + Codec + Codec, + S::Repr: Ord + 'static, { let mut model = Model::default(); // Creating a collection doesn't add much to the test but at least we exercise this path. @@ -186,6 +192,7 @@ where pub fn check_write_isolation(sut: &impl KVWritable, data: TestDataMulti<2>) -> bool where S: KVStore + Clone + Codec + Codec, + S::Repr: Ord + 'static, { let mut colls = Collections::::default(); let mut model1 = Model::default(); @@ -221,6 +228,7 @@ where pub fn check_write_isolation_concurrent(sut: &B, data1: TestData, data2: TestData) -> bool where S: KVStore + Clone + Codec + Codec, + S::Repr: Ord + 'static, B: KVWritable + Clone + Send + 'static, { let sut2 = sut.clone(); @@ -321,6 +329,7 @@ where pub fn check_read_isolation(sut: &B, data: TestData) -> bool where S: KVStore + Clone + Codec + Codec, + S::Repr: Ord + 'static, B: KVWritable + KVReadable, { let mut model = Model::default(); @@ -384,8 +393,9 @@ fn apply_both( ) -> bool where S: KVStore + Clone + Codec + Codec, - K: Hash + Eq, - V: Clone + PartialEq, + K: Clone + Hash + Eq + 'static, + V: Clone + PartialEq + 'static, + S::Repr: Ord + 'static, { match op { TestOpKV::Get(k) => { @@ -406,6 +416,30 @@ where coll.delete(tx, &k).unwrap(); model.entry(ns).or_default().remove(&k); } + TestOpKV::Iter => { + let found = coll.iterate(tx).collect::, _>>().unwrap(); + + let expected = if let Some(m) = model.get(ns) { + let mut expected = m + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + + expected.sort_by(|a, b| { + let ka = S::to_repr(&a.0).unwrap(); + let kb = S::to_repr(&b.0).unwrap(); + ka.cmp(&kb) + }); + + expected + } else { + Vec::new() + }; + + if found != expected { + return false; + } + } } true }