From 38e057fc1a28b7ca62f7d32ba9ce007d9ff714bd Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Sun, 23 Feb 2025 21:11:24 +0300 Subject: [PATCH] clean-up --- .../src/enforce_sorting/sort_pushdown.rs | 23 +- datafusion/physical-plan/src/windows/mod.rs | 228 +++++++----------- 2 files changed, 96 insertions(+), 155 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 2abe259a5f55..adc1d35025f9 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -183,9 +183,6 @@ fn pushdown_requirement_to_children( let required_input_ordering = plan.required_input_ordering(); let request_child = required_input_ordering[0].clone().unwrap_or_default(); let child_plan = plan.children().swap_remove(0); - let avoid_pushdown = !plan - .equivalence_properties() - .ordering_satisfy_requirement(parent_required); match determine_children_requirement(parent_required, &request_child, child_plan) { @@ -194,21 +191,17 @@ fn pushdown_requirement_to_children( .then(|| LexRequirement::new(request_child.to_vec())); Ok(Some(vec![req])) } - RequirementsCompatibility::Compatible(mut adjusted) => { + RequirementsCompatibility::Compatible(adjusted) => { + // If parent requirements are more specific than output ordering of the window plan, + // then we can deduce that parent expects an ordering from the columns constructed + // by the window functions. If that's the case, we block the pushdown of sort operation. + let avoid_pushdown = !plan + .equivalence_properties() + .ordering_satisfy_requirement(parent_required); if avoid_pushdown { return Ok(None); } - adjusted = adjusted.map(|adj| { - let mut new = adj.to_vec(); - new.retain(|req| { - req.expr - .as_any() - .downcast_ref::() - .map(|col| col.index() < child_plan.schema().fields().len()) - .unwrap_or(true) - }); - LexRequirement::new(new) - }); + Ok(Some(vec![adjusted])) } RequirementsCompatibility::NonCompatible => Ok(None), diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 1679652b0c81..8b7bb7271305 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -346,213 +346,161 @@ pub(crate) fn window_equivalence_properties( let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema)) .extend(input.equivalence_properties().clone()); - let schema_len = schema.fields.len(); - let window_expr_indices = - ((schema_len - window_exprs.len())..schema_len).collect::>(); + let window_schema_len = schema.fields.len(); + let input_schema_len = window_schema_len - window_exprs.len(); + let window_expr_indices = (input_schema_len..window_schema_len).collect::>(); + for (i, expr) in window_exprs.iter().enumerate() { - let mut partition_by_order = vec![]; - for order in expr.partition_by() { + let partitioning_exprs = expr.partition_by(); + let no_partitioning = partitioning_exprs.is_empty(); + // In this part, we collect the columns defining partitioning, and construct the every + // `SortOptions` variations for them. Then, we will check each one if it satisfies + // the existing ordering provided by the input plan. + let mut partition_by_orders = vec![]; + for pb_order in partitioning_exprs { let all_orders_at_current_level = - all_possible_sort_options(Arc::clone(order)); - partition_by_order.push(all_orders_at_current_level); + all_possible_sort_options(Arc::clone(pb_order)); + partition_by_orders.push(all_orders_at_current_level); } let all_orders_cartesian = - partition_by_order.into_iter().multi_cartesian_product(); - let mut all_lexs = all_orders_cartesian + partition_by_orders.into_iter().multi_cartesian_product(); + let mut all_satisfied_lexs = all_orders_cartesian .into_iter() .map(LexOrdering::new) .collect::>(); - if !expr.partition_by().is_empty() - && all_lexs - .iter() - .all(|lex| !window_eq_properties.ordering_satisfy(lex)) - { + all_satisfied_lexs.retain(|lex| window_eq_properties.ordering_satisfy(lex)); + // If there is a partitioning, and its any possible ordering cannot satisfy the input + // plan's orderings, then we cannot further introduce new orderings for the window plan. + if !no_partitioning && all_satisfied_lexs.is_empty() { return window_eq_properties; } + let window_col = Column::new(expr.name(), i + input_schema_len); if let Some(udf_window_expr) = expr.as_any().downcast_ref::() { udf_window_expr.add_equal_orderings(&mut window_eq_properties); } else if let Some(aggregate_udf_window_expr) = expr.as_any().downcast_ref::() { + // Plain window frames -- frames having the unbounded starting point let window_expr_index = window_expr_indices[i]; - if aggregate_udf_window_expr - .get_window_frame() - .end_bound - .is_unbounded() - && aggregate_udf_window_expr.partition_by().is_empty() - { - window_eq_properties = window_eq_properties.with_constants(iter::once( - ConstExpr::new(Arc::new(Column::new( - expr.name(), - i + input.schema().fields().len(), - ))), - )) - } else if aggregate_udf_window_expr + let whole_table = aggregate_udf_window_expr .get_window_frame() .end_bound - .is_unbounded() - { - all_lexs.retain(|lex| window_eq_properties.ordering_satisfy(lex)); - let mut new_lexs = vec![]; - for lex in all_lexs.iter() { - let existing = lex.clone().to_vec(); - let new_partial_consts = all_possible_sort_options(Arc::new( - Column::new(expr.name(), i + input.schema().fields().len()), - )); - let new_with_partial_consts = new_partial_consts - .into_iter() - .map(|partial| { - let mut existing = existing.clone(); + .is_unbounded(); + + if whole_table && no_partitioning { + // Window function has a constant result across the table + window_eq_properties = window_eq_properties + .with_constants(iter::once(ConstExpr::new(Arc::new(window_col)))) + } else if whole_table { + // Window function results add a partial constantness to equivalences + let new_lexs = all_satisfied_lexs + .into_iter() + .flat_map(|lex| { + let orderings = lex.to_vec(); + let new_partial_consts = + all_possible_sort_options(Arc::new(window_col.clone())); + + new_partial_consts.into_iter().map(move |partial| { + let mut existing = orderings.clone(); existing.push(partial); - existing + LexOrdering::new(existing) }) - .collect::>(); - for new in new_with_partial_consts { - new_lexs.push(LexOrdering::new(new)); - } - } + }) + .collect::>(); + window_eq_properties.add_new_orderings(new_lexs); } else { + // Set monotonicity utilization aggregate_udf_window_expr .add_equal_orderings(&mut window_eq_properties, window_expr_index); } } else if let Some(sliding_expr) = expr.as_any().downcast_ref::() { - if sliding_expr.partition_by().is_empty() - && sliding_expr.get_window_frame().end_bound.is_unbounded() - && sliding_expr - .get_aggregate_expr() - .set_monotonicity() - .ne(&SetMonotonicity::NotMonotonic) + // Sliding window frames -- frames retracting the starting point + let unbounded_end = sliding_expr.get_window_frame().end_bound.is_unbounded(); + let set_monotonicity = sliding_expr.get_aggregate_expr().set_monotonicity(); + + if no_partitioning + && unbounded_end + && set_monotonicity.ne(&SetMonotonicity::NotMonotonic) { - let new_ordering = if sliding_expr - .get_aggregate_expr() - .set_monotonicity() - .eq(&SetMonotonicity::Increasing) - { + // Reverse set-monotonic cases with no partitioning + let new_ordering = if set_monotonicity.eq(&SetMonotonicity::Increasing) { vec![LexOrdering::new(vec![PhysicalSortExpr::new( - Arc::new(Column::new( - expr.name(), - i + input.schema().fields().len(), - )), + Arc::new(window_col), SortOptions::new(true, true), )])] } else { vec![LexOrdering::new(vec![PhysicalSortExpr::new( - Arc::new(Column::new( - expr.name(), - i + input.schema().fields().len(), - )), + Arc::new(window_col), SortOptions::new(false, true), )])] }; window_eq_properties.add_new_orderings(new_ordering); - } else if sliding_expr.get_window_frame().end_bound.is_unbounded() - && sliding_expr - .get_aggregate_expr() - .set_monotonicity() - .ne(&SetMonotonicity::NotMonotonic) + } else if unbounded_end && set_monotonicity.ne(&SetMonotonicity::NotMonotonic) { - all_lexs.retain(|lex| window_eq_properties.ordering_satisfy(lex)); - - if sliding_expr - .get_aggregate_expr() - .set_monotonicity() - .eq(&SetMonotonicity::Increasing) - { - for lex in all_lexs.iter() { - let mut existing = lex.clone().to_vec(); + // Reverse set-monotonic cases + if set_monotonicity.eq(&SetMonotonicity::Increasing) { + for lex in all_satisfied_lexs.into_iter() { + let mut existing = lex.to_vec(); existing.push(PhysicalSortExpr::new( - Arc::new(Column::new( - expr.name(), - i + input.schema().fields().len(), - )), + Arc::new(window_col.clone()), SortOptions::new(true, true), )); window_eq_properties.add_new_ordering(LexOrdering::new(existing)); } } else { - for lex in all_lexs.iter() { - let mut existing = lex.clone().to_vec(); + for lex in all_satisfied_lexs.into_iter() { + let mut existing = lex.to_vec(); existing.push(PhysicalSortExpr::new( - Arc::new(Column::new( - expr.name(), - i + input.schema().fields().len(), - )), + Arc::new(window_col.clone()), SortOptions::new(false, true), )); window_eq_properties.add_new_ordering(LexOrdering::new(existing)); } } - } else if sliding_expr.get_window_frame().is_causal() - && sliding_expr - .get_aggregate_expr() - .set_monotonicity() - .ne(&SetMonotonicity::NotMonotonic) + } + // Sliding window cases: + // If we ensure that the elements entering the frame is greater than the leaving ones + // (assuming increasing set-monotonicity), the window function result is increasing. + // However, wee need to check if the frame is causal. If it is not so, we cannot utilize + // set-monotonicity since the set shrinks as the frame is getting closer to the end. + else if sliding_expr.get_window_frame().is_causal() + && set_monotonicity.ne(&SetMonotonicity::NotMonotonic) { - let pb_expr = sliding_expr.get_aggregate_expr().expressions().to_vec(); - let pb_all_lex_combinations = pb_expr + let window_fn_args = + sliding_expr.get_aggregate_expr().expressions().to_vec(); + let args_all_lex_combinations = window_fn_args .into_iter() - .map(|pb| { - vec![ - PhysicalSortExpr::new( - Arc::clone(&pb), - SortOptions::new(false, false), - ), - PhysicalSortExpr::new( - Arc::clone(&pb), - SortOptions::new(false, true), - ), - PhysicalSortExpr::new( - Arc::clone(&pb), - SortOptions::new(true, false), - ), - PhysicalSortExpr::new(pb, SortOptions::new(true, true)), - ] - }) + .map(all_possible_sort_options) .collect::>(); - let mut perm = pb_all_lex_combinations + let mut args_all_lexs = args_all_lex_combinations .into_iter() .multi_cartesian_product(); + let mut asc = false; - if perm.any(|order| { - if window_eq_properties - .ordering_satisfy(&LexOrdering::new(order.clone())) - { - asc = !order.first().unwrap().options.descending; - true - } else { - false + if args_all_lexs.any(|order| { + if let Some(f) = order.first() { + asc = !f.options.descending; } + window_eq_properties.ordering_satisfy(&LexOrdering::new(order)) }) { - if sliding_expr - .get_aggregate_expr() - .set_monotonicity() - .eq(&SetMonotonicity::Increasing) - && (asc || sliding_expr.partition_by().is_empty()) + if set_monotonicity.eq(&SetMonotonicity::Increasing) + && (asc || no_partitioning) { let new_ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - Arc::new(Column::new( - expr.name(), - i + input.schema().fields().len(), - )), + Arc::new(window_col), SortOptions::new(false, false), )]); window_eq_properties.add_new_ordering(new_ordering); - } else if sliding_expr - .get_aggregate_expr() - .set_monotonicity() - .eq(&SetMonotonicity::Decreasing) - && (!asc || sliding_expr.partition_by().is_empty()) + } else if set_monotonicity.eq(&SetMonotonicity::Decreasing) + && (!asc || no_partitioning) { let new_ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - Arc::new(Column::new( - expr.name(), - i + input.schema().fields().len(), - )), + Arc::new(window_col), SortOptions::new(true, false), )]); window_eq_properties.add_new_ordering(new_ordering);