From c605e5f1dc360d44a17df0217b258ca78dd4e359 Mon Sep 17 00:00:00 2001 From: al8n Date: Mon, 16 Sep 2024 21:36:35 +0800 Subject: [PATCH] WIP --- .codecov.yml | 7 +- Cargo.toml | 1 + examples/zero_cost.rs | 2 +- src/error.rs | 6 +- src/lib.rs | 43 +---- src/swmr/generic.rs | 46 +++-- src/swmr/generic/entry.rs | 10 +- src/swmr/generic/tests.rs | 7 + src/swmr/generic/traits.rs | 48 ++++- src/swmr/generic/traits/container_impls.rs | 99 +++++++++++ .../traits/{impls.rs => type_impls.rs} | 58 +++++- .../traits/{impls => type_impls}/bytes.rs | 22 ++- .../traits/{impls => type_impls}/net.rs | 40 +++-- .../traits/{impls => type_impls}/string.rs | 4 +- src/swmr/wal.rs | 165 ++++-------------- src/swmr/wal/reader.rs | 23 +-- src/unsync.rs | 161 ++++------------- src/wal.rs | 159 ++++++++++++++--- src/wal/impls.rs | 165 ++++++++++++++++++ src/wal/sealed.rs | 89 +++++++++- 20 files changed, 759 insertions(+), 396 deletions(-) create mode 100644 src/swmr/generic/traits/container_impls.rs rename src/swmr/generic/traits/{impls.rs => type_impls.rs} (75%) rename src/swmr/generic/traits/{impls => type_impls}/bytes.rs (86%) rename src/swmr/generic/traits/{impls => type_impls}/net.rs (81%) rename src/swmr/generic/traits/{impls => type_impls}/string.rs (97%) create mode 100644 src/wal/impls.rs diff --git a/.codecov.yml b/.codecov.yml index 78ea96ba..9ab8e2e3 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -7,12 +7,15 @@ ignore: - "**/benches/" - "src/tests.rs" - "src/error.rs" - - "src/swmr/generic/traits/impls/" - - "src/swmr/generic/traits/impls.rs" + - "src/swmr/generic/traits/container_impls/" + - "src/swmr/generic/traits/container_impls.rs" + - "src/swmr/generic/traits/type_impls/" + - "src/swmr/generic/traits/type_impls.rs" - "src/swmr/generic/tests.rs" - "src/swmr/generic/tests/" - "src/swmr/wal/tests.rs" - "src/swmr/wal/tests/" + - "src/wal/impls.rs" - "src/unsync/tests.rs" - "src/unsync/tests/" diff --git a/Cargo.toml b/Cargo.toml index 3ecdcda3..55daf990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ thiserror = "1" bytes = { version = "1", default-features = false, optional = true } smallvec = { version = "1", default-features = false, optional = true, features = ["const_generics"] } +smallvec-wrapper = { version = "0.1", optional = true, default-features = false, features = ["const_generics"] } smol_str = { version = "0.3", default-features = false, optional = true } faststr = { version = "0.2", default-features = false, optional = true } diff --git a/examples/zero_cost.rs b/examples/zero_cost.rs index e1dd6183..36de3561 100644 --- a/examples/zero_cost.rs +++ b/examples/zero_cost.rs @@ -112,7 +112,7 @@ impl Type for Person { } impl<'a> TypeRef<'a> for PersonRef<'a> { - fn from_slice(src: &'a [u8]) -> Self { + unsafe fn from_slice(src: &'a [u8]) -> Self { let (id_size, id) = decode_u64_varint(src).unwrap(); let name = std::str::from_utf8(&src[id_size..]).unwrap(); PersonRef { id, name } diff --git a/src/error.rs b/src/error.rs index 1308a198..0c04f85e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,7 +5,7 @@ pub enum Error { #[error("insufficient space in the WAL (requested: {requested}, available: {available})")] InsufficientSpace { /// The requested size - requested: u32, + requested: u64, /// The remaining size available: u32, }, @@ -43,7 +43,7 @@ pub enum Error { impl Error { /// Create a new `Error::InsufficientSpace` instance. - pub(crate) const fn insufficient_space(requested: u32, available: u32) -> Self { + pub(crate) const fn insufficient_space(requested: u64, available: u32) -> Self { Self::InsufficientSpace { requested, available, @@ -80,7 +80,7 @@ impl Error { rarena_allocator::Error::InsufficientSpace { requested, available, - } => Self::insufficient_space(requested, available), + } => Self::insufficient_space(requested as u64, available), _ => unreachable!(), } } diff --git a/src/lib.rs b/src/lib.rs index 882fc624..2e29ac91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,15 +22,18 @@ pub use rarena_allocator::OpenOptions; #[cfg(feature = "std")] extern crate std; -pub use dbutils::{Ascend, CheapClone, Checksumer, Comparator, Crc32, Descend}; +pub use dbutils::{ + checksum::{BuildChecksumer, Checksumer, Crc32}, + Ascend, CheapClone, Comparator, Descend, +}; #[cfg(feature = "xxhash3")] #[cfg_attr(docsrs, doc(cfg(feature = "xxhash3")))] -pub use dbutils::XxHash3; +pub use dbutils::checksum::XxHash3; #[cfg(feature = "xxhash64")] #[cfg_attr(docsrs, doc(cfg(feature = "xxhash64")))] -pub use dbutils::XxHash64; +pub use dbutils::checksum::XxHash64; const STATUS_SIZE: usize = mem::size_of::(); const CHECKSUM_SIZE: usize = mem::size_of::(); @@ -88,6 +91,8 @@ bitflags::bitflags! { struct Flags: u8 { /// First bit: 1 indicates committed, 0 indicates uncommitted const COMMITTED = 0b00000001; + /// Second bit: 1 indicates batching, 0 indicates single entry + const BATCHING = 0b00000010; } } @@ -169,35 +174,3 @@ where self.as_key_slice().borrow() } } - -/// Use to avoid the mutable borrow checker, for single writer multiple readers usecase. -struct UnsafeCellChecksumer(core::cell::UnsafeCell); - -impl UnsafeCellChecksumer { - #[inline] - const fn new(checksumer: S) -> Self { - Self(core::cell::UnsafeCell::new(checksumer)) - } -} - -impl UnsafeCellChecksumer -where - S: Checksumer, -{ - #[inline] - fn update(&self, buf: &[u8]) { - // SAFETY: the checksumer will not be invoked concurrently. - unsafe { (*self.0.get()).update(buf) } - } - - #[inline] - fn reset(&self) { - // SAFETY: the checksumer will not be invoked concurrently. - unsafe { (*self.0.get()).reset() } - } - - #[inline] - fn digest(&self) -> u64 { - unsafe { (*self.0.get()).digest() } - } -} diff --git a/src/swmr/generic.rs b/src/swmr/generic.rs index f8c01985..4f1a073d 100644 --- a/src/swmr/generic.rs +++ b/src/swmr/generic.rs @@ -6,8 +6,8 @@ use std::{ use among::Among; use crossbeam_skiplist::SkipSet; +use dbutils::checksum::{BuildChecksumer, Checksumer, Crc32}; pub use dbutils::equivalent::*; -use dbutils::{Checksumer, Crc32}; use rarena_allocator::{ either::Either, sync::Arena, Allocator, ArenaPosition, Memory, MmapOptions, OpenOptions, }; @@ -15,8 +15,7 @@ use rarena_allocator::{ use crate::{ arena_options, check, entry_size, error::{self, Error}, - split_lengths, Flags, Options, UnsafeCellChecksumer, CHECKSUM_SIZE, HEADER_SIZE, MAGIC_TEXT, - STATUS_SIZE, + split_lengths, Flags, Options, CHECKSUM_SIZE, HEADER_SIZE, MAGIC_TEXT, STATUS_SIZE, }; mod entry; @@ -189,9 +188,11 @@ where K::Ref<'a>: KeyRef<'a, K>, { fn compare(&self, p: &Pointer) -> cmp::Ordering { - let kr: K::Ref<'_> = TypeRef::from_slice(p.as_key_slice()); - let or: K::Ref<'_> = TypeRef::from_slice(self.as_key_slice()); - KeyRef::compare(&kr, &or).reverse() + unsafe { + let kr: K::Ref<'_> = TypeRef::from_slice(p.as_key_slice()); + let or: K::Ref<'_> = TypeRef::from_slice(self.as_key_slice()); + KeyRef::compare(&kr, &or).reverse() + } } } @@ -229,7 +230,7 @@ where Q: ?Sized + Ord + Comparable>, { fn compare(&self, p: &Pointer) -> cmp::Ordering { - let kr = TypeRef::from_slice(p.as_key_slice()); + let kr = unsafe { TypeRef::from_slice(p.as_key_slice()) }; KeyRef::compare(&kr, self.key).reverse() } } @@ -268,7 +269,7 @@ where Q: ?Sized + Ord + Comparable + Comparable>, { fn compare(&self, p: &Pointer) -> cmp::Ordering { - let kr = as TypeRef<'_>>::from_slice(p.as_key_slice()); + let kr = unsafe { as TypeRef<'_>>::from_slice(p.as_key_slice()) }; KeyRef::compare(&kr, self.key).reverse() } } @@ -387,7 +388,7 @@ where for<'a> ::Ref<'a>: KeyRef<'a, K>, V: Type + 'static, { - fn replay( + fn replay( arena: Arena, opts: &Options, ro: bool, @@ -450,7 +451,7 @@ where let cks = arena.get_u64_le(cursor + cks_offset).unwrap(); - if cks != checksumer.checksum(arena.get_bytes(cursor, cks_offset)) { + if cks != checksumer.checksum_one(arena.get_bytes(cursor, cks_offset)) { return Err(Error::corrupted()); } @@ -545,7 +546,7 @@ where pub struct GenericOrderWal { core: Arc>, opts: Options, - cks: UnsafeCellChecksumer, + cks: S, ro: bool, } @@ -854,7 +855,7 @@ impl GenericOrderWal { core: Arc::new(core), ro, opts, - cks: UnsafeCellChecksumer::new(cks), + cks, } } } @@ -864,7 +865,7 @@ where K: Type + Ord + 'static, for<'a> ::Ref<'a>: KeyRef<'a, K>, V: Type + 'static, - S: Checksumer, + S: BuildChecksumer, { /// Returns a write-ahead log backed by a file backed memory map with the given options and [`Checksumer`]. /// @@ -1121,7 +1122,7 @@ where K: Type + Ord + for<'a> Comparable> + 'static, for<'a> K::Ref<'a>: KeyRef<'a, K>, V: Type + 'static, - S: Checksumer, + S: BuildChecksumer, { /// Gets or insert the key value pair. #[inline] @@ -1274,7 +1275,7 @@ where K: Type + Ord + 'static, for<'a> K::Ref<'a>: KeyRef<'a, K>, V: Type + 'static, - S: Checksumer, + S: BuildChecksumer, { /// Inserts a key-value pair into the write-ahead log. #[inline] @@ -1486,8 +1487,8 @@ where // We allocate the buffer with the exact size, so it's safe to write to the buffer. let flag = Flags::COMMITTED.bits(); - self.cks.reset(); - self.cks.update(&[flag]); + let mut cks = self.cks.build_checksumer(); + cks.update(&[flag]); buf.put_u8_unchecked(Flags::empty().bits()); let written = buf.put_u64_varint_unchecked(kvlen); @@ -1507,8 +1508,8 @@ where val.encode(value_buf).map_err(Among::Middle)?; let cks = { - self.cks.update(&buf[1..]); - self.cks.digest() + cks.update(&buf[1..]); + cks.digest() }; buf.put_u64_le_unchecked(cks); @@ -1548,3 +1549,10 @@ where fn dummy_path_builder(p: impl AsRef) -> Result { Ok(p.as_ref().to_path_buf()) } + +// #[inline] +// fn encoded_batch_len(batch: &B) -> u64 { +// let (len, encoded_len) = batch.meta(); + +// STATUS_SIZE + +// } diff --git a/src/swmr/generic/entry.rs b/src/swmr/generic/entry.rs index c40d638e..dd98c62b 100644 --- a/src/swmr/generic/entry.rs +++ b/src/swmr/generic/entry.rs @@ -1,10 +1,10 @@ -use crossbeam_skiplist::set::Entry; +use crossbeam_skiplist::set::Entry as SetEntry; use super::{Pointer, Type, TypeRef}; /// The reference to an entry in the [`GenericOrderWal`](super::GenericOrderWal). pub struct EntryRef<'a, K, V> { - ent: Entry<'a, Pointer>, + ent: SetEntry<'a, Pointer>, } impl<'a, K, V> core::fmt::Debug for EntryRef<'a, K, V> @@ -33,7 +33,7 @@ impl Clone for EntryRef<'_, K, V> { impl<'a, K, V> EntryRef<'a, K, V> { #[inline] - pub(super) fn new(ent: Entry<'a, Pointer>) -> Self { + pub(super) fn new(ent: SetEntry<'a, Pointer>) -> Self { Self { ent } } } @@ -47,13 +47,13 @@ where #[inline] pub fn key(&self) -> K::Ref<'a> { let p = self.ent.value(); - TypeRef::from_slice(p.as_key_slice()) + unsafe { TypeRef::from_slice(p.as_key_slice()) } } /// Returns the value of the entry. #[inline] pub fn value(&self) -> V::Ref<'a> { let p = self.ent.value(); - TypeRef::from_slice(p.as_value_slice()) + unsafe { TypeRef::from_slice(p.as_value_slice()) } } } diff --git a/src/swmr/generic/tests.rs b/src/swmr/generic/tests.rs index 82d640f0..321ea44d 100644 --- a/src/swmr/generic/tests.rs +++ b/src/swmr/generic/tests.rs @@ -8,6 +8,13 @@ use super::*; const MB: u32 = 1024 * 1024; +const fn __static_assertion() {} + +const _: () = { + __static_assertion::>(); + __static_assertion::>(); +}; + #[cfg(all(test, any(test_swmr_generic_constructor, all_tests)))] mod constructor; diff --git a/src/swmr/generic/traits.rs b/src/swmr/generic/traits.rs index 2b34ffd4..a968f85e 100644 --- a/src/swmr/generic/traits.rs +++ b/src/swmr/generic/traits.rs @@ -1,10 +1,30 @@ -use core::cmp; +use core::{cmp, hash::Hash}; use among::Among; -use crossbeam_skiplist::Comparable; +use crossbeam_skiplist::{Comparable, Equivalent}; -mod impls; -pub use impls::*; +mod type_impls; +pub use type_impls::*; + +mod container_impls; +pub use container_impls::*; + +/// The container for entries in the [`GenericBatch`]. +pub trait GenericBatch { + /// The key type. + type Key; + + /// The value type. + type Value; + + /// The iterator type. + type Iter<'a>: Iterator + where + Self: 'a; + + /// Returns an iterator over the keys and values. + fn iter(&self) -> Self::Iter<'_>; +} /// The type trait for limiting the types that can be used as keys and values in the [`GenericOrderWal`]. /// @@ -24,6 +44,21 @@ pub trait Type { fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error>; } +impl Type for &T { + type Ref<'a> = T::Ref<'a>; + type Error = T::Error; + + #[inline] + fn encoded_len(&self) -> usize { + T::encoded_len(*self) + } + + #[inline] + fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> { + T::encode(*self, buf) + } +} + pub(super) trait InsertAmongExt { fn encoded_len(&self) -> usize; fn encode(&self, buf: &mut [u8]) -> Result<(), T::Error>; @@ -56,7 +91,10 @@ impl InsertAmongExt for Among { pub trait TypeRef<'a> { /// Creates a reference type from a binary slice, when using it with [`GenericOrderWal`], /// you can assume that the slice is the same as the one returned by [`encode`](Type::encode). - fn from_slice(src: &'a [u8]) -> Self; + /// + /// # Safety + /// - the `src` must the same as the one returned by [`encode`](Type::encode). + unsafe fn from_slice(src: &'a [u8]) -> Self; } /// The key reference trait for comparing `K` in the [`GenericOrderWal`]. diff --git a/src/swmr/generic/traits/container_impls.rs b/src/swmr/generic/traits/container_impls.rs new file mode 100644 index 00000000..49bf8159 --- /dev/null +++ b/src/swmr/generic/traits/container_impls.rs @@ -0,0 +1,99 @@ +use core::iter::FusedIterator; + +/// An iterator over a slice of key value tuples. +pub struct EntrySliceIter<'a, K, V>(core::slice::Iter<'a, (K, V)>); + +impl<'a, K, V> Iterator for EntrySliceIter<'a, K, V> { + type Item = (&'a K, &'a V); + + fn next(&mut self) -> Option { + self.0.next().map(|(k, v)| (k, v)) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl DoubleEndedIterator for EntrySliceIter<'_, K, V> { + fn next_back(&mut self) -> Option { + self.0.next_back().map(|(k, v)| (k, v)) + } +} + +impl FusedIterator for EntrySliceIter<'_, K, V> {} + +macro_rules! impl_for_vec { + ($( $(#[cfg($cfg:meta)])? $ty:ty $(:$N:ident)? $( => $as_ref:ident)?),+ $(,)?) => { + $( + $(#[cfg($cfg)])? + const _: () = { + impl super::GenericBatch for $ty { + type Key = K; + + type Value = V; + + type Iter<'a> = EntrySliceIter<'a, Self::Key, Self::Value> where Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + EntrySliceIter(IntoIterator::into_iter(self $(.$as_ref())?)) + } + } + }; + )+ + }; +} + +impl_for_vec!( + Vec<(K, V)>, + Box<[(K, V)]>, + &[(K, V)] => as_ref, + std::sync::Arc<[(K, V)]> => as_ref, + std::rc::Rc<[(K, V)]> => as_ref, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::OneOrMore<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::TinyVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::TriVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::SmallVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::MediumVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::LargeVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XLargeVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XXLargeVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XXXLargeVec<(K, V)>, + #[cfg(feature = "smallvec")] + ::smallvec::SmallVec<[(K, V); N]>: N, +); + +impl super::GenericBatch for std::collections::HashMap { + type Key = K; + + type Value = V; + + type Iter<'a> = std::collections::hash_map::Iter<'a, K, V> where + Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + self.iter() + } +} + +impl super::GenericBatch for std::collections::BTreeMap { + type Key = K; + + type Value = V; + + type Iter<'a> = std::collections::btree_map::Iter<'a, K, V> where + Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + self.iter() + } +} diff --git a/src/swmr/generic/traits/impls.rs b/src/swmr/generic/traits/type_impls.rs similarity index 75% rename from src/swmr/generic/traits/impls.rs rename to src/swmr/generic/traits/type_impls.rs index fe5f0013..37813140 100644 --- a/src/swmr/generic/traits/impls.rs +++ b/src/swmr/generic/traits/type_impls.rs @@ -21,7 +21,7 @@ impl Type for () { } impl TypeRef<'_> for () { - fn from_slice(_buf: &[u8]) -> Self {} + unsafe fn from_slice(_buf: &[u8]) -> Self {} } impl Type for [u8; N] { @@ -41,7 +41,7 @@ impl Type for [u8; N] { impl TypeRef<'_> for [u8; N] { #[inline] - fn from_slice(src: &'_ [u8]) -> Self { + unsafe fn from_slice(src: &'_ [u8]) -> Self { let mut this = [0; N]; this.copy_from_slice(src); this @@ -70,7 +70,7 @@ macro_rules! impl_numbers { impl TypeRef<'_> for $ty { #[inline] - fn from_slice(buf: &[u8]) -> Self { + unsafe fn from_slice(buf: &[u8]) -> Self { const SIZE: usize = core::mem::size_of::<$ty>(); $ty::from_le_bytes(buf[..SIZE].try_into().unwrap()) @@ -121,7 +121,7 @@ impl Type for f32 { impl TypeRef<'_> for f32 { #[inline] - fn from_slice(buf: &[u8]) -> Self { + unsafe fn from_slice(buf: &[u8]) -> Self { const SIZE: usize = core::mem::size_of::(); f32::from_le_bytes(buf[..SIZE].try_into().unwrap()) @@ -148,9 +148,57 @@ impl Type for f64 { impl TypeRef<'_> for f64 { #[inline] - fn from_slice(buf: &[u8]) -> Self { + unsafe fn from_slice(buf: &[u8]) -> Self { const SIZE: usize = core::mem::size_of::(); f64::from_le_bytes(buf[..SIZE].try_into().unwrap()) } } + +impl Type for bool { + type Ref<'a> = Self; + + type Error = (); + + #[inline] + fn encoded_len(&self) -> usize { + 1 + } + + #[inline] + fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> { + buf[0] = *self as u8; + Ok(()) + } +} + +impl TypeRef<'_> for bool { + #[inline] + unsafe fn from_slice(buf: &[u8]) -> Self { + buf[0] != 0 + } +} + +impl Type for char { + type Ref<'a> = Self; + + type Error = (); + + #[inline] + fn encoded_len(&self) -> usize { + self.len_utf8() + } + + #[inline] + fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> { + self.encode_utf8(buf); + Ok(()) + } +} + +impl TypeRef<'_> for char { + #[inline] + unsafe fn from_slice(buf: &[u8]) -> Self { + core::str::from_utf8_unchecked(buf).chars().next().unwrap() + } +} diff --git a/src/swmr/generic/traits/impls/bytes.rs b/src/swmr/generic/traits/type_impls/bytes.rs similarity index 86% rename from src/swmr/generic/traits/impls/bytes.rs rename to src/swmr/generic/traits/type_impls/bytes.rs index 779c3e2e..4461ed08 100644 --- a/src/swmr/generic/traits/impls/bytes.rs +++ b/src/swmr/generic/traits/type_impls/bytes.rs @@ -69,7 +69,7 @@ macro_rules! impls { } impl<'a> TypeRef<'a> for &'a [u8] { - fn from_slice(src: &'a [u8]) -> Self { + unsafe fn from_slice(src: &'a [u8]) -> Self { src } } @@ -97,7 +97,7 @@ impl<'a> From> for &'a [u8] { } impl<'a> TypeRef<'a> for SliceRef<'a> { - fn from_slice(src: &'a [u8]) -> Self { + unsafe fn from_slice(src: &'a [u8]) -> Self { Self(src) } } @@ -171,6 +171,24 @@ impls! { Arc<[u8]>, #[cfg(feature = "bytes")] ::bytes::Bytes, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::OneOrMore, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::TinyVec, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::TriVec, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::SmallVec, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::MediumVec, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::LargeVec, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XLargeVec, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XXLargeVec, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XXXLargeVec, } #[cfg(feature = "smallvec")] diff --git a/src/swmr/generic/traits/impls/net.rs b/src/swmr/generic/traits/type_impls/net.rs similarity index 81% rename from src/swmr/generic/traits/impls/net.rs rename to src/swmr/generic/traits/type_impls/net.rs index 30359db0..95c07286 100644 --- a/src/swmr/generic/traits/impls/net.rs +++ b/src/swmr/generic/traits/type_impls/net.rs @@ -22,7 +22,7 @@ impl Type for Ipv4Addr { impl TypeRef<'_> for Ipv4Addr { #[inline] - fn from_slice(buf: &[u8]) -> Self { + unsafe fn from_slice(buf: &[u8]) -> Self { let octets = <[u8; 4]>::from_slice(&buf[..4]); Ipv4Addr::from(octets) } @@ -37,9 +37,11 @@ impl KeyRef<'_, Ipv4Addr> for Ipv4Addr { } fn compare_binary(a: &[u8], b: &[u8]) -> cmp::Ordering { - let a = ::from_slice(a); - let b = ::from_slice(b); - a.cmp(&b) + unsafe { + let a = ::from_slice(a); + let b = ::from_slice(b); + a.cmp(&b) + } } } @@ -62,7 +64,7 @@ impl Type for Ipv6Addr { impl TypeRef<'_> for Ipv6Addr { #[inline] - fn from_slice(buf: &[u8]) -> Self { + unsafe fn from_slice(buf: &[u8]) -> Self { let octets = <[u8; 16]>::from_slice(&buf[..16]); Ipv6Addr::from(octets) } @@ -77,9 +79,11 @@ impl KeyRef<'_, Ipv6Addr> for Ipv6Addr { } fn compare_binary(a: &[u8], b: &[u8]) -> cmp::Ordering { - let a = ::from_slice(a); - let b = ::from_slice(b); - a.cmp(&b) + unsafe { + let a = ::from_slice(a); + let b = ::from_slice(b); + a.cmp(&b) + } } } @@ -103,7 +107,7 @@ impl Type for SocketAddrV4 { impl TypeRef<'_> for SocketAddrV4 { #[inline] - fn from_slice(buf: &[u8]) -> Self { + unsafe fn from_slice(buf: &[u8]) -> Self { let octets = <[u8; 4]>::from_slice(&buf[..4]); let port = u16::from_le_bytes(buf[4..6].try_into().unwrap()); SocketAddrV4::new(Ipv4Addr::from(octets), port) @@ -119,9 +123,11 @@ impl KeyRef<'_, SocketAddrV4> for SocketAddrV4 { } fn compare_binary(a: &[u8], b: &[u8]) -> cmp::Ordering { - let a = ::from_slice(a); - let b = ::from_slice(b); - a.cmp(&b) + unsafe { + let a = ::from_slice(a); + let b = ::from_slice(b); + a.cmp(&b) + } } } @@ -145,7 +151,7 @@ impl Type for SocketAddrV6 { impl TypeRef<'_> for SocketAddrV6 { #[inline] - fn from_slice(buf: &[u8]) -> Self { + unsafe fn from_slice(buf: &[u8]) -> Self { let octets = <[u8; 16]>::from_slice(&buf[..16]); let port = u16::from_le_bytes(buf[16..18].try_into().unwrap()); SocketAddrV6::new(Ipv6Addr::from(octets), port, 0, 0) @@ -161,8 +167,10 @@ impl KeyRef<'_, SocketAddrV6> for SocketAddrV6 { } fn compare_binary(a: &[u8], b: &[u8]) -> cmp::Ordering { - let a = ::from_slice(a); - let b = ::from_slice(b); - a.cmp(&b) + unsafe { + let a = ::from_slice(a); + let b = ::from_slice(b); + a.cmp(&b) + } } } diff --git a/src/swmr/generic/traits/impls/string.rs b/src/swmr/generic/traits/type_impls/string.rs similarity index 97% rename from src/swmr/generic/traits/impls/string.rs rename to src/swmr/generic/traits/type_impls/string.rs index 1d43ea80..7ab8c4ce 100644 --- a/src/swmr/generic/traits/impls/string.rs +++ b/src/swmr/generic/traits/type_impls/string.rs @@ -70,7 +70,7 @@ macro_rules! impls { } impl<'a> TypeRef<'a> for &'a str { - fn from_slice(src: &'a [u8]) -> Self { + unsafe fn from_slice(src: &'a [u8]) -> Self { core::str::from_utf8(src).unwrap() } } @@ -92,7 +92,7 @@ impl<'a> From> for &'a str { } impl<'a> TypeRef<'a> for Str<'a> { - fn from_slice(src: &'a [u8]) -> Self { + unsafe fn from_slice(src: &'a [u8]) -> Self { Self(core::str::from_utf8(src).unwrap()) } } diff --git a/src/swmr/wal.rs b/src/swmr/wal.rs index c6092321..4d4a2495 100644 --- a/src/swmr/wal.rs +++ b/src/swmr/wal.rs @@ -1,6 +1,5 @@ use super::super::*; -use among::Among; use either::Either; use error::Error; use wal::{ @@ -8,7 +7,6 @@ use wal::{ ImmutableWal, }; -use core::ptr::NonNull; use rarena_allocator::sync::Arena; use std::sync::Arc; @@ -36,7 +34,7 @@ pub struct OrderWalCore { map: SkipSet>, opts: Options, cmp: C, - cks: UnsafeCellChecksumer, + cks: S, } impl OrderWalCore { @@ -60,19 +58,13 @@ impl WalCore for OrderWalCore { type Base = SkipSet>; #[inline] - fn construct( - arena: Arena, - set: SkipSet>, - opts: Options, - cmp: C, - checksumer: S, - ) -> Self { + fn construct(arena: Arena, set: SkipSet>, opts: Options, cmp: C, cks: S) -> Self { Self { arena, map: set, cmp, opts, - cks: UnsafeCellChecksumer::new(checksumer), + cks, } } } @@ -99,7 +91,6 @@ impl WalCore for OrderWalCore { // ``` pub struct OrderWal { core: Arc>, - ro: bool, _s: PhantomData, } @@ -111,10 +102,14 @@ where type Core = OrderWalCore; #[inline] - fn from_core(core: Self::Core, ro: bool) -> Self { + fn allocator(&self) -> &Self::Allocator { + &self.core.arena + } + + #[inline] + fn from_core(core: Self::Core) -> Self { Self { core: Arc::new(core), - ro, _s: PhantomData, } } @@ -124,82 +119,23 @@ impl Sealed for OrderWal where C: Send + 'static, { - fn insert_with_in( - &mut self, - kb: KeyBuilder) -> Result<(), KE>>, - vb: ValueBuilder) -> Result<(), VE>>, - ) -> Result<(), Among> + fn hasher(&self) -> &S { + &self.core.cks + } + + fn options(&self) -> &Options { + &self.core.opts + } + + fn comparator(&self) -> &C { + &self.core.cmp + } + + fn insert_pointer(&self, ptr: Pointer) where - C: Comparator + CheapClone, - S: Checksumer, + C: Comparator, { - let (klen, kf) = kb.into_components(); - let (vlen, vf) = vb.into_components(); - let (len_size, kvlen, elen) = entry_size(klen, vlen); - let klen = klen as usize; - let vlen = vlen as usize; - let buf = self.core.arena.alloc_bytes(elen); - - match buf { - Err(e) => Err(Among::Right(Error::from_insufficient_space(e))), - Ok(mut buf) => { - unsafe { - // We allocate the buffer with the exact size, so it's safe to write to the buffer. - let flag = Flags::COMMITTED.bits(); - - self.core.cks.reset(); - self.core.cks.update(&[flag]); - - buf.put_u8_unchecked(Flags::empty().bits()); - let written = buf.put_u64_varint_unchecked(kvlen); - debug_assert_eq!( - written, len_size, - "the precalculated size should be equal to the written size" - ); - - let ko = STATUS_SIZE + written; - buf.set_len(ko + klen + vlen); - - kf(&mut VacantBuffer::new( - klen, - NonNull::new_unchecked(buf.as_mut_ptr().add(ko)), - )) - .map_err(Among::Left)?; - - let vo = ko + klen; - vf(&mut VacantBuffer::new( - vlen, - NonNull::new_unchecked(buf.as_mut_ptr().add(vo)), - )) - .map_err(Among::Middle)?; - - let cks = { - self.core.cks.update(&buf[1..]); - self.core.cks.digest() - }; - buf.put_u64_le_unchecked(cks); - - // commit the entry - buf[0] |= Flags::COMMITTED.bits(); - - if self.core.opts.sync_on_write() && self.core.arena.is_ondisk() { - self - .core - .arena - .flush_range(buf.offset(), elen as usize) - .map_err(|e| Among::Right(e.into()))?; - } - buf.detach(); - self.core.map.insert(Pointer::new( - klen, - vlen, - buf.as_ptr().add(ko), - self.core.cmp.cheap_clone(), - )); - Ok(()) - } - } - } + self.core.map.insert(ptr); } } @@ -253,15 +189,6 @@ where Self: 'a, C: Comparator; - #[inline] - unsafe fn reserved_slice(&self) -> &[u8] { - if self.core.opts.reserved() == 0 { - return &[]; - } - - &self.core.arena.reserved_slice()[HEADER_SIZE..] - } - #[inline] fn path(&self) -> Option<&std::path::Path> { self.core.arena.path().map(|p| p.as_ref().as_path()) @@ -282,6 +209,16 @@ where self.core.opts.maximum_value_size() } + #[inline] + fn remaining(&self) -> u32 { + self.core.arena.remaining() as u32 + } + + #[inline] + fn options(&self) -> &Options { + &self.core.opts + } + #[inline] fn contains_key(&self, key: &Q) -> bool where @@ -390,43 +327,11 @@ where { type Reader = OrderWalReader; - #[inline] - fn read_only(&self) -> bool { - self.ro - } - #[inline] fn reader(&self) -> Self::Reader { OrderWalReader::new(self.core.clone()) } - #[inline] - fn flush(&self) -> Result<(), Error> { - if self.ro { - return Err(error::Error::read_only()); - } - - self.core.arena.flush().map_err(Into::into) - } - - #[inline] - fn flush_async(&self) -> Result<(), Error> { - if self.ro { - return Err(error::Error::read_only()); - } - - self.core.arena.flush_async().map_err(Into::into) - } - - #[inline] - unsafe fn reserved_slice_mut(&mut self) -> &mut [u8] { - if self.core.opts.reserved() == 0 { - return &mut []; - } - - &mut self.core.arena.reserved_slice_mut()[HEADER_SIZE..] - } - fn get_or_insert_with_value_builder( &mut self, key: &[u8], @@ -434,7 +339,7 @@ where ) -> Result, Either> where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { self .check( diff --git a/src/swmr/wal/reader.rs b/src/swmr/wal/reader.rs index 6164cbc7..c07d2bd9 100644 --- a/src/swmr/wal/reader.rs +++ b/src/swmr/wal/reader.rs @@ -9,7 +9,6 @@ impl OrderWalReader { pub(super) fn new(wal: Arc>) -> Self { Self(OrderWal { core: wal.clone(), - ro: true, _s: PhantomData, }) } @@ -20,10 +19,14 @@ impl Constructor for OrderWalReader { type Core = OrderWalCore; - fn from_core(core: Self::Core, _ro: bool) -> Self { + #[inline] + fn allocator(&self) -> &Self::Allocator { + self.0.allocator() + } + + fn from_core(core: Self::Core) -> Self { Self(OrderWal { core: Arc::new(core), - ro: true, _s: PhantomData, }) } @@ -58,11 +61,6 @@ impl ImmutableWal for OrderWalReader { Self: 'a, C: Comparator; - #[inline] - unsafe fn reserved_slice(&self) -> &[u8] { - self.0.reserved_slice() - } - #[inline] fn path(&self) -> Option<&std::path::Path> { self.0.path() @@ -74,13 +72,8 @@ impl ImmutableWal for OrderWalReader { } #[inline] - fn maximum_key_size(&self) -> u32 { - self.0.maximum_key_size() - } - - #[inline] - fn maximum_value_size(&self) -> u32 { - self.0.maximum_value_size() + fn options(&self) -> &Options { + ImmutableWal::options(&self.0) } #[inline] diff --git a/src/unsync.rs b/src/unsync.rs index aa372fc0..64a5d8bc 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -1,9 +1,8 @@ -use core::{cell::UnsafeCell, ops::RangeBounds, ptr::NonNull}; +use core::{cell::UnsafeCell, ops::RangeBounds}; use std::{collections::BTreeSet, rc::Rc}; use super::*; -use among::Among; use either::Either; use error::Error; use rarena_allocator::unsync::Arena; @@ -51,7 +50,6 @@ mod tests; // ``` pub struct OrderWal { core: Rc>>, - ro: bool, _s: PhantomData, } @@ -63,10 +61,14 @@ where type Core = OrderWalCore; #[inline] - fn from_core(core: Self::Core, ro: bool) -> Self { + fn allocator(&self) -> &Self::Allocator { + &self.core().arena + } + + #[inline] + fn from_core(core: Self::Core) -> Self { Self { core: Rc::new(UnsafeCell::new(core)), - ro, _s: PhantomData, } } @@ -93,92 +95,34 @@ impl OrderWal { fn core(&self) -> &OrderWalCore { unsafe { &*self.core.get() } } - - #[inline] - fn core_mut(&mut self) -> &mut OrderWalCore { - unsafe { &mut *self.core.get() } - } } impl Sealed for OrderWal where C: 'static, { - fn insert_with_in( - &mut self, - kb: KeyBuilder) -> Result<(), KE>>, - vb: ValueBuilder) -> Result<(), VE>>, - ) -> Result<(), Among> + #[inline] + fn hasher(&self) -> &S { + &self.core().cks + } + + #[inline] + fn options(&self) -> &Options { + &self.core().opts + } + + #[inline] + fn comparator(&self) -> &C { + &self.core().cmp + } + + #[inline] + fn insert_pointer(&self, ptr: Pointer) where - C: Comparator + CheapClone, - S: Checksumer, + C: Comparator, { - let (klen, kf) = kb.into_components(); - let (vlen, vf) = vb.into_components(); - let (len_size, kvlen, elen) = entry_size(klen, vlen); - let klen = klen as usize; - let vlen = vlen as usize; - let core = self.core_mut(); - let buf = core.arena.alloc_bytes(elen); - - match buf { - Err(e) => Err(Among::Right(Error::from_insufficient_space(e))), - Ok(mut buf) => { - unsafe { - // We allocate the buffer with the exact size, so it's safe to write to the buffer. - let flag = Flags::COMMITTED.bits(); - - core.cks.reset(); - core.cks.update(&[flag]); - - buf.put_u8_unchecked(Flags::empty().bits()); - let written = buf.put_u64_varint_unchecked(kvlen); - debug_assert_eq!( - written, len_size, - "the precalculated size should be equal to the written size" - ); - - let ko = STATUS_SIZE + written; - buf.set_len(ko + klen + vlen); - - kf(&mut VacantBuffer::new( - klen, - NonNull::new_unchecked(buf.as_mut_ptr().add(ko)), - )) - .map_err(Among::Left)?; - - let vo = ko + klen; - vf(&mut VacantBuffer::new( - vlen, - NonNull::new_unchecked(buf.as_mut_ptr().add(vo)), - )) - .map_err(Among::Middle)?; - - let cks = { - core.cks.update(&buf[1..]); - core.cks.digest() - }; - buf.put_u64_le_unchecked(cks); - - // commit the entry - buf[0] |= Flags::COMMITTED.bits(); - - if core.opts.sync_on_write() && core.arena.is_ondisk() { - core - .arena - .flush_range(buf.offset(), elen as usize) - .map_err(|e| Among::Right(e.into()))?; - } - buf.detach(); - core.map.insert(Pointer::new( - klen, - vlen, - buf.as_ptr().add(ko), - core.cmp.cheap_clone(), - )); - Ok(()) - } - } + unsafe { + (*self.core.get()).map.insert(ptr); } } } @@ -216,13 +160,9 @@ where Self: 'a, C: Comparator; - unsafe fn reserved_slice(&self) -> &[u8] { - let core = self.core(); - if core.opts.reserved() == 0 { - return &[]; - } - - &core.arena.reserved_slice()[HEADER_SIZE..] + #[inline] + fn options(&self) -> &Options { + &self.core().opts } #[inline] @@ -252,6 +192,11 @@ where self.core().opts.maximum_value_size() } + #[inline] + fn remaining(&self) -> u32 { + self.core().arena.remaining() as u32 + } + #[inline] fn contains_key(&self, key: &Q) -> bool where @@ -360,48 +305,14 @@ where { type Reader = Self; - #[inline] - fn read_only(&self) -> bool { - self.ro - } - #[inline] fn reader(&self) -> Self::Reader { Self { core: self.core.clone(), - ro: true, _s: PhantomData, } } - #[inline] - unsafe fn reserved_slice_mut(&mut self) -> &mut [u8] { - let core = self.core_mut(); - if core.opts.reserved() == 0 { - return &mut []; - } - - &mut core.arena.reserved_slice_mut()[HEADER_SIZE..] - } - - #[inline] - fn flush(&self) -> Result<(), Error> { - if self.ro { - return Err(error::Error::read_only()); - } - - self.core().arena.flush().map_err(Into::into) - } - - #[inline] - fn flush_async(&self) -> Result<(), Error> { - if self.ro { - return Err(error::Error::read_only()); - } - - self.core().arena.flush_async().map_err(Into::into) - } - fn get_or_insert_with_value_builder( &mut self, key: &[u8], @@ -409,7 +320,7 @@ where ) -> Result, Either> where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { self .check( diff --git a/src/wal.rs b/src/wal.rs index 0e112163..fda001a5 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -4,9 +4,23 @@ use super::*; mod builder; pub use builder::*; +use sealed::Base; + +mod impls; pub(crate) mod sealed; +/// A batch of keys and values that can be inserted into the [`Wal`]. +pub trait Batch { + /// The iterator type. + type Iter<'a>: Iterator + where + Self: 'a; + + /// Returns an iterator over the keys and values. + fn iter(&self) -> Self::Iter<'_>; +} + pub trait ImmutableWal: sealed::Constructor { /// The iterator type. type Iter<'a>: Iterator + DoubleEndedIterator @@ -58,7 +72,19 @@ pub trait ImmutableWal: sealed::Constructor { /// # Safety /// - The writer must ensure that the returned slice is not modified. /// - This method is not thread-safe, so be careful when using it. - unsafe fn reserved_slice(&self) -> &[u8]; + unsafe fn reserved_slice<'a>(&'a self) -> &'a [u8] + where + Self::Allocator: 'a, + { + let reserved = self.options().reserved(); + if reserved == 0 { + return &[]; + } + + let allocator = self.allocator(); + let reserved_slice = allocator.reserved_slice(); + &reserved_slice[HEADER_SIZE..] + } /// Returns the path of the WAL if it is backed by a file. fn path(&self) -> Option<&std::path::Path>; @@ -67,15 +93,37 @@ pub trait ImmutableWal: sealed::Constructor { fn len(&self) -> usize; /// Returns `true` if the WAL is empty. + #[inline] fn is_empty(&self) -> bool { self.len() == 0 } /// Returns the maximum key size allowed in the WAL. - fn maximum_key_size(&self) -> u32; + #[inline] + fn maximum_key_size(&self) -> u32 { + self.options().maximum_key_size() + } /// Returns the maximum value size allowed in the WAL. - fn maximum_value_size(&self) -> u32; + #[inline] + fn maximum_value_size(&self) -> u32 { + self.options().maximum_value_size() + } + + /// Returns the remaining capacity of the WAL. + #[inline] + fn remaining(&self) -> u32 { + self.allocator().remaining() as u32 + } + + /// Returns the capacity of the WAL. + #[inline] + fn capacity(&self) -> u32 { + self.options().capacity() + } + + /// Returns the options used to create this WAL instance. + fn options(&self) -> &Options; /// Returns `true` if the WAL contains the specified key. fn contains_key(&self, key: &Q) -> bool @@ -161,8 +209,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { arena_options(opts.reserved()).with_capacity(opts.capacity()), ) .map_err(Error::from_insufficient_space)?; - >::new_in(arena, opts, cmp, cks) - .map(|core| Self::from_core(core, false)) + >::new_in(arena, opts, cmp, cks).map(Self::from_core) } /// Creates a new in-memory write-ahead log but backed by an anonymous mmap. @@ -180,8 +227,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { ::map_anon(arena_options(opts.reserved()), mmap_opts) .map_err(Into::into) .and_then(|arena| { - >::new_in(arena, opts, cmp, cks) - .map(|core| Self::from_core(core, false)) + >::new_in(arena, opts, cmp, cks).map(Self::from_core) }) } @@ -197,7 +243,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { unsafe fn map

(path: P, b: Builder) -> Result where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, P: AsRef, { >::map_with_path_builder::<_, ()>(|| Ok(path.as_ref().to_path_buf()), b) @@ -220,7 +266,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { where PB: FnOnce() -> Result, C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { let open_options = OpenOptions::default().read(true); @@ -235,7 +281,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { .map_err(|e| e.map_right(Into::into)) .and_then(|arena| { >::replay(arena, Options::new(), true, cmp, cks) - .map(|core| >::from_core(core, true)) + .map(>::from_core) .map_err(Either::Right) }) } @@ -252,7 +298,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { unsafe fn map_mut

(path: P, b: Builder, open_opts: OpenOptions) -> Result where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, P: AsRef, { >::map_mut_with_path_builder::<_, ()>( @@ -280,7 +326,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { where PB: FnOnce() -> Result, C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { let path = path_builder().map_err(Either::Left)?; @@ -297,31 +343,56 @@ pub trait Wal: sealed::Sealed + ImmutableWal { .map_err(Into::into) .and_then(|arena| { if !exist { - >::new_in(arena, opts, cmp, cks) - .map(|core| Self::from_core(core, false)) + >::new_in(arena, opts, cmp, cks).map(Self::from_core) } else { >::replay(arena, opts, false, cmp, cks) - .map(|core| Self::from_core(core, false)) + .map(Self::from_core) } }) .map_err(Either::Right) } /// Returns `true` if this WAL instance is read-only. - fn read_only(&self) -> bool; + fn read_only(&self) -> bool { + self.allocator().read_only() + } /// Returns the mutable reference to the reserved slice. /// /// # Safety /// - The caller must ensure that the there is no others accessing reserved slice for either read or write. /// - This method is not thread-safe, so be careful when using it. - unsafe fn reserved_slice_mut(&mut self) -> &mut [u8]; + unsafe fn reserved_slice_mut<'a>(&'a mut self) -> &'a mut [u8] + where + Self::Allocator: 'a, + { + let reserved = sealed::Sealed::options(self).reserved(); + if reserved == 0 { + return &mut []; + } + + let allocator = self.allocator(); + let reserved_slice = allocator.reserved_slice_mut(); + &mut reserved_slice[HEADER_SIZE..] + } /// Flushes the to disk. - fn flush(&self) -> Result<(), Error>; + fn flush(&self) -> Result<(), Error> { + if !self.read_only() { + self.allocator().flush().map_err(Into::into) + } else { + Err(Error::read_only()) + } + } /// Flushes the to disk. - fn flush_async(&self) -> Result<(), Error>; + fn flush_async(&self) -> Result<(), Error> { + if !self.read_only() { + self.allocator().flush_async().map_err(Into::into) + } else { + Err(Error::read_only()) + } + } /// Returns the read-only view for the WAL. fn reader(&self) -> Self::Reader; @@ -330,7 +401,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { fn get_or_insert(&mut self, key: &[u8], value: &[u8]) -> Result, Error> where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { self .get_or_insert_with_value_builder::<()>( @@ -351,7 +422,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { ) -> Result, Either> where C: Comparator + CheapClone, - S: Checksumer; + S: BuildChecksumer; /// Inserts a key-value pair into the WAL. This method /// allows the caller to build the key in place. @@ -364,7 +435,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { ) -> Result<(), Either> where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { self .check( @@ -384,6 +455,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { Ok(()) }), ) + .map(|ptr| self.insert_pointer(ptr)) .map_err(Among::into_left_right) } @@ -398,7 +470,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { ) -> Result<(), Either> where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { self .check( @@ -418,6 +490,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { }), vb, ) + .map(|ptr| self.insert_pointer(ptr)) .map_err(Among::into_middle_right) } @@ -430,7 +503,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { ) -> Result<(), Among> where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { self .check( @@ -442,14 +515,47 @@ pub trait Wal: sealed::Sealed + ImmutableWal { ) .map_err(Among::Right)?; - self.insert_with_in(kb, vb) + self + .insert_with_in(kb, vb) + .map(|ptr| self.insert_pointer(ptr)) + } + + /// Inserts a batch of key-value pairs into the WAL. + fn insert_batch(&mut self, batch: &B) -> Result<(), Error> + where + S: BuildChecksumer, + { + let batch_encoded_size = batch + .iter() + .fold(0u64, |acc, (k, v)| acc + k.len() as u64 + v.len() as u64); + let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; + if total_size > >::capacity(self) as u64 { + return Err(Error::insufficient_space( + total_size, + >::remaining(self), + )); + } + + let allocator = self.allocator(); + + let mut buf = allocator + .alloc_bytes(total_size as u32) + .map_err(Error::from_insufficient_space)?; + + unsafe { + let committed_flag = Flags::BATCHING | Flags::COMMITTED; + let cks = self.hasher().build_checksumer(); + let flag = Flags::BATCHING; + buf.put_u8_unchecked(flag.bits); + } + todo!() } /// Inserts a key-value pair into the WAL. fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { self.check( key.len(), @@ -470,6 +576,7 @@ pub trait Wal: sealed::Sealed + ImmutableWal { Ok(()) }), ) + .map(|ptr| self.insert_pointer(ptr)) .map_err(Among::unwrap_right) } } diff --git a/src/wal/impls.rs b/src/wal/impls.rs new file mode 100644 index 00000000..6e12c0c5 --- /dev/null +++ b/src/wal/impls.rs @@ -0,0 +1,165 @@ +use core::{borrow::Borrow, iter::FusedIterator, marker::PhantomData}; + +/// An iterator over a slice of key value tuples. +pub struct EntrySliceIter<'a, K, V>(core::slice::Iter<'a, (K, V)>); + +impl<'a, K, V> Iterator for EntrySliceIter<'a, K, V> +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, +{ + type Item = (&'a [u8], &'a [u8]); + + fn next(&mut self) -> Option { + self.0.next().map(|(k, v)| (k.borrow(), v.borrow())) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl DoubleEndedIterator for EntrySliceIter<'_, K, V> +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, +{ + fn next_back(&mut self) -> Option { + self.0.next_back().map(|(k, v)| (k.borrow(), v.borrow())) + } +} + +impl FusedIterator for EntrySliceIter<'_, K, V> +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, +{ +} + +macro_rules! impl_for_vec { + ($( $(#[cfg($cfg:meta)])? $ty:ty $(:$N:ident)? $( => $as_ref:ident)?),+ $(,)?) => { + $( + $(#[cfg($cfg)])? + const _: () = { + impl super::Batch for $ty + where + K: Borrow<[u8]>, + V: Borrow<[u8]>, + { + type Iter<'a> = EntrySliceIter<'a, K, V> where Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + EntrySliceIter(IntoIterator::into_iter(self $(.$as_ref())?)) + } + } + }; + )+ + }; +} + +impl_for_vec!( + Vec<(K, V)>, + Box<[(K, V)]>, + &[(K, V)] => as_ref, + std::sync::Arc<[(K, V)]> => as_ref, + std::rc::Rc<[(K, V)]> => as_ref, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::OneOrMore<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::TinyVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::TriVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::SmallVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::MediumVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::LargeVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XLargeVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XXLargeVec<(K, V)>, + #[cfg(feature = "smallvec-wrapper")] + ::smallvec_wrapper::XXXLargeVec<(K, V)>, + #[cfg(feature = "smallvec")] + ::smallvec::SmallVec<[(K, V); N]>: N, +); + +/// An iterator over a slice of key value tuples. +pub struct EntryMapIter<'a, K, V, T> { + iter: T, + _m: PhantomData<&'a (K, V)>, +} + +impl EntryMapIter<'_, K, V, T> { + /// Construct a new iterator. + #[inline] + pub const fn new(iter: T) -> Self { + Self { + iter, + _m: PhantomData, + } + } +} + +impl<'a, K, V, T> Iterator for EntryMapIter<'a, K, V, T> +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, + T: Iterator, +{ + type Item = (&'a [u8], &'a [u8]); + + fn next(&mut self) -> Option { + self.iter.next().map(|(k, v)| (k.borrow(), v.borrow())) + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } +} + +impl<'a, K, V, T> DoubleEndedIterator for EntryMapIter<'a, K, V, T> +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, + T: DoubleEndedIterator, +{ + fn next_back(&mut self) -> Option { + self.iter.next_back().map(|(k, v)| (k.borrow(), v.borrow())) + } +} + +impl<'a, K, V, T> FusedIterator for EntryMapIter<'a, K, V, T> +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, + T: FusedIterator, +{ +} + +impl super::Batch for std::collections::HashMap +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, +{ + type Iter<'a> = EntryMapIter<'a, K, V, std::collections::hash_map::Iter<'a, K, V>> where + Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + EntryMapIter::new(self.iter()) + } +} + +impl super::Batch for std::collections::BTreeMap +where + K: Borrow<[u8]>, + V: Borrow<[u8]>, +{ + type Iter<'a> = EntryMapIter<'a, K, V, std::collections::btree_map::Iter<'a, K, V>> where + Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + EntryMapIter::new(self.iter()) + } +} diff --git a/src/wal/sealed.rs b/src/wal/sealed.rs index ac0a7e48..7a1d2ff3 100644 --- a/src/wal/sealed.rs +++ b/src/wal/sealed.rs @@ -1,3 +1,5 @@ +use core::ptr::NonNull; + use rarena_allocator::ArenaPosition; use super::*; @@ -28,20 +30,97 @@ pub trait Sealed: Constructor { crate::check(klen, vlen, max_key_size, max_value_size, ro) } + fn hasher(&self) -> &S; + + fn options(&self) -> &Options; + + fn comparator(&self) -> &C; + + fn insert_pointer(&self, ptr: Pointer) + where + C: Comparator; + fn insert_with_in( &mut self, kb: KeyBuilder) -> Result<(), KE>>, vb: ValueBuilder) -> Result<(), VE>>, - ) -> Result<(), Among> + ) -> Result, Among> where C: Comparator + CheapClone, - S: Checksumer; + S: BuildChecksumer, + { + let (klen, kf) = kb.into_components(); + let (vlen, vf) = vb.into_components(); + let (len_size, kvlen, elen) = entry_size(klen, vlen); + let klen = klen as usize; + let vlen = vlen as usize; + let allocator = self.allocator(); + let is_ondisk = allocator.is_ondisk(); + let buf = allocator.alloc_bytes(elen); + let mut cks = self.hasher().build_checksumer(); + + match buf { + Err(e) => Err(Among::Right(Error::from_insufficient_space(e))), + Ok(mut buf) => { + unsafe { + // We allocate the buffer with the exact size, so it's safe to write to the buffer. + let flag = Flags::COMMITTED.bits(); + + cks.update(&[flag]); + + buf.put_u8_unchecked(Flags::empty().bits()); + let written = buf.put_u64_varint_unchecked(kvlen); + debug_assert_eq!( + written, len_size, + "the precalculated size should be equal to the written size" + ); + + let ko = STATUS_SIZE + written; + buf.set_len(ko + klen + vlen); + + kf(&mut VacantBuffer::new( + klen, + NonNull::new_unchecked(buf.as_mut_ptr().add(ko)), + )) + .map_err(Among::Left)?; + + let vo = ko + klen; + vf(&mut VacantBuffer::new( + vlen, + NonNull::new_unchecked(buf.as_mut_ptr().add(vo)), + )) + .map_err(Among::Middle)?; + + let cks = { + cks.update(&buf[1..]); + cks.digest() + }; + buf.put_u64_le_unchecked(cks); + + // commit the entry + buf[0] |= Flags::COMMITTED.bits(); + + if self.options().sync_on_write() && is_ondisk { + allocator + .flush_range(buf.offset(), elen as usize) + .map_err(|e| Among::Right(e.into()))?; + } + buf.detach(); + let cmp = self.comparator().cheap_clone(); + let ptr = buf.as_ptr().add(ko); + Ok(Pointer::new(klen, vlen, ptr, cmp)) + } + } + } + } } pub trait Constructor: Sized { type Allocator: Allocator; type Core: WalCore; + fn allocator(&self) -> &Self::Allocator; + fn new_in(arena: Self::Allocator, opts: Options, cmp: C, cks: S) -> Result { unsafe { let slice = arena.reserved_slice_mut(); @@ -64,7 +143,7 @@ pub trait Constructor: Sized { ) -> Result where C: Comparator + CheapClone, - S: Checksumer, + S: BuildChecksumer, { let slice = arena.reserved_slice(); let magic_text = &slice[0..6]; @@ -122,7 +201,7 @@ pub trait Constructor: Sized { let cks = arena.get_u64_le(cursor + cks_offset).unwrap(); - if cks != checksumer.checksum(arena.get_bytes(cursor, cks_offset)) { + if cks != checksumer.checksum_one(arena.get_bytes(cursor, cks_offset)) { return Err(Error::corrupted()); } @@ -151,5 +230,5 @@ pub trait Constructor: Sized { )) } - fn from_core(core: Self::Core, ro: bool) -> Self; + fn from_core(core: Self::Core) -> Self; }