Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): fix call df func bug&sqlness test #4165

Merged
merged 16 commits into from
Jun 24, 2024
Merged
30 changes: 28 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -321,7 +321,7 @@ impl FlownodeManager {
schema
.get_name(*i)
.clone()
.unwrap_or_else(|| format!("Col_{i}"))
.unwrap_or_else(|| format!("col_{i}"))
})
.collect_vec()
})
@@ -344,7 +344,7 @@ impl FlownodeManager {
.get(idx)
.cloned()
.flatten()
.unwrap_or(format!("Col_{}", idx));
.unwrap_or(format!("col_{}", idx));
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
if schema.typ().time_index == Some(idx) {
ret.with_time_index(true)
2 changes: 2 additions & 0 deletions src/flow/src/adapter/table_source.rs
Original file line number Diff line number Diff line change
@@ -148,6 +148,8 @@ impl TableSource {
column_types,
keys,
time_index,
// by default table schema's column are all non-auto
auto_columns: vec![],
},
names: col_names,
},
26 changes: 22 additions & 4 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
@@ -206,10 +206,15 @@ impl WorkerHandle {

impl Drop for WorkerHandle {
fn drop(&mut self) {
if let Err(err) = self.shutdown_blocking() {
common_telemetry::error!("Fail to shutdown worker: {:?}", err)
let ret = futures::executor::block_on(async { self.shutdown().await });
if let Err(ret) = ret {
common_telemetry::error!(
ret;
"While dropping Worker Handle, failed to shutdown worker, worker might be in inconsistent state."
);
} else {
info!("Flow Worker shutdown due to Worker Handle dropped.")
}
info!("Flow Worker shutdown due to Worker Handle dropped.")
}
}

@@ -496,6 +501,19 @@ mod test {
use crate::plan::Plan;
use crate::repr::{RelationType, Row};

#[test]
fn drop_handle() {
let (tx, rx) = oneshot::channel();
let worker_thread_handle = std::thread::spawn(move || {
let (handle, mut worker) = create_worker();
tx.send(handle).unwrap();
worker.run();
});
let handle = rx.blocking_recv().unwrap();
drop(handle);
worker_thread_handle.join().unwrap();
}

#[tokio::test]
pub async fn test_simple_get_with_worker_and_handle() {
let (tx, rx) = oneshot::channel();
@@ -532,7 +550,7 @@ mod test {
tx.send((Row::empty(), 0, 0)).unwrap();
handle.run_available(0).await.unwrap();
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
handle.shutdown().await.unwrap();
drop(handle);
worker_thread_handle.join().unwrap();
}
}
76 changes: 61 additions & 15 deletions src/flow/src/expr/scalar.rs
Original file line number Diff line number Diff line change
@@ -21,10 +21,10 @@ use bytes::BytesMut;
use common_error::ext::BoxedError;
use common_recordbatch::DfRecordBatch;
use datafusion_physical_expr::PhysicalExpr;
use datatypes::arrow_array;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::{arrow_array, value};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -155,8 +155,10 @@ pub enum ScalarExpr {
exprs: Vec<ScalarExpr>,
},
CallDf {
// TODO(discord9): support shuffle
/// invariant: the input args set inside this [`DfScalarFunction`] is
/// always col(0) to col(n-1) where n is the length of `expr`
df_scalar_fn: DfScalarFunction,
exprs: Vec<ScalarExpr>,
},
/// Conditionally evaluated expressions.
///
@@ -189,8 +191,27 @@ impl DfScalarFunction {
})
}

pub fn try_from_raw_fn(raw_fn: RawDfScalarFn) -> Result<Self, Error> {
Ok(Self {
fn_impl: raw_fn.get_fn_impl()?,
df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?),
raw_fn,
})
}

/// eval a list of expressions using input values
fn eval_args(values: &[Value], exprs: &[ScalarExpr]) -> Result<Vec<Value>, EvalError> {
exprs
.iter()
.map(|expr| expr.eval(values))
.collect::<Result<_, _>>()
}

// TODO(discord9): add RecordBatch support
pub fn eval(&self, values: &[Value]) -> Result<Value, EvalError> {
pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
// first eval exprs to construct values to feed to datafusion
let values: Vec<_> = Self::eval_args(values, exprs)?;

if values.is_empty() {
return InvalidArgumentSnafu {
reason: "values is empty".to_string(),
@@ -259,16 +280,18 @@ impl<'de> serde::de::Deserialize<'de> for DfScalarFunction {
D: serde::de::Deserializer<'de>,
{
let raw_fn = RawDfScalarFn::deserialize(deserializer)?;
let fn_impl = raw_fn.get_fn_impl().map_err(serde::de::Error::custom)?;
DfScalarFunction::new(raw_fn, fn_impl).map_err(serde::de::Error::custom)
DfScalarFunction::try_from_raw_fn(raw_fn).map_err(serde::de::Error::custom)
}
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RawDfScalarFn {
f: bytes::BytesMut,
input_schema: RelationDesc,
extensions: FunctionExtensions,
/// The raw bytes encoded datafusion scalar function
pub(crate) f: bytes::BytesMut,
/// The input schema of the function
pub(crate) input_schema: RelationDesc,
/// Extension contains mapping from function reference to function name
pub(crate) extensions: FunctionExtensions,
}

impl RawDfScalarFn {
@@ -354,7 +377,7 @@ impl ScalarExpr {
Ok(ColumnType::new_nullable(func.signature().output))
}
ScalarExpr::If { then, .. } => then.typ(context),
ScalarExpr::CallDf { df_scalar_fn } => {
ScalarExpr::CallDf { df_scalar_fn, .. } => {
let arrow_typ = df_scalar_fn
.fn_impl
// TODO(discord9): get scheme from args instead?
@@ -445,7 +468,10 @@ impl ScalarExpr {
}
.fail(),
},
ScalarExpr::CallDf { df_scalar_fn } => df_scalar_fn.eval(values),
ScalarExpr::CallDf {
df_scalar_fn,
exprs,
} => df_scalar_fn.eval(values, exprs),
}
}

@@ -614,7 +640,15 @@ impl ScalarExpr {
f(then)?;
f(els)
}
_ => Ok(()),
ScalarExpr::CallDf {
df_scalar_fn: _,
exprs,
} => {
for expr in exprs {
f(expr)?;
}
Ok(())
}
}
}

@@ -650,7 +684,15 @@ impl ScalarExpr {
f(then)?;
f(els)
}
_ => Ok(()),
ScalarExpr::CallDf {
df_scalar_fn: _,
exprs,
} => {
for expr in exprs {
f(expr)?;
}
Ok(())
}
}
}
}
@@ -852,11 +894,15 @@ mod test {
.unwrap();
let extensions = FunctionExtensions::from_iter(vec![(0, "abs")]);
let raw_fn = RawDfScalarFn::from_proto(&raw_scalar_func, input_schema, extensions).unwrap();
let fn_impl = raw_fn.get_fn_impl().unwrap();
let df_func = DfScalarFunction::new(raw_fn, fn_impl).unwrap();
let df_func = DfScalarFunction::try_from_raw_fn(raw_fn).unwrap();
let as_str = serde_json::to_string(&df_func).unwrap();
let from_str: DfScalarFunction = serde_json::from_str(&as_str).unwrap();
assert_eq!(df_func, from_str);
assert_eq!(df_func.eval(&[Value::Null]).unwrap(), Value::Int64(1));
assert_eq!(
df_func
.eval(&[Value::Null], &[ScalarExpr::Column(0)])
.unwrap(),
Value::Int64(1)
);
}
}
29 changes: 27 additions & 2 deletions src/flow/src/repr/relation.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,9 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};

use crate::adapter::error::{DatafusionSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu};
use crate::adapter::error::{
DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu,
};
use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr};

/// a set of column indices that are "keys" for the collection.
@@ -93,13 +95,19 @@ pub struct RelationType {
///
/// A collection can contain multiple sets of keys, although it is common to
/// have either zero or one sets of key indices.
#[serde(default)]
pub keys: Vec<Key>,
/// optionally indicate the column that is TIME INDEX
pub time_index: Option<usize>,
/// mark all the columns that are added automatically by flow, but are not present in original sql
pub auto_columns: Vec<usize>,
}

impl RelationType {
pub fn with_autos(mut self, auto_cols: &[usize]) -> Self {
self.auto_columns = auto_cols.to_vec();
self
}

/// Trying to apply a mpf on current types, will return a new RelationType
/// with the new types, will also try to preserve keys&time index information
/// if the old key&time index columns are preserve in given mfp
@@ -155,10 +163,16 @@ impl RelationType {
let time_index = self
.time_index
.and_then(|old| old_to_new_col.get(&old).cloned());
let auto_columns = self
.auto_columns
.iter()
.filter_map(|old| old_to_new_col.get(old).cloned())
.collect_vec();
Ok(Self {
column_types: mfp_out_types,
keys,
time_index,
auto_columns,
})
}
/// Constructs a `RelationType` representing the relation with no columns and
@@ -175,6 +189,7 @@ impl RelationType {
column_types,
keys: Vec::new(),
time_index: None,
auto_columns: vec![],
}
}

@@ -340,6 +355,16 @@ pub struct RelationDesc {
}

impl RelationDesc {
pub fn len(&self) -> Result<usize> {
ensure!(
self.typ.column_types.len() == self.names.len(),
InternalSnafu {
reason: "Expect typ and names field to be of same length"
}
);
Ok(self.names.len())
}

pub fn to_df_schema(&self) -> Result<DFSchema> {
let fields: Vec<_> = self
.iter()
300 changes: 293 additions & 7 deletions src/flow/src/transform/aggr.rs

Large diffs are not rendered by default.

101 changes: 61 additions & 40 deletions src/flow/src/transform/expr.rs
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ use crate::expr::{
BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc,
UnmaterializableFunc, VariadicFunc,
};
use crate::repr::{ColumnType, RelationDesc};
use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::transform::literal::{from_substrait_literal, from_substrait_type};
use crate::transform::{substrait_proto, FunctionExtensions};
// TODO(discord9): found proper place for this
@@ -99,20 +99,69 @@ pub(crate) fn from_scalar_fn_to_df_fn_impl(
Ok(phy_expr)
}

/// Return an [`Expression`](wrapped in a [`FunctionArgument`]) that references the i-th column of the input relation
pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument {
use substrait_proto::proto::expression;
let expr = Expression {
rex_type: Some(expression::RexType::Selection(Box::new(
expression::FieldReference {
reference_type: Some(expression::field_reference::ReferenceType::DirectReference(
expression::ReferenceSegment {
reference_type: Some(
expression::reference_segment::ReferenceType::StructField(Box::new(
expression::reference_segment::StructField {
field: i as i32,
child: None,
},
)),
),
},
)),
root_type: None,
},
))),
};
substrait_proto::proto::FunctionArgument {
arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value(
expr,
)),
}
}

/// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion
fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction {
let mut f_rewrite = f.clone();
for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() {
*raw_expr = proto_col(idx);
}
f_rewrite
}

impl TypedExpr {
pub fn from_substrait_to_datafusion_scalar_func(
f: &ScalarFunction,
input_schema: &RelationDesc,
arg_exprs_typed: Vec<TypedExpr>,
extensions: &FunctionExtensions,
) -> Result<TypedExpr, Error> {
let phy_expr = from_scalar_fn_to_df_fn_impl(f, input_schema, extensions)?;
let raw_fn = RawDfScalarFn::from_proto(f, input_schema.clone(), extensions.clone())?;
let expr = DfScalarFunction::new(raw_fn, phy_expr)?;
let expr = ScalarExpr::CallDf { df_scalar_fn: expr };
let (arg_exprs, arg_types): (Vec<_>, Vec<_>) =
arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip();

let f_rewrite = rewrite_scalar_function(f);

let input_schema = RelationType::new(arg_types).into_unnamed();
let raw_fn =
RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?;

let df_func = DfScalarFunction::try_from_raw_fn(raw_fn)?;
let expr = ScalarExpr::CallDf {
df_scalar_fn: df_func,
exprs: arg_exprs,
};
// df already know it's own schema, so not providing here
let ret_type = expr.typ(&[])?;
Ok(TypedExpr::new(expr, ret_type))
}

/// Convert ScalarFunction into Flow's ScalarExpr
pub fn from_substrait_scalar_func(
f: &ScalarFunction,
@@ -242,7 +291,7 @@ impl TypedExpr {
} else {
let try_as_df = Self::from_substrait_to_datafusion_scalar_func(
f,
input_schema,
arg_typed_exprs,
extensions,
)?;
Ok(try_as_df)
@@ -395,6 +444,7 @@ mod test {
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};

/// test if `WHERE` condition can be converted to Flow's ScalarExpr in mfp's filter
#[tokio::test]
async fn test_where_and() {
@@ -608,39 +658,10 @@ mod test {
)),
}
}
fn col(i: usize) -> substrait_proto::proto::FunctionArgument {
use substrait_proto::proto::expression;
let expr = Expression {
rex_type: Some(expression::RexType::Selection(Box::new(
expression::FieldReference {
reference_type: Some(
expression::field_reference::ReferenceType::DirectReference(
expression::ReferenceSegment {
reference_type: Some(
expression::reference_segment::ReferenceType::StructField(
Box::new(expression::reference_segment::StructField {
field: i as i32,
child: None,
}),
),
),
},
),
),
root_type: None,
},
))),
};
substrait_proto::proto::FunctionArgument {
arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value(
expr,
)),
}
}

let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0)],
arguments: vec![proto_col(0)],
options: vec![],
output_type: None,
..Default::default()
@@ -663,7 +684,7 @@ mod test {

let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0), col(1)],
arguments: vec![proto_col(0), proto_col(1)],
options: vec![],
output_type: None,
..Default::default()
@@ -690,7 +711,7 @@ mod test {

let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
options: vec![],
output_type: None,
..Default::default()
@@ -718,7 +739,7 @@ mod test {

let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![col(0), lit("1 second")],
arguments: vec![proto_col(0), lit("1 second")],
options: vec![],
output_type: None,
..Default::default()
274 changes: 149 additions & 125 deletions src/flow/src/transform/plan.rs
Original file line number Diff line number Diff line change
@@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};

use itertools::Itertools;
use snafu::OptionExt;
use substrait::substrait_proto_df::proto::{FilterRel, ReadRel};
use substrait_proto::proto::expression::MaskExpression;
use substrait_proto::proto::read_rel::ReadType;
use substrait_proto::proto::rel::RelType;
use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel};
use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel};

use crate::adapter::error::{
Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
@@ -63,141 +64,164 @@ impl TypedPlan {
}
}

/// Convert Substrait Rel into Flow's TypedPlan
/// TODO(discord9): SELECT DISTINCT(does it get compile with something else?)
pub fn from_substrait_rel(
pub fn from_substrait_project(
ctx: &mut FlownodeContext,
rel: &Rel,
p: &ProjectRel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
match &rel.rel_type {
Some(RelType::Project(p)) => {
let input = if let Some(input) = p.input.as_ref() {
TypedPlan::from_substrait_rel(ctx, input, extensions)?
} else {
return not_impl_err!("Projection without an input is not supported");
};
let input = if let Some(input) = p.input.as_ref() {
TypedPlan::from_substrait_rel(ctx, input, extensions)?
} else {
return not_impl_err!("Projection without an input is not supported");
};

// because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value
// function to correctly transform it, and late rewrite it
let schema_before_expand = {
let input_schema = input.schema.clone();
let auto_columns: HashSet<usize> =
HashSet::from_iter(input_schema.typ().auto_columns.clone());
let not_auto_added_columns = (0..input_schema.len()?)
.filter(|i| !auto_columns.contains(i))
.collect_vec();
let mfp = MapFilterProject::new(input_schema.len()?)
.project(not_auto_added_columns)?
.into_safe();

let mut exprs: Vec<TypedExpr> = vec![];
for e in &p.expressions {
let expr = TypedExpr::from_substrait_rex(e, &input.schema, extensions)?;
exprs.push(expr);
input_schema.apply_mfp(&mfp)?
};

let mut exprs: Vec<TypedExpr> = Vec::with_capacity(p.expressions.len());
for e in &p.expressions {
let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions)?;
exprs.push(expr);
}
let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
if is_literal {
let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
.into_iter()
.map(|TypedExpr { expr, typ }| (expr, typ))
.unzip();
let typ = RelationType::new(lit_types);
let row = literals
.into_iter()
.map(|lit| lit.as_literal().expect("A literal"))
.collect_vec();
let row = repr::Row::new(row);
let plan = Plan::Constant {
rows: vec![(row, repr::Timestamp::MIN, 1)],
};
Ok(TypedPlan {
schema: typ.into_unnamed(),
plan,
})
} else {
match input.plan.clone() {
Plan::Reduce { key_val_plan, .. } => {
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
}
let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
if is_literal {
let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
.into_iter()
.map(|TypedExpr { expr, typ }| (expr, typ))
.unzip();
let typ = RelationType::new(lit_types);
let row = literals
.into_iter()
.map(|lit| lit.as_literal().expect("A literal"))
.collect_vec();
let row = repr::Row::new(row);
let plan = Plan::Constant {
rows: vec![(row, repr::Timestamp::MIN, 1)],
};
Ok(TypedPlan {
schema: typ.into_unnamed(),
plan,
})
} else {
match input.plan.clone() {
Plan::Reduce { key_val_plan, .. } => {
rewrite_projection_after_reduce(
key_val_plan,
&input.schema,
&mut exprs,
)?;
}
Plan::Mfp { input, mfp: _ } => {
if let Plan::Reduce { key_val_plan, .. } = input.plan {
rewrite_projection_after_reduce(
key_val_plan,
&input.schema,
&mut exprs,
)?;
}
}
_ => (),
Plan::Mfp { input, mfp: _ } => {
if let Plan::Reduce { key_val_plan, .. } = input.plan {
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
}
input.projection(exprs)
}
_ => (),
}
Some(RelType::Filter(filter)) => {
let input = if let Some(input) = filter.input.as_ref() {
TypedPlan::from_substrait_rel(ctx, input, extensions)?
} else {
return not_impl_err!("Filter without an input is not supported");
};
input.projection(exprs)
}
}

let expr = if let Some(condition) = filter.condition.as_ref() {
TypedExpr::from_substrait_rex(condition, &input.schema, extensions)?
} else {
return not_impl_err!("Filter without an condition is not valid");
};
input.filter(expr)
}
Some(RelType::Read(read)) => {
if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type {
let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
reason: "Query context not found",
})?;
let table_reference = match nt.names.len() {
1 => [
query_ctx.current_catalog().to_string(),
query_ctx.current_schema().to_string(),
nt.names[0].clone(),
],
2 => [
query_ctx.current_catalog().to_string(),
nt.names[0].clone(),
nt.names[1].clone(),
],
3 => [
nt.names[0].clone(),
nt.names[1].clone(),
nt.names[2].clone(),
],
_ => InvalidQuerySnafu {
reason: "Expect table to have name",
}
.fail()?,
};
let table = ctx.table(&table_reference)?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),
};
let get_table = TypedPlan {
schema: table.1,
plan: get_table,
};
pub fn from_substrait_filter(
ctx: &mut FlownodeContext,
filter: &FilterRel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
let input = if let Some(input) = filter.input.as_ref() {
TypedPlan::from_substrait_rel(ctx, input, extensions)?
} else {
return not_impl_err!("Filter without an input is not supported");
};

if let Some(MaskExpression {
select: Some(projection),
..
}) = &read.projection
{
let column_indices: Vec<usize> = projection
.struct_items
.iter()
.map(|item| item.field as usize)
.collect();
let input_arity = get_table.schema.typ().column_types.len();
let mfp =
MapFilterProject::new(input_arity).project(column_indices.clone())?;
get_table.mfp(mfp.into_safe())
} else {
Ok(get_table)
}
} else {
not_impl_err!("Only NamedTable reads are supported")
let expr = if let Some(condition) = filter.condition.as_ref() {
TypedExpr::from_substrait_rex(condition, &input.schema, extensions)?
} else {
return not_impl_err!("Filter without an condition is not valid");
};
input.filter(expr)
}

pub fn from_substrait_read(
ctx: &mut FlownodeContext,
read: &ReadRel,
_extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
if let Some(ReadType::NamedTable(nt)) = &read.read_type {
let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
reason: "Query context not found",
})?;
let table_reference = match nt.names.len() {
1 => [
query_ctx.current_catalog().to_string(),
query_ctx.current_schema().to_string(),
nt.names[0].clone(),
],
2 => [
query_ctx.current_catalog().to_string(),
nt.names[0].clone(),
nt.names[1].clone(),
],
3 => [
nt.names[0].clone(),
nt.names[1].clone(),
nt.names[2].clone(),
],
_ => InvalidQuerySnafu {
reason: "Expect table to have name",
}
.fail()?,
};
let table = ctx.table(&table_reference)?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),
};
let get_table = TypedPlan {
schema: table.1,
plan: get_table,
};

if let Some(MaskExpression {
select: Some(projection),
..
}) = &read.projection
{
let column_indices: Vec<usize> = projection
.struct_items
.iter()
.map(|item| item.field as usize)
.collect();
let input_arity = get_table.schema.typ().column_types.len();
let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?;
get_table.mfp(mfp.into_safe())
} else {
Ok(get_table)
}
Some(RelType::Aggregate(agg)) => {
TypedPlan::from_substrait_agg_rel(ctx, agg, extensions)
}
} else {
not_impl_err!("Only NamedTable reads are supported")
}
}

/// Convert Substrait Rel into Flow's TypedPlan
/// TODO(discord9): SELECT DISTINCT(does it get compile with something else?)
pub fn from_substrait_rel(
ctx: &mut FlownodeContext,
rel: &Rel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
match &rel.rel_type {
Some(RelType::Project(p)) => Self::from_substrait_project(ctx, p.as_ref(), extensions),
Some(RelType::Filter(filter)) => Self::from_substrait_filter(ctx, filter, extensions),
Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions),
Some(RelType::Aggregate(agg)) => Self::from_substrait_agg_rel(ctx, agg, extensions),
_ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type),
}
}
61 changes: 61 additions & 0 deletions tests/cases/standalone/flow/basic.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

Affected Rows: 0

CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

Affected Rows: 0

INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

Affected Rows: 2

-- SQLNESS SLEEP 3s
SELECT col_0, window_start, window_end FROM out_num_cnt;

+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+

INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");

Affected Rows: 2

-- SQLNESS SLEEP 2s
SELECT col_0, window_start, window_end FROM out_num_cnt;

+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+-------+---------------------+---------------------+

DROP FLOW test_numbers;

Affected Rows: 0

DROP TABLE numbers_input;

Affected Rows: 0

DROP TABLE out_num_cnt;

Affected Rows: 0

31 changes: 31 additions & 0 deletions tests/cases/standalone/flow/basic.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

-- SQLNESS SLEEP 3s
SELECT col_0, window_start, window_end FROM out_num_cnt;

INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");

-- SQLNESS SLEEP 2s
SELECT col_0, window_start, window_end FROM out_num_cnt;

DROP FLOW test_numbers;
DROP TABLE numbers_input;
DROP TABLE out_num_cnt;
126 changes: 126 additions & 0 deletions tests/cases/standalone/flow/df_func.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

Affected Rows: 0

-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

Affected Rows: 0

INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

Affected Rows: 2

-- sleep a little bit longer to make sure that table is created and data is inserted
-- SQLNESS SLEEP 3s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+

INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");

Affected Rows: 2

-- SQLNESS SLEEP 2s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+-------+---------------------+---------------------+

DROP FLOW test_numbers_df_func;

Affected Rows: 0

DROP TABLE numbers_input_df_func;

Affected Rows: 0

DROP TABLE out_num_cnt_df_func;

Affected Rows: 0

CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

Affected Rows: 0

-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

Affected Rows: 0

INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

Affected Rows: 2

-- sleep a little bit longer to make sure that table is created and data is inserted
-- SQLNESS SLEEP 3s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+

INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");

Affected Rows: 2

-- SQLNESS SLEEP 2s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

+-------+---------------------+---------------------+
| col_0 | window_start | window_end |
+-------+---------------------+---------------------+
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+-------+---------------------+---------------------+

DROP FLOW test_numbers_df_func;

Affected Rows: 0

DROP TABLE numbers_input_df_func;

Affected Rows: 0

DROP TABLE out_num_cnt_df_func;

Affected Rows: 0

67 changes: 67 additions & 0 deletions tests/cases/standalone/flow/df_func.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

-- sleep a little bit longer to make sure that table is created and data is inserted
-- SQLNESS SLEEP 3s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");

-- SQLNESS SLEEP 2s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
DROP TABLE out_num_cnt_df_func;

CREATE TABLE numbers_input_df_func (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working
CREATE FLOW test_numbers_df_func
SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

INSERT INTO numbers_input_df_func
VALUES
(-20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

-- sleep a little bit longer to make sure that table is created and data is inserted
-- SQLNESS SLEEP 3s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");

-- SQLNESS SLEEP 2s
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;

DROP FLOW test_numbers_df_func;
DROP TABLE numbers_input_df_func;
DROP TABLE out_num_cnt_df_func;
3 changes: 2 additions & 1 deletion tests/runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,7 +17,8 @@ common-recordbatch.workspace = true
common-time.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlness = { version = "0.5" }
# sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1
sqlness = "0.6.1"
tempfile.workspace = true
tinytemplate = "1.2"
tokio.workspace = true