Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Sep 17, 2024
1 parent 4238f4f commit c9e64f0
Show file tree
Hide file tree
Showing 8 changed files with 436 additions and 109 deletions.
11 changes: 10 additions & 1 deletion src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ macro_rules! builder {
impl<F> [< $name Builder >]<F> {
#[doc = "Creates a new `" [<$name Builder>] "` with the given size and builder closure."]
#[inline]
pub const fn new<E>(size: $size, f: F) -> Self
pub const fn once<E>(size: $size, f: F) -> Self
where
F: for<'a> FnOnce(&mut VacantBuffer<'a>) -> Result<(), E>,
{
Self { size, f }
}

#[doc = "Creates a new `" [<$name Builder>] "` with the given size and builder closure."]
#[inline]
pub const fn new<E>(size: $size, f: F) -> Self
where
F: for<'a> Fn(&mut VacantBuffer<'a>) -> Result<(), E>,
{
Self { size, f }
}

#[doc = "Returns the required" [< $name: snake>] "size."]
#[inline]
pub const fn size(&self) -> $size {
Expand Down
57 changes: 51 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/// The batch error type.
#[derive(Debug, thiserror::Error)]
pub enum BatchError {
/// Returned when the expected batch encoding size does not match the actual size.
#[error("the expected batch encoding size ({expected}) does not match the actual size {actual}")]
EncodedSizeMismatch {
/// The expected size.
expected: u32,
/// The actual size.
actual: u32,
},
/// Larger encoding size than the expected batch encoding size.
#[error("larger encoding size than the expected batch encoding size {0}")]
LargerEncodedSize(u32),
}

/// The error type.
#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand All @@ -13,15 +29,15 @@ pub enum Error {
#[error("the key size is {size} larger than the maximum key size {maximum_key_size}")]
KeyTooLarge {
/// The size of the key.
size: u32,
size: u64,
/// The maximum key size.
maximum_key_size: u32,
},
/// The value is too large.
#[error("the value size is {size} larger than the maximum value size {maximum_value_size}")]
ValueTooLarge {
/// The size of the value.
size: u32,
size: u64,
/// The maximum value size.
maximum_value_size: u32,
},
Expand All @@ -33,6 +49,9 @@ pub enum Error {
/// The maximum entry size.
maximum_entry_size: u64,
},
/// Returned when the expected batch encoding size does not match the actual size.
#[error(transparent)]
Batch(#[from] BatchError),
/// I/O error.
#[error("{0}")]
IO(#[from] std::io::Error),
Expand All @@ -51,15 +70,15 @@ impl Error {
}

/// Create a new `Error::KeyTooLarge` instance.
pub(crate) const fn key_too_large(size: u32, maximum_key_size: u32) -> Self {
pub(crate) const fn key_too_large(size: u64, maximum_key_size: u32) -> Self {
Self::KeyTooLarge {
size,
maximum_key_size,
}
}

/// Create a new `Error::ValueTooLarge` instance.
pub(crate) const fn value_too_large(size: u32, maximum_value_size: u32) -> Self {
pub(crate) const fn value_too_large(size: u64, maximum_value_size: u32) -> Self {
Self::ValueTooLarge {
size,
maximum_value_size,
Expand Down Expand Up @@ -87,13 +106,39 @@ impl Error {

/// Create a new corrupted error.
#[inline]
pub(crate) fn corrupted() -> Error {
pub(crate) fn corrupted<E>(e: E) -> Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
#[derive(Debug)]
struct Corrupted(Box<dyn std::error::Error + Send + Sync>);

impl std::fmt::Display for Corrupted {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "corrupted write-ahead log: {}", self.0)
}
}

impl std::error::Error for Corrupted {}

Self::IO(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"corrupted write-ahead log",
Corrupted(e.into()),
))
}

/// Create a new batch size mismatch error.
#[inline]
pub(crate) const fn batch_size_mismatch(expected: u32, actual: u32) -> Self {
Self::Batch(BatchError::EncodedSizeMismatch { expected, actual })
}

/// Create a new larger batch size error.
#[inline]
pub(crate) const fn larger_batch_size(size: u32) -> Self {
Self::Batch(BatchError::LargerEncodedSize(size))
}

/// Create a read-only error.
pub(crate) const fn read_only() -> Self {
Self::ReadOnly
Expand Down
39 changes: 20 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,25 @@ const MAGIC_TEXT_SIZE: usize = MAGIC_TEXT.len();
const MAGIC_VERSION_SIZE: usize = mem::size_of::<u16>();
const HEADER_SIZE: usize = MAGIC_TEXT_SIZE + MAGIC_VERSION_SIZE;

#[cfg(all(
test,
any(
all_tests,
test_unsync_constructor,
test_unsync_insert,
test_unsync_get,
test_unsync_iters,
test_swmr_constructor,
test_swmr_insert,
test_swmr_get,
test_swmr_iters,
test_swmr_generic_constructor,
test_swmr_generic_insert,
test_swmr_generic_get,
test_swmr_generic_iters,
)
))]
// #[cfg(all(
// test,
// any(
// all_tests,
// test_unsync_constructor,
// test_unsync_insert,
// test_unsync_get,
// test_unsync_iters,
// test_swmr_constructor,
// test_swmr_insert,
// test_swmr_get,
// test_swmr_iters,
// test_swmr_generic_constructor,
// test_swmr_generic_insert,
// test_swmr_generic_get,
// test_swmr_generic_iters,
// )
// ))]
#[cfg(test)]
#[macro_use]
mod tests;

Expand All @@ -76,7 +77,7 @@ use utils::*;

mod wal;
pub use wal::{
Batch, BatchWithBuilders, BatchWithKeyBuilder, BatchWithValueBuilder, ImmutableWal, Wal,
Batch, BatchWithBuilders, BatchWithKeyBuilder, BatchWithValueBuilder, Builder, ImmutableWal, Wal,
};

mod options;
Expand Down
8 changes: 4 additions & 4 deletions src/swmr/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ where
let header = arena.get_u8(cursor).unwrap();
let flag = Flags::from_bits_unchecked(header);

let (kvsize, encoded_len) = arena.get_u64_varint(cursor + STATUS_SIZE).map_err(|_e| {
let (kvsize, encoded_len) = arena.get_u64_varint(cursor + STATUS_SIZE).map_err(|e| {
#[cfg(feature = "tracing")]
tracing::error!(err=%_e);
tracing::error!(err=%e);

Error::corrupted()
Error::corrupted(e)
})?;
let (key_len, value_len) = split_lengths(encoded_len);
let key_len = key_len as usize;
Expand All @@ -452,7 +452,7 @@ where
let cks = arena.get_u64_le(cursor + cks_offset).unwrap();

if cks != checksumer.checksum_one(arena.get_bytes(cursor, cks_offset)) {
return Err(Error::corrupted());
return Err(Error::corrupted("checksum mismatch"));
}

// If the entry is not committed, we should not rewind
Expand Down
97 changes: 91 additions & 6 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ pub(crate) fn insert_with_key_builder<W: Wal<Ascend, Crc32>>(wal: &mut W) {
for i in 0..100u32 {
wal
.insert_with_key_builder::<()>(
KeyBuilder::<_>::new(4, |buf| {
KeyBuilder::<_>::once(4, |buf| {
let _ = buf.put_u32_be(i);
Ok(())
}),
Expand All @@ -693,7 +693,7 @@ pub(crate) fn insert_with_value_builder<W: Wal<Ascend, Crc32>>(wal: &mut W) {
wal
.insert_with_value_builder::<()>(
&i.to_be_bytes(),
ValueBuilder::<_>::new(4, |buf| {
ValueBuilder::<_>::once(4, |buf| {
let _ = buf.put_u32_be(i);
Ok(())
}),
Expand All @@ -715,11 +715,11 @@ pub(crate) fn insert_with_builders<W: Wal<Ascend, Crc32>>(wal: &mut W) {
for i in 0..100u32 {
wal
.insert_with_builders::<(), ()>(
KeyBuilder::<_>::new(4, |buf| {
KeyBuilder::<_>::once(4, |buf| {
let _ = buf.put_u32_be(i);
Ok(())
}),
ValueBuilder::<_>::new(4, |buf| {
ValueBuilder::<_>::once(4, |buf| {
let _ = buf.put_u32_be(i);
Ok(())
}),
Expand Down Expand Up @@ -1023,7 +1023,7 @@ pub(crate) fn get_or_insert_with_value_builder<W: Wal<Ascend, Crc32>>(wal: &mut
wal
.get_or_insert_with_value_builder::<()>(
&i.to_be_bytes(),
ValueBuilder::<_>::new(4, |buf| {
ValueBuilder::<_>::once(4, |buf| {
let _ = buf.put_u32_be(i);
Ok(())
}),
Expand All @@ -1035,7 +1035,7 @@ pub(crate) fn get_or_insert_with_value_builder<W: Wal<Ascend, Crc32>>(wal: &mut
wal
.get_or_insert_with_value_builder::<()>(
&i.to_be_bytes(),
ValueBuilder::<_>::new(4, |buf| {
ValueBuilder::<_>::once(4, |buf| {
let _ = buf.put_u32_be(i * 2);
Ok(())
}),
Expand All @@ -1053,6 +1053,91 @@ pub(crate) fn get_or_insert_with_value_builder<W: Wal<Ascend, Crc32>>(wal: &mut
}
}

pub(crate) fn insert_batch<W: Wal<Ascend, Crc32>>(wal: &mut W) {
let mut batch = vec![];

for i in 0..100u32 {
batch.push(Entry::new(i.to_be_bytes(), i.to_be_bytes()));
}

wal.insert_batch(&mut batch).unwrap();

for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}

let wal = wal.reader();
for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}
}

pub(crate) fn insert_batch_with_key_builder<W: Wal<Ascend, Crc32>>(wal: &mut W) {
let mut batch = vec![];

for i in 0..100u32 {
batch.push(EntryWithKeyBuilder::new(
KeyBuilder::new(4, move |buf| buf.put_u32_le(i)),
i.to_be_bytes(),
));
}

wal.insert_batch_with_key_builder(&mut batch).unwrap();

for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}

let wal = wal.reader();
for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}
}

pub(crate) fn insert_batch_with_value_builder<W: Wal<Ascend, Crc32>>(wal: &mut W) {
let mut batch = vec![];

for i in 0..100u32 {
batch.push(EntryWithValueBuilder::new(
i.to_be_bytes(),
ValueBuilder::new(4, move |buf| buf.put_u32_le(i)),
));
}

wal.insert_batch_with_value_builder(&mut batch).unwrap();

for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}

let wal = wal.reader();
for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}
}

pub(crate) fn insert_batch_with_builders<W: Wal<Ascend, Crc32>>(wal: &mut W) {
let mut batch = vec![];

for i in 0..100u32 {
batch.push(EntryWithBuilders::new(
KeyBuilder::new(4, move |buf| buf.put_u32_le(i)),
ValueBuilder::new(4, move |buf| buf.put_u32_le(i)),
));
}

wal.insert_batch_with_builders(&mut batch).unwrap();

for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}

let wal = wal.reader();
for i in 0..100u32 {
assert_eq!(wal.get(&i.to_be_bytes()).unwrap(), i.to_be_bytes());
}
}

pub(crate) fn zero_reserved<W: Wal<Ascend, Crc32>>(wal: &mut W) {
unsafe {
assert_eq!(wal.reserved_slice(), &[]);
Expand Down
16 changes: 12 additions & 4 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@ pub use dbutils::leb128::*;

use super::*;

/// Merge two `u32` into a `u64`.
///
/// high 32 bits: `a`
/// low 32 bits: `b`
#[inline]
pub(crate) const fn merge_lengths(klen: u32, vlen: u32) -> u64 {
(klen as u64) << 32 | vlen as u64
pub(crate) const fn merge_lengths(a: u32, b: u32) -> u64 {
(a as u64) << 32 | b as u64
}

/// Split a `u64` into two `u32`.
///
/// high 32 bits: the first `u32`
/// low 32 bits: the second `u32`
#[inline]
pub(crate) const fn split_lengths(len: u64) -> (u32, u32) {
((len >> 32) as u32, len as u32)
Expand Down Expand Up @@ -60,11 +68,11 @@ pub(crate) const fn check(
let max_vsize = min_u64(max_value_size as u64, u32::MAX as u64);

if max_ksize < klen as u64 {
return Err(error::Error::key_too_large(klen as u32, max_key_size));
return Err(error::Error::key_too_large(klen as u64, max_key_size));
}

if max_vsize < vlen as u64 {
return Err(error::Error::value_too_large(vlen as u32, max_value_size));
return Err(error::Error::value_too_large(vlen as u64, max_value_size));
}

let (_, _, elen) = entry_size(klen as u32, vlen as u32);
Expand Down
Loading

0 comments on commit c9e64f0

Please sign in to comment.