diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 2e82ab2ab263..0fe08729828c 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -548,8 +548,9 @@ impl AggregateFunctionExpr { } /// Returns PhysicalSortExpr based on monotonicity of the function - pub fn natural_sort_expr(&self, aggr_func_idx: usize) -> Option { - // If the aggregate expressions are set-monotonic, the output data is naturally ordered with it. + pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option { + // If the aggregate expressions are set-monotonic, the output data is + // naturally ordered with it per group or partition. let monotonicity = self.set_monotonicity(); if !monotonicity.is_monotonic() { return None; diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index f22c693126bc..d94cdc1490dd 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -74,13 +74,16 @@ impl PlainAggregateWindowExpr { eq_properties: &mut EquivalenceProperties, window_expr_index: usize, ) { - let Some(expr) = self + if let Some(expr) = self .get_aggregate_expr() - .natural_sort_expr(window_expr_index) - else { - return; - }; - add_new_ordering_expr_with_partition_by(eq_properties, expr, &self.partition_by); + .get_result_ordering(window_expr_index) + { + add_new_ordering_expr_with_partition_by( + eq_properties, + expr, + &self.partition_by, + ); + } } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 4752b264d752..23967e78f07a 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -22,14 +22,11 @@ use std::ops::Range; use std::sync::Arc; use crate::aggregate::AggregateFunctionExpr; -use crate::window::standard::add_new_ordering_expr_with_partition_by; use crate::window::window_expr::AggregateWindowExpr; use crate::window::{ PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr, }; -use crate::{ - expressions::PhysicalSortExpr, reverse_order_bys, EquivalenceProperties, PhysicalExpr, -}; +use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::Field; @@ -70,20 +67,6 @@ impl SlidingAggregateWindowExpr { pub fn get_aggregate_expr(&self) -> &AggregateFunctionExpr { &self.aggregate } - - pub fn add_equal_orderings( - &self, - eq_properties: &mut EquivalenceProperties, - window_expr_index: usize, - ) { - let Some(expr) = self - .get_aggregate_expr() - .natural_sort_expr(window_expr_index) - else { - return; - }; - add_new_ordering_expr_with_partition_by(eq_properties, expr, &self.partition_by); - } } /// Incrementally update window function using the fact that batch is diff --git a/datafusion/physical-expr/src/window/standard.rs b/datafusion/physical-expr/src/window/standard.rs index 81038ae86e48..0c0bdf7e99cd 100644 --- a/datafusion/physical-expr/src/window/standard.rs +++ b/datafusion/physical-expr/src/window/standard.rs @@ -65,8 +65,7 @@ impl StandardWindowExpr { &self.expr } - /// Adds any equivalent orderings generated by the `self.expr` - /// to `builder`. + /// Adds any equivalent orderings generated by the `self.expr` to `builder`. /// /// If `self.expr` doesn't have an ordering, ordering equivalence properties /// are not updated. Otherwise, ordering equivalence properties are updated diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 11463b1c27d3..fc129fe241e5 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -667,7 +667,7 @@ impl AggregateExec { // if the aggregate function is set monotonic, add it into equivalence properties for (i, aggr_expr) in aggr_exprs.iter().enumerate() { let aggr_expr_index = aggr_expr_indices[i]; - if let Some(expr) = aggr_expr.natural_sort_expr(aggr_expr_index) { + if let Some(expr) = aggr_expr.get_result_ordering(aggr_expr_index) { if group_expr_mapping.map.is_empty() { eq_properties.add_new_ordering(LexOrdering::new(vec![expr])); } else if *input_order_mode != InputOrderMode::Linear { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index dcd2953acdab..24715c497fc3 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -354,11 +354,13 @@ pub(crate) fn window_equivalence_properties( { aggregate_udf_window_expr .add_equal_orderings(&mut window_eq_properties, window_expr_index); - } else if let Some(aggregate_udf_window_expr) = - expr.as_any().downcast_ref::() + } else if let Some(_) = expr.as_any().downcast_ref::() { - aggregate_udf_window_expr - .add_equal_orderings(&mut window_eq_properties, window_expr_index); + // TODO: SlidingAggregateWindowExpr cannot introduce a new ordering yet + // because we cannot determine whether the window's incoming elements + // are greater than its outgoing elements. However, we do have + // the necessary tools to support this, and we can extend support + // for these cases in the future. } } window_eq_properties