Skip to content

Commit

Permalink
Update generic.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Oct 1, 2024
1 parent 17953d8 commit a917088
Showing 1 changed file with 72 additions and 128 deletions.
200 changes: 72 additions & 128 deletions src/swmr/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::{
merge_lengths,
pointer::GenericPointer,
wal::sealed::Constructor,
BatchEncodedEntryMeta, EntryWithKeyBuilder, EntryWithValueBuilder, Flags, KeyBuilder, Options,
ValueBuilder, CHECKSUM_SIZE, HEADER_SIZE, STATUS_SIZE,
BatchEncodedEntryMeta, EntryWithBuilders, EntryWithKeyBuilder, EntryWithValueBuilder, Flags,
KeyBuilder, Options, ValueBuilder, CHECKSUM_SIZE, HEADER_SIZE, STATUS_SIZE,
};

pub use crate::{
Expand Down Expand Up @@ -880,8 +880,9 @@ where
}

macro_rules! process_batch {
(@pre $this:ident($batch:ident)) => {{
let batch = $batch.load(Ordering::Acquire);
($this:ident($batch:ident, $key:expr, $value:expr)) => {{
let batch_ptr = AtomicPtr::new($batch);
let batch = batch_ptr.load(Ordering::Acquire);
(*batch)
.iter_mut()
.try_fold((0u32, 0u64), |(num_entries, size), ent| {
Expand All @@ -899,6 +900,7 @@ macro_rules! process_batch {
ent.meta = BatchEncodedEntryMeta::new(klen, vlen, merged_len, merged_len_size);
(num_entries + 1, size + ent_size)
})
.map_err(Among::Right)
})
.and_then(|(num_entries, batch_encoded_size)| {
// safe to cast batch_encoded_size to u32 here, we already checked it's less than capacity (less than u32::MAX).
Expand All @@ -909,29 +911,47 @@ macro_rules! process_batch {
let total_size =
STATUS_SIZE as u64 + batch_meta_size as u64 + batch_encoded_size + CHECKSUM_SIZE as u64;
if total_size > remaining {
return Err(Error::insufficient_space(total_size, remaining as u32));
return Err(Among::Right(Error::insufficient_space(total_size, remaining as u32)));

Check warning on line 914 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L914

Added line #L914 was not covered by tests
}

let mut buf = allocator
.alloc_bytes(total_size as u32)
.map_err(Error::from_insufficient_space)?;
.map_err(|e| Among::Right(Error::from_insufficient_space(e)))?;

Check warning on line 919 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L919

Added line #L919 was not covered by tests

let flag = Flags::BATCHING;

buf.put_u8_unchecked(flag.bits);
buf.put_u64_varint_unchecked(batch_meta);

Ok((1 + batch_meta_size, buf))
let mut cursor = 1 + batch_meta_size;

for ent in (*batch).iter_mut() {
let remaining = buf.remaining();
if remaining < ent.meta.kvlen_size + ent.meta.klen + ent.meta.vlen {
return Err(Among::Right(
Error::larger_batch_size(buf.capacity() as u32),

Check warning on line 932 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L931-L932

Added lines #L931 - L932 were not covered by tests
));
}

let ent_len_size = buf.put_u64_varint_unchecked(ent.meta.kvlen);
let ko = cursor + ent_len_size;
buf.set_len(ko + ent.meta.klen + ent.meta.vlen);
let ptr = buf.as_mut_ptr().add(ko);

$key(ptr, &ent)?;
$value(ptr.add(ent.meta.klen), &ent)?;

cursor += ent_len_size + ent.meta.klen + ent.meta.vlen;
ent.pointer = Some(GenericPointer::new(ent.meta.klen, ent.meta.vlen, ptr));
}

$this
.insert_batch_helper(&$this.core.arena, buf, cursor as usize, || {
(*batch).iter_mut().map(|ent| ent.pointer.take().unwrap())
})
.map_err(Among::Right)
})
}};
(@post $this:ident($batch:ident, $buf:ident, $cursor:ident)) => {{
$this
.insert_batch_helper(&$this.core.arena, $buf, $cursor as usize, || {
let batch = $batch.load(Ordering::Acquire);
(*batch).iter_mut().map(|ent| ent.pointer.take().unwrap())
})
.map_err(Among::Right)
}}
}

impl<K, V, S> GenericOrderWal<K, V, S>
Expand Down Expand Up @@ -1090,41 +1110,22 @@ where
where
B: BatchWithKeyBuilder<GenericPointer<K, V>, Value = Generic<'a, V>>,
{
let batch_ptr = AtomicPtr::new(batch);

unsafe {
let (mut cursor, mut buf) = process_batch!(@pre self(batch_ptr)).map_err(Among::Right)?;

{
let batch = batch_ptr.load(Ordering::Acquire);
for ent in (*batch).iter_mut() {
let remaining = buf.remaining();
if remaining < ent.meta.kvlen_size + ent.meta.klen + ent.meta.vlen {
return Err(Among::Right(
Error::larger_batch_size(buf.capacity() as u32),
));
}

let ent_len_size = buf.put_u64_varint_unchecked(ent.meta.kvlen);
let ko = cursor + ent_len_size;
buf.set_len(ko + ent.meta.klen + ent.meta.vlen);
let ptr = buf.as_mut_ptr().add(ko);

process_batch!(self(
batch,
|ptr, ent: &EntryWithKeyBuilder<B::KeyBuilder, Generic<'_, V>, _>| {
let f = ent.kb.builder();
f(&mut VacantBuffer::new(
ent.meta.klen,
ent.meta.vlen,
NonNull::new_unchecked(ptr),

Check warning on line 1120 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1114-L1120

Added lines #L1114 - L1120 were not covered by tests
))
.map_err(Among::Left)?;
let value_buf = slice::from_raw_parts_mut(ptr.add(ent.meta.klen), ent.meta.vlen);
ent.value.encode(value_buf).map_err(Among::Middle)?;

cursor += ent_len_size + ent.meta.klen + ent.meta.vlen;
ent.pointer = Some(GenericPointer::new(ent.meta.klen, ent.meta.vlen, ptr));
.map_err(Among::Left)

Check warning on line 1122 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1122

Added line #L1122 was not covered by tests
},
|ptr, ent: &EntryWithKeyBuilder<B::KeyBuilder, Generic<'_, V>, _>| {
let value_buf = slice::from_raw_parts_mut(ptr, ent.meta.vlen);
ent.value.encode(value_buf).map_err(Among::Middle)

Check warning on line 1126 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1124-L1126

Added lines #L1124 - L1126 were not covered by tests
}
}

process_batch!(@post self(batch_ptr, buf, cursor))
))
}
}

Expand All @@ -1136,41 +1137,22 @@ where
where
B: BatchWithValueBuilder<GenericPointer<K, V>, Key = Generic<'a, K>>,
{
let batch_ptr = AtomicPtr::new(batch);

unsafe {
let (mut cursor, mut buf) = process_batch!(@pre self(batch_ptr)).map_err(Among::Right)?;

{
let batch = batch_ptr.load(Ordering::Acquire);
for ent in (*batch).iter_mut() {
let remaining = buf.remaining();
if remaining < ent.meta.kvlen_size + ent.meta.klen + ent.meta.vlen {
return Err(Among::Right(
Error::larger_batch_size(buf.capacity() as u32),
));
}

let ent_len_size = buf.put_u64_varint_unchecked(ent.meta.kvlen);
let ko = cursor + ent_len_size;
buf.set_len(ko + ent.meta.klen + ent.meta.vlen);
let ptr = buf.as_mut_ptr().add(ko);

process_batch!(self(
batch,
|ptr, ent: &EntryWithValueBuilder<Generic<'_, K>, B::ValueBuilder, _>| {
let key_buf = slice::from_raw_parts_mut(ptr, ent.meta.klen);
ent.key.encode(key_buf).map_err(Among::Left)?;
ent.key.encode(key_buf).map_err(Among::Left)

Check warning on line 1145 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1141-L1145

Added lines #L1141 - L1145 were not covered by tests
},
|ptr, ent: &EntryWithValueBuilder<Generic<'_, K>, B::ValueBuilder, _>| {
let f = ent.vb.builder();
f(&mut VacantBuffer::new(
ent.meta.vlen,
NonNull::new_unchecked(ptr.add(ent.meta.klen)),
NonNull::new_unchecked(ptr),

Check warning on line 1151 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1147-L1151

Added lines #L1147 - L1151 were not covered by tests
))
.map_err(Among::Middle)?;

cursor += ent_len_size + ent.meta.klen + ent.meta.vlen;
ent.pointer = Some(GenericPointer::new(ent.meta.klen, ent.meta.vlen, ptr));
.map_err(Among::Middle)

Check warning on line 1153 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1153

Added line #L1153 was not covered by tests
}
}

process_batch!(@post self(batch_ptr, buf, cursor))
))
}
}

Expand All @@ -1182,45 +1164,26 @@ where
where
B: BatchWithBuilders<GenericPointer<K, V>>,
{
let batch_ptr = AtomicPtr::new(batch);

unsafe {
let (mut cursor, mut buf) = process_batch!(@pre self(batch_ptr)).map_err(Among::Right)?;

{
let batch = batch_ptr.load(Ordering::Acquire);
for ent in (*batch).iter_mut() {
let remaining = buf.remaining();
if remaining < ent.meta.kvlen_size + ent.meta.klen + ent.meta.vlen {
return Err(Among::Right(
Error::larger_batch_size(buf.capacity() as u32),
));
}

let ent_len_size = buf.put_u64_varint_unchecked(ent.meta.kvlen);
let ko = cursor + ent_len_size;
buf.set_len(ko + ent.meta.klen + ent.meta.vlen);
let ptr = buf.as_mut_ptr().add(ko);

process_batch!(self(
batch,
|ptr, ent: &EntryWithBuilders<B::KeyBuilder, B::ValueBuilder, _>| {
let f = ent.kb.builder();
f(&mut VacantBuffer::new(
ent.meta.klen,
NonNull::new_unchecked(ptr),

Check warning on line 1174 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1168-L1174

Added lines #L1168 - L1174 were not covered by tests
))
.map_err(Among::Left)?;
.map_err(Among::Left)

Check warning on line 1176 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1176

Added line #L1176 was not covered by tests
},
|ptr, ent: &EntryWithBuilders<B::KeyBuilder, B::ValueBuilder, _>| {
let f = ent.vb.builder();
f(&mut VacantBuffer::new(
ent.meta.vlen,
NonNull::new_unchecked(ptr.add(ent.meta.klen)),
NonNull::new_unchecked(ptr),

Check warning on line 1182 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1178-L1182

Added lines #L1178 - L1182 were not covered by tests
))
.map_err(Among::Middle)?;

cursor += ent_len_size + ent.meta.klen + ent.meta.vlen;
ent.pointer = Some(GenericPointer::new(ent.meta.klen, ent.meta.vlen, ptr));
.map_err(Among::Middle)

Check warning on line 1184 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1184

Added line #L1184 was not covered by tests
}
}

process_batch!(@post self(batch_ptr, buf, cursor))
))
}
}

Expand All @@ -1229,37 +1192,18 @@ where
&'a mut self,
batch: &'b mut B,
) -> Result<(), Among<K::Error, V::Error, Error>> {
let batch_ptr = AtomicPtr::new(batch);

unsafe {
let (mut cursor, mut buf) = process_batch!(@pre self(batch_ptr)).map_err(Among::Right)?;

{
let batch = batch_ptr.load(Ordering::Acquire);
for ent in (*batch).iter_mut() {
let remaining = buf.remaining();
if remaining < ent.meta.kvlen_size + ent.meta.klen + ent.meta.vlen {
return Err(Among::Right(
Error::larger_batch_size(buf.capacity() as u32),
));
}

let ent_len_size = buf.put_u64_varint_unchecked(ent.meta.kvlen);
let ko = cursor + ent_len_size;
buf.set_len(ko + ent.meta.klen + ent.meta.vlen);
let ptr = buf.as_mut_ptr().add(ko);

process_batch!(self(
batch,

Check warning on line 1197 in src/swmr/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/swmr/generic.rs#L1196-L1197

Added lines #L1196 - L1197 were not covered by tests
|ptr, ent: &GenericEntry<'_, K, V>| {
let key_buf = slice::from_raw_parts_mut(ptr, ent.meta.klen);
ent.key.encode(key_buf).map_err(Among::Left)?;
let value_buf = slice::from_raw_parts_mut(ptr.add(ent.meta.klen), ent.meta.vlen);
ent.value.encode(value_buf).map_err(Among::Middle)?;

cursor += ent_len_size + ent.meta.klen + ent.meta.vlen;
ent.pointer = Some(GenericPointer::new(ent.meta.klen, ent.meta.vlen, ptr));
ent.key.encode(key_buf).map_err(Among::Left)
},
|ptr, ent: &GenericEntry<'_, K, V>| {
let value_buf = slice::from_raw_parts_mut(ptr, ent.meta.vlen);
ent.value.encode(value_buf).map_err(Among::Middle)
}
}

process_batch!(@post self(batch_ptr, buf, cursor))
))
}
}

Expand Down

0 comments on commit a917088

Please sign in to comment.