Skip to content

Commit

Permalink
Feat/multiple primary keys (#239)
Browse files Browse the repository at this point in the history
* feat: impl multiple primary keys

* chore: remove `ValueRef` to reduce Arc usage

* feat: `Update` supports `Set` using expressions

fix:
- primary key value position when declaring multiple primary keys(update & insert & delete)
- decimal type conversion
- the `Eq` expression of `PushPredicateIntoScan` must satisfy all index columns before it can be used
- `PrimaryKey` index supplementary composite index type
- `Tuple` deserialize may fail due to a large difference in the number of projection columns and table schema

* chore: cache `pk_ty` on `TableCatalog::add_index_meta`

* perf: use `TupleIdBuilder` for `Tuple::deserialize_from`

* chore: codefmt
  • Loading branch information
KKould authored Nov 10, 2024
1 parent 4683978 commit 104c230
Show file tree
Hide file tree
Showing 84 changed files with 1,830 additions and 1,430 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Install latest nightly
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2024-04-26
toolchain: nightly-2024-10-10
override: true
components: rustfmt, clippy

Expand Down Expand Up @@ -54,7 +54,7 @@ jobs:
- name: Install latest nightly
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2024-04-26
toolchain: nightly-2024-10-10
override: true
components: rustfmt, clippy

Expand All @@ -75,7 +75,7 @@ jobs:
- name: Install latest nightly
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2024-04-26
toolchain: nightly-2024-10-10
override: true
components: rustfmt, clippy

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ Cargo.lock
fncksql_data
fncksql_bench
sqlite_bench
fnck_sql_tpcc

tests/data/row_20000.csv
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ implement_from_tuple!(
```
- User-Defined Function: `features = ["macros"]`
```rust
scala_function!(TestFunction::test(LogicalType::Integer, LogicalType::Integer) -> LogicalType::Integer => |v1: ValueRef, v2: ValueRef| {
scala_function!(TestFunction::test(LogicalType::Integer, LogicalType::Integer) -> LogicalType::Integer => |v1: DataValue, v2: DataValue| {
let plus_binary_evaluator = EvaluatorFactory::binary_create(LogicalType::Integer, BinaryOperator::Plus)?;
let value = plus_binary_evaluator.binary_eval(&v1, &v2);

Expand All @@ -130,7 +130,7 @@ let fnck_sql = DataBaseBuilder::path("./data")
```
- User-Defined Table Function: `features = ["macros"]`
```rust
table_function!(MyTableFunction::test_numbers(LogicalType::Integer) -> [c1: LogicalType::Integer, c2: LogicalType::Integer] => (|v1: ValueRef| {
table_function!(MyTableFunction::test_numbers(LogicalType::Integer) -> [c1: LogicalType::Integer, c2: LogicalType::Integer] => (|v1: DataValue| {
let num = v1.i32().unwrap();

Ok(Box::new((0..num)
Expand Down
2 changes: 1 addition & 1 deletion src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
column_def,
} => {
let plan = TableScanOperator::build(table_name.clone(), table);
let column = self.bind_column(column_def)?;
let column = self.bind_column(column_def, None)?;

if !is_valid_identifier(column.name()) {
return Err(DatabaseError::InvalidColumn(
Expand Down
35 changes: 22 additions & 13 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ impl<T: Transaction> Binder<'_, '_, T> {
}
let mut columns: Vec<ColumnCatalog> = columns
.iter()
.map(|col| self.bind_column(col))
.enumerate()
.map(|(i, col)| self.bind_column(col, Some(i)))
.try_collect()?;
for constraint in constraints {
match constraint {
Expand All @@ -56,15 +57,19 @@ impl<T: Transaction> Binder<'_, '_, T> {
is_primary,
..
} => {
for column_name in column_names.iter().map(|ident| ident.value.to_lowercase()) {
for (i, column_name) in column_names
.iter()
.map(|ident| ident.value.to_lowercase())
.enumerate()
{
if let Some(column) = columns
.iter_mut()
.find(|column| column.name() == column_name)
{
if *is_primary {
column.desc_mut().is_primary = true;
column.desc_mut().set_primary(Some(i));
} else {
column.desc_mut().is_unique = true;
column.desc_mut().set_unique(true);
}
}
}
Expand All @@ -73,9 +78,9 @@ impl<T: Transaction> Binder<'_, '_, T> {
}
}

if columns.iter().filter(|col| col.desc().is_primary).count() != 1 {
if columns.iter().filter(|col| col.desc().is_primary()).count() == 0 {
return Err(DatabaseError::InvalidTable(
"The primary key field must exist and have at least one".to_string(),
"the primary key field must exist and have at least one".to_string(),
));
}

Expand All @@ -89,11 +94,15 @@ impl<T: Transaction> Binder<'_, '_, T> {
))
}

pub fn bind_column(&mut self, column_def: &ColumnDef) -> Result<ColumnCatalog, DatabaseError> {
pub fn bind_column(
&mut self,
column_def: &ColumnDef,
column_index: Option<usize>,
) -> Result<ColumnCatalog, DatabaseError> {
let column_name = column_def.name.value.to_lowercase();
let mut column_desc = ColumnDesc::new(
LogicalType::try_from(column_def.data_type.clone())?,
false,
None,
false,
None,
)?;
Expand All @@ -106,12 +115,12 @@ impl<T: Transaction> Binder<'_, '_, T> {
ColumnOption::NotNull => nullable = false,
ColumnOption::Unique { is_primary, .. } => {
if *is_primary {
column_desc.is_primary = true;
column_desc.set_primary(column_index);
nullable = false;
// Skip other options when using primary key
break;
} else {
column_desc.is_unique = true;
column_desc.set_unique(true);
}
}
ColumnOption::Default(expr) => {
Expand All @@ -125,7 +134,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
if expr.return_type() != column_desc.column_datatype {
expr = ScalarExpression::TypeCast {
expr: Box::new(expr),
ty: column_desc.column_datatype,
ty: column_desc.column_datatype.clone(),
}
}
column_desc.default = Some(expr);
Expand Down Expand Up @@ -184,15 +193,15 @@ mod tests {
debug_assert_eq!(op.columns[0].nullable(), false);
debug_assert_eq!(
op.columns[0].desc(),
&ColumnDesc::new(LogicalType::Integer, true, false, None)?
&ColumnDesc::new(LogicalType::Integer, Some(0), false, None)?
);
debug_assert_eq!(op.columns[1].name(), "name");
debug_assert_eq!(op.columns[1].nullable(), true);
debug_assert_eq!(
op.columns[1].desc(),
&ColumnDesc::new(
LogicalType::Varchar(Some(10), CharLengthUnits::Characters),
false,
None,
false,
None
)?
Expand Down
26 changes: 13 additions & 13 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::planner::operator::table_scan::TableScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use itertools::Itertools;
use sqlparser::ast::{Expr, TableAlias, TableFactor, TableWithJoins};
use std::sync::Arc;

Expand All @@ -23,20 +24,19 @@ impl<T: Transaction> Binder<'_, '_, T> {
table_alias = Some(Arc::new(name.value.to_lowercase()));
alias_idents = Some(columns);
}
let source = self
let Source::Table(table) = self
.context
.source_and_bind(table_name.clone(), table_alias.as_ref(), None, false)?
.ok_or(DatabaseError::SourceNotFound)?;
let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default();
let primary_key_column = source
.columns(schema_buf)
.find(|column| column.desc().is_primary)
.cloned()
.unwrap();
let mut plan = match source {
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
Source::View(view) => LogicalPlan::clone(&view.plan),
.source_and_bind(table_name.clone(), table_alias.as_ref(), None, true)?
.ok_or(DatabaseError::TableNotFound)?
else {
unreachable!()
};
let primary_keys = table
.primary_keys()
.iter()
.map(|(_, column)| column.clone())
.collect_vec();
let mut plan = TableScanOperator::build(table_name.clone(), table);

if let Some(alias_idents) = alias_idents {
plan =
Expand All @@ -50,7 +50,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
Ok(LogicalPlan::new(
Operator::Delete(DeleteOperator {
table_name,
primary_key_column,
primary_keys,
}),
vec![plan],
))
Expand Down
16 changes: 8 additions & 8 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}
Expr::CompoundIdentifier(idents) => self.bind_column_ref_from_identifiers(idents, None),
Expr::BinaryOp { left, right, op } => self.bind_binary_op_internal(left, right, op),
Expr::Value(v) => Ok(ScalarExpression::Constant(Arc::new(v.into()))),
Expr::Value(v) => Ok(ScalarExpression::Constant(v.into())),
Expr::Function(func) => self.bind_function(func),
Expr::Nested(expr) => self.bind_expr(expr),
Expr::UnaryOp { expr, op } => self.bind_unary_op_internal(expr, op),
Expand Down Expand Up @@ -77,7 +77,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}
.cast(&logical_type)?;

Ok(ScalarExpression::Constant(Arc::new(value)))
Ok(ScalarExpression::Constant(value))
}
Expr::Between {
expr,
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if ty == &LogicalType::SqlNull {
*ty = result_ty;
} else if ty != &result_ty {
return Err(DatabaseError::Incomparable(*ty, result_ty));
return Err(DatabaseError::Incomparable(ty.clone(), result_ty));
}
}

Expand Down Expand Up @@ -333,7 +333,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
Ok(ScalarExpression::ColumnRef(
source
.column(&full_name.1, schema_buf)
.ok_or_else(|| DatabaseError::NotFound("column", full_name.1.to_string()))?,
.ok_or_else(|| DatabaseError::ColumnNotFound(full_name.1.to_string()))?,
))
} else {
let op =
Expand Down Expand Up @@ -373,7 +373,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if let Some(parent) = self.parent {
op(&mut got_column, &parent.context, &mut self.table_schema_buf);
}
Ok(got_column.ok_or(DatabaseError::NotFound("column", full_name.1))?)
Ok(got_column.ok_or(DatabaseError::ColumnNotFound(full_name.1))?)
}
}

Expand Down Expand Up @@ -621,7 +621,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}));
}

Err(DatabaseError::NotFound("function", summary.name))
Err(DatabaseError::FunctionNotFound(summary.name))
}

fn return_type(
Expand Down Expand Up @@ -672,10 +672,10 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
}

fn wildcard_expr() -> ScalarExpression {
ScalarExpression::Constant(Arc::new(DataValue::Utf8 {
ScalarExpression::Constant(DataValue::Utf8 {
value: Some("*".to_string()),
ty: Utf8Type::Variable(None),
unit: CharLengthUnits::Characters,
}))
})
}
}
6 changes: 3 additions & 3 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::SchemaRef;
use crate::types::value::{DataValue, ValueRef};
use crate::types::value::DataValue;
use sqlparser::ast::{Expr, Ident, ObjectName};
use std::slice;
use std::sync::Arc;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<T: Transaction> Binder<'_, '_, T> {
value.check_len(ty)?;

if value.logical_type() != *ty {
value = Arc::new(DataValue::clone(&value).cast(ty)?);
value = value.cast(ty)?;
}
row.push(value);
}
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<T: Transaction> Binder<'_, '_, T> {

pub(crate) fn bind_values(
&mut self,
rows: Vec<Vec<ValueRef>>,
rows: Vec<Vec<DataValue>>,
schema_ref: SchemaRef,
) -> LogicalPlan {
LogicalPlan::new(
Expand Down
8 changes: 4 additions & 4 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,12 @@ pub mod test {
ColumnCatalog::new(
"c1".to_string(),
false,
ColumnDesc::new(Integer, true, false, None)?,
ColumnDesc::new(Integer, Some(0), false, None)?,
),
ColumnCatalog::new(
"c2".to_string(),
false,
ColumnDesc::new(Integer, false, true, None)?,
ColumnDesc::new(Integer, None, true, None)?,
),
],
false,
Expand All @@ -587,12 +587,12 @@ pub mod test {
ColumnCatalog::new(
"c3".to_string(),
false,
ColumnDesc::new(Integer, true, false, None)?,
ColumnDesc::new(Integer, Some(0), false, None)?,
),
ColumnCatalog::new(
"c4".to_string(),
false,
ColumnDesc::new(Integer, false, false, None)?,
ColumnDesc::new(Integer, None, false, None)?,
),
],
false,
Expand Down
4 changes: 2 additions & 2 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
if let Some(expr) = limit_expr {
let expr = self.bind_expr(expr)?;
match expr {
ScalarExpression::Constant(dv) => match dv.as_ref() {
ScalarExpression::Constant(dv) => match &dv {
DataValue::Int32(Some(v)) if *v >= 0 => limit = Some(*v as usize),
DataValue::Int64(Some(v)) if *v >= 0 => limit = Some(*v as usize),
_ => return Err(DatabaseError::InvalidType),
Expand All @@ -713,7 +713,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
if let Some(expr) = offset_expr {
let expr = self.bind_expr(&expr.value)?;
match expr {
ScalarExpression::Constant(dv) => match dv.as_ref() {
ScalarExpression::Constant(dv) => match &dv {
DataValue::Int32(Some(v)) if *v > 0 => offset = Some(*v as usize),
DataValue::Int64(Some(v)) if *v > 0 => offset = Some(*v as usize),
_ => return Err(DatabaseError::InvalidType),
Expand Down
Loading

0 comments on commit 104c230

Please sign in to comment.