From 079b9937c9d9401626b836f3640375edf5a37dbd Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Wed, 24 Jul 2024 22:00:15 +0000 Subject: [PATCH] It cannot possibly work --- datafusion/core/src/physical_planner.rs | 26 +-- .../core/tests/fuzz_cases/window_fuzz.rs | 12 +- datafusion/expr/src/expr.rs | 13 +- datafusion/expr/src/expr_schema.rs | 8 - datafusion/expr/src/lib.rs | 2 - datafusion/expr/src/tree_node.rs | 10 -- .../expr/src/type_coercion/aggregates.rs | 2 +- datafusion/functions-array/src/planner.rs | 7 +- .../optimizer/src/analyzer/type_coercion.rs | 57 +------ datafusion/optimizer/src/decorrelate.rs | 3 - .../src/single_distinct_to_groupby.rs | 69 -------- .../physical-plan/src/aggregates/mod.rs | 2 - datafusion/physical-plan/src/windows/mod.rs | 18 -- datafusion/proto/proto/datafusion.proto | 8 - datafusion/proto/src/generated/pbjson.rs | 157 ------------------ datafusion/proto/src/generated/prost.rs | 17 +- .../proto/src/logical_plan/from_proto.rs | 37 +---- datafusion/proto/src/logical_plan/to_proto.rs | 8 +- .../proto/src/physical_plan/from_proto.rs | 9 - datafusion/proto/src/physical_plan/mod.rs | 24 +-- .../proto/src/physical_plan/to_proto.rs | 74 +-------- .../tests/cases/roundtrip_physical_plan.rs | 1 + datafusion/sql/src/expr/function.rs | 51 +----- .../substrait/src/logical_plan/consumer.rs | 8 +- .../substrait/src/logical_plan/producer.rs | 32 ---- 25 files changed, 28 insertions(+), 627 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 329d343f13fc1..e120bb79ee330 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -59,7 +59,7 @@ use crate::physical_plan::unnest::UnnestExec; use crate::physical_plan::values::ValuesExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ - aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, + displayable, udaf, windows, AggregateExpr, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, Partitioning, PhysicalExpr, WindowExpr, }; @@ -1812,7 +1812,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( e: &Expr, name: impl Into, logical_input_schema: &DFSchema, - physical_input_schema: &Schema, + _physical_input_schema: &Schema, execution_props: &ExecutionProps, ) -> Result { match e { @@ -1840,28 +1840,6 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( == NullTreatment::IgnoreNulls; let (agg_expr, filter, order_by) = match func_def { - AggregateFunctionDefinition::BuiltIn(fun) => { - let physical_sort_exprs = match order_by { - Some(exprs) => Some(create_physical_sort_exprs( - exprs, - logical_input_schema, - execution_props, - )?), - None => None, - }; - let ordering_reqs: Vec = - physical_sort_exprs.clone().unwrap_or(vec![]); - let agg_expr = aggregates::create_aggregate_expr( - fun, - *distinct, - &physical_args, - &ordering_reqs, - physical_input_schema, - name, - ignore_nulls, - )?; - (agg_expr, filter, physical_sort_exprs) - } AggregateFunctionDefinition::UDF(fun) => { let sort_exprs = order_by.clone().unwrap_or(vec![]); let physical_sort_exprs = match order_by { diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index ddfa940975d8a..028dad9d488bc 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -32,7 +32,6 @@ use datafusion::physical_plan::{collect, InputOrderMode}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::{Result, ScalarValue}; use datafusion_common_runtime::SpawnedTask; -use datafusion_expr::type_coercion::aggregates::coerce_types; use datafusion_expr::type_coercion::functions::data_types_with_aggregate_udf; use datafusion_expr::{ BuiltInWindowFunction, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -465,16 +464,7 @@ fn get_random_function( let fn_name = window_fn_map.keys().collect::>()[rand_fn_idx]; let (window_fn, args) = window_fn_map.values().collect::>()[rand_fn_idx]; let mut args = args.clone(); - if let WindowFunctionDefinition::AggregateFunction(f) = window_fn { - if !args.is_empty() { - // Do type coercion first argument - let a = args[0].clone(); - let dt = a.data_type(schema.as_ref()).unwrap(); - let sig = f.signature(); - let coerced = coerce_types(f, &[dt], &sig).unwrap(); - args[0] = cast(a, schema, coerced[0].clone()).unwrap(); - } - } else if let WindowFunctionDefinition::AggregateUDF(udf) = window_fn { + if let WindowFunctionDefinition::AggregateUDF(udf) = window_fn { if !args.is_empty() { // Do type coercion first argument let a = args[0].clone(); diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 2cc22ef736e14..6a0a7a9458285 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -28,7 +28,7 @@ use crate::expr_fn::binary_expr; use crate::logical_plan::Subquery; use crate::utils::expr_to_columns; use crate::{ - aggregate_function, built_in_window_function, udaf, ExprSchemable, Operator, + built_in_window_function, udaf, ExprSchemable, Operator, Signature, }; use crate::{window_frame, Volatility}; @@ -697,7 +697,7 @@ impl WindowFunctionDefinition { pub fn return_type( &self, input_expr_types: &[DataType], - input_expr_nullable: &[bool], + _input_expr_nullable: &[bool], ) -> Result { match self { WindowFunctionDefinition::BuiltInWindowFunction(fun) => { @@ -713,7 +713,6 @@ impl WindowFunctionDefinition { /// the signatures supported by the function `fun`. pub fn signature(&self) -> Signature { match self { - WindowFunctionDefinition::AggregateFunction(fun) => fun.signature(), WindowFunctionDefinition::BuiltInWindowFunction(fun) => fun.signature(), WindowFunctionDefinition::AggregateUDF(fun) => fun.signature().clone(), WindowFunctionDefinition::WindowUDF(fun) => fun.signature().clone(), @@ -725,7 +724,6 @@ impl WindowFunctionDefinition { match self { WindowFunctionDefinition::BuiltInWindowFunction(fun) => fun.name(), WindowFunctionDefinition::WindowUDF(fun) => fun.name(), - WindowFunctionDefinition::AggregateFunction(fun) => fun.name(), WindowFunctionDefinition::AggregateUDF(fun) => fun.name(), } } @@ -734,9 +732,6 @@ impl WindowFunctionDefinition { impl fmt::Display for WindowFunctionDefinition { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - WindowFunctionDefinition::AggregateFunction(fun) => { - std::fmt::Display::fmt(fun, f) - } WindowFunctionDefinition::BuiltInWindowFunction(fun) => { std::fmt::Display::fmt(fun, f) } @@ -798,10 +793,6 @@ pub fn find_df_window_func(name: &str) -> Option { Some(WindowFunctionDefinition::BuiltInWindowFunction( built_in_function, )) - } else if let Ok(aggregate) = - aggregate_function::AggregateFunction::from_str(name.as_str()) - { - Some(WindowFunctionDefinition::AggregateFunction(aggregate)) } else { None } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 5e0571f712ee5..6344b892adb7e 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -198,14 +198,7 @@ impl ExprSchemable for Expr { .iter() .map(|e| e.get_type(schema)) .collect::>>()?; - let nullability = args - .iter() - .map(|e| e.nullable(schema)) - .collect::>>()?; match func_def { - AggregateFunctionDefinition::BuiltIn(fun) => { - fun.return_type(&data_types, &nullability) - } AggregateFunctionDefinition::UDF(fun) => { let new_types = data_types_with_aggregate_udf(&data_types, fun) .map_err(|err| { @@ -338,7 +331,6 @@ impl ExprSchemable for Expr { Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema), Expr::AggregateFunction(AggregateFunction { func_def, .. }) => { match func_def { - AggregateFunctionDefinition::BuiltIn(fun) => fun.nullable(), // TODO: UDF should be able to customize nullability AggregateFunctionDefinition::UDF(udf) if udf.name() == "count" => { Ok(false) diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index e1943c890e7c3..d5e3fa412bfd4 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -39,7 +39,6 @@ mod udaf; mod udf; mod udwf; -pub mod aggregate_function; pub mod conditional_expressions; pub mod execution_props; pub mod expr; @@ -63,7 +62,6 @@ pub mod window_frame; pub mod window_state; pub use accumulator::Accumulator; -pub use aggregate_function::AggregateFunction; pub use built_in_window_function::BuiltInWindowFunction; pub use columnar_value::ColumnarValue; pub use expr::{ diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index f1df8609f903c..a7b27c8048efa 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -319,16 +319,6 @@ impl TreeNode for Expr { )? .map_data( |(new_args, new_filter, new_order_by)| match func_def { - AggregateFunctionDefinition::BuiltIn(fun) => { - Ok(Expr::AggregateFunction(AggregateFunction::new( - fun, - new_args, - distinct, - new_filter, - new_order_by, - null_treatment, - ))) - } AggregateFunctionDefinition::UDF(fun) => { Ok(Expr::AggregateFunction(AggregateFunction::new_udf( fun, diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index 6ac0ac3ac95cc..2fc6ac7f0d4c4 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{AggregateFunction, Signature, TypeSignature}; +use crate::TypeSignature; use arrow::datatypes::{ DataType, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, diff --git a/datafusion/functions-array/src/planner.rs b/datafusion/functions-array/src/planner.rs index 3f779c9f111ed..428b47bf35e7d 100644 --- a/datafusion/functions-array/src/planner.rs +++ b/datafusion/functions-array/src/planner.rs @@ -171,9 +171,6 @@ impl ExprPlanner for FieldAccessPlanner { } fn is_array_agg(agg_func: &datafusion_expr::expr::AggregateFunction) -> bool { - if let AggregateFunctionDefinition::UDF(udf) = &agg_func.func_def { - return udf.name() == "array_agg"; - } - - false + let AggregateFunctionDefinition::UDF(udf) = &agg_func.func_def; + return udf.name() == "array_agg"; } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 50fb1b8193ceb..94812d0c94d8f 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -47,8 +47,8 @@ use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8}; use datafusion_expr::utils::merge_schema; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, not, - type_coercion, AggregateFunction, AggregateUDF, Expr, ExprSchemable, LogicalPlan, - Operator, ScalarUDF, Signature, WindowFrame, WindowFrameBound, WindowFrameUnits, + AggregateUDF, Expr, ExprSchemable, LogicalPlan, + Operator, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, }; use crate::analyzer::AnalyzerRule; @@ -400,24 +400,6 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { order_by, null_treatment, }) => match func_def { - AggregateFunctionDefinition::BuiltIn(fun) => { - let new_expr = coerce_agg_exprs_for_signature( - &fun, - args, - self.schema, - &fun.signature(), - )?; - Ok(Transformed::yes(Expr::AggregateFunction( - expr::AggregateFunction::new( - fun, - new_expr, - distinct, - filter, - order_by, - null_treatment, - ), - ))) - } AggregateFunctionDefinition::UDF(fun) => { let new_expr = coerce_arguments_for_signature_with_aggregate_udf( args, @@ -448,14 +430,6 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { coerce_window_frame(window_frame, self.schema, &order_by)?; let args = match &fun { - expr::WindowFunctionDefinition::AggregateFunction(fun) => { - coerce_agg_exprs_for_signature( - fun, - args, - self.schema, - &fun.signature(), - )? - } expr::WindowFunctionDefinition::AggregateUDF(udf) => { coerce_arguments_for_signature_with_aggregate_udf( args, @@ -691,33 +665,6 @@ fn coerce_arguments_for_fun( } } -/// Returns the coerced exprs for each `input_exprs`. -/// Get the coerced data type from `aggregate_rule::coerce_types` and add `try_cast` if the -/// data type of `input_exprs` need to be coerced. -fn coerce_agg_exprs_for_signature( - agg_fun: &AggregateFunction, - input_exprs: Vec, - schema: &DFSchema, - signature: &Signature, -) -> Result> { - if input_exprs.is_empty() { - return Ok(input_exprs); - } - let current_types = input_exprs - .iter() - .map(|e| e.get_type(schema)) - .collect::>>()?; - - let coerced_types = - type_coercion::aggregates::coerce_types(agg_fun, ¤t_types, signature)?; - - input_exprs - .into_iter() - .enumerate() - .map(|(i, expr)| expr.cast_to(&coerced_types[i], schema)) - .collect() -} - fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { // Given expressions like: // diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index c998e8442548c..6dbf1641bd7cb 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -436,9 +436,6 @@ fn agg_exprs_evaluation_result_on_empty_batch( Expr::AggregateFunction(expr::AggregateFunction { func_def, .. }) => match func_def { - AggregateFunctionDefinition::BuiltIn(_fun) => { - Transformed::yes(Expr::Literal(ScalarValue::Null)) - } AggregateFunctionDefinition::UDF(fun) => { if fun.name() == "count" { Transformed::yes(Expr::Literal(ScalarValue::Int64(Some( diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 23acae7063d15..dff06d48ce80d 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -70,26 +70,6 @@ fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result { let mut aggregate_count = 0; for expr in aggr_expr { if let Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::BuiltIn(_fun), - distinct, - args, - filter, - order_by, - null_treatment: _, - }) = expr - { - if filter.is_some() || order_by.is_some() { - return Ok(false); - } - aggregate_count += 1; - if *distinct { - for e in args { - fields_set.insert(e); - } - } else { - return Ok(false); - } - } else if let Expr::AggregateFunction(AggregateFunction { func_def: AggregateFunctionDefinition::UDF(fun), distinct, args, @@ -203,55 +183,6 @@ impl OptimizerRule for SingleDistinctToGroupBy { let outer_aggr_exprs = aggr_expr .into_iter() .map(|aggr_expr| match aggr_expr { - Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::BuiltIn(fun), - mut args, - distinct, - .. - }) => { - if distinct { - if args.len() != 1 { - return internal_err!("DISTINCT aggregate should have exactly one argument"); - } - let arg = args.swap_remove(0); - - if group_fields_set.insert(arg.display_name()?) { - inner_group_exprs - .push(arg.alias(SINGLE_DISTINCT_ALIAS)); - } - Ok(Expr::AggregateFunction(AggregateFunction::new( - fun, - vec![col(SINGLE_DISTINCT_ALIAS)], - false, // intentional to remove distinct here - None, - None, - None, - ))) - // if the aggregate function is not distinct, we need to rewrite it like two phase aggregation - } else { - index += 1; - let alias_str = format!("alias{}", index); - inner_aggr_exprs.push( - Expr::AggregateFunction(AggregateFunction::new( - fun.clone(), - args, - false, - None, - None, - None, - )) - .alias(&alias_str), - ); - Ok(Expr::AggregateFunction(AggregateFunction::new( - fun, - vec![col(&alias_str)], - false, - None, - None, - None, - ))) - } - } Expr::AggregateFunction(AggregateFunction { func_def: AggregateFunctionDefinition::UDF(udf), mut args, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e7cd5cb2725be..44cfdec13c9a5 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -55,8 +55,6 @@ mod row_hash; mod topk; mod topk_stream; -pub use datafusion_expr::AggregateFunction; -pub use datafusion_physical_expr::expressions::create_aggregate_expr; /// Hash aggregate modes #[derive(Debug, Copy, Clone, PartialEq, Eq)] diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 959796489c191..22645fbecbf7c 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -21,7 +21,6 @@ use std::borrow::Borrow; use std::sync::Arc; use crate::{ - aggregates, expressions::{ cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, NthValue, Ntile, PhysicalSortExpr, RowNumber, @@ -103,23 +102,6 @@ pub fn create_window_expr( ignore_nulls: bool, ) -> Result> { Ok(match fun { - WindowFunctionDefinition::AggregateFunction(fun) => { - let aggregate = aggregates::create_aggregate_expr( - fun, - false, - args, - &[], - input_schema, - name, - ignore_nulls, - )?; - window_expr_from_aggregate_expr( - partition_by, - order_by, - window_frame, - aggregate, - ) - } WindowFunctionDefinition::BuiltInWindowFunction(fun) => { Arc::new(BuiltInWindowExpr::new( create_built_in_window_expr(fun, args, input_schema, name, ignore_nulls)?, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index b06aca25f8f52..8b8fd6c261263 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -310,8 +310,6 @@ message LogicalExprNode { // binary expressions BinaryExprNode binary_expr = 4; - // aggregate expressions - AggregateExprNode aggregate_expr = 5; // null checks IsNull is_null_expr = 6; @@ -465,12 +463,6 @@ message InListNode { bool negated = 3; } -message AggregateExprNode { - repeated LogicalExprNode expr = 2; - bool distinct = 3; - LogicalExprNode filter = 4; - repeated LogicalExprNode order_by = 5; -} message AggregateUDFExprNode { string fun_name = 1; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 6a98cfcb330cf..2d46b439e056f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -362,149 +362,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { deserializer.deserialize_struct("datafusion.AggregateExecNode", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for AggregateExprNode { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if !self.expr.is_empty() { - len += 1; - } - if self.distinct { - len += 1; - } - if self.filter.is_some() { - len += 1; - } - if !self.order_by.is_empty() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.AggregateExprNode", len)?; - if !self.expr.is_empty() { - struct_ser.serialize_field("expr", &self.expr)?; - } - if self.distinct { - struct_ser.serialize_field("distinct", &self.distinct)?; - } - if let Some(v) = self.filter.as_ref() { - struct_ser.serialize_field("filter", v)?; - } - if !self.order_by.is_empty() { - struct_ser.serialize_field("orderBy", &self.order_by)?; - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for AggregateExprNode { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "expr", - "distinct", - "filter", - "order_by", - "orderBy", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Expr, - Distinct, - Filter, - OrderBy, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "expr" => Ok(GeneratedField::Expr), - "distinct" => Ok(GeneratedField::Distinct), - "filter" => Ok(GeneratedField::Filter), - "orderBy" | "order_by" => Ok(GeneratedField::OrderBy), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = AggregateExprNode; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.AggregateExprNode") - } - - fn visit_map(self, mut map_: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut expr__ = None; - let mut distinct__ = None; - let mut filter__ = None; - let mut order_by__ = None; - while let Some(k) = map_.next_key()? { - match k { - GeneratedField::Expr => { - if expr__.is_some() { - return Err(serde::de::Error::duplicate_field("expr")); - } - expr__ = Some(map_.next_value()?); - } - GeneratedField::Distinct => { - if distinct__.is_some() { - return Err(serde::de::Error::duplicate_field("distinct")); - } - distinct__ = Some(map_.next_value()?); - } - GeneratedField::Filter => { - if filter__.is_some() { - return Err(serde::de::Error::duplicate_field("filter")); - } - filter__ = map_.next_value()?; - } - GeneratedField::OrderBy => { - if order_by__.is_some() { - return Err(serde::de::Error::duplicate_field("orderBy")); - } - order_by__ = Some(map_.next_value()?); - } - } - } - Ok(AggregateExprNode { - expr: expr__.unwrap_or_default(), - distinct: distinct__.unwrap_or_default(), - filter: filter__, - order_by: order_by__.unwrap_or_default(), - }) - } - } - deserializer.deserialize_struct("datafusion.AggregateExprNode", FIELDS, GeneratedVisitor) - } -} impl serde::Serialize for AggregateMode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -9384,9 +9241,6 @@ impl serde::Serialize for LogicalExprNode { logical_expr_node::ExprType::BinaryExpr(v) => { struct_ser.serialize_field("binaryExpr", v)?; } - logical_expr_node::ExprType::AggregateExpr(v) => { - struct_ser.serialize_field("aggregateExpr", v)?; - } logical_expr_node::ExprType::IsNullExpr(v) => { struct_ser.serialize_field("isNullExpr", v)?; } @@ -9488,8 +9342,6 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { "literal", "binary_expr", "binaryExpr", - "aggregate_expr", - "aggregateExpr", "is_null_expr", "isNullExpr", "is_not_null_expr", @@ -9543,7 +9395,6 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { Alias, Literal, BinaryExpr, - AggregateExpr, IsNullExpr, IsNotNullExpr, NotExpr, @@ -9597,7 +9448,6 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { "alias" => Ok(GeneratedField::Alias), "literal" => Ok(GeneratedField::Literal), "binaryExpr" | "binary_expr" => Ok(GeneratedField::BinaryExpr), - "aggregateExpr" | "aggregate_expr" => Ok(GeneratedField::AggregateExpr), "isNullExpr" | "is_null_expr" => Ok(GeneratedField::IsNullExpr), "isNotNullExpr" | "is_not_null_expr" => Ok(GeneratedField::IsNotNullExpr), "notExpr" | "not_expr" => Ok(GeneratedField::NotExpr), @@ -9674,13 +9524,6 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { return Err(serde::de::Error::duplicate_field("binaryExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::BinaryExpr) -; - } - GeneratedField::AggregateExpr => { - if expr_type__.is_some() { - return Err(serde::de::Error::duplicate_field("aggregateExpr")); - } - expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::AggregateExpr) ; } GeneratedField::IsNullExpr => { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 5b48fd26bcb1c..a4cefb8c09e00 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -486,7 +486,7 @@ pub struct SubqueryAliasNode { pub struct LogicalExprNode { #[prost( oneof = "logical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35" )] pub expr_type: ::core::option::Option, } @@ -506,9 +506,6 @@ pub mod logical_expr_node { /// binary expressions #[prost(message, tag = "4")] BinaryExpr(super::BinaryExprNode), - /// aggregate expressions - #[prost(message, tag = "5")] - AggregateExpr(::prost::alloc::boxed::Box), /// null checks #[prost(message, tag = "6")] IsNullExpr(::prost::alloc::boxed::Box), @@ -731,18 +728,6 @@ pub struct InListNode { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct AggregateExprNode { - #[prost(message, repeated, tag = "2")] - pub expr: ::prost::alloc::vec::Vec, - #[prost(bool, tag = "3")] - pub distinct: bool, - #[prost(message, optional, boxed, tag = "4")] - pub filter: ::core::option::Option<::prost::alloc::boxed::Box>, - #[prost(message, repeated, tag = "5")] - pub order_by: ::prost::alloc::vec::Vec, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct AggregateUdfExprNode { #[prost(string, tag = "1")] pub fun_name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 15c574ebbd946..bb5daf3ba4bf0 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -28,7 +28,7 @@ use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_ use datafusion_expr::{ expr::{self, InList, Sort, WindowFunction}, logical_plan::{PlanType, StringifiedPlan}, - AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, Case, Cast, Expr, + Between, BinaryExpr, BuiltInWindowFunction, Case, Cast, Expr, GroupingSet, GroupingSet::GroupingSets, JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, @@ -226,11 +226,6 @@ impl From for JoinConstraint { } } -pub fn parse_i32_to_aggregate_function(value: &i32) -> Result { - protobuf::AggregateFunction::try_from(*value) - .map(|a| a.into()) - .map_err(|_| Error::unknown("AggregateFunction", *value)) -} pub fn parse_expr( proto: &protobuf::LogicalExprNode, @@ -295,23 +290,6 @@ pub fn parse_expr( regularize_window_order_by(&window_frame, &mut order_by)?; match window_function { - window_expr_node::WindowFunction::AggrFunction(i) => { - let aggr_function = parse_i32_to_aggregate_function(i)?; - - Ok(Expr::WindowFunction(WindowFunction::new( - expr::WindowFunctionDefinition::AggregateFunction(aggr_function), - vec![parse_required_expr( - expr.expr.as_deref(), - registry, - "expr", - codec, - )?], - partition_by, - order_by, - window_frame, - None, - ))) - } window_expr_node::WindowFunction::BuiltInFunction(i) => { let built_in_function = protobuf::BuiltInWindowFunction::try_from(*i) .map_err(|_| Error::unknown("BuiltInWindowFunction", *i))? @@ -373,19 +351,6 @@ pub fn parse_expr( } } } - ExprType::AggregateExpr(expr) => { - let fun = parse_i32_to_aggregate_function(&expr.aggr_function)?; - - Ok(Expr::AggregateFunction(expr::AggregateFunction::new( - fun, - parse_exprs(&expr.expr, registry, codec)?, - expr.distinct, - parse_optional_expr(expr.filter.as_deref(), registry, codec)? - .map(Box::new), - parse_vec_expr(&expr.order_by, registry, codec)?, - None, - ))) - } ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new( parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?, alias diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index f1590760f2025..330215b6a07ba 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -25,7 +25,7 @@ use datafusion_expr::expr::{ InList, Like, Placeholder, ScalarFunction, Sort, Unnest, }; use datafusion_expr::{ - logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction, + logical_plan::PlanType, logical_plan::StringifiedPlan, BuiltInWindowFunction, Expr, JoinConstraint, JoinType, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; @@ -311,12 +311,6 @@ pub fn serialize_expr( null_treatment: _, }) => { let (window_function, fun_definition) = match fun { - WindowFunctionDefinition::AggregateFunction(fun) => ( - protobuf::window_expr_node::WindowFunction::AggrFunction( - protobuf::AggregateFunction::from(fun).into(), - ), - None, - ), WindowFunctionDefinition::BuiltInWindowFunction(fun) => ( protobuf::window_expr_node::WindowFunction::BuiltInFunction( protobuf::BuiltInWindowFunction::from(fun).into(), diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 5ecca51478053..bc0a19336bae4 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -145,15 +145,6 @@ pub fn parse_physical_window_expr( let fun = if let Some(window_func) = proto.window_function.as_ref() { match window_func { - protobuf::physical_window_expr_node::WindowFunction::AggrFunction(n) => { - let f = protobuf::AggregateFunction::try_from(*n).map_err(|_| { - proto_error(format!( - "Received an unknown window aggregate function: {n}" - )) - })?; - - WindowFunctionDefinition::AggregateFunction(f.into()) - } protobuf::physical_window_expr_node::WindowFunction::BuiltInFunction(n) => { let f = protobuf::BuiltInWindowFunction::try_from(*n).map_err(|_| { proto_error(format!( diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 8c9e5bbd0e959..00239044177f2 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -34,7 +34,7 @@ use datafusion::datasource::physical_plan::{AvroExec, CsvExec}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; -use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateMode}; +use datafusion::physical_plan::aggregates::{AggregateMode}; use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; @@ -471,30 +471,10 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ExprType::AggregateExpr(agg_node) => { let input_phy_expr: Vec> = agg_node.expr.iter() .map(|e| parse_physical_expr(e, registry, &physical_schema, extension_codec)).collect::>>()?; - let ordering_req: Vec = agg_node.ordering_req.iter() + let _ordering_req: Vec = agg_node.ordering_req.iter() .map(|e| parse_physical_sort_expr(e, registry, &physical_schema, extension_codec)).collect::>>()?; agg_node.aggregate_function.as_ref().map(|func| { match func { - AggregateFunction::AggrFunction(i) => { - let aggr_function = protobuf::AggregateFunction::try_from(*i) - .map_err( - |_| { - proto_error(format!( - "Received an unknown aggregate function: {i}" - )) - }, - )?; - - create_aggregate_expr( - &aggr_function.into(), - agg_node.distinct, - input_phy_expr.as_slice(), - &ordering_req, - &physical_schema, - name.to_string(), - agg_node.ignore_nulls, - ) - } AggregateFunction::UserDefinedAggrFunction(udaf_name) => { let agg_udf = match &agg_node.fun_definition { Some(buf) => extension_codec.try_decode_udaf(udaf_name, buf)?, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 639b82bba5023..4c5cf30d3f4a9 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -26,8 +26,6 @@ use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, Rank, RankType, RowNumber, TryCastExpr, WindowShift, - IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, OrderSensitiveArrayAgg, - Rank, RankType, RowNumber, TryCastExpr, WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -62,7 +60,7 @@ pub fn serialize_physical_aggr_expr( let name = a.fun().name().to_string(); let mut buf = Vec::new(); codec.try_encode_udaf(a.fun(), &mut buf)?; - return Ok(protobuf::PhysicalExprNode { + Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), @@ -73,35 +71,15 @@ pub fn serialize_physical_aggr_expr( fun_definition: (!buf.is_empty()).then_some(buf) }, )), - }); + }) + } else { + unreachable!("No other types exists besides AggergationFunctionExpr"); } - - let AggrFn { - inner: aggr_function, - distinct, - } = aggr_expr_to_aggr_fn(aggr_expr.as_ref())?; - - Ok(protobuf::PhysicalExprNode { - expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( - protobuf::PhysicalAggregateExprNode { - aggregate_function: Some( - physical_aggregate_expr_node::AggregateFunction::AggrFunction( - aggr_function as i32, - ), - ), - expr: expressions, - ordering_req, - distinct, - ignore_nulls: false, - fun_definition: None, - }, - )), - }) } fn serialize_physical_window_aggr_expr( aggr_expr: &dyn AggregateExpr, - window_frame: &WindowFrame, + _window_frame: &WindowFrame, codec: &dyn PhysicalExtensionCodec, ) -> Result<(physical_window_expr_node::WindowFunction, Option>)> { if let Some(a) = aggr_expr.as_any().downcast_ref::() { @@ -121,23 +99,9 @@ fn serialize_physical_window_aggr_expr( (!buf.is_empty()).then_some(buf), )) } else { - let AggrFn { inner, distinct } = aggr_expr_to_aggr_fn(aggr_expr)?; - if distinct { - return not_impl_err!( - "Distinct aggregate functions not supported in window expressions" + unreachable!( + "No other types exists besides AggergationFunctionExpr" ); - } - - if !window_frame.start_bound.is_unbounded() { - return Err(DataFusionError::Internal(format!( - "Unbounded start bound in WindowFrame = {window_frame}" - ))); - } - - Ok(( - physical_window_expr_node::WindowFunction::AggrFunction(inner as i32), - None, - )) } } @@ -254,30 +218,6 @@ pub fn serialize_physical_window_expr( }) } -struct AggrFn { - inner: protobuf::AggregateFunction, - distinct: bool, -} - -fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { - let aggr_expr = expr.as_any(); - - // TODO: remove OrderSensitiveArrayAgg - let inner = if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::ArrayAgg - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::Min - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::Max - } else { - return not_impl_err!("Aggregate function not supported: {expr:?}"); - }; - - Ok(AggrFn { - inner, - distinct: false, - }) -} pub fn serialize_physical_sort_exprs( sort_exprs: I, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fd987f1366854..c5014ba322628 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -963,6 +963,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { "max", false, false, + false, )?; let window = Arc::new(WindowAggExec::try_new( diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 4804752d8389f..4e8789ca162d8 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -24,7 +24,7 @@ use datafusion_common::{ use datafusion_expr::planner::PlannerResult; use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by}; use datafusion_expr::{ - expr, AggregateFunction, Expr, ExprSchemable, WindowFrame, WindowFunctionDefinition, + expr, Expr, ExprSchemable, WindowFrame, WindowFunctionDefinition, }; use datafusion_expr::{ expr::{ScalarFunction, Unnest}, @@ -35,7 +35,6 @@ use sqlparser::ast::{ FunctionArgExpr, FunctionArgumentClause, FunctionArgumentList, FunctionArguments, NullTreatment, ObjectName, OrderByExpr, WindowType, }; -use std::str::FromStr; use strum::IntoEnumIterator; /// Suggest a valid function based on an invalid input function name @@ -48,7 +47,6 @@ pub fn suggest_valid_function( // All aggregate functions and builtin window functions let mut funcs = Vec::new(); - funcs.extend(AggregateFunction::iter().map(|func| func.to_string())); funcs.extend(ctx.udaf_names()); funcs.extend(BuiltInWindowFunction::iter().map(|func| func.to_string())); funcs.extend(ctx.udwf_names()); @@ -59,7 +57,6 @@ pub fn suggest_valid_function( let mut funcs = Vec::new(); funcs.extend(ctx.udf_names()); - funcs.extend(AggregateFunction::iter().map(|func| func.to_string())); funcs.extend(ctx.udaf_names()); funcs @@ -321,29 +318,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; if let Ok(fun) = self.find_window_func(&name) { - let expr = match fun { - WindowFunctionDefinition::AggregateFunction(aggregate_fun) => { - let args = - self.function_args_to_expr(args, schema, planner_context)?; - - Expr::WindowFunction(expr::WindowFunction::new( - WindowFunctionDefinition::AggregateFunction(aggregate_fun), - args, - partition_by, - order_by, - window_frame, - null_treatment, - )) - } - _ => Expr::WindowFunction(expr::WindowFunction::new( + let expr = Expr::WindowFunction(expr::WindowFunction::new( fun, self.function_args_to_expr(args, schema, planner_context)?, partition_by, order_by, window_frame, null_treatment, - )), - }; + )); + return Ok(expr); } } else { @@ -371,32 +354,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { null_treatment, ))); } - - // next, aggregate built-ins - if let Ok(fun) = AggregateFunction::from_str(&name) { - let order_by = self.order_by_to_sort_expr( - &order_by, - schema, - planner_context, - true, - None, - )?; - let order_by = (!order_by.is_empty()).then_some(order_by); - let args = self.function_args_to_expr(args, schema, planner_context)?; - let filter: Option> = filter - .map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context)) - .transpose()? - .map(Box::new); - - return Ok(Expr::AggregateFunction(expr::AggregateFunction::new( - fun, - args, - distinct, - filter, - order_by, - null_treatment, - ))); - }; } // Could not find the relevant function, so return an error diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 15c447114819e..0e374cc8afaac 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -30,7 +30,7 @@ use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::expr::{Exists, InSubquery, Sort}; use datafusion::logical_expr::{ - aggregate_function, expr::find_df_window_func, Aggregate, BinaryExpr, Case, + expr::find_df_window_func, Aggregate, BinaryExpr, Case, EmptyRelation, Expr, ExprSchemable, LogicalPlan, Operator, Projection, Values, }; use substrait::proto::expression::subquery::set_predicate::PredicateOp; @@ -67,7 +67,6 @@ use datafusion::{ scalar::ScalarValue, }; use std::collections::{HashMap, HashSet}; -use std::str::FromStr; use std::sync::Arc; use substrait::proto::exchange_rel::ExchangeKind; use substrait::proto::expression::literal::user_defined::Val; @@ -1004,11 +1003,6 @@ pub async fn from_substrait_agg_func( Ok(Arc::new(Expr::AggregateFunction( expr::AggregateFunction::new_udf(fun, args, distinct, filter, order_by, None), ))) - } else if let Ok(fun) = aggregate_function::AggregateFunction::from_str(function_name) - { - Ok(Arc::new(Expr::AggregateFunction( - expr::AggregateFunction::new(fun, args, distinct, filter, order_by, None), - ))) } else { not_impl_err!( "Aggregate function {} is not supported: function anchor = {:?}", diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 8263209ffccc7..bd6e0e00491a1 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -48,7 +48,6 @@ use datafusion::common::{ }; use datafusion::common::{substrait_err, DFSchemaRef}; #[allow(unused_imports)] -use datafusion::logical_expr::aggregate_function; use datafusion::logical_expr::expr::{ AggregateFunctionDefinition, Alias, BinaryExpr, Case, Cast, GroupingSet, InList, InSubquery, Sort, WindowFunction, @@ -767,37 +766,6 @@ pub fn to_substrait_agg_measure( match expr { Expr::AggregateFunction(expr::AggregateFunction { func_def, args, distinct, filter, order_by, null_treatment: _, }) => { match func_def { - AggregateFunctionDefinition::BuiltIn (fun) => { - let sorts = if let Some(order_by) = order_by { - order_by.iter().map(|expr| to_substrait_sort_field(ctx, expr, schema, extensions)).collect::>>()? - } else { - vec![] - }; - let mut arguments: Vec = vec![]; - for arg in args { - arguments.push(FunctionArgument { arg_type: Some(ArgType::Value(to_substrait_rex(ctx, arg, schema, 0, extensions)?)) }); - } - let function_anchor = extensions.register_function(fun.to_string()); - Ok(Measure { - measure: Some(AggregateFunction { - function_reference: function_anchor, - arguments, - sorts, - output_type: None, - invocation: match distinct { - true => AggregationInvocation::Distinct as i32, - false => AggregationInvocation::All as i32, - }, - phase: AggregationPhase::Unspecified as i32, - args: vec![], - options: vec![], - }), - filter: match filter { - Some(f) => Some(to_substrait_rex(ctx, f, schema, 0, extensions)?), - None => None - } - }) - } AggregateFunctionDefinition::UDF(fun) => { let sorts = if let Some(order_by) = order_by { order_by.iter().map(|expr| to_substrait_sort_field(ctx, expr, schema, extensions)).collect::>>()?