diff --git a/src/lib.rs b/src/lib.rs index 2e29ac91..e22724cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,3 +174,151 @@ where self.as_key_slice().borrow() } } + +/// An entry in the write-ahead log. +pub struct Entry { + key: K, + value: V, + pointer: Option>, +} + +impl Entry { + /// Creates a new entry. + #[inline] + pub const fn new(key: K, value: V) -> Self { + Self { + key, + value, + pointer: None, + } + } + + /// Returns the key. + #[inline] + pub const fn key(&self) -> &K { + &self.key + } + + /// Returns the value. + #[inline] + pub const fn value(&self) -> &V { + &self.value + } + + /// Consumes the entry and returns the key and value. + #[inline] + pub fn into_components(self) -> (K, V) { + (self.key, self.value) + } +} + +/// An entry in the write-ahead log. +pub struct EntryWithKeyBuilder { + kb: KeyBuilder, + value: V, + pointer: Option>, +} + +impl EntryWithKeyBuilder { + /// Creates a new entry. + #[inline] + pub const fn new(kb: KeyBuilder, value: V) -> Self { + Self { + kb, + value, + pointer: None, + } + } + + /// Returns the key. + #[inline] + pub const fn key_builder(&self) -> &KeyBuilder { + &self.kb + } + + /// Returns the value. + #[inline] + pub const fn value(&self) -> &V { + &self.value + } + + /// Consumes the entry and returns the key and value. + #[inline] + pub fn into_components(self) -> (KeyBuilder, V) { + (self.kb, self.value) + } +} + +/// An entry in the write-ahead log. +pub struct EntryWithValueBuilder { + key: K, + vb: ValueBuilder, + pointer: Option>, +} + +impl EntryWithValueBuilder { + /// Creates a new entry. + #[inline] + pub const fn new(key: K, vb: ValueBuilder) -> Self { + Self { + key, + vb, + pointer: None, + } + } + + /// Returns the key. + #[inline] + pub const fn value_builder(&self) -> &ValueBuilder { + &self.vb + } + + /// Returns the value. + #[inline] + pub const fn key(&self) -> &K { + &self.key + } + + /// Consumes the entry and returns the key and value. + #[inline] + pub fn into_components(self) -> (K, ValueBuilder) { + (self.key, self.vb) + } +} + +/// An entry in the write-ahead log. +pub struct EntryWithBuilders { + kb: KeyBuilder, + vb: ValueBuilder, + pointer: Option>, +} + +impl EntryWithBuilders { + /// Creates a new entry. + #[inline] + pub const fn new(kb: KeyBuilder, vb: ValueBuilder) -> Self { + Self { + kb, + vb, + pointer: None, + } + } + + /// Returns the key. + #[inline] + pub const fn value_builder(&self) -> &ValueBuilder { + &self.vb + } + + /// Returns the value. + #[inline] + pub const fn key_builder(&self) -> &KeyBuilder { + &self.kb + } + + /// Consumes the entry and returns the key and value. + #[inline] + pub fn into_components(self) -> (KeyBuilder, ValueBuilder) { + (self.kb, self.vb) + } +} diff --git a/src/swmr/wal.rs b/src/swmr/wal.rs index 4d4a2495..706c53fc 100644 --- a/src/swmr/wal.rs +++ b/src/swmr/wal.rs @@ -137,6 +137,15 @@ where { self.core.map.insert(ptr); } + + fn insert_pointers(&self, ptrs: impl Iterator>) + where + C: Comparator, + { + for ptr in ptrs { + self.core.map.insert(ptr); + } + } } impl OrderWal { diff --git a/src/unsync.rs b/src/unsync.rs index 64a5d8bc..3c9dba2d 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -125,6 +125,16 @@ where (*self.core.get()).map.insert(ptr); } } + + #[inline] + fn insert_pointers(&self, ptrs: impl Iterator>) + where + C: Comparator, + { + unsafe { + (*self.core.get()).map.extend(ptrs); + } + } } impl ImmutableWal for OrderWal diff --git a/src/wal.rs b/src/wal.rs index fda001a5..54a5af5e 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -4,23 +4,190 @@ use super::*; mod builder; pub use builder::*; -use sealed::Base; - -mod impls; pub(crate) mod sealed; /// A batch of keys and values that can be inserted into the [`Wal`]. pub trait Batch { + /// The key type. + type Key: Borrow<[u8]>; + + /// The value type. + type Value: Borrow<[u8]>; + + /// The [`Comparator`] type. + type Comparator: Comparator; + /// The iterator type. - type Iter<'a>: Iterator + type IterMut<'a>: Iterator> where Self: 'a; /// Returns an iterator over the keys and values. - fn iter(&self) -> Self::Iter<'_>; + fn iter_mut(&mut self) -> Self::IterMut<'_>; } +impl Batch for T +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, + C: Comparator, + for<'a> &'a mut T: IntoIterator>, +{ + type Key = K; + type Value = V; + type Comparator = C; + + type IterMut<'a> = <&'a mut T as IntoIterator>::IntoIter where Self: 'a; + + fn iter_mut(&mut self) -> Self::IterMut<'_> { + IntoIterator::into_iter(self) + } +} + +/// A batch of keys and values that can be inserted into the [`Wal`]. +/// Comparing to [`Batch`], this trait is used to build +/// the key in place. +pub trait BatchWithKeyBuilder { + /// The key builder type. + type KeyBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::Error>; + + /// The error for the key builder. + type Error; + + /// The value type. + type Value: Borrow<[u8]>; + + /// The [`Comparator`] type. + type Comparator: Comparator; + + /// The iterator type. + type IterMut<'a>: Iterator< + Item = &'a mut EntryWithKeyBuilder, + > + where + Self: 'a; + + /// Returns an iterator over the keys and values. + fn iter_mut(&mut self) -> Self::IterMut<'_>; +} + +impl BatchWithKeyBuilder for T +where + KB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), E>, + V: Borrow<[u8]>, + C: Comparator, + for<'a> &'a mut T: IntoIterator>, +{ + type KeyBuilder = KB; + type Error = E; + type Value = V; + type Comparator = C; + + type IterMut<'a> = <&'a mut T as IntoIterator>::IntoIter where Self: 'a; + + fn iter_mut(&mut self) -> Self::IterMut<'_> { + IntoIterator::into_iter(self) + } +} + +/// A batch of keys and values that can be inserted into the [`Wal`]. +/// Comparing to [`Batch`], this trait is used to build +/// the value in place. +pub trait BatchWithValueBuilder { + /// The value builder type. + type ValueBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::Error>; + + /// The error for the value builder. + type Error; + + /// The key type. + type Key: Borrow<[u8]>; + + /// The [`Comparator`] type. + type Comparator: Comparator; + + /// The iterator type. + type IterMut<'a>: Iterator< + Item = &'a mut EntryWithValueBuilder, + > + where + Self: 'a; + + /// Returns an iterator over the keys and values. + fn iter_mut(&mut self) -> Self::IterMut<'_>; +} + +impl BatchWithValueBuilder for T +where + VB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), E>, + K: Borrow<[u8]>, + C: Comparator, + for<'a> &'a mut T: IntoIterator>, +{ + type Key = K; + type Error = E; + type ValueBuilder = VB; + type Comparator = C; + + type IterMut<'a> = <&'a mut T as IntoIterator>::IntoIter where Self: 'a; + + fn iter_mut(&mut self) -> Self::IterMut<'_> { + IntoIterator::into_iter(self) + } +} + +/// A batch of keys and values that can be inserted into the [`Wal`]. +/// Comparing to [`Batch`], this trait is used to build +/// the key and value in place. +pub trait BatchWithBuilders { + /// The value builder type. + type ValueBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::ValueError>; + + /// The error for the value builder. + type ValueError; + + /// The value builder type. + type KeyBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::KeyError>; + + /// The error for the value builder. + type KeyError; + + /// The [`Comparator`] type. + type Comparator: Comparator; + + /// The iterator type. + type IterMut<'a>: Iterator< + Item = &'a mut EntryWithBuilders, + > + where + Self: 'a; + + /// Returns an iterator over the keys and values. + fn iter_mut(&mut self) -> Self::IterMut<'_>; +} + +impl BatchWithBuilders for T +where + VB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), VE>, + KB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), KE>, + C: Comparator, + for<'a> &'a mut T: IntoIterator>, +{ + type KeyBuilder = KB; + type KeyError = KE; + type ValueBuilder = VB; + type ValueError = VE; + type Comparator = C; + + type IterMut<'a> = <&'a mut T as IntoIterator>::IntoIter where Self: 'a; + + fn iter_mut(&mut self) -> Self::IterMut<'_> { + IntoIterator::into_iter(self) + } +} + +/// An abstract layer for the immutable write-ahead log. pub trait ImmutableWal: sealed::Constructor { /// The iterator type. type Iter<'a>: Iterator + DoubleEndedIterator @@ -521,34 +688,14 @@ pub trait Wal: sealed::Sealed + ImmutableWal { } /// Inserts a batch of key-value pairs into the WAL. - fn insert_batch(&mut self, batch: &B) -> Result<(), Error> + fn insert_batch>(&mut self, batch: &mut B) -> Result<(), Error> where + C: Comparator + CheapClone, S: BuildChecksumer, { - let batch_encoded_size = batch - .iter() - .fold(0u64, |acc, (k, v)| acc + k.len() as u64 + v.len() as u64); - let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; - if total_size > >::capacity(self) as u64 { - return Err(Error::insufficient_space( - total_size, - >::remaining(self), - )); - } - - let allocator = self.allocator(); - - let mut buf = allocator - .alloc_bytes(total_size as u32) - .map_err(Error::from_insufficient_space)?; - - unsafe { - let committed_flag = Flags::BATCHING | Flags::COMMITTED; - let cks = self.hasher().build_checksumer(); - let flag = Flags::BATCHING; - buf.put_u8_unchecked(flag.bits); - } - todo!() + self + .insert_batch_in(batch) + .map(|_| self.insert_pointers(batch.iter_mut().map(|ent| ent.pointer.take().unwrap()))) } /// Inserts a key-value pair into the WAL. diff --git a/src/wal/impls.rs b/src/wal/impls.rs deleted file mode 100644 index 6e12c0c5..00000000 --- a/src/wal/impls.rs +++ /dev/null @@ -1,165 +0,0 @@ -use core::{borrow::Borrow, iter::FusedIterator, marker::PhantomData}; - -/// An iterator over a slice of key value tuples. -pub struct EntrySliceIter<'a, K, V>(core::slice::Iter<'a, (K, V)>); - -impl<'a, K, V> Iterator for EntrySliceIter<'a, K, V> -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, -{ - type Item = (&'a [u8], &'a [u8]); - - fn next(&mut self) -> Option { - self.0.next().map(|(k, v)| (k.borrow(), v.borrow())) - } - - fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() - } -} - -impl DoubleEndedIterator for EntrySliceIter<'_, K, V> -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, -{ - fn next_back(&mut self) -> Option { - self.0.next_back().map(|(k, v)| (k.borrow(), v.borrow())) - } -} - -impl FusedIterator for EntrySliceIter<'_, K, V> -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, -{ -} - -macro_rules! impl_for_vec { - ($( $(#[cfg($cfg:meta)])? $ty:ty $(:$N:ident)? $( => $as_ref:ident)?),+ $(,)?) => { - $( - $(#[cfg($cfg)])? - const _: () = { - impl super::Batch for $ty - where - K: Borrow<[u8]>, - V: Borrow<[u8]>, - { - type Iter<'a> = EntrySliceIter<'a, K, V> where Self: 'a; - - fn iter(&self) -> Self::Iter<'_> { - EntrySliceIter(IntoIterator::into_iter(self $(.$as_ref())?)) - } - } - }; - )+ - }; -} - -impl_for_vec!( - Vec<(K, V)>, - Box<[(K, V)]>, - &[(K, V)] => as_ref, - std::sync::Arc<[(K, V)]> => as_ref, - std::rc::Rc<[(K, V)]> => as_ref, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::OneOrMore<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::TinyVec<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::TriVec<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::SmallVec<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::MediumVec<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::LargeVec<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::XLargeVec<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::XXLargeVec<(K, V)>, - #[cfg(feature = "smallvec-wrapper")] - ::smallvec_wrapper::XXXLargeVec<(K, V)>, - #[cfg(feature = "smallvec")] - ::smallvec::SmallVec<[(K, V); N]>: N, -); - -/// An iterator over a slice of key value tuples. -pub struct EntryMapIter<'a, K, V, T> { - iter: T, - _m: PhantomData<&'a (K, V)>, -} - -impl EntryMapIter<'_, K, V, T> { - /// Construct a new iterator. - #[inline] - pub const fn new(iter: T) -> Self { - Self { - iter, - _m: PhantomData, - } - } -} - -impl<'a, K, V, T> Iterator for EntryMapIter<'a, K, V, T> -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, - T: Iterator, -{ - type Item = (&'a [u8], &'a [u8]); - - fn next(&mut self) -> Option { - self.iter.next().map(|(k, v)| (k.borrow(), v.borrow())) - } - - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } -} - -impl<'a, K, V, T> DoubleEndedIterator for EntryMapIter<'a, K, V, T> -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, - T: DoubleEndedIterator, -{ - fn next_back(&mut self) -> Option { - self.iter.next_back().map(|(k, v)| (k.borrow(), v.borrow())) - } -} - -impl<'a, K, V, T> FusedIterator for EntryMapIter<'a, K, V, T> -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, - T: FusedIterator, -{ -} - -impl super::Batch for std::collections::HashMap -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, -{ - type Iter<'a> = EntryMapIter<'a, K, V, std::collections::hash_map::Iter<'a, K, V>> where - Self: 'a; - - fn iter(&self) -> Self::Iter<'_> { - EntryMapIter::new(self.iter()) - } -} - -impl super::Batch for std::collections::BTreeMap -where - K: Borrow<[u8]>, - V: Borrow<[u8]>, -{ - type Iter<'a> = EntryMapIter<'a, K, V, std::collections::btree_map::Iter<'a, K, V>> where - Self: 'a; - - fn iter(&self) -> Self::Iter<'_> { - EntryMapIter::new(self.iter()) - } -} diff --git a/src/wal/sealed.rs b/src/wal/sealed.rs index 7a1d2ff3..0fe01286 100644 --- a/src/wal/sealed.rs +++ b/src/wal/sealed.rs @@ -40,6 +40,70 @@ pub trait Sealed: Constructor { where C: Comparator; + fn insert_pointers(&self, ptrs: impl Iterator>) + where + C: Comparator; + + /// Inserts a batch of key-value pairs into the WAL. + fn insert_batch_in>(&mut self, batch: &mut B) -> Result<(), Error> + where + C: Comparator + CheapClone, + S: BuildChecksumer, + { + let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| { + acc + ent.key.borrow().len() as u64 + ent.value.borrow().len() as u64 + }); + let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; + if total_size > self.options().capacity() as u64 { + return Err(Error::insufficient_space( + total_size, + self.allocator().remaining() as u32, + )); + } + + let allocator = self.allocator(); + + let mut buf = allocator + .alloc_bytes(total_size as u32) + .map_err(Error::from_insufficient_space)?; + + unsafe { + let committed_flag = Flags::BATCHING | Flags::COMMITTED; + let mut cks = self.hasher().build_checksumer(); + let flag = Flags::BATCHING; + buf.put_u8_unchecked(flag.bits); + let cmp = self.comparator(); + let mut cursor = 1; + batch.iter_mut().for_each(|ent| { + let ptr = buf.as_ptr().add(cursor as usize); + let key = ent.key.borrow(); + let value = ent.value.borrow(); + let klen = key.len(); + let vlen = value.len(); + cursor += klen as u64; + buf.put_slice_unchecked(key); + cursor += vlen as u64; + buf.put_slice_unchecked(value); + + ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); + }); + + cks.update(&[committed_flag.bits]); + cks.update(&buf[1..]); + buf.put_u64_le_unchecked(cks.digest()); + + // commit the entry + buf[0] = committed_flag.bits; + let buf_cap = buf.capacity(); + + if self.options().sync_on_write() && allocator.is_ondisk() { + allocator.flush_range(buf.offset(), buf_cap)?; + } + buf.detach(); + Ok(()) + } + } + fn insert_with_in( &mut self, kb: KeyBuilder) -> Result<(), KE>>,