Skip to content

Commit

Permalink
It cannot possibly work
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop committed Jul 24, 2024
1 parent 0e1d2c4 commit 079b993
Show file tree
Hide file tree
Showing 25 changed files with 28 additions and 627 deletions.
26 changes: 2 additions & 24 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::physical_plan::unnest::UnnestExec;
use crate::physical_plan::values::ValuesExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan,
displayable, udaf, windows, AggregateExpr, ExecutionPlan,
ExecutionPlanProperties, InputOrderMode, Partitioning, PhysicalExpr, WindowExpr,
};

Expand Down Expand Up @@ -1812,7 +1812,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
e: &Expr,
name: impl Into<String>,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
_physical_input_schema: &Schema,
execution_props: &ExecutionProps,
) -> Result<AggregateExprWithOptionalArgs> {
match e {
Expand Down Expand Up @@ -1840,28 +1840,6 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
== NullTreatment::IgnoreNulls;

let (agg_expr, filter, order_by) = match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
let physical_sort_exprs = match order_by {
Some(exprs) => Some(create_physical_sort_exprs(
exprs,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
let agg_expr = aggregates::create_aggregate_expr(
fun,
*distinct,
&physical_args,
&ordering_reqs,
physical_input_schema,
name,
ignore_nulls,
)?;
(agg_expr, filter, physical_sort_exprs)
}
AggregateFunctionDefinition::UDF(fun) => {
let sort_exprs = order_by.clone().unwrap_or(vec![]);
let physical_sort_exprs = match order_by {
Expand Down
12 changes: 1 addition & 11 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion::physical_plan::{collect, InputOrderMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_expr::type_coercion::functions::data_types_with_aggregate_udf;
use datafusion_expr::{
BuiltInWindowFunction, WindowFrame, WindowFrameBound, WindowFrameUnits,
Expand Down Expand Up @@ -465,16 +464,7 @@ fn get_random_function(
let fn_name = window_fn_map.keys().collect::<Vec<_>>()[rand_fn_idx];
let (window_fn, args) = window_fn_map.values().collect::<Vec<_>>()[rand_fn_idx];
let mut args = args.clone();
if let WindowFunctionDefinition::AggregateFunction(f) = window_fn {
if !args.is_empty() {
// Do type coercion first argument
let a = args[0].clone();
let dt = a.data_type(schema.as_ref()).unwrap();
let sig = f.signature();
let coerced = coerce_types(f, &[dt], &sig).unwrap();
args[0] = cast(a, schema, coerced[0].clone()).unwrap();
}
} else if let WindowFunctionDefinition::AggregateUDF(udf) = window_fn {
if let WindowFunctionDefinition::AggregateUDF(udf) = window_fn {
if !args.is_empty() {
// Do type coercion first argument
let a = args[0].clone();
Expand Down
13 changes: 2 additions & 11 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::expr_fn::binary_expr;
use crate::logical_plan::Subquery;
use crate::utils::expr_to_columns;
use crate::{
aggregate_function, built_in_window_function, udaf, ExprSchemable, Operator,
built_in_window_function, udaf, ExprSchemable, Operator,
Signature,
};
use crate::{window_frame, Volatility};
Expand Down Expand Up @@ -697,7 +697,7 @@ impl WindowFunctionDefinition {
pub fn return_type(
&self,
input_expr_types: &[DataType],
input_expr_nullable: &[bool],
_input_expr_nullable: &[bool],
) -> Result<DataType> {
match self {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
Expand All @@ -713,7 +713,6 @@ impl WindowFunctionDefinition {
/// the signatures supported by the function `fun`.
pub fn signature(&self) -> Signature {
match self {
WindowFunctionDefinition::AggregateFunction(fun) => fun.signature(),
WindowFunctionDefinition::BuiltInWindowFunction(fun) => fun.signature(),
WindowFunctionDefinition::AggregateUDF(fun) => fun.signature().clone(),
WindowFunctionDefinition::WindowUDF(fun) => fun.signature().clone(),
Expand All @@ -725,7 +724,6 @@ impl WindowFunctionDefinition {
match self {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => fun.name(),
WindowFunctionDefinition::WindowUDF(fun) => fun.name(),
WindowFunctionDefinition::AggregateFunction(fun) => fun.name(),
WindowFunctionDefinition::AggregateUDF(fun) => fun.name(),
}
}
Expand All @@ -734,9 +732,6 @@ impl WindowFunctionDefinition {
impl fmt::Display for WindowFunctionDefinition {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
WindowFunctionDefinition::AggregateFunction(fun) => {
std::fmt::Display::fmt(fun, f)
}
WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
std::fmt::Display::fmt(fun, f)
}
Expand Down Expand Up @@ -798,10 +793,6 @@ pub fn find_df_window_func(name: &str) -> Option<WindowFunctionDefinition> {
Some(WindowFunctionDefinition::BuiltInWindowFunction(
built_in_function,
))
} else if let Ok(aggregate) =
aggregate_function::AggregateFunction::from_str(name.as_str())
{
Some(WindowFunctionDefinition::AggregateFunction(aggregate))
} else {
None
}
Expand Down
8 changes: 0 additions & 8 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,7 @@ impl ExprSchemable for Expr {
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
let nullability = args
.iter()
.map(|e| e.nullable(schema))
.collect::<Result<Vec<_>>>()?;
match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
fun.return_type(&data_types, &nullability)
}
AggregateFunctionDefinition::UDF(fun) => {
let new_types = data_types_with_aggregate_udf(&data_types, fun)
.map_err(|err| {
Expand Down Expand Up @@ -338,7 +331,6 @@ impl ExprSchemable for Expr {
Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
Expr::AggregateFunction(AggregateFunction { func_def, .. }) => {
match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => fun.nullable(),
// TODO: UDF should be able to customize nullability
AggregateFunctionDefinition::UDF(udf) if udf.name() == "count" => {
Ok(false)
Expand Down
2 changes: 0 additions & 2 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ mod udaf;
mod udf;
mod udwf;

pub mod aggregate_function;
pub mod conditional_expressions;
pub mod execution_props;
pub mod expr;
Expand All @@ -63,7 +62,6 @@ pub mod window_frame;
pub mod window_state;

pub use accumulator::Accumulator;
pub use aggregate_function::AggregateFunction;
pub use built_in_window_function::BuiltInWindowFunction;
pub use columnar_value::ColumnarValue;
pub use expr::{
Expand Down
10 changes: 0 additions & 10 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,6 @@ impl TreeNode for Expr {
)?
.map_data(
|(new_args, new_filter, new_order_by)| match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
Ok(Expr::AggregateFunction(AggregateFunction::new(
fun,
new_args,
distinct,
new_filter,
new_order_by,
null_treatment,
)))
}
AggregateFunctionDefinition::UDF(fun) => {
Ok(Expr::AggregateFunction(AggregateFunction::new_udf(
fun,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/type_coercion/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{AggregateFunction, Signature, TypeSignature};
use crate::TypeSignature;

use arrow::datatypes::{
DataType, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
Expand Down
7 changes: 2 additions & 5 deletions datafusion/functions-array/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ impl ExprPlanner for FieldAccessPlanner {
}

fn is_array_agg(agg_func: &datafusion_expr::expr::AggregateFunction) -> bool {
if let AggregateFunctionDefinition::UDF(udf) = &agg_func.func_def {
return udf.name() == "array_agg";
}

false
let AggregateFunctionDefinition::UDF(udf) = &agg_func.func_def;
return udf.name() == "array_agg";
}
57 changes: 2 additions & 55 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8};
use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, not,
type_coercion, AggregateFunction, AggregateUDF, Expr, ExprSchemable, LogicalPlan,
Operator, ScalarUDF, Signature, WindowFrame, WindowFrameBound, WindowFrameUnits,
AggregateUDF, Expr, ExprSchemable, LogicalPlan,
Operator, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
};

use crate::analyzer::AnalyzerRule;
Expand Down Expand Up @@ -400,24 +400,6 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
order_by,
null_treatment,
}) => match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
let new_expr = coerce_agg_exprs_for_signature(
&fun,
args,
self.schema,
&fun.signature(),
)?;
Ok(Transformed::yes(Expr::AggregateFunction(
expr::AggregateFunction::new(
fun,
new_expr,
distinct,
filter,
order_by,
null_treatment,
),
)))
}
AggregateFunctionDefinition::UDF(fun) => {
let new_expr = coerce_arguments_for_signature_with_aggregate_udf(
args,
Expand Down Expand Up @@ -448,14 +430,6 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
coerce_window_frame(window_frame, self.schema, &order_by)?;

let args = match &fun {
expr::WindowFunctionDefinition::AggregateFunction(fun) => {
coerce_agg_exprs_for_signature(
fun,
args,
self.schema,
&fun.signature(),
)?
}
expr::WindowFunctionDefinition::AggregateUDF(udf) => {
coerce_arguments_for_signature_with_aggregate_udf(
args,
Expand Down Expand Up @@ -691,33 +665,6 @@ fn coerce_arguments_for_fun(
}
}

/// Returns the coerced exprs for each `input_exprs`.
/// Get the coerced data type from `aggregate_rule::coerce_types` and add `try_cast` if the
/// data type of `input_exprs` need to be coerced.
fn coerce_agg_exprs_for_signature(
agg_fun: &AggregateFunction,
input_exprs: Vec<Expr>,
schema: &DFSchema,
signature: &Signature,
) -> Result<Vec<Expr>> {
if input_exprs.is_empty() {
return Ok(input_exprs);
}
let current_types = input_exprs
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;

let coerced_types =
type_coercion::aggregates::coerce_types(agg_fun, &current_types, signature)?;

input_exprs
.into_iter()
.enumerate()
.map(|(i, expr)| expr.cast_to(&coerced_types[i], schema))
.collect()
}

fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result<Case> {
// Given expressions like:
//
Expand Down
3 changes: 0 additions & 3 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,6 @@ fn agg_exprs_evaluation_result_on_empty_batch(
Expr::AggregateFunction(expr::AggregateFunction {
func_def, ..
}) => match func_def {
AggregateFunctionDefinition::BuiltIn(_fun) => {
Transformed::yes(Expr::Literal(ScalarValue::Null))
}
AggregateFunctionDefinition::UDF(fun) => {
if fun.name() == "count" {
Transformed::yes(Expr::Literal(ScalarValue::Int64(Some(
Expand Down
69 changes: 0 additions & 69 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,6 @@ fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result<bool> {
let mut aggregate_count = 0;
for expr in aggr_expr {
if let Expr::AggregateFunction(AggregateFunction {
func_def: AggregateFunctionDefinition::BuiltIn(_fun),
distinct,
args,
filter,
order_by,
null_treatment: _,
}) = expr
{
if filter.is_some() || order_by.is_some() {
return Ok(false);
}
aggregate_count += 1;
if *distinct {
for e in args {
fields_set.insert(e);
}
} else {
return Ok(false);
}
} else if let Expr::AggregateFunction(AggregateFunction {
func_def: AggregateFunctionDefinition::UDF(fun),
distinct,
args,
Expand Down Expand Up @@ -203,55 +183,6 @@ impl OptimizerRule for SingleDistinctToGroupBy {
let outer_aggr_exprs = aggr_expr
.into_iter()
.map(|aggr_expr| match aggr_expr {
Expr::AggregateFunction(AggregateFunction {
func_def: AggregateFunctionDefinition::BuiltIn(fun),
mut args,
distinct,
..
}) => {
if distinct {
if args.len() != 1 {
return internal_err!("DISTINCT aggregate should have exactly one argument");
}
let arg = args.swap_remove(0);

if group_fields_set.insert(arg.display_name()?) {
inner_group_exprs
.push(arg.alias(SINGLE_DISTINCT_ALIAS));
}
Ok(Expr::AggregateFunction(AggregateFunction::new(
fun,
vec![col(SINGLE_DISTINCT_ALIAS)],
false, // intentional to remove distinct here
None,
None,
None,
)))
// if the aggregate function is not distinct, we need to rewrite it like two phase aggregation
} else {
index += 1;
let alias_str = format!("alias{}", index);
inner_aggr_exprs.push(
Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args,
false,
None,
None,
None,
))
.alias(&alias_str),
);
Ok(Expr::AggregateFunction(AggregateFunction::new(
fun,
vec![col(&alias_str)],
false,
None,
None,
None,
)))
}
}
Expr::AggregateFunction(AggregateFunction {
func_def: AggregateFunctionDefinition::UDF(udf),
mut args,
Expand Down
2 changes: 0 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ mod row_hash;
mod topk;
mod topk_stream;

pub use datafusion_expr::AggregateFunction;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;

/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down
Loading

0 comments on commit 079b993

Please sign in to comment.