From bb3488b6fc682e3cfb6f5bde4b81028dcda65191 Mon Sep 17 00:00:00 2001 From: al8n Date: Wed, 18 Sep 2024 00:20:24 +0800 Subject: [PATCH] WIP --- .github/workflows/ci.yml | 2 + Cargo.toml | 2 + src/lib.rs | 38 +++--- src/swmr/wal/tests.rs | 3 + src/swmr/wal/tests/constructor.rs | 2 +- src/swmr/wal/tests/get.rs | 2 +- src/swmr/wal/tests/insert.rs | 2 +- src/swmr/wal/tests/insert_batch.rs | 3 + src/swmr/wal/tests/iter.rs | 2 +- src/tests.rs | 204 ++++++++++++++++++++++++++--- src/unsync.rs | 1 + src/unsync/tests.rs | 3 + src/unsync/tests/insert_batch.rs | 3 + src/utils.rs | 8 +- src/wal/sealed.rs | 161 ++++++++++++++++------- 15 files changed, 343 insertions(+), 93 deletions(-) create mode 100644 src/swmr/wal/tests/insert_batch.rs create mode 100644 src/unsync/tests/insert_batch.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8059f4ee..20d039e4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -240,10 +240,12 @@ jobs: - unsync_iters - unsync_get - unsync_constructor + - unsync_insert_batch - swmr_insert - swmr_iters - swmr_get - swmr_constructor + - swmr_insert_batch - swmr_generic_insert - swmr_generic_iters - swmr_generic_get diff --git a/Cargo.toml b/Cargo.toml index 55daf990..e13216da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,10 +68,12 @@ unexpected_cfgs = { level = "warn", check-cfg = [ 'cfg(all_tests)', 'cfg(test_unsync_constructor)', 'cfg(test_unsync_insert)', + 'cfg(test_unsync_insert_batch)', 'cfg(test_unsync_iters)', 'cfg(test_unsync_get)', 'cfg(test_swmr_constructor)', 'cfg(test_swmr_insert)', + 'cfg(test_swmr_insert_batch)', 'cfg(test_swmr_iters)', 'cfg(test_swmr_get)', 'cfg(test_swmr_generic_constructor)', diff --git a/src/lib.rs b/src/lib.rs index f3ecc463..807c26f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,24 +43,26 @@ const MAGIC_TEXT_SIZE: usize = MAGIC_TEXT.len(); const MAGIC_VERSION_SIZE: usize = mem::size_of::(); const HEADER_SIZE: usize = MAGIC_TEXT_SIZE + MAGIC_VERSION_SIZE; -// #[cfg(all( -// test, -// any( -// all_tests, -// test_unsync_constructor, -// test_unsync_insert, -// test_unsync_get, -// test_unsync_iters, -// test_swmr_constructor, -// test_swmr_insert, -// test_swmr_get, -// test_swmr_iters, -// test_swmr_generic_constructor, -// test_swmr_generic_insert, -// test_swmr_generic_get, -// test_swmr_generic_iters, -// ) -// ))] +#[cfg(all( + test, + any( + all_tests, + test_unsync_constructor, + test_unsync_insert, + test_unsync_insert_batch, + test_unsync_get, + test_unsync_iters, + test_swmr_constructor, + test_swmr_insert, + test_swmr_insert_batch, + test_swmr_get, + test_swmr_iters, + test_swmr_generic_constructor, + test_swmr_generic_insert, + test_swmr_generic_get, + test_swmr_generic_iters, + ) +))] #[cfg(test)] #[macro_use] mod tests; diff --git a/src/swmr/wal/tests.rs b/src/swmr/wal/tests.rs index ed30fd4a..fb79892e 100644 --- a/src/swmr/wal/tests.rs +++ b/src/swmr/wal/tests.rs @@ -16,4 +16,7 @@ mod iter; #[cfg(all(test, any(test_swmr_get, all_tests)))] mod get; +#[cfg(all(test, any(test_swmr_insert_batch, all_tests)))] +mod insert_batch; + const MB: u32 = 1024 * 1024; diff --git a/src/swmr/wal/tests/constructor.rs b/src/swmr/wal/tests/constructor.rs index 03e864a3..c80352ea 100644 --- a/src/swmr/wal/tests/constructor.rs +++ b/src/swmr/wal/tests/constructor.rs @@ -1,3 +1,3 @@ use super::*; -common_unittests!(unsync::constructor::OrderWal); +common_unittests!(swmr::constructor::OrderWal); diff --git a/src/swmr/wal/tests/get.rs b/src/swmr/wal/tests/get.rs index c9eefcf8..0165171d 100644 --- a/src/swmr/wal/tests/get.rs +++ b/src/swmr/wal/tests/get.rs @@ -1,3 +1,3 @@ use super::*; -common_unittests!(unsync::get::OrderWal); +common_unittests!(swmr::get::OrderWal); diff --git a/src/swmr/wal/tests/insert.rs b/src/swmr/wal/tests/insert.rs index 6aacd454..217084d8 100644 --- a/src/swmr/wal/tests/insert.rs +++ b/src/swmr/wal/tests/insert.rs @@ -1,3 +1,3 @@ use super::*; -common_unittests!(unsync::insert::OrderWal); +common_unittests!(swmr::insert::OrderWal); diff --git a/src/swmr/wal/tests/insert_batch.rs b/src/swmr/wal/tests/insert_batch.rs new file mode 100644 index 00000000..a158ebb7 --- /dev/null +++ b/src/swmr/wal/tests/insert_batch.rs @@ -0,0 +1,3 @@ +use super::*; + +common_unittests!(swmr::insert_batch::OrderWal); diff --git a/src/swmr/wal/tests/iter.rs b/src/swmr/wal/tests/iter.rs index f20d78e8..e42d2ed6 100644 --- a/src/swmr/wal/tests/iter.rs +++ b/src/swmr/wal/tests/iter.rs @@ -1,3 +1,3 @@ use super::*; -common_unittests!(unsync::iters::OrderWal); +common_unittests!(swmr::iters::OrderWal); diff --git a/src/tests.rs b/src/tests.rs index 37fc0d60..c614ad6f 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -145,6 +145,169 @@ macro_rules! common_unittests { } } }; + ($prefix:ident::insert_batch::$wal:ident) => { + paste::paste! { + #[test] + fn test_insert_batch_inmemory() { + insert_batch(&mut OrderWal::new(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + fn test_insert_batch_map_anon() { + insert_batch(&mut OrderWal::map_anon(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_insert_batch_map_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join(concat!( + "test_", + stringify!($prefix), + "_insert_batch_map_file" + )); + let mut map = unsafe { + OrderWal::map_mut( + &path, + Builder::new(), + OpenOptions::new() + .create_new(Some(MB)) + .write(true) + .read(true), + ) + .unwrap() + }; + + insert_batch(&mut map); + + let map = unsafe { OrderWal::map(&path, Builder::new()).unwrap() }; + + for i in 0..100u32 { + assert_eq!(map.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); + } + } + + #[test] + fn test_insert_batch_with_key_builder_inmemory() { + insert_batch(&mut OrderWal::new(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + fn test_insert_batch_with_key_builder_map_anon() { + insert_batch(&mut OrderWal::map_anon(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_insert_batch_with_key_builder_map_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join(concat!( + "test_", + stringify!($prefix), + "_insert_batch_with_key_builder_map_file" + )); + let mut map = unsafe { + OrderWal::map_mut( + &path, + Builder::new(), + OpenOptions::new() + .create_new(Some(MB)) + .write(true) + .read(true), + ) + .unwrap() + }; + + insert_batch_with_key_builder(&mut map); + + let map = unsafe { OrderWal::map(&path, Builder::new()).unwrap() }; + + for i in 0..100u32 { + assert_eq!(map.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); + } + } + + #[test] + fn test_insert_batch_with_value_builder_inmemory() { + insert_batch(&mut OrderWal::new(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + fn test_insert_batch_with_value_builder_map_anon() { + insert_batch(&mut OrderWal::map_anon(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_insert_batch_with_value_builder_map_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join(concat!( + "test_", + stringify!($prefix), + "_insert_batch_with_value_builder_map_file" + )); + let mut map = unsafe { + OrderWal::map_mut( + &path, + Builder::new(), + OpenOptions::new() + .create_new(Some(MB)) + .write(true) + .read(true), + ) + .unwrap() + }; + + insert_batch_with_value_builder(&mut map); + + let map = unsafe { OrderWal::map(&path, Builder::new()).unwrap() }; + + for i in 0..100u32 { + assert_eq!(map.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); + } + } + + #[test] + fn test_insert_batch_with_builders_inmemory() { + insert_batch_with_builders(&mut OrderWal::new(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + fn test_insert_batch_with_builders_map_anon() { + insert_batch_with_builders(&mut OrderWal::map_anon(Builder::new().with_capacity(MB)).unwrap()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_insert_batch_with_builders_map_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join(concat!( + "test_", + stringify!($prefix), + "_insert_batch_with_builders_map_file" + )); + let mut map = unsafe { + OrderWal::map_mut( + &path, + Builder::new(), + OpenOptions::new() + .create_new(Some(MB)) + .write(true) + .read(true), + ) + .unwrap() + }; + + insert_batch_with_builders(&mut map); + + let map = unsafe { OrderWal::map(&path, Builder::new()).unwrap() }; + + for i in 0..100u32 { + assert_eq!(map.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); + } + } + } + }; ($prefix:ident::iters::$wal:ident) => { paste::paste! { #[test] @@ -1054,86 +1217,93 @@ pub(crate) fn get_or_insert_with_value_builder>(wal: &mut } pub(crate) fn insert_batch>(wal: &mut W) { + const N: u32 = 100; + let mut batch = vec![]; - for i in 0..100u32 { + for i in 0..N { batch.push(Entry::new(i.to_be_bytes(), i.to_be_bytes())); } wal.insert_batch(&mut batch).unwrap(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } let wal = wal.reader(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } } pub(crate) fn insert_batch_with_key_builder>(wal: &mut W) { + const N: u32 = 100; + let mut batch = vec![]; - for i in 0..100u32 { + for i in 0..N { batch.push(EntryWithKeyBuilder::new( - KeyBuilder::new(4, move |buf| buf.put_u32_le(i)), + KeyBuilder::new(4, move |buf| buf.put_u32_be(i)), i.to_be_bytes(), )); } wal.insert_batch_with_key_builder(&mut batch).unwrap(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } let wal = wal.reader(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } } pub(crate) fn insert_batch_with_value_builder>(wal: &mut W) { - let mut batch = vec![]; + const N: u32 = 100; - for i in 0..100u32 { + let mut batch = vec![]; + for i in 0..N { batch.push(EntryWithValueBuilder::new( i.to_be_bytes(), - ValueBuilder::new(4, move |buf| buf.put_u32_le(i)), + ValueBuilder::new(4, move |buf| buf.put_u32_be(i)), )); } wal.insert_batch_with_value_builder(&mut batch).unwrap(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } let wal = wal.reader(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } } pub(crate) fn insert_batch_with_builders>(wal: &mut W) { + const N: u32 = 100; + let mut batch = vec![]; - for i in 0..100u32 { + for i in 0..N { batch.push(EntryWithBuilders::new( - KeyBuilder::new(4, move |buf| buf.put_u32_le(i)), - ValueBuilder::new(4, move |buf| buf.put_u32_le(i)), + KeyBuilder::new(4, move |buf| buf.put_u32_be(i)), + ValueBuilder::new(4, move |buf| buf.put_u32_be(i)), )); } wal.insert_batch_with_builders(&mut batch).unwrap(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } let wal = wal.reader(); - for i in 0..100u32 { + for i in 0..N { assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes()); } } diff --git a/src/unsync.rs b/src/unsync.rs index 3c9dba2d..d5535580 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -24,6 +24,7 @@ use c::*; all_tests, test_unsync_constructor, test_unsync_insert, + test_unsync_insert_batch, test_unsync_get, test_unsync_iters, ) diff --git a/src/unsync/tests.rs b/src/unsync/tests.rs index 5cd48486..b0145a05 100644 --- a/src/unsync/tests.rs +++ b/src/unsync/tests.rs @@ -16,4 +16,7 @@ mod iter; #[cfg(all(test, any(test_unsync_get, all_tests)))] mod get; +#[cfg(all(test, any(test_unsync_insert_batch, all_tests)))] +mod insert_batch; + const MB: u32 = 1024 * 1024; diff --git a/src/unsync/tests/insert_batch.rs b/src/unsync/tests/insert_batch.rs new file mode 100644 index 00000000..8aaa30b0 --- /dev/null +++ b/src/unsync/tests/insert_batch.rs @@ -0,0 +1,3 @@ +use super::*; + +common_unittests!(unsync::insert_batch::OrderWal); diff --git a/src/utils.rs b/src/utils.rs index 5258794d..dd141cf7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -4,8 +4,8 @@ use super::*; /// Merge two `u32` into a `u64`. /// -/// high 32 bits: `a` -/// low 32 bits: `b` +/// - high 32 bits: `a` +/// - low 32 bits: `b` #[inline] pub(crate) const fn merge_lengths(a: u32, b: u32) -> u64 { (a as u64) << 32 | b as u64 @@ -13,8 +13,8 @@ pub(crate) const fn merge_lengths(a: u32, b: u32) -> u64 { /// Split a `u64` into two `u32`. /// -/// high 32 bits: the first `u32` -/// low 32 bits: the second `u32` +/// - high 32 bits: the first `u32` +/// - low 32 bits: the second `u32` #[inline] pub(crate) const fn split_lengths(len: u64) -> (u32, u32) { ((len >> 32) as u32, len as u32) diff --git a/src/wal/sealed.rs b/src/wal/sealed.rs index e90a9aa9..7724d1ab 100644 --- a/src/wal/sealed.rs +++ b/src/wal/sealed.rs @@ -81,11 +81,11 @@ pub trait Sealed: Constructor { num_entries += 1; } - let cap = self.options().capacity() as u64; + let cap = self.allocator().remaining() as u64; if batch_encoded_size > cap { return Err(Either::Right(Error::insufficient_space( batch_encoded_size, - self.allocator().remaining() as u32, + cap as u32, ))); } @@ -96,8 +96,7 @@ pub trait Sealed: Constructor { STATUS_SIZE as u64 + batch_meta_size as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; if total_size > cap { return Err(Either::Right(Error::insufficient_space( - total_size, - self.allocator().remaining() as u32, + total_size, cap as u32, ))); } @@ -140,9 +139,9 @@ pub trait Sealed: Constructor { ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); } - if cursor as u64 != total_size { + if (cursor + CHECKSUM_SIZE) as u64 != total_size { return Err(Either::Right(Error::batch_size_mismatch( - total_size as u32, + total_size as u32 - CHECKSUM_SIZE as u32, cursor as u32, ))); } @@ -185,11 +184,11 @@ pub trait Sealed: Constructor { num_entries += 1; } - let cap = self.options().capacity() as u64; + let cap = self.allocator().remaining() as u64; if batch_encoded_size > cap { return Err(Either::Right(Error::insufficient_space( batch_encoded_size, - self.allocator().remaining() as u32, + cap as u32, ))); } @@ -200,8 +199,7 @@ pub trait Sealed: Constructor { STATUS_SIZE as u64 + batch_meta_size as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; if total_size > cap { return Err(Either::Right(Error::insufficient_space( - total_size, - self.allocator().remaining() as u32, + total_size, cap as u32, ))); } @@ -244,9 +242,9 @@ pub trait Sealed: Constructor { ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); } - if cursor as u64 != total_size { + if (cursor + CHECKSUM_SIZE) as u64 != total_size { return Err(Either::Right(Error::batch_size_mismatch( - total_size as u32, + total_size as u32 - CHECKSUM_SIZE as u32, cursor as u32, ))); } @@ -279,18 +277,32 @@ pub trait Sealed: Constructor { { let mut batch_encoded_size = 0u64; + let mut num_entries = 0u32; for ent in batch.iter_mut() { let klen = ent.kb.size() as usize; let vlen = ent.vb.size() as usize; self.check_batch_entry(klen, vlen).map_err(Either::Right)?; - batch_encoded_size += klen as u64 + vlen as u64; + let merged_len = merge_lengths(klen as u32, vlen as u32); + batch_encoded_size += klen as u64 + vlen as u64 + encoded_u64_varint_len(merged_len) as u64; + num_entries += 1; } - let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; - if total_size > self.options().capacity() as u64 { + let cap = self.allocator().remaining() as u64; + if batch_encoded_size > cap { return Err(Among::Right(Error::insufficient_space( - total_size, - self.allocator().remaining() as u32, + batch_encoded_size, + cap as u32, + ))); + } + + // safe to cast batch_encoded_size to u32 here, we already checked it's less than capacity (less than u32::MAX). + let batch_meta = merge_lengths(num_entries, batch_encoded_size as u32); + let batch_meta_size = encoded_u64_varint_len(batch_meta); + let total_size = + STATUS_SIZE as u64 + batch_meta_size as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; + if total_size > cap { + return Err(Among::Right(Error::insufficient_space( + total_size, cap as u32, ))); } @@ -305,17 +317,27 @@ pub trait Sealed: Constructor { let mut cks = self.hasher().build_checksumer(); let flag = Flags::BATCHING; buf.put_u8_unchecked(flag.bits); + buf.put_u64_varint_unchecked(batch_meta); let cmp = self.comparator(); - let mut cursor = 1; + let mut cursor = 1 + batch_meta_size; for ent in batch.iter_mut() { - let ptr = buf.as_mut_ptr().add(cursor as usize); let klen = ent.kb.size() as usize; - buf.set_len(cursor as usize + klen); + let vlen = ent.vb.size() as usize; + let merged_kv_len = merge_lengths(klen as u32, vlen as u32); + let merged_kv_len_size = encoded_u64_varint_len(merged_kv_len); + + let remaining = buf.remaining(); + if remaining < merged_kv_len_size + klen + vlen { + return Err(Among::Right(Error::larger_batch_size(total_size as u32))); + } + + let ent_len_size = buf.put_u64_varint_unchecked(merged_kv_len); + let ptr = buf.as_mut_ptr().add(cursor as usize + ent_len_size); + buf.set_len(cursor as usize + ent_len_size + klen); let f = ent.kb.builder(); f(&mut VacantBuffer::new(klen, NonNull::new_unchecked(ptr))).map_err(Among::Left)?; - let vlen = ent.vb.size() as usize; - cursor += klen as u64; + cursor += ent_len_size + klen; buf.set_len(cursor as usize + vlen); let f = ent.vb.builder(); f(&mut VacantBuffer::new( @@ -323,10 +345,17 @@ pub trait Sealed: Constructor { NonNull::new_unchecked(ptr.add(klen)), )) .map_err(Among::Middle)?; - cursor += vlen as u64; + cursor += vlen; ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); } + if (cursor + CHECKSUM_SIZE) as u64 != total_size { + return Err(Among::Right(Error::batch_size_mismatch( + total_size as u32 - CHECKSUM_SIZE as u32, + cursor as u32, + ))); + } + cks.update(&[committed_flag.bits]); cks.update(&buf[1..]); buf.put_u64_le_unchecked(cks.digest()); @@ -350,18 +379,29 @@ pub trait Sealed: Constructor { C: Comparator + CheapClone, S: BuildChecksumer, { - let batch_encoded_size = batch.iter_mut().fold(0u64, |acc, ent| { - acc + ent.key.borrow().len() as u64 + ent.value.borrow().len() as u64 - }); - let total_size = STATUS_SIZE as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; - if total_size > self.options().capacity() as u64 { - return Err(Error::insufficient_space( - total_size, - self.allocator().remaining() as u32, - )); + let mut batch_encoded_size = 0u64; + + let mut num_entries = 0u32; + for ent in batch.iter_mut() { + let klen = ent.key.borrow().len(); + let vlen = ent.value.borrow().len(); + self.check_batch_entry(klen, vlen)?; + let merged_len = merge_lengths(klen as u32, vlen as u32); + batch_encoded_size += klen as u64 + vlen as u64 + encoded_u64_varint_len(merged_len) as u64; + + num_entries += 1; } + // safe to cast batch_encoded_size to u32 here, we already checked it's less than capacity (less than u32::MAX). + let batch_meta = merge_lengths(num_entries, batch_encoded_size as u32); + let batch_meta_size = encoded_u64_varint_len(batch_meta); let allocator = self.allocator(); + let remaining = allocator.remaining() as u64; + let total_size = + STATUS_SIZE as u64 + batch_meta_size as u64 + batch_encoded_size + CHECKSUM_SIZE as u64; + if total_size > remaining { + return Err(Error::insufficient_space(total_size, remaining as u32)); + } let mut buf = allocator .alloc_bytes(total_size as u32) @@ -372,25 +412,43 @@ pub trait Sealed: Constructor { let mut cks = self.hasher().build_checksumer(); let flag = Flags::BATCHING; buf.put_u8_unchecked(flag.bits); + buf.put_u64_varint_unchecked(batch_meta); let cmp = self.comparator(); - let mut cursor = 1; - batch.iter_mut().for_each(|ent| { - let ptr = buf.as_ptr().add(cursor as usize); + let mut cursor = 1 + batch_meta_size; + + for ent in batch.iter_mut() { let key = ent.key.borrow(); let value = ent.value.borrow(); let klen = key.len(); let vlen = value.len(); - cursor += klen as u64; + let merged_kv_len = merge_lengths(klen as u32, vlen as u32); + let merged_kv_len_size = encoded_u64_varint_len(merged_kv_len); + + let remaining = buf.remaining(); + if remaining < merged_kv_len_size + klen + vlen { + return Err(Error::larger_batch_size(total_size as u32)); + } + + let ent_len_size = buf.put_u64_varint_unchecked(merged_kv_len); + let ptr = buf.as_mut_ptr().add(cursor as usize + ent_len_size); + cursor += ent_len_size + klen; buf.put_slice_unchecked(key); - cursor += vlen as u64; + cursor += vlen; buf.put_slice_unchecked(value); - ent.pointer = Some(Pointer::new(klen, vlen, ptr, cmp.cheap_clone())); - }); + } + + if (cursor + CHECKSUM_SIZE) as u64 != total_size { + return Err(Error::batch_size_mismatch( + total_size as u32 - CHECKSUM_SIZE as u32, + cursor as u32, + )); + } cks.update(&[committed_flag.bits]); cks.update(&buf[1..]); - buf.put_u64_le_unchecked(cks.digest()); + let checksum = cks.digest(); + buf.put_u64_le_unchecked(checksum); // commit the entry buf[0] = committed_flag.bits; @@ -624,34 +682,37 @@ pub trait Constructor: Sized { } let mut sub_cursor = 0; - + batch_data_buf = &batch_data_buf[1 + readed..]; for _ in 0..num_entries { - let (kvlen, ent_len) = - dbutils::leb128::decode_u64_varint(batch_data_buf).map_err(|e| { - #[cfg(feature = "tracing")] - tracing::error!(err=%e); + let (kvlen, ent_len) = decode_u64_varint(batch_data_buf).map_err(|e| { + #[cfg(feature = "tracing")] + tracing::error!(err=%e); - Error::corrupted(e) - })?; + Error::corrupted(e) + })?; let (klen, vlen) = split_lengths(ent_len); let klen = klen as usize; let vlen = vlen as usize; - set.insert(Pointer::new( + let ptr = Pointer::new( klen, vlen, arena.get_pointer(cursor + STATUS_SIZE + readed + sub_cursor + kvlen), cmp.cheap_clone(), - )); + ); + set.insert(ptr); + let ent_len = kvlen + klen + vlen; sub_cursor += kvlen + klen + vlen; - batch_data_buf = &batch_data_buf[sub_cursor..]; + batch_data_buf = &batch_data_buf[ent_len..]; } debug_assert_eq!( sub_cursor, encoded_data_len as usize, "expected encoded batch data size is not equal to the actual size" ); + + cursor += cks_offset + CHECKSUM_SIZE; } } }