Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORAGE: KVRead::iterate #919

Merged
merged 4 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions fendermint/rocksdb/src/kvstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,28 @@ where
}
})
}

fn iterate<K, V>(&self, ns: &S::Namespace) -> impl Iterator<Item = KVResult<(K, V)>>
where
S: Decode<K> + Decode<V>,
<S as KVStore>::Repr: Ord + 'static,
{
self.cache
.with_cf_handle(ns.as_ref(), |cf| {
let it = self.snapshot.iterator_cf(cf, rocksdb::IteratorMode::Start);

let it = it.map(|res| res.map_err(to_kv_error)).map(|res| {
res.and_then(|(k, v)| {
let k: K = S::from_repr(&k.to_vec())?;
let v: V = S::from_repr(&v.to_vec())?;
Ok((k, v))
})
});

Ok(it)
})
.expect("just wrapped into ok")
}
}

impl<'a, S> KVRead<S> for RocksDbWriteTx<'a>
Expand All @@ -160,6 +182,28 @@ where
}
})
}

fn iterate<K, V>(&self, ns: &S::Namespace) -> impl Iterator<Item = KVResult<(K, V)>>
where
S: Decode<K> + Decode<V>,
<S as KVStore>::Repr: Ord + 'static,
{
self.cache
.with_cf_handle(ns.as_ref(), |cf| {
let it = self.tx.iterator_cf(cf, rocksdb::IteratorMode::Start);

let it = it.map(|res| res.map_err(to_kv_error)).map(|res| {
res.and_then(|(k, v)| {
let k: K = S::from_repr(&k.to_vec())?;
let v: V = S::from_repr(&v.to_vec())?;
Ok((k, v))
})
});

Ok(it)
})
.expect("just wrapped into ok")
}
}

impl<'a, S> KVWrite<S> for RocksDbWriteTx<'a>
Expand Down
64 changes: 63 additions & 1 deletion fendermint/storage/src/im.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
// SPDX-License-Identifier: Apache-2.0, MIT
use std::{
hash::Hash,
marker::PhantomData,
mem,
sync::{Arc, Mutex, MutexGuard},
thread,
};

use crate::{
Decode, Encode, KVRead, KVReadable, KVResult, KVStore, KVTransaction, KVWritable, KVWrite,
Decode, Encode, KVError, KVRead, KVReadable, KVResult, KVStore, KVTransaction, KVWritable,
KVWrite,
};

/// Read-only mode.
pub struct Read;
/// Read-write mode.
pub struct Write;

/// Immutable data multimap.
type IDataMap<S> = im::HashMap<
<S as KVStore>::Namespace,
im::HashMap<<S as KVStore>::Repr, Arc<<S as KVStore>::Repr>>,
Expand Down Expand Up @@ -152,6 +155,23 @@ where
}
Ok(None)
}

fn iterate<K, V>(&self, ns: &S::Namespace) -> impl Iterator<Item = KVResult<(K, V)>>
where
S: Decode<K> + Decode<V>,
<S as KVStore>::Repr: Ord + 'static,
K: 'static,
V: 'static,
Comment on lines +163 to +164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be static so K/V is guaranteed to outlive the iterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I can explain, but the compiler insisted that the iterator can outlive these types 🤷

{
if let Some(m) = self.data.get(ns) {
let mut items = m.iter().map(|(k, v)| (k, v.as_ref())).collect::<Vec<_>>();
items.sort_by(|a, b| a.0.cmp(b.0));

KVIter::<S, K, V>::new(items)
} else {
KVIter::empty()
}
}
}

impl<'a, S: KVStore> KVWrite<S> for Transaction<'a, S, Write>
Expand Down Expand Up @@ -183,6 +203,48 @@ where
}
}

struct KVIter<'a, S: KVStore, K, V> {
items: Vec<(&'a S::Repr, &'a S::Repr)>,
next: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe curr_size ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I wanted to express here is the next index that will be returned from the Vec. The vec isn't being consumed, its size stays the same.

phantom_v: PhantomData<V>,
phantom_k: PhantomData<K>,
}

impl<'a, S, K, V> KVIter<'a, S, K, V>
where
S: KVStore,
{
pub fn new(items: Vec<(&'a S::Repr, &'a S::Repr)>) -> Self {
KVIter {
items,
next: 0,
phantom_v: PhantomData,
phantom_k: PhantomData,
}
}

pub fn empty() -> Self {
Self::new(vec![])
}
}

impl<'a, S, K, V> Iterator for KVIter<'a, S, K, V>
where
S: KVStore + Decode<K> + Decode<V>,
{
type Item = Result<(K, V), KVError>;

fn next(&mut self) -> Option<Self::Item> {
if let Some((k, v)) = self.items.get(self.next) {
self.next += 1;
let kv = S::from_repr(k).and_then(|k| S::from_repr(v).map(|v| (k, v)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Some(kv)
} else {
None
}
}
}

#[cfg(all(feature = "inmem", test))]
mod tests {
use std::borrow::Cow;
Expand Down
24 changes: 24 additions & 0 deletions fendermint/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ pub trait KVRead<S: KVStore> {
fn get<K, V>(&self, ns: &S::Namespace, k: &K) -> KVResult<Option<V>>
where
S: Encode<K> + Decode<V>;

/// Iterate items in the namespace ordered by their representation.
///
/// TODO: Add parameters for iteration direction and bounds.
fn iterate<K, V>(&self, ns: &S::Namespace) -> impl Iterator<Item = KVResult<(K, V)>>
where
K: 'static,
V: 'static,
S: Decode<K> + Decode<V>,
<S as KVStore>::Repr: Ord + 'static;
}

/// Operations available on a write transaction.
Expand Down Expand Up @@ -156,4 +166,18 @@ where
pub fn delete(&self, kv: &mut impl KVWrite<S>, k: &K) -> KVResult<()> {
kv.delete(&self.ns, k)
}

pub fn iterate<'a, 'b>(
&'a self,
kv: &'b impl KVRead<S>,
) -> impl Iterator<Item = KVResult<(K, V)>> + 'b
where
S::Repr: Ord + 'static,
S: Decode<K>,
K: 'static,
V: 'static,
'a: 'b,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall seeing this before, but its one of these lifetimes I never remember how is written

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also from the compiler. I didn't think adding this method on KVCollection would involve more constraints. It complained that the impl Interator captures the lifetimes of both reference inputs, so it needs to outlive them, and this was the only way to express that they are the same, or compatible.

{
kv.iterate::<K, V>(&self.ns)
}
}
42 changes: 38 additions & 4 deletions fendermint/storage/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ pub enum TestOpKV<K, V> {
Get(K),
Put(K, V),
Del(K),
Iter,
}

#[derive(Clone, Debug)]
pub enum TestOpNs {
/// String-to-Int
S2I(TestNamespace, TestOpKV<String, u8>),
/// Int-to-String
I2S(TestNamespace, TestOpKV<u8, String>),
Rollback,
}
Expand All @@ -48,18 +51,20 @@ impl Arbitrary for TestOpNs {
match u8::arbitrary(g) % 100 {
i if i < 47 => {
let ns = g.choose(&["spam", "eggs"]).unwrap();
let k = *g.choose(&["foo", "bar"]).unwrap();
let k = *g.choose(&["foo", "bar", "baz"]).unwrap();
match u8::arbitrary(g) % 10 {
i if i < 3 => S2I(ns, Get(k.to_owned())),
i if i < 4 => S2I(ns, Iter),
i if i < 9 => S2I(ns, Put(k.to_owned(), Arbitrary::arbitrary(g))),
_ => S2I(ns, Del(k.to_owned())),
}
}
i if i < 94 => {
let ns = g.choose(&["fizz", "buzz"]).unwrap();
let k = u8::arbitrary(g) % 2;
let k = u8::arbitrary(g) % 3;
match u8::arbitrary(g) % 10 {
i if i < 3 => I2S(ns, Get(k)),
i if i < 4 => I2S(ns, Iter),
i if i < 9 => {
let sz = u8::arbitrary(g) % 5;
let s = (0..sz).map(|_| char::arbitrary(g)).collect();
Expand Down Expand Up @@ -146,6 +151,7 @@ where
pub fn check_writable<S>(sut: &impl KVWritable<S>, data: TestData) -> bool
where
S: KVStore<Namespace = TestNamespace> + Clone + Codec<String> + Codec<u8>,
S::Repr: Ord + 'static,
{
let mut model = Model::default();
// Creating a collection doesn't add much to the test but at least we exercise this path.
Expand Down Expand Up @@ -186,6 +192,7 @@ where
pub fn check_write_isolation<S>(sut: &impl KVWritable<S>, data: TestDataMulti<2>) -> bool
where
S: KVStore<Namespace = TestNamespace> + Clone + Codec<String> + Codec<u8>,
S::Repr: Ord + 'static,
{
let mut colls = Collections::<S>::default();
let mut model1 = Model::default();
Expand Down Expand Up @@ -221,6 +228,7 @@ where
pub fn check_write_isolation_concurrent<S, B>(sut: &B, data1: TestData, data2: TestData) -> bool
where
S: KVStore<Namespace = TestNamespace> + Clone + Codec<String> + Codec<u8>,
S::Repr: Ord + 'static,
B: KVWritable<S> + Clone + Send + 'static,
{
let sut2 = sut.clone();
Expand Down Expand Up @@ -321,6 +329,7 @@ where
pub fn check_read_isolation<S, B>(sut: &B, data: TestData) -> bool
where
S: KVStore<Namespace = TestNamespace> + Clone + Codec<String> + Codec<u8>,
S::Repr: Ord + 'static,
B: KVWritable<S> + KVReadable<S>,
{
let mut model = Model::default();
Expand Down Expand Up @@ -384,8 +393,9 @@ fn apply_both<S, K, V>(
) -> bool
where
S: KVStore<Namespace = TestNamespace> + Clone + Codec<K> + Codec<V>,
K: Hash + Eq,
V: Clone + PartialEq,
K: Clone + Hash + Eq + 'static,
V: Clone + PartialEq + 'static,
S::Repr: Ord + 'static,
{
match op {
TestOpKV::Get(k) => {
Expand All @@ -406,6 +416,30 @@ where
coll.delete(tx, &k).unwrap();
model.entry(ns).or_default().remove(&k);
}
TestOpKV::Iter => {
let found = coll.iterate(tx).collect::<Result<Vec<_>, _>>().unwrap();

let expected = if let Some(m) = model.get(ns) {
let mut expected = m
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>();

expected.sort_by(|a, b| {
let ka = S::to_repr(&a.0).unwrap();
let kb = S::to_repr(&b.0).unwrap();
ka.cmp(&kb)
});

expected
} else {
Vec::new()
};

if found != expected {
return false;
}
}
}
true
}
Loading