Skip to content

Commit

Permalink
simplify terms
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Jan 29, 2025
1 parent 4b946b3 commit 481b5b4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 33 deletions.
5 changes: 3 additions & 2 deletions datafusion/physical-expr/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,9 @@ impl AggregateFunctionExpr {
}

/// Returns PhysicalSortExpr based on monotonicity of the function
pub fn natural_sort_expr(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
// If the aggregate expressions are set-monotonic, the output data is naturally ordered with it.
pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
// If the aggregate expressions are set-monotonic, the output data is
// naturally ordered with it per group or partition.
let monotonicity = self.set_monotonicity();
if !monotonicity.is_monotonic() {
return None;
Expand Down
15 changes: 9 additions & 6 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ impl PlainAggregateWindowExpr {
eq_properties: &mut EquivalenceProperties,
window_expr_index: usize,
) {
let Some(expr) = self
if let Some(expr) = self
.get_aggregate_expr()
.natural_sort_expr(window_expr_index)
else {
return;
};
add_new_ordering_expr_with_partition_by(eq_properties, expr, &self.partition_by);
.get_result_ordering(window_expr_index)
{
add_new_ordering_expr_with_partition_by(
eq_properties,
expr,
&self.partition_by,
);
}
}
}

Expand Down
19 changes: 1 addition & 18 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ use std::ops::Range;
use std::sync::Arc;

use crate::aggregate::AggregateFunctionExpr;
use crate::window::standard::add_new_ordering_expr_with_partition_by;
use crate::window::window_expr::AggregateWindowExpr;
use crate::window::{
PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr,
};
use crate::{
expressions::PhysicalSortExpr, reverse_order_bys, EquivalenceProperties, PhysicalExpr,
};
use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};

use arrow::array::{Array, ArrayRef};
use arrow::datatypes::Field;
Expand Down Expand Up @@ -70,20 +67,6 @@ impl SlidingAggregateWindowExpr {
pub fn get_aggregate_expr(&self) -> &AggregateFunctionExpr {
&self.aggregate
}

pub fn add_equal_orderings(
&self,
eq_properties: &mut EquivalenceProperties,
window_expr_index: usize,
) {
let Some(expr) = self
.get_aggregate_expr()
.natural_sort_expr(window_expr_index)
else {
return;
};
add_new_ordering_expr_with_partition_by(eq_properties, expr, &self.partition_by);
}
}

/// Incrementally update window function using the fact that batch is
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-expr/src/window/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl StandardWindowExpr {
&self.expr
}

/// Adds any equivalent orderings generated by the `self.expr`
/// to `builder`.
/// Adds any equivalent orderings generated by the `self.expr` to `builder`.
///
/// If `self.expr` doesn't have an ordering, ordering equivalence properties
/// are not updated. Otherwise, ordering equivalence properties are updated
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ impl AggregateExec {
// if the aggregate function is set monotonic, add it into equivalence properties
for (i, aggr_expr) in aggr_exprs.iter().enumerate() {
let aggr_expr_index = aggr_expr_indices[i];
if let Some(expr) = aggr_expr.natural_sort_expr(aggr_expr_index) {
if let Some(expr) = aggr_expr.get_result_ordering(aggr_expr_index) {
if group_expr_mapping.map.is_empty() {
eq_properties.add_new_ordering(LexOrdering::new(vec![expr]));
} else if *input_order_mode != InputOrderMode::Linear {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,13 @@ pub(crate) fn window_equivalence_properties(
{
aggregate_udf_window_expr
.add_equal_orderings(&mut window_eq_properties, window_expr_index);
} else if let Some(aggregate_udf_window_expr) =
expr.as_any().downcast_ref::<SlidingAggregateWindowExpr>()
} else if let Some(_) = expr.as_any().downcast_ref::<SlidingAggregateWindowExpr>()
{
aggregate_udf_window_expr
.add_equal_orderings(&mut window_eq_properties, window_expr_index);
// TODO: SlidingAggregateWindowExpr cannot introduce a new ordering yet
// because we cannot determine whether the window's incoming elements
// are greater than its outgoing elements. However, we do have
// the necessary tools to support this, and we can extend support
// for these cases in the future.
}
}
window_eq_properties
Expand Down

0 comments on commit 481b5b4

Please sign in to comment.