From 4238f4f20ed92ff4e16d84876335880049b9974e Mon Sep 17 00:00:00 2001 From: al8n Date: Tue, 17 Sep 2024 13:59:12 +0800 Subject: [PATCH] Finish batch insertion for `Wal` --- src/lib.rs | 8 +- src/wal.rs | 58 +++++++++++-- src/wal/sealed.rs | 206 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 260 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e22724cf..4a23386f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,7 +75,9 @@ pub mod utils; use utils::*; mod wal; -pub use wal::{Builder, Wal}; +pub use wal::{ + Batch, BatchWithBuilders, BatchWithKeyBuilder, BatchWithValueBuilder, ImmutableWal, Wal, +}; mod options; pub use options::Options; @@ -304,13 +306,13 @@ impl EntryWithBuilders { } } - /// Returns the key. + /// Returns the value builder. #[inline] pub const fn value_builder(&self) -> &ValueBuilder { &self.vb } - /// Returns the value. + /// Returns the key builder. #[inline] pub const fn key_builder(&self) -> &KeyBuilder { &self.kb diff --git a/src/wal.rs b/src/wal.rs index 54a5af5e..2e16b4d9 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -50,7 +50,7 @@ where /// the key in place. pub trait BatchWithKeyBuilder { /// The key builder type. - type KeyBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::Error>; + type KeyBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::Error>; /// The error for the key builder. type Error; @@ -74,7 +74,7 @@ pub trait BatchWithKeyBuilder { impl BatchWithKeyBuilder for T where - KB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), E>, + KB: Fn(&mut VacantBuffer<'_>) -> Result<(), E>, V: Borrow<[u8]>, C: Comparator, for<'a> &'a mut T: IntoIterator>, @@ -96,7 +96,7 @@ where /// the value in place. pub trait BatchWithValueBuilder { /// The value builder type. - type ValueBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::Error>; + type ValueBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::Error>; /// The error for the value builder. type Error; @@ -120,7 +120,7 @@ pub trait BatchWithValueBuilder { impl BatchWithValueBuilder for T where - VB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), E>, + VB: Fn(&mut VacantBuffer<'_>) -> Result<(), E>, K: Borrow<[u8]>, C: Comparator, for<'a> &'a mut T: IntoIterator>, @@ -142,13 +142,13 @@ where /// the key and value in place. pub trait BatchWithBuilders { /// The value builder type. - type ValueBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::ValueError>; + type ValueBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::ValueError>; /// The error for the value builder. type ValueError; /// The value builder type. - type KeyBuilder: FnOnce(&mut VacantBuffer<'_>) -> Result<(), Self::KeyError>; + type KeyBuilder: Fn(&mut VacantBuffer<'_>) -> Result<(), Self::KeyError>; /// The error for the value builder. type KeyError; @@ -169,8 +169,8 @@ pub trait BatchWithBuilders { impl BatchWithBuilders for T where - VB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), VE>, - KB: FnOnce(&mut VacantBuffer<'_>) -> Result<(), KE>, + VB: Fn(&mut VacantBuffer<'_>) -> Result<(), VE>, + KB: Fn(&mut VacantBuffer<'_>) -> Result<(), KE>, C: Comparator, for<'a> &'a mut T: IntoIterator>, { @@ -687,6 +687,48 @@ pub trait Wal: sealed::Sealed + ImmutableWal { .map(|ptr| self.insert_pointer(ptr)) } + /// Inserts a batch of key-value pairs into the WAL. + fn insert_batch_with_key_builder>( + &mut self, + batch: &mut B, + ) -> Result<(), Either> + where + C: Comparator + CheapClone, + S: BuildChecksumer, + { + self + .insert_batch_with_key_builder_in(batch) + .map(|_| self.insert_pointers(batch.iter_mut().map(|ent| ent.pointer.take().unwrap()))) + } + + /// Inserts a batch of key-value pairs into the WAL. + fn insert_batch_with_value_builder>( + &mut self, + batch: &mut B, + ) -> Result<(), Either> + where + C: Comparator + CheapClone, + S: BuildChecksumer, + { + self + .insert_batch_with_value_builder_in(batch) + .map(|_| self.insert_pointers(batch.iter_mut().map(|ent| ent.pointer.take().unwrap()))) + } + + /// Inserts a batch of key-value pairs into the WAL. + fn insert_batch_with_builders>( + &mut self, + batch: &mut B, + ) -> Result<(), Among> + where + C: Comparator + CheapClone, + S: BuildChecksumer, + { + self + .insert_batch_with_builders_in(batch) + .map(|_| self.insert_pointers(batch.iter_mut().map(|ent| ent.pointer.take().unwrap()))) + } + /// Inserts a batch of key-value pairs into the WAL. fn insert_batch>(&mut self, batch: &mut B) -> Result<(), Error> where diff --git a/src/wal/sealed.rs b/src/wal/sealed.rs index 0fe01286..e8478ad3 100644 --- a/src/wal/sealed.rs +++ b/src/wal/sealed.rs @@ -44,7 +44,211 @@ pub trait Sealed: Constructor { where C: Comparator; - /// Inserts a batch of key-value pairs into the WAL. + fn insert_batch_with_key_builder_in>( + &mut self, + batch: &mut B, + ) -> Result<(), Either> + where + C: Comparator + CheapClone, + S: BuildChecksumer, + { + let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| { + acc + ent.kb.size() as u64 + ent.value.borrow().len() as u64 + }); + let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; + if total_size > self.options().capacity() as u64 { + return Err(Either::Right(Error::insufficient_space( + total_size, + self.allocator().remaining() as u32, + ))); + } + + let allocator = self.allocator(); + + let mut buf = allocator + .alloc_bytes(total_size as u32) + .map_err(|e| Either::Right(Error::from_insufficient_space(e)))?; + + unsafe { + let committed_flag = Flags::BATCHING | Flags::COMMITTED; + let mut cks = self.hasher().build_checksumer(); + let flag = Flags::BATCHING; + buf.put_u8_unchecked(flag.bits); + let cmp = self.comparator(); + let mut cursor = 1; + + for ent in batch.iter_mut() { + let ptr = buf.as_mut_ptr().add(cursor as usize); + let klen = ent.kb.size() as usize; + buf.set_len(cursor as usize + klen); + let f = ent.kb.builder(); + f(&mut VacantBuffer::new(klen, NonNull::new_unchecked(ptr))).map_err(Either::Left)?; + let value = ent.value.borrow(); + let vlen = value.len(); + cursor += klen as u64; + cursor += vlen as u64; + buf.put_slice_unchecked(value); + + ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); + } + + cks.update(&[committed_flag.bits]); + cks.update(&buf[1..]); + buf.put_u64_le_unchecked(cks.digest()); + + // commit the entry + buf[0] = committed_flag.bits; + let buf_cap = buf.capacity(); + + if self.options().sync_on_write() && allocator.is_ondisk() { + allocator + .flush_range(buf.offset(), buf_cap) + .map_err(|e| Either::Right(e.into()))?; + } + buf.detach(); + Ok(()) + } + } + + fn insert_batch_with_value_builder_in>( + &mut self, + batch: &mut B, + ) -> Result<(), Either> + where + C: Comparator + CheapClone, + S: BuildChecksumer, + { + let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| { + acc + ent.key.borrow().len() as u64 + ent.vb.size() as u64 + }); + let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; + if total_size > self.options().capacity() as u64 { + return Err(Either::Right(Error::insufficient_space( + total_size, + self.allocator().remaining() as u32, + ))); + } + + let allocator = self.allocator(); + + let mut buf = allocator + .alloc_bytes(total_size as u32) + .map_err(|e| Either::Right(Error::from_insufficient_space(e)))?; + + unsafe { + let committed_flag = Flags::BATCHING | Flags::COMMITTED; + let mut cks = self.hasher().build_checksumer(); + let flag = Flags::BATCHING; + buf.put_u8_unchecked(flag.bits); + let cmp = self.comparator(); + let mut cursor = 1; + + for ent in batch.iter_mut() { + let ptr = buf.as_mut_ptr().add(cursor as usize); + let key = ent.key.borrow(); + let klen = key.len(); + let vlen = ent.vb.size() as usize; + cursor += klen as u64; + buf.put_slice_unchecked(key); + buf.set_len(cursor as usize + vlen); + let f = ent.vb.builder(); + f(&mut VacantBuffer::new( + klen, + NonNull::new_unchecked(ptr.add(klen)), + )) + .map_err(Either::Left)?; + cursor += vlen as u64; + ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); + } + + cks.update(&[committed_flag.bits]); + cks.update(&buf[1..]); + buf.put_u64_le_unchecked(cks.digest()); + + // commit the entry + buf[0] = committed_flag.bits; + let buf_cap = buf.capacity(); + + if self.options().sync_on_write() && allocator.is_ondisk() { + allocator + .flush_range(buf.offset(), buf_cap) + .map_err(|e| Either::Right(e.into()))?; + } + buf.detach(); + Ok(()) + } + } + + fn insert_batch_with_builders_in>( + &mut self, + batch: &mut B, + ) -> Result<(), Among> + where + C: Comparator + CheapClone, + S: BuildChecksumer, + { + let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| { + acc + ent.kb.size() as u64 + ent.vb.size() as u64 + }); + let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; + if total_size > self.options().capacity() as u64 { + return Err(Among::Right(Error::insufficient_space( + total_size, + self.allocator().remaining() as u32, + ))); + } + + let allocator = self.allocator(); + + let mut buf = allocator + .alloc_bytes(total_size as u32) + .map_err(|e| Either::Right(Error::from_insufficient_space(e)))?; + + unsafe { + let committed_flag = Flags::BATCHING | Flags::COMMITTED; + let mut cks = self.hasher().build_checksumer(); + let flag = Flags::BATCHING; + buf.put_u8_unchecked(flag.bits); + let cmp = self.comparator(); + let mut cursor = 1; + + for ent in batch.iter_mut() { + let ptr = buf.as_mut_ptr().add(cursor as usize); + let klen = ent.kb.size() as usize; + buf.set_len(cursor as usize + klen); + let f = ent.kb.builder(); + f(&mut VacantBuffer::new(klen, NonNull::new_unchecked(ptr))).map_err(Among::Left)?; + let vlen = ent.vb.size() as usize; + cursor += klen as u64; + buf.set_len(cursor as usize + vlen); + let f = ent.vb.builder(); + f(&mut VacantBuffer::new( + klen, + NonNull::new_unchecked(ptr.add(klen)), + )) + .map_err(Among::Middle)?; + cursor += vlen as u64; + ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); + } + + cks.update(&[committed_flag.bits]); + cks.update(&buf[1..]); + buf.put_u64_le_unchecked(cks.digest()); + + // commit the entry + buf[0] = committed_flag.bits; + let buf_cap = buf.capacity(); + + if self.options().sync_on_write() && allocator.is_ondisk() { + allocator + .flush_range(buf.offset(), buf_cap) + .map_err(|e| Among::Right(e.into()))?; + } + buf.detach(); + Ok(()) + } + } + fn insert_batch_in>(&mut self, batch: &mut B) -> Result<(), Error> where C: Comparator + CheapClone,