Skip to content

Commit

Permalink
Finish batch insertion for Wal
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Sep 17, 2024
1 parent f8e21ca commit 4238f4f
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 12 deletions.
8 changes: 5 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ pub mod utils;
use utils::*;

mod wal;
pub use wal::{Builder, Wal};
pub use wal::{
Batch, BatchWithBuilders, BatchWithKeyBuilder, BatchWithValueBuilder, ImmutableWal, Wal,
};

mod options;
pub use options::Options;
Expand Down Expand Up @@ -304,13 +306,13 @@ impl<KB, VB, C> EntryWithBuilders<KB, VB, C> {
}
}

/// Returns the key.
/// Returns the value builder.
#[inline]
pub const fn value_builder(&self) -> &ValueBuilder<VB> {
&self.vb
}

/// Returns the value.
/// Returns the key builder.
#[inline]
pub const fn key_builder(&self) -> &KeyBuilder<KB> {
&self.kb
Expand Down
58 changes: 50 additions & 8 deletions src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
/// the key in place.
pub trait BatchWithKeyBuilder {
/// The key builder type.
type KeyBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::Error>;
type KeyBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::Error>;

/// The error for the key builder.
type Error;
Expand All @@ -74,7 +74,7 @@ pub trait BatchWithKeyBuilder {

impl<KB, E, V, C, T> BatchWithKeyBuilder for T
where
KB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), E>,
KB: Fn(&mut VacantBuffer<'_>) -> Result<(), E>,
V: Borrow<[u8]>,
C: Comparator,
for<'a> &'a mut T: IntoIterator<Item = &'a mut EntryWithKeyBuilder<KB, V, C>>,
Expand All @@ -96,7 +96,7 @@ where
/// the value in place.
pub trait BatchWithValueBuilder {
/// The value builder type.
type ValueBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::Error>;
type ValueBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::Error>;

/// The error for the value builder.
type Error;
Expand All @@ -120,7 +120,7 @@ pub trait BatchWithValueBuilder {

impl<K, VB, E, C, T> BatchWithValueBuilder for T
where
VB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), E>,
VB: Fn(&mut VacantBuffer<'_>) -> Result<(), E>,
K: Borrow<[u8]>,
C: Comparator,
for<'a> &'a mut T: IntoIterator<Item = &'a mut EntryWithValueBuilder<K, VB, C>>,
Expand All @@ -142,13 +142,13 @@ where
/// the key and value in place.
pub trait BatchWithBuilders {
/// The value builder type.
type ValueBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::ValueError>;
type ValueBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::ValueError>;

/// The error for the value builder.
type ValueError;

/// The value builder type.
type KeyBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::KeyError>;
type KeyBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::KeyError>;

/// The error for the value builder.
type KeyError;
Expand All @@ -169,8 +169,8 @@ pub trait BatchWithBuilders {

impl<KB, KE, VB, VE, C, T> BatchWithBuilders for T
where
VB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), VE>,
KB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), KE>,
VB: Fn(&mut VacantBuffer<'_>) -> Result<(), VE>,
KB: Fn(&mut VacantBuffer<'_>) -> Result<(), KE>,
C: Comparator,
for<'a> &'a mut T: IntoIterator<Item = &'a mut EntryWithBuilders<KB, VB, C>>,
{
Expand Down Expand Up @@ -687,6 +687,48 @@ pub trait Wal<C, S>: sealed::Sealed<C, S> + ImmutableWal<C, S> {
.map(|ptr| self.insert_pointer(ptr))
}

/// Inserts a batch of key-value pairs into the WAL.
fn insert_batch_with_key_builder<B: BatchWithKeyBuilder<Comparator = C>>(
&mut self,
batch: &mut B,
) -> Result<(), Either<B::Error, Error>>
where
C: Comparator + CheapClone,
S: BuildChecksumer,
{
self
.insert_batch_with_key_builder_in(batch)
.map(|_| self.insert_pointers(batch.iter_mut().map(|ent| ent.pointer.take().unwrap())))
}

/// Inserts a batch of key-value pairs into the WAL.
fn insert_batch_with_value_builder<B: BatchWithValueBuilder<Comparator = C>>(
&mut self,
batch: &mut B,
) -> Result<(), Either<B::Error, Error>>
where
C: Comparator + CheapClone,
S: BuildChecksumer,
{
self
.insert_batch_with_value_builder_in(batch)
.map(|_| self.insert_pointers(batch.iter_mut().map(|ent| ent.pointer.take().unwrap())))
}

/// Inserts a batch of key-value pairs into the WAL.
fn insert_batch_with_builders<B: BatchWithBuilders<Comparator = C>>(
&mut self,
batch: &mut B,
) -> Result<(), Among<B::KeyError, B::ValueError, Error>>
where
C: Comparator + CheapClone,
S: BuildChecksumer,
{
self
.insert_batch_with_builders_in(batch)
.map(|_| self.insert_pointers(batch.iter_mut().map(|ent| ent.pointer.take().unwrap())))
}

/// Inserts a batch of key-value pairs into the WAL.
fn insert_batch<B: Batch<Comparator = C>>(&mut self, batch: &mut B) -> Result<(), Error>
where
Expand Down
206 changes: 205 additions & 1 deletion src/wal/sealed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,211 @@ pub trait Sealed<C, S>: Constructor<C, S> {
where
C: Comparator;

/// Inserts a batch of key-value pairs into the WAL.
fn insert_batch_with_key_builder_in<B: BatchWithKeyBuilder<Comparator = C>>(
&mut self,
batch: &mut B,
) -> Result<(), Either<B::Error, Error>>
where
C: Comparator + CheapClone,
S: BuildChecksumer,
{
let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| {
acc + ent.kb.size() as u64 + ent.value.borrow().len() as u64
});
let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64;
if total_size > self.options().capacity() as u64 {
return Err(Either::Right(Error::insufficient_space(
total_size,
self.allocator().remaining() as u32,
)));
}

let allocator = self.allocator();

let mut buf = allocator
.alloc_bytes(total_size as u32)
.map_err(|e| Either::Right(Error::from_insufficient_space(e)))?;

unsafe {
let committed_flag = Flags::BATCHING | Flags::COMMITTED;
let mut cks = self.hasher().build_checksumer();
let flag = Flags::BATCHING;
buf.put_u8_unchecked(flag.bits);
let cmp = self.comparator();
let mut cursor = 1;

for ent in batch.iter_mut() {
let ptr = buf.as_mut_ptr().add(cursor as usize);
let klen = ent.kb.size() as usize;
buf.set_len(cursor as usize + klen);
let f = ent.kb.builder();
f(&mut VacantBuffer::new(klen, NonNull::new_unchecked(ptr))).map_err(Either::Left)?;
let value = ent.value.borrow();
let vlen = value.len();
cursor += klen as u64;
cursor += vlen as u64;
buf.put_slice_unchecked(value);

ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone()));
}

cks.update(&[committed_flag.bits]);
cks.update(&buf[1..]);
buf.put_u64_le_unchecked(cks.digest());

// commit the entry
buf[0] = committed_flag.bits;
let buf_cap = buf.capacity();

if self.options().sync_on_write() && allocator.is_ondisk() {
allocator
.flush_range(buf.offset(), buf_cap)
.map_err(|e| Either::Right(e.into()))?;
}
buf.detach();
Ok(())
}
}

fn insert_batch_with_value_builder_in<B: BatchWithValueBuilder<Comparator = C>>(
&mut self,
batch: &mut B,
) -> Result<(), Either<B::Error, Error>>
where
C: Comparator + CheapClone,
S: BuildChecksumer,
{
let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| {
acc + ent.key.borrow().len() as u64 + ent.vb.size() as u64
});
let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64;
if total_size > self.options().capacity() as u64 {
return Err(Either::Right(Error::insufficient_space(
total_size,
self.allocator().remaining() as u32,
)));
}

let allocator = self.allocator();

let mut buf = allocator
.alloc_bytes(total_size as u32)
.map_err(|e| Either::Right(Error::from_insufficient_space(e)))?;

unsafe {
let committed_flag = Flags::BATCHING | Flags::COMMITTED;
let mut cks = self.hasher().build_checksumer();
let flag = Flags::BATCHING;
buf.put_u8_unchecked(flag.bits);
let cmp = self.comparator();
let mut cursor = 1;

for ent in batch.iter_mut() {
let ptr = buf.as_mut_ptr().add(cursor as usize);
let key = ent.key.borrow();
let klen = key.len();
let vlen = ent.vb.size() as usize;
cursor += klen as u64;
buf.put_slice_unchecked(key);
buf.set_len(cursor as usize + vlen);
let f = ent.vb.builder();
f(&mut VacantBuffer::new(
klen,
NonNull::new_unchecked(ptr.add(klen)),
))
.map_err(Either::Left)?;
cursor += vlen as u64;
ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone()));
}

cks.update(&[committed_flag.bits]);
cks.update(&buf[1..]);
buf.put_u64_le_unchecked(cks.digest());

// commit the entry
buf[0] = committed_flag.bits;
let buf_cap = buf.capacity();

if self.options().sync_on_write() && allocator.is_ondisk() {
allocator
.flush_range(buf.offset(), buf_cap)
.map_err(|e| Either::Right(e.into()))?;
}
buf.detach();
Ok(())
}
}

fn insert_batch_with_builders_in<B: BatchWithBuilders<Comparator = C>>(
&mut self,
batch: &mut B,
) -> Result<(), Among<B::KeyError, B::ValueError, Error>>
where
C: Comparator + CheapClone,
S: BuildChecksumer,
{
let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| {
acc + ent.kb.size() as u64 + ent.vb.size() as u64
});
let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64;
if total_size > self.options().capacity() as u64 {
return Err(Among::Right(Error::insufficient_space(
total_size,
self.allocator().remaining() as u32,
)));
}

let allocator = self.allocator();

let mut buf = allocator
.alloc_bytes(total_size as u32)
.map_err(|e| Either::Right(Error::from_insufficient_space(e)))?;

unsafe {
let committed_flag = Flags::BATCHING | Flags::COMMITTED;
let mut cks = self.hasher().build_checksumer();
let flag = Flags::BATCHING;
buf.put_u8_unchecked(flag.bits);
let cmp = self.comparator();
let mut cursor = 1;

for ent in batch.iter_mut() {
let ptr = buf.as_mut_ptr().add(cursor as usize);
let klen = ent.kb.size() as usize;
buf.set_len(cursor as usize + klen);
let f = ent.kb.builder();
f(&mut VacantBuffer::new(klen, NonNull::new_unchecked(ptr))).map_err(Among::Left)?;
let vlen = ent.vb.size() as usize;
cursor += klen as u64;
buf.set_len(cursor as usize + vlen);
let f = ent.vb.builder();
f(&mut VacantBuffer::new(
klen,
NonNull::new_unchecked(ptr.add(klen)),
))
.map_err(Among::Middle)?;
cursor += vlen as u64;
ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone()));
}

cks.update(&[committed_flag.bits]);
cks.update(&buf[1..]);
buf.put_u64_le_unchecked(cks.digest());

// commit the entry
buf[0] = committed_flag.bits;
let buf_cap = buf.capacity();

if self.options().sync_on_write() && allocator.is_ondisk() {
allocator
.flush_range(buf.offset(), buf_cap)
.map_err(|e| Among::Right(e.into()))?;
}
buf.detach();
Ok(())
}
}

fn insert_batch_in<B: Batch<Comparator = C>>(&mut self, batch: &mut B) -> Result<(), Error>
where
C: Comparator + CheapClone,
Expand Down

0 comments on commit 4238f4f

Please sign in to comment.