Skip to content

Commit

Permalink
clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Feb 23, 2025
1 parent f5e9789 commit 38e057f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 155 deletions.
23 changes: 8 additions & 15 deletions datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ fn pushdown_requirement_to_children(
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].clone().unwrap_or_default();
let child_plan = plan.children().swap_remove(0);
let avoid_pushdown = !plan
.equivalence_properties()
.ordering_satisfy_requirement(parent_required);

match determine_children_requirement(parent_required, &request_child, child_plan)
{
Expand All @@ -194,21 +191,17 @@ fn pushdown_requirement_to_children(
.then(|| LexRequirement::new(request_child.to_vec()));
Ok(Some(vec![req]))
}
RequirementsCompatibility::Compatible(mut adjusted) => {
RequirementsCompatibility::Compatible(adjusted) => {
// If parent requirements are more specific than output ordering of the window plan,
// then we can deduce that parent expects an ordering from the columns constructed
// by the window functions. If that's the case, we block the pushdown of sort operation.
let avoid_pushdown = !plan
.equivalence_properties()
.ordering_satisfy_requirement(parent_required);
if avoid_pushdown {
return Ok(None);
}
adjusted = adjusted.map(|adj| {
let mut new = adj.to_vec();
new.retain(|req| {
req.expr
.as_any()
.downcast_ref::<Column>()
.map(|col| col.index() < child_plan.schema().fields().len())
.unwrap_or(true)
});
LexRequirement::new(new)
});

Ok(Some(vec![adjusted]))
}
RequirementsCompatibility::NonCompatible => Ok(None),
Expand Down
228 changes: 88 additions & 140 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,213 +346,161 @@ pub(crate) fn window_equivalence_properties(
let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
.extend(input.equivalence_properties().clone());

let schema_len = schema.fields.len();
let window_expr_indices =
((schema_len - window_exprs.len())..schema_len).collect::<Vec<_>>();
let window_schema_len = schema.fields.len();
let input_schema_len = window_schema_len - window_exprs.len();
let window_expr_indices = (input_schema_len..window_schema_len).collect::<Vec<_>>();

for (i, expr) in window_exprs.iter().enumerate() {
let mut partition_by_order = vec![];
for order in expr.partition_by() {
let partitioning_exprs = expr.partition_by();
let no_partitioning = partitioning_exprs.is_empty();
// In this part, we collect the columns defining partitioning, and construct the every
// `SortOptions` variations for them. Then, we will check each one if it satisfies
// the existing ordering provided by the input plan.
let mut partition_by_orders = vec![];
for pb_order in partitioning_exprs {
let all_orders_at_current_level =
all_possible_sort_options(Arc::clone(order));
partition_by_order.push(all_orders_at_current_level);
all_possible_sort_options(Arc::clone(pb_order));
partition_by_orders.push(all_orders_at_current_level);
}
let all_orders_cartesian =
partition_by_order.into_iter().multi_cartesian_product();
let mut all_lexs = all_orders_cartesian
partition_by_orders.into_iter().multi_cartesian_product();
let mut all_satisfied_lexs = all_orders_cartesian
.into_iter()
.map(LexOrdering::new)
.collect::<Vec<_>>();
if !expr.partition_by().is_empty()
&& all_lexs
.iter()
.all(|lex| !window_eq_properties.ordering_satisfy(lex))
{
all_satisfied_lexs.retain(|lex| window_eq_properties.ordering_satisfy(lex));
// If there is a partitioning, and its any possible ordering cannot satisfy the input
// plan's orderings, then we cannot further introduce new orderings for the window plan.
if !no_partitioning && all_satisfied_lexs.is_empty() {
return window_eq_properties;
}

let window_col = Column::new(expr.name(), i + input_schema_len);
if let Some(udf_window_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
{
udf_window_expr.add_equal_orderings(&mut window_eq_properties);
} else if let Some(aggregate_udf_window_expr) =
expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
{
// Plain window frames -- frames having the unbounded starting point
let window_expr_index = window_expr_indices[i];
if aggregate_udf_window_expr
.get_window_frame()
.end_bound
.is_unbounded()
&& aggregate_udf_window_expr.partition_by().is_empty()
{
window_eq_properties = window_eq_properties.with_constants(iter::once(
ConstExpr::new(Arc::new(Column::new(
expr.name(),
i + input.schema().fields().len(),
))),
))
} else if aggregate_udf_window_expr
let whole_table = aggregate_udf_window_expr
.get_window_frame()
.end_bound
.is_unbounded()
{
all_lexs.retain(|lex| window_eq_properties.ordering_satisfy(lex));
let mut new_lexs = vec![];
for lex in all_lexs.iter() {
let existing = lex.clone().to_vec();
let new_partial_consts = all_possible_sort_options(Arc::new(
Column::new(expr.name(), i + input.schema().fields().len()),
));
let new_with_partial_consts = new_partial_consts
.into_iter()
.map(|partial| {
let mut existing = existing.clone();
.is_unbounded();

if whole_table && no_partitioning {
// Window function has a constant result across the table
window_eq_properties = window_eq_properties
.with_constants(iter::once(ConstExpr::new(Arc::new(window_col))))
} else if whole_table {
// Window function results add a partial constantness to equivalences
let new_lexs = all_satisfied_lexs
.into_iter()
.flat_map(|lex| {
let orderings = lex.to_vec();
let new_partial_consts =
all_possible_sort_options(Arc::new(window_col.clone()));

new_partial_consts.into_iter().map(move |partial| {
let mut existing = orderings.clone();
existing.push(partial);
existing
LexOrdering::new(existing)
})
.collect::<Vec<_>>();
for new in new_with_partial_consts {
new_lexs.push(LexOrdering::new(new));
}
}
})
.collect::<Vec<_>>();

window_eq_properties.add_new_orderings(new_lexs);
} else {
// Set monotonicity utilization
aggregate_udf_window_expr
.add_equal_orderings(&mut window_eq_properties, window_expr_index);
}
} else if let Some(sliding_expr) =
expr.as_any().downcast_ref::<SlidingAggregateWindowExpr>()
{
if sliding_expr.partition_by().is_empty()
&& sliding_expr.get_window_frame().end_bound.is_unbounded()
&& sliding_expr
.get_aggregate_expr()
.set_monotonicity()
.ne(&SetMonotonicity::NotMonotonic)
// Sliding window frames -- frames retracting the starting point
let unbounded_end = sliding_expr.get_window_frame().end_bound.is_unbounded();
let set_monotonicity = sliding_expr.get_aggregate_expr().set_monotonicity();

if no_partitioning
&& unbounded_end
&& set_monotonicity.ne(&SetMonotonicity::NotMonotonic)
{
let new_ordering = if sliding_expr
.get_aggregate_expr()
.set_monotonicity()
.eq(&SetMonotonicity::Increasing)
{
// Reverse set-monotonic cases with no partitioning
let new_ordering = if set_monotonicity.eq(&SetMonotonicity::Increasing) {
vec![LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new(
expr.name(),
i + input.schema().fields().len(),
)),
Arc::new(window_col),
SortOptions::new(true, true),
)])]
} else {
vec![LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new(
expr.name(),
i + input.schema().fields().len(),
)),
Arc::new(window_col),
SortOptions::new(false, true),
)])]
};
window_eq_properties.add_new_orderings(new_ordering);
} else if sliding_expr.get_window_frame().end_bound.is_unbounded()
&& sliding_expr
.get_aggregate_expr()
.set_monotonicity()
.ne(&SetMonotonicity::NotMonotonic)
} else if unbounded_end && set_monotonicity.ne(&SetMonotonicity::NotMonotonic)
{
all_lexs.retain(|lex| window_eq_properties.ordering_satisfy(lex));

if sliding_expr
.get_aggregate_expr()
.set_monotonicity()
.eq(&SetMonotonicity::Increasing)
{
for lex in all_lexs.iter() {
let mut existing = lex.clone().to_vec();
// Reverse set-monotonic cases
if set_monotonicity.eq(&SetMonotonicity::Increasing) {
for lex in all_satisfied_lexs.into_iter() {
let mut existing = lex.to_vec();
existing.push(PhysicalSortExpr::new(
Arc::new(Column::new(
expr.name(),
i + input.schema().fields().len(),
)),
Arc::new(window_col.clone()),
SortOptions::new(true, true),
));
window_eq_properties.add_new_ordering(LexOrdering::new(existing));
}
} else {
for lex in all_lexs.iter() {
let mut existing = lex.clone().to_vec();
for lex in all_satisfied_lexs.into_iter() {
let mut existing = lex.to_vec();
existing.push(PhysicalSortExpr::new(
Arc::new(Column::new(
expr.name(),
i + input.schema().fields().len(),
)),
Arc::new(window_col.clone()),
SortOptions::new(false, true),
));
window_eq_properties.add_new_ordering(LexOrdering::new(existing));
}
}
} else if sliding_expr.get_window_frame().is_causal()
&& sliding_expr
.get_aggregate_expr()
.set_monotonicity()
.ne(&SetMonotonicity::NotMonotonic)
}
// Sliding window cases:
// If we ensure that the elements entering the frame is greater than the leaving ones
// (assuming increasing set-monotonicity), the window function result is increasing.
// However, wee need to check if the frame is causal. If it is not so, we cannot utilize
// set-monotonicity since the set shrinks as the frame is getting closer to the end.
else if sliding_expr.get_window_frame().is_causal()
&& set_monotonicity.ne(&SetMonotonicity::NotMonotonic)
{
let pb_expr = sliding_expr.get_aggregate_expr().expressions().to_vec();
let pb_all_lex_combinations = pb_expr
let window_fn_args =
sliding_expr.get_aggregate_expr().expressions().to_vec();
let args_all_lex_combinations = window_fn_args
.into_iter()
.map(|pb| {
vec![
PhysicalSortExpr::new(
Arc::clone(&pb),
SortOptions::new(false, false),
),
PhysicalSortExpr::new(
Arc::clone(&pb),
SortOptions::new(false, true),
),
PhysicalSortExpr::new(
Arc::clone(&pb),
SortOptions::new(true, false),
),
PhysicalSortExpr::new(pb, SortOptions::new(true, true)),
]
})
.map(all_possible_sort_options)
.collect::<Vec<_>>();
let mut perm = pb_all_lex_combinations
let mut args_all_lexs = args_all_lex_combinations
.into_iter()
.multi_cartesian_product();

let mut asc = false;
if perm.any(|order| {
if window_eq_properties
.ordering_satisfy(&LexOrdering::new(order.clone()))
{
asc = !order.first().unwrap().options.descending;
true
} else {
false
if args_all_lexs.any(|order| {
if let Some(f) = order.first() {
asc = !f.options.descending;
}
window_eq_properties.ordering_satisfy(&LexOrdering::new(order))
}) {
if sliding_expr
.get_aggregate_expr()
.set_monotonicity()
.eq(&SetMonotonicity::Increasing)
&& (asc || sliding_expr.partition_by().is_empty())
if set_monotonicity.eq(&SetMonotonicity::Increasing)
&& (asc || no_partitioning)
{
let new_ordering = LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new(
expr.name(),
i + input.schema().fields().len(),
)),
Arc::new(window_col),
SortOptions::new(false, false),
)]);
window_eq_properties.add_new_ordering(new_ordering);
} else if sliding_expr
.get_aggregate_expr()
.set_monotonicity()
.eq(&SetMonotonicity::Decreasing)
&& (!asc || sliding_expr.partition_by().is_empty())
} else if set_monotonicity.eq(&SetMonotonicity::Decreasing)
&& (!asc || no_partitioning)
{
let new_ordering = LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new(
expr.name(),
i + input.schema().fields().len(),
)),
Arc::new(window_col),
SortOptions::new(true, false),
)]);
window_eq_properties.add_new_ordering(new_ordering);
Expand Down

0 comments on commit 38e057f

Please sign in to comment.