Skip to content

Commit

Permalink
perf: use rkyv to reconstruct the serialization of Tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 10, 2024
1 parent cc095a0 commit 4264dd0
Show file tree
Hide file tree
Showing 15 changed files with 487 additions and 412 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ harness = false
[dependencies]
ahash = { version = "0.8" }
bincode = { version = "1" }
bytes = { version = "1" }
chrono = { version = "0.4" }
comfy-table = { version = "7" }
csv = { version = "1" }
Expand All @@ -47,8 +46,9 @@ paste = { version = "1" }
parking_lot = { version = "0.12", features = ["arc_lock"] }
petgraph = { version = "0.6" }
regex = { version = "1" }
rkyv = { version = "0.8" }
rocksdb = { version = "0.22" }
rust_decimal = { version = "1" }
rust_decimal = { version = "1", features = ["rkyv"] }
serde = { version = "1", features = ["derive", "rc"] }
fnck_sql_serde_macros = { version = "0.1.0", path = "fnck_sql_serde_macros" }
siphasher = { version = "1", features = ["serde"] }
Expand Down
6 changes: 6 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ pub enum DatabaseError {
PrimaryKeyNotFound,
#[error("primaryKey only allows single or multiple values")]
PrimaryKeyTooManyLayers,
#[error("rkyv: {0}")]
Rkyv(
#[source]
#[from]
rkyv::rancor::Error,
),
#[error("rocksdb: {0}")]
RocksDB(
#[source]
Expand Down
115 changes: 2 additions & 113 deletions src/serdes/data_value.rs
Original file line number Diff line number Diff line change
@@ -1,118 +1,7 @@
use crate::errors::DatabaseError;
use crate::serdes::{ReferenceSerialization, ReferenceTables};
use crate::storage::{TableCache, Transaction};
use crate::types::value::DataValue;
use crate::types::LogicalType;
use std::io::{Read, Write};
use crate::implement_serialization_by_bincode;

impl DataValue {
// FIXME: redundant code
pub(crate) fn inner_encode<W: Write>(
&self,
writer: &mut W,
ty: &LogicalType,
) -> Result<(), DatabaseError> {
writer.write_all(&[if self.is_null() { 0u8 } else { 1u8 }])?;

if self.is_null() {
return Ok(());
}
if let DataValue::Tuple(values) = self {
match values {
None => writer.write_all(&[0u8])?,
Some((values, is_upper)) => {
writer.write_all(&[1u8])?;
writer.write_all(&(values.len() as u32).to_le_bytes())?;
for value in values.iter() {
value.inner_encode(writer, &value.logical_type())?
}
writer.write_all(&[if *is_upper { 1u8 } else { 0u8 }])?;
}
}

return Ok(());
}
if ty.raw_len().is_none() {
let mut bytes = Vec::new();
writer.write_all(&(self.to_raw(&mut bytes)? as u32).to_le_bytes())?;
writer.write_all(&bytes)?;
} else {
let _ = self.to_raw(writer)?;
}

Ok(())
}

pub(crate) fn inner_decode<R: Read>(
reader: &mut R,
ty: &LogicalType,
) -> Result<Self, DatabaseError> {
let mut bytes = [0u8; 1];
reader.read_exact(&mut bytes)?;
if bytes[0] == 0 {
return Ok(DataValue::none(ty));
}
if let LogicalType::Tuple(types) = ty {
let mut bytes = [0u8; 1];
reader.read_exact(&mut bytes)?;
let values = match bytes[0] {
0 => None,
1 => {
let mut bytes = [0u8; 4];
reader.read_exact(&mut bytes)?;
let len = u32::from_le_bytes(bytes) as usize;
let mut vec = Vec::with_capacity(len);

for ty in types.iter() {
vec.push(Self::inner_decode(reader, ty)?);
}
let mut bytes = [0u8];
reader.read_exact(&mut bytes)?;
Some((vec, bytes[0] == 1))
}
_ => unreachable!(),
};

return Ok(DataValue::Tuple(values));
}
let value_len = match ty.raw_len() {
None => {
let mut bytes = [0u8; 4];
reader.read_exact(&mut bytes)?;
u32::from_le_bytes(bytes) as usize
}
Some(len) => len,
};
let mut buf = vec![0u8; value_len];
reader.read_exact(&mut buf)?;

Ok(DataValue::from_raw(&buf, ty))
}
}

impl ReferenceSerialization for DataValue {
fn encode<W: Write>(
&self,
writer: &mut W,
is_direct: bool,
reference_tables: &mut ReferenceTables,
) -> Result<(), DatabaseError> {
let ty = self.logical_type();
ty.encode(writer, is_direct, reference_tables)?;

self.inner_encode(writer, &ty)
}

fn decode<T: Transaction, R: Read>(
reader: &mut R,
drive: Option<(&T, &TableCache)>,
reference_tables: &ReferenceTables,
) -> Result<Self, DatabaseError> {
let logical_type = LogicalType::decode(reader, drive, reference_tables)?;

Self::inner_decode(reader, &logical_type)
}
}
implement_serialization_by_bincode!(DataValue);

#[cfg(test)]
pub(crate) mod test {
Expand Down
29 changes: 6 additions & 23 deletions src/serdes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,20 @@ macro_rules! implement_serialization_by_bincode {
fn encode<W: std::io::Write>(
&self,
writer: &mut W,
is_direct: bool,
reference_tables: &mut $crate::serdes::ReferenceTables,
_: bool,
_: &mut $crate::serdes::ReferenceTables,
) -> Result<(), $crate::errors::DatabaseError> {
let bytes = bincode::serialize(self)?;
$crate::serdes::ReferenceSerialization::encode(
&bytes.len(),
writer,
is_direct,
reference_tables,
)?;
std::io::Write::write_all(writer, &bytes)?;
bincode::serialize_into(writer, self)?;

Ok(())
}

fn decode<T: $crate::storage::Transaction, R: std::io::Read>(
reader: &mut R,
drive: Option<(&T, &$crate::storage::TableCache)>,
reference_tables: &$crate::serdes::ReferenceTables,
_: Option<(&T, &$crate::storage::TableCache)>,
_: &$crate::serdes::ReferenceTables,
) -> Result<Self, $crate::errors::DatabaseError> {
let mut buf = vec![
0u8;
<usize as $crate::serdes::ReferenceSerialization>::decode(
reader,
drive,
reference_tables
)?
];
std::io::Read::read_exact(reader, &mut buf)?;

Ok(bincode::deserialize::<Self>(&buf)?)
Ok(bincode::deserialize_from(reader)?)
}
}
};
Expand Down
47 changes: 24 additions & 23 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ use crate::errors::DatabaseError;
use crate::expression::range_detacher::Range;
use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta};
use crate::serdes::ReferenceTables;
use crate::storage::table_codec::TableCodec;
use crate::storage::table_codec::{Bytes, TableCodec};
use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType};
use crate::types::tuple::{Tuple, TupleId};
use crate::types::value::DataValue;
use crate::types::{ColumnId, LogicalType};
use crate::utils::lru::SharedLruCache;
use bytes::Bytes;
use itertools::Itertools;
use std::collections::Bound;
use std::io::Cursor;
use std::mem;
use std::ops::SubAssign;
use std::sync::Arc;
use std::vec::IntoIter;
use rkyv::util::AlignedVec;
use ulid::Generator;

pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>;
Expand Down Expand Up @@ -64,8 +64,9 @@ pub trait Transaction: Sized {
let pk_indices = table.primary_keys_indices();
let table_types = table.types();
if columns.is_empty() {
let (i, column) = &table.primary_keys()[0];
columns.push((*i, column.clone()));
for (i, column) in table.primary_keys() {
columns.push((*i, column.clone()));
}
}
let mut tuple_columns = Vec::with_capacity(columns.len());
let mut projections = Vec::with_capacity(columns.len());
Expand Down Expand Up @@ -167,7 +168,7 @@ pub trait Transaction: Sized {

if matches!(index.ty, IndexType::Unique) {
if let Some(bytes) = self.get(&key)? {
return if bytes != value {
return if bytes.as_slice() != value.as_slice() {
Err(DatabaseError::DuplicateUniqueValue)
} else {
Ok(())
Expand Down Expand Up @@ -602,9 +603,9 @@ pub trait Transaction: Sized {
Ok(())
}

fn get(&self, key: &[u8]) -> Result<Option<Bytes>, DatabaseError>;
fn get(&self, key: &[u8]) -> Result<Option<AlignedVec>, DatabaseError>;

fn set(&mut self, key: Bytes, value: Bytes) -> Result<(), DatabaseError>;
fn set(&mut self, key: Bytes, value: AlignedVec) -> Result<(), DatabaseError>;

fn remove(&mut self, key: &[u8]) -> Result<(), DatabaseError>;

Expand All @@ -620,7 +621,7 @@ pub trait Transaction: Sized {
trait IndexImpl<T: Transaction> {
fn index_lookup(
&self,
bytes: &Bytes,
bytes: &AlignedVec,
pk_indices: &PrimaryKeyIndices,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError>;
Expand Down Expand Up @@ -693,15 +694,15 @@ impl<T: Transaction> IndexImplParams<'_, T> {
) -> Result<Option<Tuple>, DatabaseError> {
let key = TableCodec::encode_tuple_key(self.table_name, tuple_id)?;

Ok(self.tx.get(&key)?.map(|bytes| {
self.tx.get(&key)?.map(|bytes| {
TableCodec::decode_tuple(
&self.table_types,
pk_indices,
&self.projections,
&self.tuple_schema_ref,
&bytes,
)
}))
}).transpose()
}
}

Expand All @@ -713,7 +714,7 @@ enum IndexResult<'a, T: Transaction + 'a> {
impl<T: Transaction> IndexImpl<T> for IndexImplEnum {
fn index_lookup(
&self,
bytes: &Bytes,
bytes: &AlignedVec,
pk_indices: &PrimaryKeyIndices,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
Expand Down Expand Up @@ -756,17 +757,17 @@ impl<T: Transaction> IndexImpl<T> for IndexImplEnum {
impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
bytes: &AlignedVec,
pk_indices: &PrimaryKeyIndices,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
Ok(TableCodec::decode_tuple(
TableCodec::decode_tuple(
&params.table_types,
pk_indices,
&params.projections,
&params.tuple_schema_ref,
bytes,
))
)
}

fn eq_to_res<'a>(
Expand All @@ -786,7 +787,7 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
&params.tuple_schema_ref,
&bytes,
)
});
}).transpose()?;
Ok(IndexResult::Tuple(tuple))
}

Expand All @@ -800,11 +801,11 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
}

fn secondary_index_lookup<T: Transaction>(
bytes: &Bytes,
bytes: &AlignedVec,
pk_indices: &PrimaryKeyIndices,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
let tuple_id = TableCodec::decode_index(bytes, &params.index_meta.pk_ty)?;
let tuple_id = TableCodec::decode_index(bytes)?;
params
.get_tuple_by_id(pk_indices, &tuple_id)?
.ok_or(DatabaseError::TupleIdNotFound(tuple_id))
Expand All @@ -813,7 +814,7 @@ fn secondary_index_lookup<T: Transaction>(
impl<T: Transaction> IndexImpl<T> for UniqueIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
bytes: &AlignedVec,
pk_indices: &PrimaryKeyIndices,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
Expand All @@ -829,7 +830,7 @@ impl<T: Transaction> IndexImpl<T> for UniqueIndexImpl {
let Some(bytes) = params.tx.get(&self.bound_key(params, value)?)? else {
return Ok(IndexResult::Tuple(None));
};
let tuple_id = TableCodec::decode_index(&bytes, &params.index_meta.pk_ty)?;
let tuple_id = TableCodec::decode_index(&bytes)?;
let tuple = params
.get_tuple_by_id(pk_indices, &tuple_id)?
.ok_or(DatabaseError::TupleIdNotFound(tuple_id))?;
Expand All @@ -850,7 +851,7 @@ impl<T: Transaction> IndexImpl<T> for UniqueIndexImpl {
impl<T: Transaction> IndexImpl<T> for NormalIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
bytes: &AlignedVec,
pk_indices: &PrimaryKeyIndices,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
Expand Down Expand Up @@ -886,7 +887,7 @@ impl<T: Transaction> IndexImpl<T> for NormalIndexImpl {
impl<T: Transaction> IndexImpl<T> for CompositeIndexImpl {
fn index_lookup(
&self,
bytes: &Bytes,
bytes: &AlignedVec,
pk_indices: &PrimaryKeyIndices,
params: &IndexImplParams<T>,
) -> Result<Tuple, DatabaseError> {
Expand Down Expand Up @@ -950,7 +951,7 @@ impl<'a, T: Transaction + 'a> Iter for TupleIter<'a, T> {
&self.projections,
&self.tuple_columns,
&value,
);
)?;

if let Some(num) = self.limit.as_mut() {
num.sub_assign(1);
Expand Down Expand Up @@ -1100,7 +1101,7 @@ impl<T: Transaction> Iter for IndexIter<'_, T> {
}

pub trait InnerIter {
fn try_next(&mut self) -> Result<Option<(Bytes, Bytes)>, DatabaseError>;
fn try_next(&mut self) -> Result<Option<(AlignedVec, AlignedVec)>, DatabaseError>;
}

pub trait Iter {
Expand Down
Loading

0 comments on commit 4264dd0

Please sign in to comment.