Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Sep 16, 2024
1 parent ebb9c0d commit c605e5f
Show file tree
Hide file tree
Showing 20 changed files with 759 additions and 396 deletions.
7 changes: 5 additions & 2 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ ignore:
- "**/benches/"
- "src/tests.rs"
- "src/error.rs"
- "src/swmr/generic/traits/impls/"
- "src/swmr/generic/traits/impls.rs"
- "src/swmr/generic/traits/container_impls/"
- "src/swmr/generic/traits/container_impls.rs"
- "src/swmr/generic/traits/type_impls/"
- "src/swmr/generic/traits/type_impls.rs"
- "src/swmr/generic/tests.rs"
- "src/swmr/generic/tests/"
- "src/swmr/wal/tests.rs"
- "src/swmr/wal/tests/"
- "src/wal/impls.rs"
- "src/unsync/tests.rs"
- "src/unsync/tests/"

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ thiserror = "1"

bytes = { version = "1", default-features = false, optional = true }
smallvec = { version = "1", default-features = false, optional = true, features = ["const_generics"] }
smallvec-wrapper = { version = "0.1", optional = true, default-features = false, features = ["const_generics"] }
smol_str = { version = "0.3", default-features = false, optional = true }
faststr = { version = "0.2", default-features = false, optional = true }

Expand Down
2 changes: 1 addition & 1 deletion examples/zero_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Type for Person {
}

impl<'a> TypeRef<'a> for PersonRef<'a> {
fn from_slice(src: &'a [u8]) -> Self {
unsafe fn from_slice(src: &'a [u8]) -> Self {
let (id_size, id) = decode_u64_varint(src).unwrap();
let name = std::str::from_utf8(&src[id_size..]).unwrap();
PersonRef { id, name }
Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub enum Error {
#[error("insufficient space in the WAL (requested: {requested}, available: {available})")]
InsufficientSpace {
/// The requested size
requested: u32,
requested: u64,
/// The remaining size
available: u32,
},
Expand Down Expand Up @@ -43,7 +43,7 @@ pub enum Error {

impl Error {
/// Create a new `Error::InsufficientSpace` instance.
pub(crate) const fn insufficient_space(requested: u32, available: u32) -> Self {
pub(crate) const fn insufficient_space(requested: u64, available: u32) -> Self {
Self::InsufficientSpace {
requested,
available,
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Error {
rarena_allocator::Error::InsufficientSpace {
requested,
available,
} => Self::insufficient_space(requested, available),
} => Self::insufficient_space(requested as u64, available),
_ => unreachable!(),
}
}
Expand Down
43 changes: 8 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ pub use rarena_allocator::OpenOptions;
#[cfg(feature = "std")]
extern crate std;

pub use dbutils::{Ascend, CheapClone, Checksumer, Comparator, Crc32, Descend};
pub use dbutils::{
checksum::{BuildChecksumer, Checksumer, Crc32},
Ascend, CheapClone, Comparator, Descend,
};

#[cfg(feature = "xxhash3")]
#[cfg_attr(docsrs, doc(cfg(feature = "xxhash3")))]
pub use dbutils::XxHash3;
pub use dbutils::checksum::XxHash3;

#[cfg(feature = "xxhash64")]
#[cfg_attr(docsrs, doc(cfg(feature = "xxhash64")))]
pub use dbutils::XxHash64;
pub use dbutils::checksum::XxHash64;

const STATUS_SIZE: usize = mem::size_of::<u8>();
const CHECKSUM_SIZE: usize = mem::size_of::<u64>();
Expand Down Expand Up @@ -88,6 +91,8 @@ bitflags::bitflags! {
struct Flags: u8 {
/// First bit: 1 indicates committed, 0 indicates uncommitted
const COMMITTED = 0b00000001;
/// Second bit: 1 indicates batching, 0 indicates single entry
const BATCHING = 0b00000010;
}
}

Expand Down Expand Up @@ -169,35 +174,3 @@ where
self.as_key_slice().borrow()
}
}

/// Use to avoid the mutable borrow checker, for single writer multiple readers usecase.
struct UnsafeCellChecksumer<S>(core::cell::UnsafeCell<S>);

impl<S> UnsafeCellChecksumer<S> {
#[inline]
const fn new(checksumer: S) -> Self {
Self(core::cell::UnsafeCell::new(checksumer))
}
}

impl<S> UnsafeCellChecksumer<S>
where
S: Checksumer,
{
#[inline]
fn update(&self, buf: &[u8]) {
// SAFETY: the checksumer will not be invoked concurrently.
unsafe { (*self.0.get()).update(buf) }
}

#[inline]
fn reset(&self) {
// SAFETY: the checksumer will not be invoked concurrently.
unsafe { (*self.0.get()).reset() }
}

#[inline]
fn digest(&self) -> u64 {
unsafe { (*self.0.get()).digest() }
}
}
46 changes: 27 additions & 19 deletions src/swmr/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ use std::{

use among::Among;
use crossbeam_skiplist::SkipSet;
use dbutils::checksum::{BuildChecksumer, Checksumer, Crc32};
pub use dbutils::equivalent::*;
use dbutils::{Checksumer, Crc32};
use rarena_allocator::{
either::Either, sync::Arena, Allocator, ArenaPosition, Memory, MmapOptions, OpenOptions,
};

use crate::{
arena_options, check, entry_size,
error::{self, Error},
split_lengths, Flags, Options, UnsafeCellChecksumer, CHECKSUM_SIZE, HEADER_SIZE, MAGIC_TEXT,
STATUS_SIZE,
split_lengths, Flags, Options, CHECKSUM_SIZE, HEADER_SIZE, MAGIC_TEXT, STATUS_SIZE,
};

mod entry;
Expand Down Expand Up @@ -189,9 +188,11 @@ where
K::Ref<'a>: KeyRef<'a, K>,
{
fn compare(&self, p: &Pointer<K, V>) -> cmp::Ordering {
let kr: K::Ref<'_> = TypeRef::from_slice(p.as_key_slice());
let or: K::Ref<'_> = TypeRef::from_slice(self.as_key_slice());
KeyRef::compare(&kr, &or).reverse()
unsafe {
let kr: K::Ref<'_> = TypeRef::from_slice(p.as_key_slice());
let or: K::Ref<'_> = TypeRef::from_slice(self.as_key_slice());
KeyRef::compare(&kr, &or).reverse()
}
}
}

Expand Down Expand Up @@ -229,7 +230,7 @@ where
Q: ?Sized + Ord + Comparable<K::Ref<'a>>,
{
fn compare(&self, p: &Pointer<K, V>) -> cmp::Ordering {
let kr = TypeRef::from_slice(p.as_key_slice());
let kr = unsafe { TypeRef::from_slice(p.as_key_slice()) };
KeyRef::compare(&kr, self.key).reverse()
}
}
Expand Down Expand Up @@ -268,7 +269,7 @@ where
Q: ?Sized + Ord + Comparable<K> + Comparable<K::Ref<'a>>,
{
fn compare(&self, p: &Pointer<K, V>) -> cmp::Ordering {
let kr = <K::Ref<'_> as TypeRef<'_>>::from_slice(p.as_key_slice());
let kr = unsafe { <K::Ref<'_> as TypeRef<'_>>::from_slice(p.as_key_slice()) };
KeyRef::compare(&kr, self.key).reverse()
}
}
Expand Down Expand Up @@ -387,7 +388,7 @@ where
for<'a> <K as Type>::Ref<'a>: KeyRef<'a, K>,
V: Type + 'static,
{
fn replay<S: Checksumer>(
fn replay<S: BuildChecksumer>(
arena: Arena,
opts: &Options,
ro: bool,
Expand Down Expand Up @@ -450,7 +451,7 @@ where

let cks = arena.get_u64_le(cursor + cks_offset).unwrap();

if cks != checksumer.checksum(arena.get_bytes(cursor, cks_offset)) {
if cks != checksumer.checksum_one(arena.get_bytes(cursor, cks_offset)) {
return Err(Error::corrupted());
}

Expand Down Expand Up @@ -545,7 +546,7 @@ where
pub struct GenericOrderWal<K, V, S = Crc32> {
core: Arc<GenericOrderWalCore<K, V>>,
opts: Options,
cks: UnsafeCellChecksumer<S>,
cks: S,
ro: bool,
}

Expand Down Expand Up @@ -854,7 +855,7 @@ impl<K, V, S> GenericOrderWal<K, V, S> {
core: Arc::new(core),
ro,
opts,
cks: UnsafeCellChecksumer::new(cks),
cks,
}
}
}
Expand All @@ -864,7 +865,7 @@ where
K: Type + Ord + 'static,
for<'a> <K as Type>::Ref<'a>: KeyRef<'a, K>,
V: Type + 'static,
S: Checksumer,
S: BuildChecksumer,
{
/// Returns a write-ahead log backed by a file backed memory map with the given options and [`Checksumer`].
///
Expand Down Expand Up @@ -1121,7 +1122,7 @@ where
K: Type + Ord + for<'a> Comparable<K::Ref<'a>> + 'static,
for<'a> K::Ref<'a>: KeyRef<'a, K>,
V: Type + 'static,
S: Checksumer,
S: BuildChecksumer,
{
/// Gets or insert the key value pair.
#[inline]
Expand Down Expand Up @@ -1274,7 +1275,7 @@ where
K: Type + Ord + 'static,
for<'a> K::Ref<'a>: KeyRef<'a, K>,
V: Type + 'static,
S: Checksumer,
S: BuildChecksumer,
{
/// Inserts a key-value pair into the write-ahead log.
#[inline]
Expand Down Expand Up @@ -1486,8 +1487,8 @@ where
// We allocate the buffer with the exact size, so it's safe to write to the buffer.
let flag = Flags::COMMITTED.bits();

self.cks.reset();
self.cks.update(&[flag]);
let mut cks = self.cks.build_checksumer();
cks.update(&[flag]);

buf.put_u8_unchecked(Flags::empty().bits());
let written = buf.put_u64_varint_unchecked(kvlen);
Expand All @@ -1507,8 +1508,8 @@ where
val.encode(value_buf).map_err(Among::Middle)?;

let cks = {
self.cks.update(&buf[1..]);
self.cks.digest()
cks.update(&buf[1..]);
cks.digest()
};
buf.put_u64_le_unchecked(cks);

Expand Down Expand Up @@ -1548,3 +1549,10 @@ where
fn dummy_path_builder(p: impl AsRef<Path>) -> Result<PathBuf, ()> {
Ok(p.as_ref().to_path_buf())
}

// #[inline]
// fn encoded_batch_len<B: GenericBatch>(batch: &B) -> u64 {
// let (len, encoded_len) = batch.meta();

// STATUS_SIZE +
// }
10 changes: 5 additions & 5 deletions src/swmr/generic/entry.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crossbeam_skiplist::set::Entry;
use crossbeam_skiplist::set::Entry as SetEntry;

use super::{Pointer, Type, TypeRef};

/// The reference to an entry in the [`GenericOrderWal`](super::GenericOrderWal).
pub struct EntryRef<'a, K, V> {
ent: Entry<'a, Pointer<K, V>>,
ent: SetEntry<'a, Pointer<K, V>>,
}

impl<'a, K, V> core::fmt::Debug for EntryRef<'a, K, V>
Expand Down Expand Up @@ -33,7 +33,7 @@ impl<K, V> Clone for EntryRef<'_, K, V> {

impl<'a, K, V> EntryRef<'a, K, V> {
#[inline]
pub(super) fn new(ent: Entry<'a, Pointer<K, V>>) -> Self {
pub(super) fn new(ent: SetEntry<'a, Pointer<K, V>>) -> Self {
Self { ent }
}
}
Expand All @@ -47,13 +47,13 @@ where
#[inline]
pub fn key(&self) -> K::Ref<'a> {
let p = self.ent.value();
TypeRef::from_slice(p.as_key_slice())
unsafe { TypeRef::from_slice(p.as_key_slice()) }
}

/// Returns the value of the entry.
#[inline]
pub fn value(&self) -> V::Ref<'a> {
let p = self.ent.value();
TypeRef::from_slice(p.as_value_slice())
unsafe { TypeRef::from_slice(p.as_value_slice()) }
}
}
7 changes: 7 additions & 0 deletions src/swmr/generic/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ use super::*;

const MB: u32 = 1024 * 1024;

const fn __static_assertion<B: GenericBatch>() {}

const _: () = {
__static_assertion::<std::collections::BTreeMap<String, String>>();
__static_assertion::<std::collections::HashMap<String, String>>();
};

#[cfg(all(test, any(test_swmr_generic_constructor, all_tests)))]
mod constructor;

Expand Down
48 changes: 43 additions & 5 deletions src/swmr/generic/traits.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
use core::cmp;
use core::{cmp, hash::Hash};

use among::Among;
use crossbeam_skiplist::Comparable;
use crossbeam_skiplist::{Comparable, Equivalent};

mod impls;
pub use impls::*;
mod type_impls;
pub use type_impls::*;

mod container_impls;
pub use container_impls::*;

/// The container for entries in the [`GenericBatch`].
pub trait GenericBatch {
/// The key type.
type Key;

/// The value type.
type Value;

/// The iterator type.
type Iter<'a>: Iterator<Item = (&'a Self::Key, &'a Self::Value)>
where
Self: 'a;

/// Returns an iterator over the keys and values.
fn iter(&self) -> Self::Iter<'_>;
}

/// The type trait for limiting the types that can be used as keys and values in the [`GenericOrderWal`].
///
Expand All @@ -24,6 +44,21 @@ pub trait Type {
fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error>;
}

impl<T: Type> Type for &T {
type Ref<'a> = T::Ref<'a>;
type Error = T::Error;

#[inline]
fn encoded_len(&self) -> usize {
T::encoded_len(*self)
}

#[inline]
fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> {
T::encode(*self, buf)
}
}

pub(super) trait InsertAmongExt<T: Type> {
fn encoded_len(&self) -> usize;
fn encode(&self, buf: &mut [u8]) -> Result<(), T::Error>;
Expand Down Expand Up @@ -56,7 +91,10 @@ impl<T: Type> InsertAmongExt<T> for Among<T, &T, &[u8]> {
pub trait TypeRef<'a> {
/// Creates a reference type from a binary slice, when using it with [`GenericOrderWal`],
/// you can assume that the slice is the same as the one returned by [`encode`](Type::encode).
fn from_slice(src: &'a [u8]) -> Self;
///
/// # Safety
/// - the `src` must the same as the one returned by [`encode`](Type::encode).
unsafe fn from_slice(src: &'a [u8]) -> Self;
}

/// The key reference trait for comparing `K` in the [`GenericOrderWal`].
Expand Down
Loading

0 comments on commit c605e5f

Please sign in to comment.