From ff1d2df69d946a49c02f5e3d13d08cac1a989fdb Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 26 Dec 2023 10:19:38 +0100 Subject: [PATCH] - Add `TreeNode.transform_with_payload()`, `TreeNode.transform_down_with_payload()` and `TreeNode.transform_up_with_payload()` - Refactor `SortPushDown` and `PlanWithRequitements` using `TreeNode.transform_down_with_payload()` - Refactor `ExprOrdering` using `TreeNode.transform_up_with_payload()` --- datafusion/common/src/tree_node.rs | 72 ++++++ .../enforce_distribution.rs | 219 ++++++------------ .../src/physical_optimizer/enforce_sorting.rs | 64 ++++- .../src/physical_optimizer/sort_pushdown.rs | 161 +------------ datafusion/physical-expr/src/equivalence.rs | 64 +++-- .../physical-expr/src/sort_properties.rs | 80 +------ 6 files changed, 245 insertions(+), 415 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 5da9636ffe185..8ec106df3e90c 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -114,6 +114,42 @@ pub trait TreeNode: Sized { self.transform_up(op) } + /// Transforms the tree using `f_down` pre-preorder (top-down) and `f_up` post-order + /// (bottom-up) traversals. The `f_down` and `f_up` closures take payloads that they + /// propagate down and up during the transformation. + /// + /// The `f_down` closure takes `FD` type payload from its parent and returns `Vec` + /// type payload to propagate down to its children. One `FD` element is propagated + /// down to each child. + /// + /// The `f_up` closure takes `FU` type payload from its children collected into a + /// `Vec` and returns `FU` type payload to propagate up to its parent. + fn transform_with_payload( + self, + f_down: &mut FD, + payload_down: PD, + f_up: &mut FU, + ) -> Result<(Self, PU)> + where + FD: FnMut(Self, PD) -> Result<(Transformed, Vec)>, + FU: FnMut(Self, Vec) -> Result<(Transformed, PU)>, + { + let (new_node, new_payload_down) = f_down(self, payload_down)?; + let mut new_payload_down_iter = new_payload_down.into_iter(); + let mut payload_up = vec![]; + let node_with_new_children = new_node.into().map_children(|node| { + let (new_node, p) = node.transform_with_payload( + f_down, + new_payload_down_iter.next().unwrap(), + f_up, + )?; + payload_up.push(p); + Ok(new_node) + })?; + let (new_node, new_payload_up) = f_up(node_with_new_children, payload_up)?; + Ok((new_node.into(), new_payload_up)) + } + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its /// children(Preorder Traversal). /// When the `op` does not apply to a given node, it is left unchanged. @@ -136,6 +172,23 @@ pub trait TreeNode: Sized { after_op.map_children(|node| node.transform_down_mut(op)) } + /// Transforms the tree using `f` pre-preorder (top-down) traversal. The `f_down` + /// closure takes payloads that it propagates down during the transformation. + /// + /// The `f_down` closure takes `FD` type payload from its parent and returns `Vec` + /// type payload to propagate down to its children. One `FD` element is propagated + /// down to each child. + fn transform_down_with_payload(self, f: &mut F, payload: P) -> Result + where + F: FnMut(Self, P) -> Result<(Transformed, Vec

)>, + { + let (new_node, new_payload) = f(self, payload)?; + let mut new_payload_iter = new_payload.into_iter(); + new_node.into().map_children(|node| { + node.transform_down_with_payload(f, new_payload_iter.next().unwrap()) + }) + } + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its /// children and then itself(Postorder Traversal). /// When the `op` does not apply to a given node, it is left unchanged. @@ -162,6 +215,25 @@ pub trait TreeNode: Sized { Ok(new_node) } + /// Transforms the tree using `f_up` post-order traversal. The `f_up` closure takes + /// payloads that it propagates up during the transformation. + /// + /// The `f_up` closure takes `FU` type payload from its children collected into a + /// `Vec` and returns `FU` type payload to propagate up to its parent. + fn transform_up_with_payload(self, f: &mut F) -> Result<(Self, P)> + where + F: FnMut(Self, Vec

) -> Result<(Transformed, P)>, + { + let mut payload = vec![]; + let node_with_new_children = self.map_children(|node| { + let (new_node, p) = node.transform_up_with_payload(f)?; + payload.push(p); + Ok(new_node) + })?; + let (new_node, new_payload) = f(node_with_new_children, payload)?; + Ok((new_node.into(), new_payload)) + } + /// Transform the tree node using the given [TreeNodeRewriter] /// It performs a depth first walk of an node and its children. /// diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 5106c0bb901e3..f51a7a996b4bd 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -198,10 +198,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { let adjusted = if top_down_join_key_reordering { // Run a top-down process to adjust input key ordering recursively - let plan_requirements = PlanWithKeyRequirements::new(plan); - let adjusted = - plan_requirements.transform_down(&adjust_input_keys_ordering)?; - adjusted.plan + plan.transform_down_with_payload(&mut adjust_input_keys_ordering, None)? } else { // Run a bottom-up process plan.transform_up(&|plan| { @@ -269,11 +266,17 @@ impl PhysicalOptimizerRule for EnforceDistribution { /// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements /// 5) For other types of operators, by default, pushdown the parent requirements to children. /// +type RequiredKeyOrdering = Option>>; + fn adjust_input_keys_ordering( - requirements: PlanWithKeyRequirements, -) -> Result> { - let parent_required = requirements.required_key_ordering.clone(); - let plan_any = requirements.plan.as_any(); + plan: Arc, + required_key_ordering: RequiredKeyOrdering, +) -> Result<( + Transformed>, + Vec, +)> { + let parent_required = required_key_ordering.unwrap_or_default().clone(); + let plan_any = plan.as_any(); let transformed = if let Some(HashJoinExec { left, right, @@ -299,13 +302,14 @@ fn adjust_input_keys_ordering( *null_equals_null, )?) as Arc) }; - Some(reorder_partitioned_join_keys( - requirements.plan.clone(), + let (new_plan, request_key_ordering) = reorder_partitioned_join_keys( + plan.clone(), &parent_required, on, vec![], &join_constructor, - )?) + )?; + (Some(new_plan), request_key_ordering) } PartitionMode::CollectLeft => { let new_right_request = match join_type { @@ -323,15 +327,11 @@ fn adjust_input_keys_ordering( }; // Push down requirements to the right side - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![None, new_right_request], - }) + (Some(plan.clone()), vec![None, new_right_request]) } PartitionMode::Auto => { // Can not satisfy, clear the current requirements and generate new empty requirements - Some(PlanWithKeyRequirements::new(requirements.plan.clone())) + (Some(plan.clone()), vec![None; plan.children().len()]) } } } else if let Some(CrossJoinExec { left, .. }) = @@ -339,14 +339,13 @@ fn adjust_input_keys_ordering( { let left_columns_len = left.schema().fields().len(); // Push down requirements to the right side - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![ + ( + Some(plan.clone()), + vec![ None, shift_right_required(&parent_required, left_columns_len), ], - }) + ) } else if let Some(SortMergeJoinExec { left, right, @@ -368,26 +367,30 @@ fn adjust_input_keys_ordering( *null_equals_null, )?) as Arc) }; - Some(reorder_partitioned_join_keys( - requirements.plan.clone(), + let (new_plan, request_key_ordering) = reorder_partitioned_join_keys( + plan.clone(), &parent_required, on, sort_options.clone(), &join_constructor, - )?) + )?; + (Some(new_plan), request_key_ordering) } else if let Some(aggregate_exec) = plan_any.downcast_ref::() { if !parent_required.is_empty() { match aggregate_exec.mode() { - AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys( - requirements.plan.clone(), - &parent_required, - aggregate_exec, - )?), - _ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())), + AggregateMode::FinalPartitioned => { + let (new_plan, request_key_ordering) = reorder_aggregate_keys( + plan.clone(), + &parent_required, + aggregate_exec, + )?; + (Some(new_plan), request_key_ordering) + } + _ => (Some(plan.clone()), vec![None; plan.children().len()]), } } else { // Keep everything unchanged - None + (None, vec![None; plan.children().len()]) } } else if let Some(proj) = plan_any.downcast_ref::() { let expr = proj.expr(); @@ -396,33 +399,29 @@ fn adjust_input_keys_ordering( // Construct a mapping from new name to the the orginal Column let new_required = map_columns_before_projection(&parent_required, expr); if new_required.len() == parent_required.len() { - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![Some(new_required.clone())], - }) + (Some(plan.clone()), vec![Some(new_required.clone())]) } else { // Can not satisfy, clear the current requirements and generate new empty requirements - Some(PlanWithKeyRequirements::new(requirements.plan.clone())) + (Some(plan.clone()), vec![None; plan.children().len()]) } } else if plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() { - Some(PlanWithKeyRequirements::new(requirements.plan.clone())) + (Some(plan.clone()), vec![None; plan.children().len()]) } else { // By default, push down the parent requirements to children - let children_len = requirements.plan.children().len(); - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![Some(parent_required.clone()); children_len], - }) + let children_len = plan.children().len(); + ( + Some(plan.clone()), + vec![Some(parent_required.clone()); children_len], + ) }; + let (transformed, request_key_ordering) = transformed; Ok(if let Some(transformed) = transformed { - Transformed::Yes(transformed) + (Transformed::Yes(transformed), request_key_ordering) } else { - Transformed::No(requirements) + (Transformed::No(plan), request_key_ordering) }) } @@ -432,7 +431,7 @@ fn reorder_partitioned_join_keys( on: &[(Column, Column)], sort_options: Vec, join_constructor: &F, -) -> Result +) -> Result<(Arc, Vec)> where F: Fn((Vec<(Column, Column)>, Vec)) -> Result>, { @@ -455,27 +454,21 @@ where new_sort_options.push(sort_options[new_positions[idx]]) } - Ok(PlanWithKeyRequirements { - plan: join_constructor((new_join_on, new_sort_options))?, - required_key_ordering: vec![], - request_key_ordering: vec![Some(left_keys), Some(right_keys)], - }) + Ok(( + join_constructor((new_join_on, new_sort_options))?, + vec![Some(left_keys), Some(right_keys)], + )) } else { - Ok(PlanWithKeyRequirements { - plan: join_plan, - required_key_ordering: vec![], - request_key_ordering: vec![Some(left_keys), Some(right_keys)], - }) + Ok((join_plan, vec![Some(left_keys), Some(right_keys)])) } } else { - Ok(PlanWithKeyRequirements { - plan: join_plan, - required_key_ordering: vec![], - request_key_ordering: vec![ + Ok(( + join_plan, + vec![ Some(join_key_pairs.left_keys), Some(join_key_pairs.right_keys), ], - }) + )) } } @@ -483,7 +476,7 @@ fn reorder_aggregate_keys( agg_plan: Arc, parent_required: &[Arc], agg_exec: &AggregateExec, -) -> Result { +) -> Result<(Arc, Vec)> { let output_columns = agg_exec .group_by() .expr() @@ -501,11 +494,15 @@ fn reorder_aggregate_keys( || !agg_exec.group_by().null_expr().is_empty() || physical_exprs_equal(&output_exprs, parent_required) { - Ok(PlanWithKeyRequirements::new(agg_plan)) + let request_key_ordering = vec![None; agg_plan.children().len()]; + Ok((agg_plan, request_key_ordering)) } else { let new_positions = expected_expr_positions(&output_exprs, parent_required); match new_positions { - None => Ok(PlanWithKeyRequirements::new(agg_plan)), + None => { + let request_key_ordering = vec![None; agg_plan.children().len()]; + Ok((agg_plan, request_key_ordering)) + } Some(positions) => { let new_partial_agg = if let Some(agg_exec) = agg_exec.input().as_any().downcast_ref::() @@ -577,11 +574,13 @@ fn reorder_aggregate_keys( .push((Arc::new(Column::new(name, idx)) as _, name.clone())) } // TODO merge adjacent Projections if there are - Ok(PlanWithKeyRequirements::new(Arc::new( - ProjectionExec::try_new(proj_exprs, new_final_agg)?, - ))) + let new_plan = + Arc::new(ProjectionExec::try_new(proj_exprs, new_final_agg)?); + let request_key_ordering = vec![None; new_plan.children().len()]; + Ok((new_plan, request_key_ordering)) } else { - Ok(PlanWithKeyRequirements::new(agg_plan)) + let request_key_ordering = vec![None; agg_plan.children().len()]; + Ok((agg_plan, request_key_ordering)) } } } @@ -1476,86 +1475,6 @@ struct JoinKeyPairs { right_keys: Vec>, } -#[derive(Debug, Clone)] -struct PlanWithKeyRequirements { - plan: Arc, - /// Parent required key ordering - required_key_ordering: Vec>, - /// The request key ordering to children - request_key_ordering: Vec>>>, -} - -impl PlanWithKeyRequirements { - fn new(plan: Arc) -> Self { - let children_len = plan.children().len(); - PlanWithKeyRequirements { - plan, - required_key_ordering: vec![], - request_key_ordering: vec![None; children_len], - } - } - - fn children(&self) -> Vec { - let plan_children = self.plan.children(); - assert_eq!(plan_children.len(), self.request_key_ordering.len()); - plan_children - .into_iter() - .zip(self.request_key_ordering.clone()) - .map(|(child, required)| { - let from_parent = required.unwrap_or_default(); - let length = child.children().len(); - PlanWithKeyRequirements { - plan: child, - required_key_ordering: from_parent, - request_key_ordering: vec![None; length], - } - }) - .collect() - } -} - -impl TreeNode for PlanWithKeyRequirements { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - let children = self.children(); - for child in children { - match op(&child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) - } - - fn map_children(self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - let children = self.children(); - if !children.is_empty() { - let new_children: Result> = - children.into_iter().map(transform).collect(); - - let children_plans = new_children? - .into_iter() - .map(|child| child.plan) - .collect::>(); - let new_plan = with_new_children_if_necessary(self.plan, children_plans)?; - Ok(PlanWithKeyRequirements { - plan: new_plan.into(), - required_key_ordering: self.required_key_ordering, - request_key_ordering: self.request_key_ordering, - }) - } else { - Ok(self) - } - } -} - /// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on #[cfg(feature = "parquet")] #[cfg(test)] diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 3a7ca258d3ab7..00b31464a971d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -41,7 +41,7 @@ use crate::error::Result; use crate::physical_optimizer::replace_with_order_preserving_variants::{ replace_with_order_preserving_variants, OrderPreservationContext, }; -use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown}; +use crate::physical_optimizer::sort_pushdown::pushdown_requirement_to_children; use crate::physical_optimizer::utils::{ add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, @@ -326,10 +326,64 @@ impl PhysicalOptimizerRule for EnforceSorting { // Execute a top-down traversal to exploit sort push-down opportunities // missed by the bottom-up traversal: - let mut sort_pushdown = SortPushDown::new(updated_plan.plan); - sort_pushdown.assign_initial_requirements(); - let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?; - Ok(adjusted.plan) + let plan = updated_plan.plan.transform_down_with_payload( + &mut |plan, required_ordering: Option>| { + let parent_required = required_ordering.as_deref().unwrap_or(&[]); + if let Some(sort_exec) = plan.as_any().downcast_ref::() { + let new_plan = if !plan + .equivalence_properties() + .ordering_satisfy_requirement(parent_required) + { + // If the current plan is a SortExec, modify it to satisfy parent requirements: + let mut new_plan = sort_exec.input().clone(); + add_sort_above(&mut new_plan, parent_required, sort_exec.fetch()); + new_plan + } else { + plan + }; + let required_ordering = new_plan + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs) + .unwrap_or_default(); + // Since new_plan is a SortExec, we can safely get the 0th index. + let child = new_plan.children().swap_remove(0); + if let Some(adjusted) = + pushdown_requirement_to_children(&child, &required_ordering)? + { + // Can push down requirements + Ok((Transformed::Yes(child), adjusted)) + } else { + // Can not push down requirements + let required_input_ordering = new_plan.required_input_ordering(); + Ok((Transformed::Yes(new_plan), required_input_ordering)) + } + } else { + // Executors other than SortExec + if plan + .equivalence_properties() + .ordering_satisfy_requirement(parent_required) + { + // Satisfies parent requirements, immediately return. + let required_input_ordering = plan.required_input_ordering(); + return Ok((Transformed::Yes(plan), required_input_ordering)); + } + // Can not satisfy the parent requirements, check whether the requirements can be pushed down: + if let Some(adjusted) = + pushdown_requirement_to_children(&plan, parent_required)? + { + Ok((Transformed::Yes(plan), adjusted)) + } else { + // Can not push down requirements, add new SortExec: + let mut new_plan = plan; + add_sort_above(&mut new_plan, parent_required, None); + let required_input_ordering = new_plan.required_input_ordering(); + Ok((Transformed::Yes(new_plan), required_input_ordering)) + } + } + }, + None, + )?; + Ok(plan) } fn name(&self) -> &str { diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index e3ec6907fec09..d06adb82c83ed 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -18,17 +18,15 @@ use std::sync::Arc; use crate::physical_optimizer::utils::{ - add_sort_above, is_limit, is_sort_preserving_merge, is_union, is_window, + is_limit, is_sort_preserving_merge, is_union, is_window, }; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::utils::calculate_join_output_ordering; use crate::physical_plan::joins::{HashJoinExec, SortMergeJoinExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use crate::physical_plan::ExecutionPlan; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{plan_err, DataFusionError, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; @@ -36,160 +34,7 @@ use datafusion_physical_expr::{ LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement, }; -/// This is a "data class" we use within the [`EnforceSorting`] rule to push -/// down [`SortExec`] in the plan. In some cases, we can reduce the total -/// computational cost by pushing down `SortExec`s through some executors. -/// -/// [`EnforceSorting`]: crate::physical_optimizer::enforce_sorting::EnforceSorting -#[derive(Debug, Clone)] -pub(crate) struct SortPushDown { - /// Current plan - pub plan: Arc, - /// Parent required sort ordering - required_ordering: Option>, - children_nodes: Vec, -} - -impl SortPushDown { - /// Creates an empty tree with empty `required_ordering`'s. - pub fn new(plan: Arc) -> Self { - let children = plan.children(); - Self { - plan, - required_ordering: None, - children_nodes: children.into_iter().map(Self::new).collect(), - } - } - - /// Assigns the ordering requirement of the root node to the its children. - pub fn assign_initial_requirements(&mut self) { - let reqs = self.plan.required_input_ordering(); - for (child, requirement) in self.children_nodes.iter_mut().zip(reqs) { - child.required_ordering = requirement; - } - } -} - -impl TreeNode for SortPushDown { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children_nodes { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) - } - fn map_children(self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - if self.children_nodes.is_empty() { - Ok(self) - } else { - let children_nodes = self - .children_nodes - .into_iter() - .map(transform) - .collect::>>()?; - - Ok(Self { - plan: with_new_children_if_necessary( - self.plan, - children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(), - required_ordering: self.required_ordering, - children_nodes, - }) - } - } -} - -pub(crate) fn pushdown_sorts( - mut requirements: SortPushDown, -) -> Result> { - let plan = &requirements.plan; - let parent_required = requirements.required_ordering.as_deref().unwrap_or(&[]); - - if let Some(sort_exec) = plan.as_any().downcast_ref::() { - if !plan - .equivalence_properties() - .ordering_satisfy_requirement(parent_required) - { - // If the current plan is a SortExec, modify it to satisfy parent requirements: - let mut new_plan = sort_exec.input().clone(); - add_sort_above(&mut new_plan, parent_required, sort_exec.fetch()); - requirements.plan = new_plan; - }; - let mut new_node = requirements; - - let required_ordering = new_node - .plan - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs) - .unwrap_or_default(); - // Since new_plan is a SortExec, we can safely get the 0th index. - let mut child = new_node.children_nodes.swap_remove(0); - if let Some(adjusted) = - pushdown_requirement_to_children(&child.plan, &required_ordering)? - { - for (c, o) in child.children_nodes.iter_mut().zip(adjusted) { - c.required_ordering = o; - } - // Can push down requirements - Ok(Transformed::Yes(SortPushDown { - plan: child.plan, - required_ordering: None, - children_nodes: child.children_nodes, - })) - } else { - // Can not push down requirements - let mut empty_node = SortPushDown::new(new_node.plan); - empty_node.assign_initial_requirements(); - Ok(Transformed::Yes(empty_node)) - } - } else { - // Executors other than SortExec - if plan - .equivalence_properties() - .ordering_satisfy_requirement(parent_required) - { - // Satisfies parent requirements, immediately return. - let reqs = requirements.plan.required_input_ordering(); - for (child, order) in requirements.children_nodes.iter_mut().zip(reqs) { - child.required_ordering = order; - } - return Ok(Transformed::Yes(requirements)); - } - // Can not satisfy the parent requirements, check whether the requirements can be pushed down: - if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_required)? { - for (c, o) in requirements.children_nodes.iter_mut().zip(adjusted) { - c.required_ordering = o; - } - Ok(Transformed::Yes(SortPushDown { - plan: requirements.plan, - required_ordering: None, - children_nodes: requirements.children_nodes, - })) - } else { - // Can not push down requirements, add new SortExec: - let mut new_plan = requirements.plan; - add_sort_above(&mut new_plan, parent_required, None); - let mut new_empty = SortPushDown::new(new_plan); - new_empty.assign_initial_requirements(); - // Can not push down requirements - Ok(Transformed::Yes(new_empty)) - } - } -} - -fn pushdown_requirement_to_children( +pub fn pushdown_requirement_to_children( plan: &Arc, parent_required: LexRequirementRef, ) -> Result>>>> { diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index defd7b5786a3e..f30fb69784bdf 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use crate::expressions::{Column, Literal}; use crate::physical_expr::deduplicate_physical_exprs; -use crate::sort_properties::{ExprOrdering, SortProperties}; +use crate::sort_properties::SortProperties; use crate::{ physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, @@ -1053,8 +1053,7 @@ impl EquivalenceProperties { /// /// Returns `true` if the specified ordering is satisfied, `false` otherwise. fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool { - let expr_ordering = self.get_expr_ordering(req.expr.clone()); - let ExprOrdering { expr, state, .. } = expr_ordering; + let (expr, state) = self.get_expr_ordering(req.expr.clone()); match state { SortProperties::Ordered(options) => { let sort_expr = PhysicalSortExpr { expr, options }; @@ -1448,7 +1447,7 @@ impl EquivalenceProperties { let ordered_exprs = search_indices .iter() .flat_map(|&idx| { - let ExprOrdering { expr, state, .. } = + let (expr, state) = eq_properties.get_expr_ordering(exprs[idx].clone()); if let SortProperties::Ordered(options) = state { Some((PhysicalSortExpr { expr, options }, idx)) @@ -1515,11 +1514,25 @@ impl EquivalenceProperties { /// /// Returns an `ExprOrdering` object containing the ordering information for /// the given expression. - pub fn get_expr_ordering(&self, expr: Arc) -> ExprOrdering { - ExprOrdering::new(expr.clone()) - .transform_up(&|expr| Ok(update_ordering(expr, self))) - // Guaranteed to always return `Ok`. - .unwrap() + pub fn get_expr_ordering( + &self, + expr: Arc, + ) -> (Arc, SortProperties) { + // The transform is designed to aid in the determination of ordering (represented + // by [`SortProperties`]) for a given [`PhysicalExpr`]. When analyzing the orderings + // of a [`PhysicalExpr`], the process begins by assigning the ordering of its leaf nodes. + // By propagating these leaf node orderings upwards in the expression tree, the overall + // ordering of the entire [`PhysicalExpr`] can be derived. + // + // This struct holds the necessary state information for each expression in the [`PhysicalExpr`]. + // It encapsulates the orderings (`state`) associated with the expression (`expr`), and + // orderings of the children expressions (`children_states`). The `state` of a parent + // expression is determined based on the states of its children expressions. + expr.transform_up_with_payload(&mut |expr, children_states| { + Ok(update_ordering(expr, children_states, self)) + }) + // Guaranteed to always return `Ok`. + .unwrap() } } @@ -1908,7 +1921,7 @@ fn updated_right_ordering_equivalence_class( } } -/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node. +/// Calculates the [`SortProperties`] of a given [`PhysicalExpr`] node. /// The node can either be a leaf node, or an intermediate node: /// - If it is a leaf node, we directly find the order of the node by looking /// at the given sort expression and equivalence properties if it is a `Column` @@ -1921,28 +1934,31 @@ fn updated_right_ordering_equivalence_class( /// sort expression emerges at that node immediately, discarding the recursive /// result coming from its children. fn update_ordering( - mut node: ExprOrdering, + expr: Arc, + children_states: Vec, eq_properties: &EquivalenceProperties, -) -> Transformed { +) -> (Transformed>, SortProperties) { // We have a Column, which is one of the two possible leaf node types: - let normalized_expr = eq_properties.eq_group.normalize_expr(node.expr.clone()); + let normalized_expr = eq_properties.eq_group.normalize_expr(expr.clone()); + + let state; if eq_properties.is_expr_constant(&normalized_expr) { - node.state = SortProperties::Singleton; + state = SortProperties::Singleton; } else if let Some(options) = eq_properties .normalized_oeq_class() .get_options(&normalized_expr) { - node.state = SortProperties::Ordered(options); - } else if !node.expr.children().is_empty() { + state = SortProperties::Ordered(options); + } else if !expr.children().is_empty() { // We have an intermediate (non-leaf) node, account for its children: - node.state = node.expr.get_ordering(&node.children_state()); - } else if node.expr.as_any().is::() { + state = expr.get_ordering(&children_states); + } else if expr.as_any().is::() { // We have a Literal, which is the other possible leaf node type: - node.state = node.expr.get_ordering(&[]); + state = expr.get_ordering(&[]); } else { - return Transformed::No(node); + return (Transformed::No(expr), Default::default()); } - Transformed::Yes(node) + (Transformed::Yes(expr), state) } #[cfg(test)] @@ -3992,12 +4008,12 @@ mod tests { .iter() .flat_map(|ordering| ordering.first().cloned()) .collect::>(); - let expr_ordering = eq_properties.get_expr_ordering(expr.clone()); + let (_, state) = eq_properties.get_expr_ordering(expr.clone()); let err_msg = format!( "expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}", - expr, expected, expr_ordering.state + expr, expected, state ); - assert_eq!(expr_ordering.state, expected, "{}", err_msg); + assert_eq!(state, expected, "{}", err_msg); } Ok(()) diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index caab85844fa35..e086fdc3cf576 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -15,15 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{ops::Neg, sync::Arc}; +use std::ops::Neg; use arrow_schema::SortOptions; -use crate::PhysicalExpr; -use datafusion_common::tree_node::{TreeNode, VisitRecursion}; -use datafusion_common::Result; - -/// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is insufficient +/// To propagate [`SortOptions`] across the [`crate::PhysicalExpr`], it is insufficient /// to simply use `Option`: There must be a differentiation between /// unordered columns and literal values, since literals may not break the ordering /// when they are used as a child of some binary expression when the other child has @@ -136,75 +132,3 @@ impl Neg for SortProperties { } } } - -/// The `ExprOrdering` struct is designed to aid in the determination of ordering (represented -/// by [`SortProperties`]) for a given [`PhysicalExpr`]. When analyzing the orderings -/// of a [`PhysicalExpr`], the process begins by assigning the ordering of its leaf nodes. -/// By propagating these leaf node orderings upwards in the expression tree, the overall -/// ordering of the entire [`PhysicalExpr`] can be derived. -/// -/// This struct holds the necessary state information for each expression in the [`PhysicalExpr`]. -/// It encapsulates the orderings (`state`) associated with the expression (`expr`), and -/// orderings of the children expressions (`children_states`). The [`ExprOrdering`] of a parent -/// expression is determined based on the [`ExprOrdering`] states of its children expressions. -#[derive(Debug)] -pub struct ExprOrdering { - pub expr: Arc, - pub state: SortProperties, - pub children: Vec, -} - -impl ExprOrdering { - /// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states - /// for `expr` and its children. - pub fn new(expr: Arc) -> Self { - let children = expr.children(); - Self { - expr, - state: Default::default(), - children: children.into_iter().map(Self::new).collect(), - } - } - - /// Get a reference to each child state. - pub fn children_state(&self) -> Vec { - self.children.iter().map(|c| c.state).collect() - } -} - -impl TreeNode for ExprOrdering { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - Ok(VisitRecursion::Continue) - } - - fn map_children(self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - if self.children.is_empty() { - Ok(self) - } else { - let children = self - .children - .into_iter() - .map(transform) - .collect::>>()?; - - Ok(Self { - expr: self.expr, - state: self.state, - children, - }) - } - } -}