From b2f630965ec567d0f48c2ebf81c6b42a2fd3e7b3 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 18 Apr 2024 09:14:48 +0200 Subject: [PATCH 1/2] Deprecate `TreeNode::transform_down_mut()` and `TreeNode::transform_up_mut()` methods --- .../examples/function_factory.rs | 2 +- datafusion-examples/examples/rewrite_expr.rs | 6 ++-- datafusion/common/src/tree_node.rs | 28 +++++++-------- .../physical_optimizer/coalesce_batches.rs | 2 +- .../combine_partial_final_agg.rs | 4 +-- .../physical_optimizer/convert_first_last.rs | 2 +- .../enforce_distribution.rs | 12 +++---- .../src/physical_optimizer/enforce_sorting.rs | 20 ++++++----- .../src/physical_optimizer/join_selection.rs | 8 ++--- .../limited_distinct_aggregation.rs | 4 +-- .../physical_optimizer/output_requirements.rs | 2 +- .../physical_optimizer/pipeline_checker.rs | 2 +- .../physical_optimizer/projection_pushdown.rs | 7 ++-- .../core/src/physical_optimizer/pruning.rs | 2 +- .../replace_with_order_preserving_variants.rs | 2 +- .../core/src/physical_optimizer/test_utils.rs | 2 +- .../physical_optimizer/topk_aggregation.rs | 4 +-- .../user_defined_scalar_functions.rs | 2 +- datafusion/expr/src/expr.rs | 2 +- datafusion/expr/src/expr_rewriter/mod.rs | 16 ++++----- datafusion/expr/src/expr_rewriter/order_by.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 8 ++--- datafusion/expr/src/logical_plan/tree_node.rs | 36 ++++--------------- .../src/analyzer/count_wildcard_rule.rs | 4 +-- .../src/analyzer/function_rewrite.rs | 4 +-- .../src/analyzer/inline_table_scan.rs | 4 +-- datafusion/optimizer/src/decorrelate.rs | 6 ++-- datafusion/optimizer/src/push_down_filter.rs | 2 +- .../optimizer/src/scalar_subquery_to_join.rs | 4 +-- .../physical-expr/src/equivalence/class.rs | 4 +-- .../physical-expr/src/equivalence/mod.rs | 2 +- .../src/equivalence/projection.rs | 2 +- .../src/equivalence/properties.rs | 2 +- .../physical-expr/src/expressions/case.rs | 4 +-- datafusion/physical-expr/src/utils/mod.rs | 6 ++-- .../src/joins/stream_join_utils.rs | 2 +- datafusion/physical-plan/src/joins/utils.rs | 2 +- .../physical-plan/src/recursive_query.rs | 4 +-- datafusion/sql/src/select.rs | 2 +- datafusion/sql/src/unparser/utils.rs | 2 +- datafusion/sql/src/utils.rs | 6 ++-- 41 files changed, 108 insertions(+), 129 deletions(-) diff --git a/datafusion-examples/examples/function_factory.rs b/datafusion-examples/examples/function_factory.rs index a7c8558c6da8..810361fa9c21 100644 --- a/datafusion-examples/examples/function_factory.rs +++ b/datafusion-examples/examples/function_factory.rs @@ -164,7 +164,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper { impl ScalarFunctionWrapper { // replaces placeholders such as $1 with actual arguments (args[0] fn replacement(expr: &Expr, args: &[Expr]) -> Result { - let result = expr.clone().transform(&|e| { + let result = expr.clone().transform(&mut |e| { let r = match e { Expr::Placeholder(placeholder) => { let placeholder_position = diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index dcebbb55fb66..b8abc93916ab 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -91,7 +91,7 @@ impl AnalyzerRule for MyAnalyzerRule { impl MyAnalyzerRule { fn analyze_plan(plan: LogicalPlan) -> Result { - plan.transform(&|plan| { + plan.transform(&mut |plan| { Ok(match plan { LogicalPlan::Filter(filter) => { let predicate = Self::analyze_expr(filter.predicate.clone())?; @@ -107,7 +107,7 @@ impl MyAnalyzerRule { } fn analyze_expr(expr: Expr) -> Result { - expr.transform(&|expr| { + expr.transform(&mut |expr| { // closure is invoked for all sub expressions Ok(match expr { Expr::Literal(ScalarValue::Int64(i)) => { @@ -163,7 +163,7 @@ impl OptimizerRule for MyOptimizerRule { /// use rewrite_expr to modify the expression tree. fn my_rewrite(expr: Expr) -> Result { - expr.transform(&|expr| { + expr.transform(&mut |expr| { // closure is invoked for all sub expressions Ok(match expr { Expr::Between(Between { diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index dff22d495958..1e347b63a53d 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -145,9 +145,9 @@ pub trait TreeNode: Sized { /// Convenience utility for writing optimizer rules: Recursively apply the /// given function `f` to the tree in a bottom-up (post-order) fashion. When /// `f` does not apply to a given node, it is left unchanged. - fn transform Result>>( + fn transform Result>>( self, - f: &F, + f: &mut F, ) -> Result> { self.transform_up(f) } @@ -155,9 +155,9 @@ pub trait TreeNode: Sized { /// Convenience utility for writing optimizer rules: Recursively apply the /// given function `f` to a node and then to its children (pre-order traversal). /// When `f` does not apply to a given node, it is left unchanged. - fn transform_down Result>>( + fn transform_down Result>>( self, - f: &F, + f: &mut F, ) -> Result> { handle_transform_recursion_down!(f(self), |c| c.transform_down(f)) } @@ -165,20 +165,21 @@ pub trait TreeNode: Sized { /// Convenience utility for writing optimizer rules: Recursively apply the /// given mutable function `f` to a node and then to its children (pre-order /// traversal). When `f` does not apply to a given node, it is left unchanged. + #[deprecated(since = "38.0.0", note = "Use `transform_down` instead")] fn transform_down_mut Result>>( self, f: &mut F, ) -> Result> { - handle_transform_recursion_down!(f(self), |c| c.transform_down_mut(f)) + self.transform_down(f) } /// Convenience utility for writing optimizer rules: Recursively apply the /// given function `f` to all children of a node, and then to the node itself /// (post-order traversal). When `f` does not apply to a given node, it is /// left unchanged. - fn transform_up Result>>( + fn transform_up Result>>( self, - f: &F, + f: &mut F, ) -> Result> { handle_transform_recursion_up!(self, |c| c.transform_up(f), f) } @@ -187,11 +188,12 @@ pub trait TreeNode: Sized { /// given mutable function `f` to all children of a node, and then to the /// node itself (post-order traversal). When `f` does not apply to a given /// node, it is left unchanged. + #[deprecated(since = "38.0.0", note = "Use `transform_up` instead")] fn transform_up_mut Result>>( self, f: &mut F, ) -> Result> { - handle_transform_recursion_up!(self, |c| c.transform_up_mut(f), f) + self.transform_up(f) } /// Transforms the tree using `f_down` while traversing the tree top-down @@ -200,8 +202,8 @@ pub trait TreeNode: Sized { /// /// Use this method if you want to start the `f_up` process right where `f_down` jumps. /// This can make the whole process faster by reducing the number of `f_up` steps. - /// If you don't need this, it's just like using `transform_down_mut` followed by - /// `transform_up_mut` on the same tree. + /// If you don't need this, it's just like using `transform_down` followed by + /// `transform_up` on the same tree. /// /// Consider the following tree structure: /// ```text @@ -439,9 +441,7 @@ impl TreeNodeRecursion { /// This struct is used by tree transformation APIs such as /// - [`TreeNode::rewrite`], /// - [`TreeNode::transform_down`], -/// - [`TreeNode::transform_down_mut`], /// - [`TreeNode::transform_up`], -/// - [`TreeNode::transform_up_mut`], /// - [`TreeNode::transform_down_up`] /// /// to control the transformation and return the transformed result. @@ -1466,7 +1466,7 @@ mod tests { #[test] fn $NAME() -> Result<()> { let tree = test_tree(); - assert_eq!(tree.transform_down_mut(&mut $F)?, $EXPECTED_TREE); + assert_eq!(tree.transform_down(&mut $F)?, $EXPECTED_TREE); Ok(()) } @@ -1478,7 +1478,7 @@ mod tests { #[test] fn $NAME() -> Result<()> { let tree = test_tree(); - assert_eq!(tree.transform_up_mut(&mut $F)?, $EXPECTED_TREE); + assert_eq!(tree.transform_up(&mut $F)?, $EXPECTED_TREE); Ok(()) } diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 7c0082037da0..b0b84aa6947b 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -54,7 +54,7 @@ impl PhysicalOptimizerRule for CoalesceBatches { } let target_batch_size = config.execution.batch_size; - plan.transform_up(&|plan| { + plan.transform_up(&mut |plan| { let plan_any = plan.as_any(); // The goal here is to detect operators that could produce small batches and only // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 1cba8f025895..4582d7293873 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_down(&|plan| { + plan.transform_down(&mut |plan| { let transformed = plan.as_any() .downcast_ref::() @@ -179,7 +179,7 @@ fn normalize_group_exprs(group_exprs: GroupExprsRef) -> GroupExprs { fn discard_column_index(group_expr: Arc) -> Arc { group_expr .clone() - .transform(&|expr| { + .transform(&mut |expr| { let normalized_form: Option> = match expr.as_any().downcast_ref::() { Some(column) => Some(Arc::new(Column::new(column.name(), 0))), diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs b/datafusion/core/src/physical_optimizer/convert_first_last.rs index 4102313d3126..c9cec4fe38d6 100644 --- a/datafusion/core/src/physical_optimizer/convert_first_last.rs +++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs @@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_up(&get_common_requirement_of_aggregate_input) + plan.transform_up(&mut get_common_requirement_of_aggregate_input) .data() } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c9c54a46bd1c..98a1ffae7fd2 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -197,12 +197,12 @@ impl PhysicalOptimizerRule for EnforceDistribution { // Run a top-down process to adjust input key ordering recursively let plan_requirements = PlanWithKeyRequirements::new_default(plan); let adjusted = plan_requirements - .transform_down(&adjust_input_keys_ordering) + .transform_down(&mut adjust_input_keys_ordering) .data()?; adjusted.plan } else { // Run a bottom-up process - plan.transform_up(&|plan| { + plan.transform_up(&mut |plan| { Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) }) .data()? @@ -211,7 +211,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { let distribution_context = DistributionContext::new_default(adjusted); // Distribution enforcement needs to be applied bottom-up. let distribution_context = distribution_context - .transform_up(&|distribution_context| { + .transform_up(&mut |distribution_context| { ensure_distribution(distribution_context, config) }) .data()?; @@ -1768,14 +1768,14 @@ pub(crate) mod tests { let plan_requirements = PlanWithKeyRequirements::new_default($PLAN.clone()); let adjusted = plan_requirements - .transform_down(&adjust_input_keys_ordering) + .transform_down(&mut adjust_input_keys_ordering) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. adjusted.plan } else { // Run reorder_join_keys_to_inputs rule - $PLAN.clone().transform_up(&|plan| { + $PLAN.clone().transform_up(&mut |plan| { Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) }) .data()? @@ -1783,7 +1783,7 @@ pub(crate) mod tests { // Then run ensure_distribution rule DistributionContext::new_default(adjusted) - .transform_up(&|distribution_context| { + .transform_up(&mut |distribution_context| { ensure_distribution(distribution_context, &config) }) .data() diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 5bf21c3dfab5..c6a8890e27c3 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -160,12 +160,12 @@ impl PhysicalOptimizerRule for EnforceSorting { let plan_requirements = PlanWithCorrespondingSort::new_default(plan); // Execute a bottom-up traversal to enforce sorting requirements, // remove unnecessary sorts, and optimize sort-sensitive operators: - let adjusted = plan_requirements.transform_up(&ensure_sorting)?.data; + let adjusted = plan_requirements.transform_up(&mut ensure_sorting)?.data; let new_plan = if config.optimizer.repartition_sorts { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); let parallel = plan_with_coalesce_partitions - .transform_up(¶llelize_sorts) + .transform_up(&mut parallelize_sorts) .data()?; parallel.plan } else { @@ -174,7 +174,7 @@ impl PhysicalOptimizerRule for EnforceSorting { let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); let updated_plan = plan_with_pipeline_fixer - .transform_up(&|plan_with_pipeline_fixer| { + .transform_up(&mut |plan_with_pipeline_fixer| { replace_with_order_preserving_variants( plan_with_pipeline_fixer, false, @@ -188,11 +188,13 @@ impl PhysicalOptimizerRule for EnforceSorting { // missed by the bottom-up traversal: let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); assign_initial_requirements(&mut sort_pushdown); - let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?.data; + let adjusted = sort_pushdown.transform_down(&mut pushdown_sorts)?.data; adjusted .plan - .transform_up(&|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?))) + .transform_up(&mut |plan| { + Ok(Transformed::yes(replace_with_partial_sort(plan)?)) + }) .data() } @@ -681,7 +683,7 @@ mod tests { { let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone()); let adjusted = plan_requirements - .transform_up(&ensure_sorting) + .transform_up(&mut ensure_sorting) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. @@ -690,7 +692,7 @@ mod tests { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); let parallel = plan_with_coalesce_partitions - .transform_up(¶llelize_sorts) + .transform_up(&mut parallelize_sorts) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. @@ -701,7 +703,7 @@ mod tests { let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); let updated_plan = plan_with_pipeline_fixer - .transform_up(&|plan_with_pipeline_fixer| { + .transform_up(&mut |plan_with_pipeline_fixer| { replace_with_order_preserving_variants( plan_with_pipeline_fixer, false, @@ -716,7 +718,7 @@ mod tests { let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); assign_initial_requirements(&mut sort_pushdown); sort_pushdown - .transform_down(&pushdown_sorts) + .transform_down(&mut pushdown_sorts) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 72174b0e6e2f..fa08a210cc6a 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -263,7 +263,7 @@ impl PhysicalOptimizerRule for JoinSelection { Box::new(hash_join_swap_subrule), ]; let new_plan = plan - .transform_up(&|p| apply_subrules(p, &subrules, config)) + .transform_up(&mut |p| apply_subrules(p, &subrules, config)) .data()?; // Next, we apply another subrule that tries to optimize joins using any // statistics their inputs might have. @@ -280,7 +280,7 @@ impl PhysicalOptimizerRule for JoinSelection { let collect_threshold_byte_size = config.hash_join_single_partition_threshold; let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; new_plan - .transform_up(&|plan| { + .transform_up(&mut |plan| { statistical_join_selection_subrule( plan, collect_threshold_byte_size, @@ -842,13 +842,13 @@ mod tests_statistical { Box::new(hash_join_swap_subrule), ]; let new_plan = plan - .transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new())) + .transform_up(&mut |p| apply_subrules(p, &subrules, &ConfigOptions::new())) .data()?; // TODO: End state payloads will be checked here. let config = ConfigOptions::new().optimizer; let collect_left_threshold = config.hash_join_single_partition_threshold; let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; - let _ = new_plan.transform_up(&|plan| { + let _ = new_plan.transform_up(&mut |plan| { statistical_join_selection_subrule( plan, collect_left_threshold, diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index 9509d4e4c828..db4ea3396eed 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -138,7 +138,7 @@ impl LimitedDistinctAggregation { rewrite_applicable = false; Ok(Transformed::no(plan)) }; - let child = child.clone().transform_down_mut(&mut closure).data().ok()?; + let child = child.clone().transform_down(&mut closure).data().ok()?; if is_global_limit { return Some(Arc::new(GlobalLimitExec::new( child, @@ -163,7 +163,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { config: &ConfigOptions, ) -> Result> { if config.optimizer.enable_distinct_aggregation_soft_limit { - plan.transform_down(&|plan| { + plan.transform_down(&mut |plan| { Ok( if let Some(plan) = LimitedDistinctAggregation::transform_limit(plan.clone()) diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 829d523c990c..7be536559d8c 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -198,7 +198,7 @@ impl PhysicalOptimizerRule for OutputRequirements { match self.mode { RuleMode::Add => require_top_ordering(plan), RuleMode::Remove => plan - .transform_up(&|plan| { + .transform_up(&mut |plan| { if let Some(sort_req) = plan.as_any().downcast_ref::() { diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 1dc8bc5042bf..f853327bb395 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -50,7 +50,7 @@ impl PhysicalOptimizerRule for PipelineChecker { plan: Arc, config: &ConfigOptions, ) -> Result> { - plan.transform_up(&|p| check_finiteness_requirements(p, &config.optimizer)) + plan.transform_up(&mut |p| check_finiteness_requirements(p, &config.optimizer)) .data() } diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index ed445e6d48b8..e0da9b1dbacc 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -75,7 +75,8 @@ impl PhysicalOptimizerRule for ProjectionPushdown { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_down(&remove_unnecessary_projections).data() + plan.transform_down(&mut remove_unnecessary_projections) + .data() } fn name(&self) -> &str { @@ -977,7 +978,7 @@ fn update_expr( let new_expr = expr .clone() - .transform_up_mut(&mut |expr: Arc| { + .transform_up(&mut |expr: Arc| { if state == RewriteState::RewrittenInvalid { return Ok(Transformed::no(expr)); } @@ -1120,7 +1121,7 @@ fn new_columns_for_join_on( // Rewrite all columns in `on` (*on) .clone() - .transform(&|expr| { + .transform(&mut |expr| { if let Some(column) = expr.as_any().downcast_ref::() { // Find the column in the projection expressions let new_column = projection_exprs diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index d8a3814d77e1..6b0aba541613 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1175,7 +1175,7 @@ fn rewrite_column_expr( column_old: &phys_expr::Column, column_new: &phys_expr::Column, ) -> Result> { - e.transform(&|expr| { + e.transform(&mut |expr| { if let Some(column) = expr.as_any().downcast_ref::() { if column == column_old { return Ok(Transformed::yes(Arc::new(column_new.clone()))); diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index ad19215fbf67..6db431c0aa58 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -395,7 +395,7 @@ mod tests { // Run the rule top-down let config = SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT); let plan_with_pipeline_fixer = OrderPreservationContext::new_default(physical_plan); - let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options())).data().and_then(check_integrity)?; + let parallel = plan_with_pipeline_fixer.transform_up(&mut |plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options())).data().and_then(check_integrity)?; let optimized_physical_plan = parallel.plan; // Get string representation of the plan diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 2e6e3af5dfe2..6c75c3490eb4 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -380,7 +380,7 @@ pub fn sort_exec( /// replaced with direct plan equality checks. pub fn check_integrity(context: PlanContext) -> Result> { context - .transform_up(&|node| { + .transform_up(&mut |node| { let children_plans = node.plan.children(); assert_eq!(node.children.len(), children_plans.len()); for (child_plan, child_node) in diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index c47e5e25d143..f7f0285fcb93 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -120,7 +120,7 @@ impl TopKAggregation { } Ok(Transformed::no(plan)) }; - let child = child.clone().transform_down_mut(&mut closure).data().ok()?; + let child = child.clone().transform_down(&mut closure).data().ok()?; let sort = SortExec::new(sort.expr().to_vec(), child) .with_fetch(sort.fetch()) .with_preserve_partitioning(sort.preserve_partitioning()); @@ -141,7 +141,7 @@ impl PhysicalOptimizerRule for TopKAggregation { config: &ConfigOptions, ) -> Result> { if config.optimizer.enable_topk_aggregation { - plan.transform_down(&|plan| { + plan.transform_down(&mut |plan| { Ok( if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) { Transformed::yes(plan) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 86be887198ae..efbe390c0933 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -817,7 +817,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper { impl ScalarFunctionWrapper { // replaces placeholders with actual arguments fn replacement(expr: &Expr, args: &[Expr]) -> Result { - let result = expr.clone().transform(&|e| { + let result = expr.clone().transform(&mut |e| { let r = match e { Expr::Placeholder(placeholder) => { let placeholder_position = diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index cffb58dadd8e..f8c3d2b622fb 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1250,7 +1250,7 @@ impl Expr { /// For example, gicen an expression like ` = $0` will infer `$0` to /// have type `int32`. pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result { - self.transform(&|mut expr| { + self.transform(&mut |mut expr| { // Default to assuming the arguments are the same type if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut expr { rewrite_placeholder(left.as_mut(), right.as_ref(), schema)?; diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index c11619fc0ea2..96e048fa583b 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -62,7 +62,7 @@ pub trait FunctionRewrite { /// Recursively call [`Column::normalize_with_schemas`] on all [`Column`] expressions /// in the `expr` expression tree. pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { - expr.transform(&|expr| { + expr.transform(&mut |expr| { Ok({ if let Expr::Column(c) = expr { let col = LogicalPlanBuilder::normalize(plan, c)?; @@ -91,7 +91,7 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( return Ok(Expr::Unnest(Unnest { expr: Box::new(e) })); } - expr.transform(&|expr| { + expr.transform(&mut |expr| { Ok({ if let Expr::Column(c) = expr { let col = @@ -119,7 +119,7 @@ pub fn normalize_cols( /// Recursively replace all [`Column`] expressions in a given expression tree with /// `Column` expressions provided by the hash map argument. pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Result { - expr.transform(&|expr| { + expr.transform(&mut |expr| { Ok({ if let Expr::Column(c) = &expr { match replace_map.get(c) { @@ -140,7 +140,7 @@ pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Resul /// For example, if there were expressions like `foo.bar` this would /// rewrite it to just `bar`. pub fn unnormalize_col(expr: Expr) -> Expr { - expr.transform(&|expr| { + expr.transform(&mut |expr| { Ok({ if let Expr::Column(c) = expr { let col = Column { @@ -190,7 +190,7 @@ pub fn unnormalize_cols(exprs: impl IntoIterator) -> Vec { /// Recursively remove all the ['OuterReferenceColumn'] and return the inside Column /// in the expression tree. pub fn strip_outer_reference(expr: Expr) -> Expr { - expr.transform(&|expr| { + expr.transform(&mut |expr| { Ok({ if let Expr::OuterReferenceColumn(_, col) = expr { Transformed::yes(Expr::Column(col)) @@ -318,7 +318,7 @@ mod test { #[test] fn rewriter_rewrite() { // rewrites all "foo" string literals to "bar" - let transformer = |expr: Expr| -> Result> { + let mut transformer = |expr: Expr| -> Result> { match expr { Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => { let utf8_val = if utf8_val == "foo" { @@ -336,7 +336,7 @@ mod test { // rewrites "foo" --> "bar" let rewritten = col("state") .eq(lit("foo")) - .transform(&transformer) + .transform(&mut transformer) .data() .unwrap(); assert_eq!(rewritten, col("state").eq(lit("bar"))); @@ -344,7 +344,7 @@ mod test { // doesn't rewrite let rewritten = col("state") .eq(lit("baz")) - .transform(&transformer) + .transform(&mut transformer) .data() .unwrap(); assert_eq!(rewritten, col("state").eq(lit("baz"))); diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs b/datafusion/expr/src/expr_rewriter/order_by.rs index 2fb522b979b0..dff179f3db00 100644 --- a/datafusion/expr/src/expr_rewriter/order_by.rs +++ b/datafusion/expr/src/expr_rewriter/order_by.rs @@ -84,7 +84,7 @@ fn rewrite_in_terms_of_projection( ) -> Result { // assumption is that each item in exprs, such as "b + c" is // available as an output column named "b + c" - expr.transform(&|expr| { + expr.transform(&mut |expr| { // search for unnormalized names first such as "c1" (such as aliases) if let Some(found) = proj_exprs.iter().find(|a| (**a) == expr) { let col = Expr::Column( diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index dbff5046013b..c5dd1f04641d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -554,7 +554,7 @@ impl LogicalPlan { // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) let predicate = predicate - .transform_down(&|expr| { + .transform_down(&mut |expr| { match expr { Expr::Exists { .. } | Expr::ScalarSubquery(_) @@ -1017,10 +1017,10 @@ impl LogicalPlan { self, param_values: &ParamValues, ) -> Result { - self.transform_up_with_subqueries(&|plan| { + self.transform_up_with_subqueries(&mut |plan| { let schema = plan.schema().clone(); plan.map_expressions(|e| { - e.infer_placeholder_types(&schema)?.transform_up(&|e| { + e.infer_placeholder_types(&schema)?.transform_up(&mut |e| { if let Expr::Placeholder(Placeholder { id, .. }) = e { let value = param_values.get_placeholders_with_values(&id)?; Ok(Transformed::yes(Expr::Literal(value))) @@ -3170,7 +3170,7 @@ digraph { // after transformation, because plan is not the same anymore, // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs let plan = plan - .transform(&|plan| match plan { + .transform(&mut |plan| match plan { LogicalPlan::TableScan(table) => { let filter = Filter::try_new( external_filter.clone(), diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 48f047c070dd..22a369579950 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -797,9 +797,9 @@ impl LogicalPlan { /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`, /// including subqueries that may appear in expressions such as `IN (SELECT /// ...)`. - pub fn transform_with_subqueries Result>>( + pub fn transform_with_subqueries Result>>( self, - f: &F, + f: &mut F, ) -> Result> { self.transform_up_with_subqueries(f) } @@ -807,43 +807,21 @@ impl LogicalPlan { /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`, /// including subqueries that may appear in expressions such as `IN (SELECT /// ...)`. - pub fn transform_down_with_subqueries Result>>( - self, - f: &F, - ) -> Result> { - handle_transform_recursion_down!(f(self), |c| c.transform_down_with_subqueries(f)) - } - - /// Similarly to [`Self::transform_down_mut`], rewrites this node and its inputs using `f`, - /// including subqueries that may appear in expressions such as `IN (SELECT - /// ...)`. - pub fn transform_down_mut_with_subqueries< - F: FnMut(Self) -> Result>, - >( + pub fn transform_down_with_subqueries Result>>( self, f: &mut F, ) -> Result> { - handle_transform_recursion_down!(f(self), |c| c - .transform_down_mut_with_subqueries(f)) + handle_transform_recursion_down!(f(self), |c| c.transform_down_with_subqueries(f)) } /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`, /// including subqueries that may appear in expressions such as `IN (SELECT /// ...)`. - pub fn transform_up_with_subqueries Result>>( - self, - f: &F, - ) -> Result> { - handle_transform_recursion_up!(self, |c| c.transform_up_with_subqueries(f), f) - } - - pub fn transform_up_mut_with_subqueries< - F: FnMut(Self) -> Result>, - >( + pub fn transform_up_with_subqueries Result>>( self, f: &mut F, ) -> Result> { - handle_transform_recursion_up!(self, |c| c.transform_up_mut_with_subqueries(f), f) + handle_transform_recursion_up!(self, |c| c.transform_up_with_subqueries(f), f) } /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`, @@ -895,7 +873,7 @@ impl LogicalPlan { mut f: F, ) -> Result> { self.map_expressions(|expr| { - expr.transform_down_mut(&mut |expr| match expr { + expr.transform_down(&mut |expr| match expr { Expr::Exists(Exists { subquery, negated }) => { f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { LogicalPlan::Subquery(subquery) => { diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index 080ec074d3c3..e3e9f959e042 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -41,7 +41,7 @@ impl CountWildcardRule { impl AnalyzerRule for CountWildcardRule { fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - plan.transform_down_with_subqueries(&analyze_internal) + plan.transform_down_with_subqueries(&mut analyze_internal) .data() } @@ -78,7 +78,7 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { let name_preserver = NamePreserver::new(&plan); plan.map_expressions(|expr| { let original_name = name_preserver.save(&expr)?; - let transformed_expr = expr.transform_up(&|expr| match expr { + let transformed_expr = expr.transform_up(&mut |expr| match expr { Expr::WindowFunction(mut window_function) if is_count_star_window_aggregate(&window_function) => { diff --git a/datafusion/optimizer/src/analyzer/function_rewrite.rs b/datafusion/optimizer/src/analyzer/function_rewrite.rs index 4dd3222a32cf..38d018d1627d 100644 --- a/datafusion/optimizer/src/analyzer/function_rewrite.rs +++ b/datafusion/optimizer/src/analyzer/function_rewrite.rs @@ -64,7 +64,7 @@ impl ApplyFunctionRewrites { let original_name = name_preserver.save(&expr)?; // recursively transform the expression, applying the rewrites at each step - let transformed_expr = expr.transform_up(&|expr| { + let transformed_expr = expr.transform_up(&mut |expr| { let mut result = Transformed::no(expr); for rewriter in self.function_rewrites.iter() { result = result.transform_data(|expr| { @@ -85,7 +85,7 @@ impl AnalyzerRule for ApplyFunctionRewrites { } fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) -> Result { - plan.transform_up_with_subqueries(&|plan| self.rewrite_plan(plan, options)) + plan.transform_up_with_subqueries(&mut |plan| self.rewrite_plan(plan, options)) .map(|res| res.data) } } diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index cc5f870a9c73..a3efafe6e314 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -38,7 +38,7 @@ impl InlineTableScan { impl AnalyzerRule for InlineTableScan { fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - plan.transform_up(&analyze_internal).data() + plan.transform_up(&mut analyze_internal).data() } fn name(&self) -> &str { @@ -49,7 +49,7 @@ impl AnalyzerRule for InlineTableScan { fn analyze_internal(plan: LogicalPlan) -> Result> { // rewrite any subqueries in the plan first let transformed_plan = - plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?; + plan.map_subqueries(|plan| plan.transform_up(&mut analyze_internal))?; let transformed_plan = transformed_plan.transform_data(|plan| { match plan { diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 7eda45fb563c..884f001fce32 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -381,7 +381,7 @@ fn agg_exprs_evaluation_result_on_empty_batch( for e in agg_expr.iter() { let result_expr = e .clone() - .transform_up(&|expr| { + .transform_up(&mut |expr| { let new_expr = match expr { Expr::AggregateFunction(expr::AggregateFunction { func_def, .. @@ -429,7 +429,7 @@ fn proj_exprs_evaluation_result_on_empty_batch( for expr in proj_expr.iter() { let result_expr = expr .clone() - .transform_up(&|expr| { + .transform_up(&mut |expr| { if let Expr::Column(Column { name, .. }) = &expr { if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) @@ -468,7 +468,7 @@ fn filter_exprs_evaluation_result_on_empty_batch( ) -> Result> { let result_expr = filter_expr .clone() - .transform_up(&|expr| { + .transform_up(&mut |expr| { if let Expr::Column(Column { name, .. }) = &expr { if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) { Ok(Transformed::yes(result_expr.clone())) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 2b123e3559f5..0caaad1d02d6 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -993,7 +993,7 @@ pub fn replace_cols_by_name( e: Expr, replace_map: &HashMap, ) -> Result { - e.transform_up(&|expr| { + e.transform_up(&mut |expr| { Ok(if let Expr::Column(c) = &expr { match replace_map.get(&c.flat_name()) { Some(new_c) => Transformed::yes(new_c.clone()), diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index a8999f9c1d3c..b6ca7f23b448 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -95,7 +95,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { if !expr_check_map.is_empty() { rewrite_expr = rewrite_expr .clone() - .transform_up(&|expr| { + .transform_up(&mut |expr| { if let Expr::Column(col) = &expr { if let Some(map_expr) = expr_check_map.get(&col.name) @@ -152,7 +152,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { { let new_expr = rewrite_expr .clone() - .transform_up(&|expr| { + .transform_up(&mut |expr| { if let Expr::Column(col) = &expr { if let Some(map_expr) = expr_check_map.get(&col.name) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 58519c61cf1f..4cc0a3e3d304 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -262,7 +262,7 @@ impl EquivalenceGroup { /// class it matches with (if any). pub fn normalize_expr(&self, expr: Arc) -> Arc { expr.clone() - .transform(&|expr| { + .transform(&mut |expr| { for cls in self.iter() { if cls.contains(&expr) { return Ok(Transformed::yes(cls.canonical_expr().unwrap())); @@ -452,7 +452,7 @@ impl EquivalenceGroup { // Rewrite rhs to point to the right side of the join: let new_rhs = rhs .clone() - .transform(&|expr| { + .transform(&mut |expr| { if let Some(column) = expr.as_any().downcast_ref::() { diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index fd8123c45b06..60aa242351bd 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -97,7 +97,7 @@ pub fn add_offset_to_expr( expr: Arc, offset: usize, ) -> Arc { - expr.transform_down(&|e| match e.as_any().downcast_ref::() { + expr.transform_down(&mut |e| match e.as_any().downcast_ref::() { Some(col) => Ok(Transformed::yes(Arc::new(Column::new( col.name(), offset + col.index(), diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 92772e4623be..e5086b4f644a 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -59,7 +59,7 @@ impl ProjectionMapping { let target_expr = Arc::new(Column::new(name, expr_idx)) as _; expression .clone() - .transform_down(&|e| match e.as_any().downcast_ref::() { + .transform_down(&mut |e| match e.as_any().downcast_ref::() { Some(col) => { // Sometimes, an expression and its name in the input_schema // doesn't match. This can cause problems, so we make sure diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 58ef5ec797b8..2d5247100225 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -857,7 +857,7 @@ impl EquivalenceProperties { /// the given expression. pub fn get_expr_ordering(&self, expr: Arc) -> ExprOrdering { ExprOrdering::new_default(expr.clone()) - .transform_up(&|expr| Ok(update_ordering(expr, self))) + .transform_up(&mut |expr| Ok(update_ordering(expr, self))) .data() // Guaranteed to always return `Ok`. .unwrap() diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 609349509b86..ed3b3f84ab8b 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -958,7 +958,7 @@ mod tests { let expr2 = expr .clone() - .transform(&|e| { + .transform(&mut |e| { let transformed = match e.as_any().downcast_ref::() { Some(lit_value) => match lit_value.value() { @@ -980,7 +980,7 @@ mod tests { let expr3 = expr .clone() - .transform_down(&|e| { + .transform_down(&mut |e| { let transformed = match e.as_any().downcast_ref::() { Some(lit_value) => match lit_value.value() { diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index a0d6436586a2..44994ec922bb 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -194,9 +194,7 @@ where constructor, }; // Use the builder to transform the expression tree node into a DAG. - let root = init - .transform_up_mut(&mut |node| builder.mutate(node)) - .data()?; + let root = init.transform_up(&mut |node| builder.mutate(node)).data()?; // Return a tuple containing the root node index and the DAG. Ok((root.data.unwrap(), builder.graph)) } @@ -224,7 +222,7 @@ pub fn reassign_predicate_columns( schema: &SchemaRef, ignore_not_found: bool, ) -> Result> { - pred.transform_down(&|expr| { + pred.transform_down(&mut |expr| { let expr_any = expr.as_any(); if let Some(column) = expr_any.downcast_ref::() { diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 9824c723d9d1..7c7dd01e1b90 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -285,7 +285,7 @@ pub fn convert_sort_expr_with_filter_schema( // Since we are sure that one to one column mapping includes all columns, we convert // the sort expression into a filter expression. let converted_filter_expr = expr - .transform_up(&|p| { + .transform_up(&mut |p| { convert_filter_columns(p.as_ref(), &column_map).map(|transformed| { match transformed { Some(transformed) => Transformed::yes(transformed), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a3d20b97d1ab..f6ab027915f0 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -478,7 +478,7 @@ fn replace_on_columns_of_right_ordering( let new_expr = item .expr .clone() - .transform(&|e| { + .transform(&mut |e| { if e.eq(right_col) { Ok(Transformed::yes(left_col.clone())) } else { diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index ba7d1a54548a..67440242fe8e 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -325,7 +325,7 @@ fn assign_work_table( work_table: Arc, ) -> Result> { let mut work_table_refs = 0; - plan.transform_down_mut(&mut |plan| { + plan.transform_down(&mut |plan| { if let Some(exec) = plan.as_any().downcast_ref::() { if work_table_refs > 0 { not_impl_err!( @@ -353,7 +353,7 @@ fn assign_work_table( /// However, if the data of the left table is derived from the work table, it will become outdated /// as the work table changes. When the next iteration executes this plan again, we must clear the left table. fn reset_plan_states(plan: Arc) -> Result> { - plan.transform_up(&|plan| { + plan.transform_up(&mut |plan| { // WorkTableExec's states have already been updated correctly. if plan.as_any().is::() { Ok(Transformed::no(plan)) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 30eacdb44c4a..d3353774dcab 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -293,7 +293,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { data: transformed_expr, transformed, tnr: _, - } = expr.transform_up_mut(&mut |expr: Expr| { + } = expr.transform_up(&mut |expr: Expr| { if let Expr::Unnest(Unnest { expr: ref arg }) = expr { let column_name = expr.display_name()?; unnest_columns.push(column_name.clone()); diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index 9d098c494599..4d62e6b2b40d 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -60,7 +60,7 @@ pub(crate) fn find_agg_node_within_select( /// into an actual aggregate expression COUNT(*) as identified in the aggregate node. pub(crate) fn unproject_agg_exprs(expr: &Expr, agg: &Aggregate) -> Result { expr.clone() - .transform(&|sub_expr| { + .transform(&mut |sub_expr| { if let Expr::Column(c) = sub_expr { // find the column in the agg schmea if let Ok(n) = agg.schema.index_of_column(&c) { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index d2f1982d5418..d59549232f4b 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -34,7 +34,7 @@ use sqlparser::ast::Ident; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { expr.clone() - .transform_up(&|nested_expr| { + .transform_up(&mut |nested_expr| { match nested_expr { Expr::Column(col) => { let (qualifier, field) = @@ -72,7 +72,7 @@ pub(crate) fn rebase_expr( plan: &LogicalPlan, ) -> Result { expr.clone() - .transform_down(&|nested_expr| { + .transform_down(&mut |nested_expr| { if base_exprs.contains(&nested_expr) { Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?)) } else { @@ -178,7 +178,7 @@ pub(crate) fn resolve_aliases_to_exprs( aliases: &HashMap, ) -> Result { expr.clone() - .transform_up(&|nested_expr| match nested_expr { + .transform_up(&mut |nested_expr| match nested_expr { Expr::Column(c) if c.relation.is_none() => { if let Some(aliased_expr) = aliases.get(&c.name) { Ok(Transformed::yes(aliased_expr.clone())) From d781ca714fe1ec6be25aa20f6bb15fdfc1419641 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 19 Apr 2024 11:35:13 +0200 Subject: [PATCH 2/2] Refactor `TreeNode` and `LogicalPlan` apply, transform, transform_up, transform_down and transform_down_up APIs to accept owned closures --- .../examples/function_factory.rs | 2 +- datafusion-examples/examples/rewrite_expr.rs | 6 +- datafusion/common/src/tree_node.rs | 87 +++++++++------ .../core/src/datasource/listing/helpers.rs | 2 +- .../physical_optimizer/coalesce_batches.rs | 2 +- .../combine_partial_final_agg.rs | 4 +- .../physical_optimizer/convert_first_last.rs | 2 +- .../enforce_distribution.rs | 12 +- .../src/physical_optimizer/enforce_sorting.rs | 20 ++-- .../src/physical_optimizer/join_selection.rs | 8 +- .../limited_distinct_aggregation.rs | 6 +- .../physical_optimizer/output_requirements.rs | 2 +- .../physical_optimizer/pipeline_checker.rs | 2 +- .../physical_optimizer/projection_pushdown.rs | 9 +- .../core/src/physical_optimizer/pruning.rs | 2 +- .../replace_with_order_preserving_variants.rs | 2 +- .../core/src/physical_optimizer/test_utils.rs | 2 +- .../physical_optimizer/topk_aggregation.rs | 6 +- .../user_defined_scalar_functions.rs | 2 +- datafusion/expr/src/expr.rs | 2 +- datafusion/expr/src/expr_rewriter/mod.rs | 16 +-- datafusion/expr/src/expr_rewriter/order_by.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 14 +-- datafusion/expr/src/logical_plan/tree_node.rs | 105 ++++++++++++------ datafusion/expr/src/utils.rs | 10 +- .../src/analyzer/count_wildcard_rule.rs | 5 +- .../src/analyzer/function_rewrite.rs | 4 +- .../src/analyzer/inline_table_scan.rs | 4 +- datafusion/optimizer/src/analyzer/mod.rs | 4 +- datafusion/optimizer/src/analyzer/subquery.rs | 2 +- datafusion/optimizer/src/decorrelate.rs | 6 +- .../optimizer/src/optimize_projections.rs | 2 +- datafusion/optimizer/src/plan_signature.rs | 2 +- datafusion/optimizer/src/push_down_filter.rs | 6 +- .../optimizer/src/scalar_subquery_to_join.rs | 4 +- datafusion/optimizer/src/utils.rs | 2 +- .../physical-expr/src/equivalence/class.rs | 4 +- .../physical-expr/src/equivalence/mod.rs | 2 +- .../src/equivalence/projection.rs | 2 +- .../src/equivalence/properties.rs | 2 +- .../physical-expr/src/expressions/case.rs | 4 +- datafusion/physical-expr/src/utils/mod.rs | 6 +- .../src/joins/stream_join_utils.rs | 2 +- datafusion/physical-plan/src/joins/utils.rs | 2 +- .../physical-plan/src/recursive_query.rs | 4 +- datafusion/sql/src/cte.rs | 2 +- datafusion/sql/src/select.rs | 2 +- datafusion/sql/src/unparser/utils.rs | 2 +- datafusion/sql/src/utils.rs | 6 +- 49 files changed, 229 insertions(+), 179 deletions(-) diff --git a/datafusion-examples/examples/function_factory.rs b/datafusion-examples/examples/function_factory.rs index 810361fa9c21..3973e50474ba 100644 --- a/datafusion-examples/examples/function_factory.rs +++ b/datafusion-examples/examples/function_factory.rs @@ -164,7 +164,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper { impl ScalarFunctionWrapper { // replaces placeholders such as $1 with actual arguments (args[0] fn replacement(expr: &Expr, args: &[Expr]) -> Result { - let result = expr.clone().transform(&mut |e| { + let result = expr.clone().transform(|e| { let r = match e { Expr::Placeholder(placeholder) => { let placeholder_position = diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index b8abc93916ab..9b94a71a501c 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -91,7 +91,7 @@ impl AnalyzerRule for MyAnalyzerRule { impl MyAnalyzerRule { fn analyze_plan(plan: LogicalPlan) -> Result { - plan.transform(&mut |plan| { + plan.transform(|plan| { Ok(match plan { LogicalPlan::Filter(filter) => { let predicate = Self::analyze_expr(filter.predicate.clone())?; @@ -107,7 +107,7 @@ impl MyAnalyzerRule { } fn analyze_expr(expr: Expr) -> Result { - expr.transform(&mut |expr| { + expr.transform(|expr| { // closure is invoked for all sub expressions Ok(match expr { Expr::Literal(ScalarValue::Int64(i)) => { @@ -163,7 +163,7 @@ impl OptimizerRule for MyOptimizerRule { /// use rewrite_expr to modify the expression tree. fn my_rewrite(expr: Expr) -> Result { - expr.transform(&mut |expr| { + expr.transform(|expr| { // closure is invoked for all sub expressions Ok(match expr { Expr::Between(Between { diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 1e347b63a53d..f41d264d35c0 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -31,18 +31,6 @@ macro_rules! handle_transform_recursion { }}; } -macro_rules! handle_transform_recursion_down { - ($F_DOWN:expr, $F_CHILD:expr) => {{ - $F_DOWN?.transform_children(|n| n.map_children($F_CHILD)) - }}; -} - -macro_rules! handle_transform_recursion_up { - ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{ - $SELF.map_children($F_CHILD)?.transform_parent(|n| $F_UP(n)) - }}; -} - /// Defines a visitable and rewriteable tree node. This trait is implemented /// for plans ([`ExecutionPlan`] and [`LogicalPlan`]) as well as expression /// trees ([`PhysicalExpr`], [`Expr`]) in DataFusion. @@ -137,9 +125,16 @@ pub trait TreeNode: Sized { /// or run a check on the tree. fn apply Result>( &self, - f: &mut F, + mut f: F, ) -> Result { - f(self)?.visit_children(|| self.apply_children(|c| c.apply(f))) + fn apply_impl Result>( + node: &N, + f: &mut F, + ) -> Result { + f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c, f))) + } + + apply_impl(self, &mut f) } /// Convenience utility for writing optimizer rules: Recursively apply the @@ -147,7 +142,7 @@ pub trait TreeNode: Sized { /// `f` does not apply to a given node, it is left unchanged. fn transform Result>>( self, - f: &mut F, + f: F, ) -> Result> { self.transform_up(f) } @@ -157,9 +152,16 @@ pub trait TreeNode: Sized { /// When `f` does not apply to a given node, it is left unchanged. fn transform_down Result>>( self, - f: &mut F, + mut f: F, ) -> Result> { - handle_transform_recursion_down!(f(self), |c| c.transform_down(f)) + fn transform_down_impl Result>>( + node: N, + f: &mut F, + ) -> Result> { + f(node)?.transform_children(|n| n.map_children(|c| transform_down_impl(c, f))) + } + + transform_down_impl(self, &mut f) } /// Convenience utility for writing optimizer rules: Recursively apply the @@ -179,9 +181,17 @@ pub trait TreeNode: Sized { /// left unchanged. fn transform_up Result>>( self, - f: &mut F, + mut f: F, ) -> Result> { - handle_transform_recursion_up!(self, |c| c.transform_up(f), f) + fn transform_up_impl Result>>( + node: N, + f: &mut F, + ) -> Result> { + node.map_children(|c| transform_up_impl(c, f))? + .transform_parent(f) + } + + transform_up_impl(self, &mut f) } /// Convenience utility for writing optimizer rules: Recursively apply the @@ -290,14 +300,26 @@ pub trait TreeNode: Sized { FU: FnMut(Self) -> Result>, >( self, - f_down: &mut FD, - f_up: &mut FU, + mut f_down: FD, + mut f_up: FU, ) -> Result> { - handle_transform_recursion!( - f_down(self), - |c| c.transform_down_up(f_down, f_up), - f_up - ) + fn transform_down_up_impl< + N: TreeNode, + FD: FnMut(N) -> Result>, + FU: FnMut(N) -> Result>, + >( + node: N, + f_down: &mut FD, + f_up: &mut FU, + ) -> Result> { + handle_transform_recursion!( + f_down(node), + |c| transform_down_up_impl(c, f_down, f_up), + f_up + ) + } + + transform_down_up_impl(self, &mut f_down, &mut f_up) } /// Returns true if `f` returns true for node in the tree. @@ -305,7 +327,7 @@ pub trait TreeNode: Sized { /// Stops recursion as soon as a matching node is found fn exists bool>(&self, mut f: F) -> bool { let mut found = false; - self.apply(&mut |n| { + self.apply(|n| { Ok(if f(n) { found = true; TreeNodeRecursion::Stop @@ -1362,7 +1384,7 @@ mod tests { fn $NAME() -> Result<()> { let tree = test_tree(); let mut visits = vec![]; - tree.apply(&mut |node| { + tree.apply(|node| { visits.push(format!("f_down({})", node.data)); $F(node) })?; @@ -1451,10 +1473,7 @@ mod tests { #[test] fn $NAME() -> Result<()> { let tree = test_tree(); - assert_eq!( - tree.transform_down_up(&mut $F_DOWN, &mut $F_UP,)?, - $EXPECTED_TREE - ); + assert_eq!(tree.transform_down_up($F_DOWN, $F_UP,)?, $EXPECTED_TREE); Ok(()) } @@ -1466,7 +1485,7 @@ mod tests { #[test] fn $NAME() -> Result<()> { let tree = test_tree(); - assert_eq!(tree.transform_down(&mut $F)?, $EXPECTED_TREE); + assert_eq!(tree.transform_down($F)?, $EXPECTED_TREE); Ok(()) } @@ -1478,7 +1497,7 @@ mod tests { #[test] fn $NAME() -> Result<()> { let tree = test_tree(); - assert_eq!(tree.transform_up(&mut $F)?, $EXPECTED_TREE); + assert_eq!(tree.transform_up($F)?, $EXPECTED_TREE); Ok(()) } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index f97d465c442b..22ce1ba50268 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -50,7 +50,7 @@ use object_store::{ObjectMeta, ObjectStore}; /// was performed pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { let mut is_applicable = true; - expr.apply(&mut |expr| { + expr.apply(|expr| { match expr { Expr::Column(Column { ref name, .. }) => { is_applicable &= col_names.contains(name); diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index b0b84aa6947b..0ba467f7b620 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -54,7 +54,7 @@ impl PhysicalOptimizerRule for CoalesceBatches { } let target_batch_size = config.execution.batch_size; - plan.transform_up(&mut |plan| { + plan.transform_up(|plan| { let plan_any = plan.as_any(); // The goal here is to detect operators that could produce small batches and only // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 4582d7293873..3d8f89d56919 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_down(&mut |plan| { + plan.transform_down(|plan| { let transformed = plan.as_any() .downcast_ref::() @@ -179,7 +179,7 @@ fn normalize_group_exprs(group_exprs: GroupExprsRef) -> GroupExprs { fn discard_column_index(group_expr: Arc) -> Arc { group_expr .clone() - .transform(&mut |expr| { + .transform(|expr| { let normalized_form: Option> = match expr.as_any().downcast_ref::() { Some(column) => Some(Arc::new(Column::new(column.name(), 0))), diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs b/datafusion/core/src/physical_optimizer/convert_first_last.rs index c9cec4fe38d6..14860eecf189 100644 --- a/datafusion/core/src/physical_optimizer/convert_first_last.rs +++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs @@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_up(&mut get_common_requirement_of_aggregate_input) + plan.transform_up(get_common_requirement_of_aggregate_input) .data() } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 98a1ffae7fd2..1432a814ba3c 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -197,12 +197,12 @@ impl PhysicalOptimizerRule for EnforceDistribution { // Run a top-down process to adjust input key ordering recursively let plan_requirements = PlanWithKeyRequirements::new_default(plan); let adjusted = plan_requirements - .transform_down(&mut adjust_input_keys_ordering) + .transform_down(adjust_input_keys_ordering) .data()?; adjusted.plan } else { // Run a bottom-up process - plan.transform_up(&mut |plan| { + plan.transform_up(|plan| { Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) }) .data()? @@ -211,7 +211,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { let distribution_context = DistributionContext::new_default(adjusted); // Distribution enforcement needs to be applied bottom-up. let distribution_context = distribution_context - .transform_up(&mut |distribution_context| { + .transform_up(|distribution_context| { ensure_distribution(distribution_context, config) }) .data()?; @@ -1768,14 +1768,14 @@ pub(crate) mod tests { let plan_requirements = PlanWithKeyRequirements::new_default($PLAN.clone()); let adjusted = plan_requirements - .transform_down(&mut adjust_input_keys_ordering) + .transform_down(adjust_input_keys_ordering) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. adjusted.plan } else { // Run reorder_join_keys_to_inputs rule - $PLAN.clone().transform_up(&mut |plan| { + $PLAN.clone().transform_up(|plan| { Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) }) .data()? @@ -1783,7 +1783,7 @@ pub(crate) mod tests { // Then run ensure_distribution rule DistributionContext::new_default(adjusted) - .transform_up(&mut |distribution_context| { + .transform_up(|distribution_context| { ensure_distribution(distribution_context, &config) }) .data() diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index c6a8890e27c3..2dced0de6adb 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -160,12 +160,12 @@ impl PhysicalOptimizerRule for EnforceSorting { let plan_requirements = PlanWithCorrespondingSort::new_default(plan); // Execute a bottom-up traversal to enforce sorting requirements, // remove unnecessary sorts, and optimize sort-sensitive operators: - let adjusted = plan_requirements.transform_up(&mut ensure_sorting)?.data; + let adjusted = plan_requirements.transform_up(ensure_sorting)?.data; let new_plan = if config.optimizer.repartition_sorts { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); let parallel = plan_with_coalesce_partitions - .transform_up(&mut parallelize_sorts) + .transform_up(parallelize_sorts) .data()?; parallel.plan } else { @@ -174,7 +174,7 @@ impl PhysicalOptimizerRule for EnforceSorting { let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); let updated_plan = plan_with_pipeline_fixer - .transform_up(&mut |plan_with_pipeline_fixer| { + .transform_up(|plan_with_pipeline_fixer| { replace_with_order_preserving_variants( plan_with_pipeline_fixer, false, @@ -188,13 +188,11 @@ impl PhysicalOptimizerRule for EnforceSorting { // missed by the bottom-up traversal: let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); assign_initial_requirements(&mut sort_pushdown); - let adjusted = sort_pushdown.transform_down(&mut pushdown_sorts)?.data; + let adjusted = sort_pushdown.transform_down(pushdown_sorts)?.data; adjusted .plan - .transform_up(&mut |plan| { - Ok(Transformed::yes(replace_with_partial_sort(plan)?)) - }) + .transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?))) .data() } @@ -683,7 +681,7 @@ mod tests { { let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone()); let adjusted = plan_requirements - .transform_up(&mut ensure_sorting) + .transform_up(ensure_sorting) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. @@ -692,7 +690,7 @@ mod tests { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); let parallel = plan_with_coalesce_partitions - .transform_up(&mut parallelize_sorts) + .transform_up(parallelize_sorts) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. @@ -703,7 +701,7 @@ mod tests { let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); let updated_plan = plan_with_pipeline_fixer - .transform_up(&mut |plan_with_pipeline_fixer| { + .transform_up(|plan_with_pipeline_fixer| { replace_with_order_preserving_variants( plan_with_pipeline_fixer, false, @@ -718,7 +716,7 @@ mod tests { let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); assign_initial_requirements(&mut sort_pushdown); sort_pushdown - .transform_down(&mut pushdown_sorts) + .transform_down(pushdown_sorts) .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index fa08a210cc6a..2872340e56eb 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -263,7 +263,7 @@ impl PhysicalOptimizerRule for JoinSelection { Box::new(hash_join_swap_subrule), ]; let new_plan = plan - .transform_up(&mut |p| apply_subrules(p, &subrules, config)) + .transform_up(|p| apply_subrules(p, &subrules, config)) .data()?; // Next, we apply another subrule that tries to optimize joins using any // statistics their inputs might have. @@ -280,7 +280,7 @@ impl PhysicalOptimizerRule for JoinSelection { let collect_threshold_byte_size = config.hash_join_single_partition_threshold; let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; new_plan - .transform_up(&mut |plan| { + .transform_up(|plan| { statistical_join_selection_subrule( plan, collect_threshold_byte_size, @@ -842,13 +842,13 @@ mod tests_statistical { Box::new(hash_join_swap_subrule), ]; let new_plan = plan - .transform_up(&mut |p| apply_subrules(p, &subrules, &ConfigOptions::new())) + .transform_up(|p| apply_subrules(p, &subrules, &ConfigOptions::new())) .data()?; // TODO: End state payloads will be checked here. let config = ConfigOptions::new().optimizer; let collect_left_threshold = config.hash_join_single_partition_threshold; let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; - let _ = new_plan.transform_up(&mut |plan| { + let _ = new_plan.transform_up(|plan| { statistical_join_selection_subrule( plan, collect_left_threshold, diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index db4ea3396eed..dbdcfed2ae2f 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -107,7 +107,7 @@ impl LimitedDistinctAggregation { let mut found_match_aggr = false; let mut rewrite_applicable = true; - let mut closure = |plan: Arc| { + let closure = |plan: Arc| { if !rewrite_applicable { return Ok(Transformed::no(plan)); } @@ -138,7 +138,7 @@ impl LimitedDistinctAggregation { rewrite_applicable = false; Ok(Transformed::no(plan)) }; - let child = child.clone().transform_down(&mut closure).data().ok()?; + let child = child.clone().transform_down(closure).data().ok()?; if is_global_limit { return Some(Arc::new(GlobalLimitExec::new( child, @@ -163,7 +163,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { config: &ConfigOptions, ) -> Result> { if config.optimizer.enable_distinct_aggregation_soft_limit { - plan.transform_down(&mut |plan| { + plan.transform_down(|plan| { Ok( if let Some(plan) = LimitedDistinctAggregation::transform_limit(plan.clone()) diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 7be536559d8c..5bf86e88d646 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -198,7 +198,7 @@ impl PhysicalOptimizerRule for OutputRequirements { match self.mode { RuleMode::Add => require_top_ordering(plan), RuleMode::Remove => plan - .transform_up(&mut |plan| { + .transform_up(|plan| { if let Some(sort_req) = plan.as_any().downcast_ref::() { diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index f853327bb395..5c6a0ab8ea7f 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -50,7 +50,7 @@ impl PhysicalOptimizerRule for PipelineChecker { plan: Arc, config: &ConfigOptions, ) -> Result> { - plan.transform_up(&mut |p| check_finiteness_requirements(p, &config.optimizer)) + plan.transform_up(|p| check_finiteness_requirements(p, &config.optimizer)) .data() } diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index e0da9b1dbacc..fc5600ce9aa1 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -75,8 +75,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_down(&mut remove_unnecessary_projections) - .data() + plan.transform_down(remove_unnecessary_projections).data() } fn name(&self) -> &str { @@ -274,7 +273,7 @@ fn try_unifying_projections( // Collect the column references usage in the outer projection. projection.expr().iter().for_each(|(expr, _)| { - expr.apply(&mut |expr| { + expr.apply(|expr| { Ok({ if let Some(column) = expr.as_any().downcast_ref::() { *column_ref_map.entry(column.clone()).or_default() += 1; @@ -978,7 +977,7 @@ fn update_expr( let new_expr = expr .clone() - .transform_up(&mut |expr: Arc| { + .transform_up(|expr: Arc| { if state == RewriteState::RewrittenInvalid { return Ok(Transformed::no(expr)); } @@ -1121,7 +1120,7 @@ fn new_columns_for_join_on( // Rewrite all columns in `on` (*on) .clone() - .transform(&mut |expr| { + .transform(|expr| { if let Some(column) = expr.as_any().downcast_ref::() { // Find the column in the projection expressions let new_column = projection_exprs diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 6b0aba541613..a8002ae4de20 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1175,7 +1175,7 @@ fn rewrite_column_expr( column_old: &phys_expr::Column, column_new: &phys_expr::Column, ) -> Result> { - e.transform(&mut |expr| { + e.transform(|expr| { if let Some(column) = expr.as_any().downcast_ref::() { if column == column_old { return Ok(Transformed::yes(Arc::new(column_new.clone()))); diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 6db431c0aa58..b438e40ece02 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -395,7 +395,7 @@ mod tests { // Run the rule top-down let config = SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT); let plan_with_pipeline_fixer = OrderPreservationContext::new_default(physical_plan); - let parallel = plan_with_pipeline_fixer.transform_up(&mut |plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options())).data().and_then(check_integrity)?; + let parallel = plan_with_pipeline_fixer.transform_up(|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options())).data().and_then(check_integrity)?; let optimized_physical_plan = parallel.plan; // Get string representation of the plan diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 6c75c3490eb4..7bc1eeb7c4a5 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -380,7 +380,7 @@ pub fn sort_exec( /// replaced with direct plan equality checks. pub fn check_integrity(context: PlanContext) -> Result> { context - .transform_up(&mut |node| { + .transform_up(|node| { let children_plans = node.plan.children(); assert_eq!(node.children.len(), children_plans.len()); for (child_plan, child_node) in diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index f7f0285fcb93..95f7067cbe1b 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -102,7 +102,7 @@ impl TopKAggregation { }; let mut cardinality_preserved = true; - let mut closure = |plan: Arc| { + let closure = |plan: Arc| { if !cardinality_preserved { return Ok(Transformed::no(plan)); } @@ -120,7 +120,7 @@ impl TopKAggregation { } Ok(Transformed::no(plan)) }; - let child = child.clone().transform_down(&mut closure).data().ok()?; + let child = child.clone().transform_down(closure).data().ok()?; let sort = SortExec::new(sort.expr().to_vec(), child) .with_fetch(sort.fetch()) .with_preserve_partitioning(sort.preserve_partitioning()); @@ -141,7 +141,7 @@ impl PhysicalOptimizerRule for TopKAggregation { config: &ConfigOptions, ) -> Result> { if config.optimizer.enable_topk_aggregation { - plan.transform_down(&mut |plan| { + plan.transform_down(|plan| { Ok( if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) { Transformed::yes(plan) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index efbe390c0933..e31a1081621a 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -817,7 +817,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper { impl ScalarFunctionWrapper { // replaces placeholders with actual arguments fn replacement(expr: &Expr, args: &[Expr]) -> Result { - let result = expr.clone().transform(&mut |e| { + let result = expr.clone().transform(|e| { let r = match e { Expr::Placeholder(placeholder) => { let placeholder_position = diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index f8c3d2b622fb..50eac105f5fe 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1250,7 +1250,7 @@ impl Expr { /// For example, gicen an expression like ` = $0` will infer `$0` to /// have type `int32`. pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result { - self.transform(&mut |mut expr| { + self.transform(|mut expr| { // Default to assuming the arguments are the same type if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut expr { rewrite_placeholder(left.as_mut(), right.as_ref(), schema)?; diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 96e048fa583b..f5779df812f1 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -62,7 +62,7 @@ pub trait FunctionRewrite { /// Recursively call [`Column::normalize_with_schemas`] on all [`Column`] expressions /// in the `expr` expression tree. pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { - expr.transform(&mut |expr| { + expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { let col = LogicalPlanBuilder::normalize(plan, c)?; @@ -91,7 +91,7 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( return Ok(Expr::Unnest(Unnest { expr: Box::new(e) })); } - expr.transform(&mut |expr| { + expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { let col = @@ -119,7 +119,7 @@ pub fn normalize_cols( /// Recursively replace all [`Column`] expressions in a given expression tree with /// `Column` expressions provided by the hash map argument. pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Result { - expr.transform(&mut |expr| { + expr.transform(|expr| { Ok({ if let Expr::Column(c) = &expr { match replace_map.get(c) { @@ -140,7 +140,7 @@ pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Resul /// For example, if there were expressions like `foo.bar` this would /// rewrite it to just `bar`. pub fn unnormalize_col(expr: Expr) -> Expr { - expr.transform(&mut |expr| { + expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { let col = Column { @@ -190,7 +190,7 @@ pub fn unnormalize_cols(exprs: impl IntoIterator) -> Vec { /// Recursively remove all the ['OuterReferenceColumn'] and return the inside Column /// in the expression tree. pub fn strip_outer_reference(expr: Expr) -> Expr { - expr.transform(&mut |expr| { + expr.transform(|expr| { Ok({ if let Expr::OuterReferenceColumn(_, col) = expr { Transformed::yes(Expr::Column(col)) @@ -318,7 +318,7 @@ mod test { #[test] fn rewriter_rewrite() { // rewrites all "foo" string literals to "bar" - let mut transformer = |expr: Expr| -> Result> { + let transformer = |expr: Expr| -> Result> { match expr { Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => { let utf8_val = if utf8_val == "foo" { @@ -336,7 +336,7 @@ mod test { // rewrites "foo" --> "bar" let rewritten = col("state") .eq(lit("foo")) - .transform(&mut transformer) + .transform(transformer) .data() .unwrap(); assert_eq!(rewritten, col("state").eq(lit("bar"))); @@ -344,7 +344,7 @@ mod test { // doesn't rewrite let rewritten = col("state") .eq(lit("baz")) - .transform(&mut transformer) + .transform(transformer) .data() .unwrap(); assert_eq!(rewritten, col("state").eq(lit("baz"))); diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs b/datafusion/expr/src/expr_rewriter/order_by.rs index dff179f3db00..eb38fee7cad0 100644 --- a/datafusion/expr/src/expr_rewriter/order_by.rs +++ b/datafusion/expr/src/expr_rewriter/order_by.rs @@ -84,7 +84,7 @@ fn rewrite_in_terms_of_projection( ) -> Result { // assumption is that each item in exprs, such as "b + c" is // available as an output column named "b + c" - expr.transform(&mut |expr| { + expr.transform(|expr| { // search for unnormalized names first such as "c1" (such as aliases) if let Some(found) = proj_exprs.iter().find(|a| (**a) == expr) { let col = Expr::Column( diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c5dd1f04641d..97f5e22287ea 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -358,7 +358,7 @@ impl LogicalPlan { pub fn using_columns(&self) -> Result>, DataFusionError> { let mut using_columns: Vec> = vec![]; - self.apply_with_subqueries(&mut |plan| { + self.apply_with_subqueries(|plan| { if let LogicalPlan::Join(Join { join_constraint: JoinConstraint::Using, on, @@ -554,7 +554,7 @@ impl LogicalPlan { // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) let predicate = predicate - .transform_down(&mut |expr| { + .transform_down(|expr| { match expr { Expr::Exists { .. } | Expr::ScalarSubquery(_) @@ -1017,10 +1017,10 @@ impl LogicalPlan { self, param_values: &ParamValues, ) -> Result { - self.transform_up_with_subqueries(&mut |plan| { + self.transform_up_with_subqueries(|plan| { let schema = plan.schema().clone(); plan.map_expressions(|e| { - e.infer_placeholder_types(&schema)?.transform_up(&mut |e| { + e.infer_placeholder_types(&schema)?.transform_up(|e| { if let Expr::Placeholder(Placeholder { id, .. }) = e { let value = param_values.get_placeholders_with_values(&id)?; Ok(Transformed::yes(Expr::Literal(value))) @@ -1039,9 +1039,9 @@ impl LogicalPlan { ) -> Result>, DataFusionError> { let mut param_types: HashMap> = HashMap::new(); - self.apply_with_subqueries(&mut |plan| { + self.apply_with_subqueries(|plan| { plan.apply_expressions(|expr| { - expr.apply(&mut |expr| { + expr.apply(|expr| { if let Expr::Placeholder(Placeholder { id, data_type }) = expr { let prev = param_types.get(id); match (prev, data_type) { @@ -3170,7 +3170,7 @@ digraph { // after transformation, because plan is not the same anymore, // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs let plan = plan - .transform(&mut |plan| match plan { + .transform(|plan| match plan { LogicalPlan::TableScan(table) => { let filter = Filter::try_new( external_filter.clone(), diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 22a369579950..f5db5a2704bc 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -431,23 +431,6 @@ macro_rules! handle_transform_recursion { }}; } -macro_rules! handle_transform_recursion_down { - ($F_DOWN:expr, $F_CHILD:expr) => {{ - $F_DOWN? - .transform_children(|n| n.map_subqueries($F_CHILD))? - .transform_sibling(|n| n.map_children($F_CHILD)) - }}; -} - -macro_rules! handle_transform_recursion_up { - ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{ - $SELF - .map_subqueries($F_CHILD)? - .transform_sibling(|n| n.map_children($F_CHILD))? - .transform_parent(|n| $F_UP(n)) - }}; -} - impl LogicalPlan { /// Calls `f` on all expressions in the current `LogicalPlan` node. /// @@ -787,11 +770,24 @@ impl LogicalPlan { /// ...)`. pub fn apply_with_subqueries Result>( &self, - f: &mut F, + mut f: F, ) -> Result { - f(self)? - .visit_children(|| self.apply_subqueries(|c| c.apply_with_subqueries(f)))? - .visit_sibling(|| self.apply_children(|c| c.apply_with_subqueries(f))) + fn apply_with_subqueries_impl< + F: FnMut(&LogicalPlan) -> Result, + >( + node: &LogicalPlan, + f: &mut F, + ) -> Result { + f(node)? + .visit_children(|| { + node.apply_subqueries(|c| apply_with_subqueries_impl(c, f)) + })? + .visit_sibling(|| { + node.apply_children(|c| apply_with_subqueries_impl(c, f)) + }) + } + + apply_with_subqueries_impl(self, &mut f) } /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`, @@ -799,7 +795,7 @@ impl LogicalPlan { /// ...)`. pub fn transform_with_subqueries Result>>( self, - f: &mut F, + f: F, ) -> Result> { self.transform_up_with_subqueries(f) } @@ -809,9 +805,24 @@ impl LogicalPlan { /// ...)`. pub fn transform_down_with_subqueries Result>>( self, - f: &mut F, + mut f: F, ) -> Result> { - handle_transform_recursion_down!(f(self), |c| c.transform_down_with_subqueries(f)) + fn transform_down_with_subqueries_impl< + F: FnMut(LogicalPlan) -> Result>, + >( + node: LogicalPlan, + f: &mut F, + ) -> Result> { + f(node)? + .transform_children(|n| { + n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f)) + })? + .transform_sibling(|n| { + n.map_children(|c| transform_down_with_subqueries_impl(c, f)) + }) + } + + transform_down_with_subqueries_impl(self, &mut f) } /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`, @@ -819,9 +830,22 @@ impl LogicalPlan { /// ...)`. pub fn transform_up_with_subqueries Result>>( self, - f: &mut F, + mut f: F, ) -> Result> { - handle_transform_recursion_up!(self, |c| c.transform_up_with_subqueries(f), f) + fn transform_up_with_subqueries_impl< + F: FnMut(LogicalPlan) -> Result>, + >( + node: LogicalPlan, + f: &mut F, + ) -> Result> { + node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))? + .transform_sibling(|n| { + n.map_children(|c| transform_up_with_subqueries_impl(c, f)) + })? + .transform_parent(f) + } + + transform_up_with_subqueries_impl(self, &mut f) } /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`, @@ -832,14 +856,25 @@ impl LogicalPlan { FU: FnMut(Self) -> Result>, >( self, - f_down: &mut FD, - f_up: &mut FU, + mut f_down: FD, + mut f_up: FU, ) -> Result> { - handle_transform_recursion!( - f_down(self), - |c| c.transform_down_up_with_subqueries(f_down, f_up), - f_up - ) + fn transform_down_up_with_subqueries_impl< + FD: FnMut(LogicalPlan) -> Result>, + FU: FnMut(LogicalPlan) -> Result>, + >( + node: LogicalPlan, + f_down: &mut FD, + f_up: &mut FU, + ) -> Result> { + handle_transform_recursion!( + f_down(node), + |c| transform_down_up_with_subqueries_impl(c, f_down, f_up), + f_up + ) + } + + transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up) } /// Similarly to [`Self::apply`], calls `f` on this node and its inputs @@ -850,7 +885,7 @@ impl LogicalPlan { mut f: F, ) -> Result { self.apply_expressions(|expr| { - expr.apply(&mut |expr| match expr { + expr.apply(|expr| match expr { Expr::Exists(Exists { subquery, .. }) | Expr::InSubquery(InSubquery { subquery, .. }) | Expr::ScalarSubquery(subquery) => { @@ -873,7 +908,7 @@ impl LogicalPlan { mut f: F, ) -> Result> { self.map_expressions(|expr| { - expr.transform_down(&mut |expr| match expr { + expr.transform_down(|expr| match expr { Expr::Exists(Exists { subquery, negated }) => { f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { LogicalPlan::Subquery(subquery) => { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 8c6b98f17933..8da93c244c07 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -264,7 +264,7 @@ pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result> { /// Recursively walk an expression tree, collecting the unique set of columns /// referenced in the expression pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet) -> Result<()> { - expr.apply(&mut |expr| { + expr.apply(|expr| { match expr { Expr::Column(qc) => { accum.insert(qc.clone()); @@ -661,7 +661,7 @@ where F: Fn(&Expr) -> bool, { let mut exprs = vec![]; - expr.apply(&mut |expr| { + expr.apply(|expr| { if test_fn(expr) { if !(exprs.contains(expr)) { exprs.push(expr.clone()) @@ -683,7 +683,7 @@ where F: FnMut(&Expr) -> Result<(), E>, { let mut err = Ok(()); - expr.apply(&mut |expr| { + expr.apply(|expr| { if let Err(e) = f(expr) { // save the error for later (it may not be a DataFusionError err = Err(e); @@ -839,7 +839,7 @@ pub fn find_column_exprs(exprs: &[Expr]) -> Vec { pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec { let mut exprs = vec![]; - e.apply(&mut |expr| { + e.apply(|expr| { if let Expr::Column(c) = expr { exprs.push(c.clone()) } @@ -868,7 +868,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr( schema: &DFSchemaRef, ) -> Vec { let mut indexes = vec![]; - e.apply(&mut |expr| { + e.apply(|expr| { match expr { Expr::Column(qc) => { if let Ok(idx) = schema.index_of_column(qc) { diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index e3e9f959e042..99d89a2426e0 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -41,8 +41,7 @@ impl CountWildcardRule { impl AnalyzerRule for CountWildcardRule { fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - plan.transform_down_with_subqueries(&mut analyze_internal) - .data() + plan.transform_down_with_subqueries(analyze_internal).data() } fn name(&self) -> &str { @@ -78,7 +77,7 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { let name_preserver = NamePreserver::new(&plan); plan.map_expressions(|expr| { let original_name = name_preserver.save(&expr)?; - let transformed_expr = expr.transform_up(&mut |expr| match expr { + let transformed_expr = expr.transform_up(|expr| match expr { Expr::WindowFunction(mut window_function) if is_count_star_window_aggregate(&window_function) => { diff --git a/datafusion/optimizer/src/analyzer/function_rewrite.rs b/datafusion/optimizer/src/analyzer/function_rewrite.rs index 38d018d1627d..098c934bf7e1 100644 --- a/datafusion/optimizer/src/analyzer/function_rewrite.rs +++ b/datafusion/optimizer/src/analyzer/function_rewrite.rs @@ -64,7 +64,7 @@ impl ApplyFunctionRewrites { let original_name = name_preserver.save(&expr)?; // recursively transform the expression, applying the rewrites at each step - let transformed_expr = expr.transform_up(&mut |expr| { + let transformed_expr = expr.transform_up(|expr| { let mut result = Transformed::no(expr); for rewriter in self.function_rewrites.iter() { result = result.transform_data(|expr| { @@ -85,7 +85,7 @@ impl AnalyzerRule for ApplyFunctionRewrites { } fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) -> Result { - plan.transform_up_with_subqueries(&mut |plan| self.rewrite_plan(plan, options)) + plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan, options)) .map(|res| res.data) } } diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index a3efafe6e314..db1ce18e86f5 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -38,7 +38,7 @@ impl InlineTableScan { impl AnalyzerRule for InlineTableScan { fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - plan.transform_up(&mut analyze_internal).data() + plan.transform_up(analyze_internal).data() } fn name(&self) -> &str { @@ -49,7 +49,7 @@ impl AnalyzerRule for InlineTableScan { fn analyze_internal(plan: LogicalPlan) -> Result> { // rewrite any subqueries in the plan first let transformed_plan = - plan.map_subqueries(|plan| plan.transform_up(&mut analyze_internal))?; + plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?; let transformed_plan = transformed_plan.transform_data(|plan| { match plan { diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index d0b83d24299b..fb0eb14da659 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -155,10 +155,10 @@ impl Analyzer { /// Do necessary check and fail the invalid plan fn check_plan(plan: &LogicalPlan) -> Result<()> { - plan.apply_with_subqueries(&mut |plan: &LogicalPlan| { + plan.apply_with_subqueries(|plan: &LogicalPlan| { plan.apply_expressions(|expr| { // recursively look for subqueries - expr.apply(&mut |expr| { + expr.apply(|expr| { match expr { Expr::Exists(Exists { subquery, .. }) | Expr::InSubquery(InSubquery { subquery, .. }) diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 002885266e2f..b46516017ae9 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -276,7 +276,7 @@ fn strip_inner_query(inner_plan: &LogicalPlan) -> &LogicalPlan { fn get_correlated_expressions(inner_plan: &LogicalPlan) -> Result> { let mut exprs = vec![]; - inner_plan.apply_with_subqueries(&mut |plan| { + inner_plan.apply_with_subqueries(|plan| { if let LogicalPlan::Filter(Filter { predicate, .. }) = plan { let (correlated, _): (Vec<_>, Vec<_>) = split_conjunction(predicate) .into_iter() diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 884f001fce32..a6abec9efd8c 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -381,7 +381,7 @@ fn agg_exprs_evaluation_result_on_empty_batch( for e in agg_expr.iter() { let result_expr = e .clone() - .transform_up(&mut |expr| { + .transform_up(|expr| { let new_expr = match expr { Expr::AggregateFunction(expr::AggregateFunction { func_def, .. @@ -429,7 +429,7 @@ fn proj_exprs_evaluation_result_on_empty_batch( for expr in proj_expr.iter() { let result_expr = expr .clone() - .transform_up(&mut |expr| { + .transform_up(|expr| { if let Expr::Column(Column { name, .. }) = &expr { if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) @@ -468,7 +468,7 @@ fn filter_exprs_evaluation_result_on_empty_batch( ) -> Result> { let result_expr = filter_expr .clone() - .transform_up(&mut |expr| { + .transform_up(|expr| { if let Expr::Column(Column { name, .. }) = &expr { if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) { Ok(Transformed::yes(result_expr.clone())) diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index b54fb248a7c7..69dbc9307004 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -613,7 +613,7 @@ fn rewrite_expr(expr: &Expr, input: &Projection) -> Result> { /// columns are collected. fn outer_columns(expr: &Expr, columns: &mut HashSet) { // inspect_expr_pre doesn't handle subquery references, so find them explicitly - expr.apply(&mut |expr| { + expr.apply(|expr| { match expr { Expr::OuterReferenceColumn(_, col) => { columns.insert(col.clone()); diff --git a/datafusion/optimizer/src/plan_signature.rs b/datafusion/optimizer/src/plan_signature.rs index a8e323ff429f..d642e2c26e47 100644 --- a/datafusion/optimizer/src/plan_signature.rs +++ b/datafusion/optimizer/src/plan_signature.rs @@ -73,7 +73,7 @@ impl LogicalPlanSignature { /// Get total number of [`LogicalPlan`]s in the plan. fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize { let mut node_number = 0; - plan.apply_with_subqueries(&mut |_plan| { + plan.apply_with_subqueries(|_plan| { node_number += 1; Ok(TreeNodeRecursion::Continue) }) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 0caaad1d02d6..950932f479c9 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -218,7 +218,7 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result Result { let mut is_evaluate = true; - predicate.apply(&mut |expr| match expr { + predicate.apply(|expr| match expr { Expr::Column(_) | Expr::Literal(_) | Expr::Placeholder(_) @@ -993,7 +993,7 @@ pub fn replace_cols_by_name( e: Expr, replace_map: &HashMap, ) -> Result { - e.transform_up(&mut |expr| { + e.transform_up(|expr| { Ok(if let Expr::Column(c) = &expr { match replace_map.get(&c.flat_name()) { Some(new_c) => Transformed::yes(new_c.clone()), @@ -1009,7 +1009,7 @@ pub fn replace_cols_by_name( /// check whether the expression uses the columns in `check_map`. fn contain(e: &Expr, check_map: &HashMap) -> bool { let mut is_contain = false; - e.apply(&mut |expr| { + e.apply(|expr| { Ok(if let Expr::Column(c) = &expr { match check_map.get(&c.flat_name()) { Some(_) => { diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index b6ca7f23b448..f9f602297f90 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -95,7 +95,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { if !expr_check_map.is_empty() { rewrite_expr = rewrite_expr .clone() - .transform_up(&mut |expr| { + .transform_up(|expr| { if let Expr::Column(col) = &expr { if let Some(map_expr) = expr_check_map.get(&col.name) @@ -152,7 +152,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { { let new_expr = rewrite_expr .clone() - .transform_up(&mut |expr| { + .transform_up(|expr| { if let Expr::Column(col) = &expr { if let Some(map_expr) = expr_check_map.get(&col.name) diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index f0605018e6f3..15ddd9b4a1bf 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -100,7 +100,7 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) { /// check whether the expression is volatile predicates pub(crate) fn is_volatile_expression(e: &Expr) -> Result { let mut is_volatile_expr = false; - e.apply(&mut |expr| { + e.apply(|expr| { Ok(if is_volatile(expr)? { is_volatile_expr = true; TreeNodeRecursion::Stop diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 4cc0a3e3d304..f8d2b1cb6922 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -262,7 +262,7 @@ impl EquivalenceGroup { /// class it matches with (if any). pub fn normalize_expr(&self, expr: Arc) -> Arc { expr.clone() - .transform(&mut |expr| { + .transform(|expr| { for cls in self.iter() { if cls.contains(&expr) { return Ok(Transformed::yes(cls.canonical_expr().unwrap())); @@ -452,7 +452,7 @@ impl EquivalenceGroup { // Rewrite rhs to point to the right side of the join: let new_rhs = rhs .clone() - .transform(&mut |expr| { + .transform(|expr| { if let Some(column) = expr.as_any().downcast_ref::() { diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 60aa242351bd..f78d69d672a6 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -97,7 +97,7 @@ pub fn add_offset_to_expr( expr: Arc, offset: usize, ) -> Arc { - expr.transform_down(&mut |e| match e.as_any().downcast_ref::() { + expr.transform_down(|e| match e.as_any().downcast_ref::() { Some(col) => Ok(Transformed::yes(Arc::new(Column::new( col.name(), offset + col.index(), diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index e5086b4f644a..8c747ab8a244 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -59,7 +59,7 @@ impl ProjectionMapping { let target_expr = Arc::new(Column::new(name, expr_idx)) as _; expression .clone() - .transform_down(&mut |e| match e.as_any().downcast_ref::() { + .transform_down(|e| match e.as_any().downcast_ref::() { Some(col) => { // Sometimes, an expression and its name in the input_schema // doesn't match. This can cause problems, so we make sure diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 2d5247100225..f98347ada088 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -857,7 +857,7 @@ impl EquivalenceProperties { /// the given expression. pub fn get_expr_ordering(&self, expr: Arc) -> ExprOrdering { ExprOrdering::new_default(expr.clone()) - .transform_up(&mut |expr| Ok(update_ordering(expr, self))) + .transform_up(|expr| Ok(update_ordering(expr, self))) .data() // Guaranteed to always return `Ok`. .unwrap() diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index ed3b3f84ab8b..e376d3e7bbac 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -958,7 +958,7 @@ mod tests { let expr2 = expr .clone() - .transform(&mut |e| { + .transform(|e| { let transformed = match e.as_any().downcast_ref::() { Some(lit_value) => match lit_value.value() { @@ -980,7 +980,7 @@ mod tests { let expr3 = expr .clone() - .transform_down(&mut |e| { + .transform_down(|e| { let transformed = match e.as_any().downcast_ref::() { Some(lit_value) => match lit_value.value() { diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 44994ec922bb..2232f6de4452 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -194,7 +194,7 @@ where constructor, }; // Use the builder to transform the expression tree node into a DAG. - let root = init.transform_up(&mut |node| builder.mutate(node)).data()?; + let root = init.transform_up(|node| builder.mutate(node)).data()?; // Return a tuple containing the root node index and the DAG. Ok((root.data.unwrap(), builder.graph)) } @@ -202,7 +202,7 @@ where /// Recursively extract referenced [`Column`]s within a [`PhysicalExpr`]. pub fn collect_columns(expr: &Arc) -> HashSet { let mut columns = HashSet::::new(); - expr.apply(&mut |expr| { + expr.apply(|expr| { if let Some(column) = expr.as_any().downcast_ref::() { if !columns.iter().any(|c| c.eq(column)) { columns.insert(column.clone()); @@ -222,7 +222,7 @@ pub fn reassign_predicate_columns( schema: &SchemaRef, ignore_not_found: bool, ) -> Result> { - pred.transform_down(&mut |expr| { + pred.transform_down(|expr| { let expr_any = expr.as_any(); if let Some(column) = expr_any.downcast_ref::() { diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 7c7dd01e1b90..ef3fda37377d 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -285,7 +285,7 @@ pub fn convert_sort_expr_with_filter_schema( // Since we are sure that one to one column mapping includes all columns, we convert // the sort expression into a filter expression. let converted_filter_expr = expr - .transform_up(&mut |p| { + .transform_up(|p| { convert_filter_columns(p.as_ref(), &column_map).map(|transformed| { match transformed { Some(transformed) => Transformed::yes(transformed), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index f6ab027915f0..acf9ed4d7ec8 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -478,7 +478,7 @@ fn replace_on_columns_of_right_ordering( let new_expr = item .expr .clone() - .transform(&mut |e| { + .transform(|e| { if e.eq(right_col) { Ok(Transformed::yes(left_col.clone())) } else { diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 67440242fe8e..ed897d78f0c8 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -325,7 +325,7 @@ fn assign_work_table( work_table: Arc, ) -> Result> { let mut work_table_refs = 0; - plan.transform_down(&mut |plan| { + plan.transform_down(|plan| { if let Some(exec) = plan.as_any().downcast_ref::() { if work_table_refs > 0 { not_impl_err!( @@ -353,7 +353,7 @@ fn assign_work_table( /// However, if the data of the left table is derived from the work table, it will become outdated /// as the work table changes. When the next iteration executes this plan again, we must clear the left table. fn reset_plan_states(plan: Arc) -> Result> { - plan.transform_up(&mut |plan| { + plan.transform_up(|plan| { // WorkTableExec's states have already been updated correctly. if plan.as_any().is::() { Ok(Transformed::no(plan)) diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs index 5b1f81e820a2..4f7b9bb6d11d 100644 --- a/datafusion/sql/src/cte.rs +++ b/datafusion/sql/src/cte.rs @@ -197,7 +197,7 @@ fn has_work_table_reference( work_table_source: &Arc, ) -> bool { let mut has_reference = false; - plan.apply(&mut |node| { + plan.apply(|node| { if let LogicalPlan::TableScan(scan) = node { if Arc::ptr_eq(&scan.source, work_table_source) { has_reference = true; diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index d3353774dcab..c9c6863ac149 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -293,7 +293,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { data: transformed_expr, transformed, tnr: _, - } = expr.transform_up(&mut |expr: Expr| { + } = expr.transform_up(|expr: Expr| { if let Expr::Unnest(Unnest { expr: ref arg }) = expr { let column_name = expr.display_name()?; unnest_columns.push(column_name.clone()); diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index 4d62e6b2b40d..c1b02c330fae 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -60,7 +60,7 @@ pub(crate) fn find_agg_node_within_select( /// into an actual aggregate expression COUNT(*) as identified in the aggregate node. pub(crate) fn unproject_agg_exprs(expr: &Expr, agg: &Aggregate) -> Result { expr.clone() - .transform(&mut |sub_expr| { + .transform(|sub_expr| { if let Expr::Column(c) = sub_expr { // find the column in the agg schmea if let Ok(n) = agg.schema.index_of_column(&c) { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index d59549232f4b..89e0b9c1e9cb 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -34,7 +34,7 @@ use sqlparser::ast::Ident; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { expr.clone() - .transform_up(&mut |nested_expr| { + .transform_up(|nested_expr| { match nested_expr { Expr::Column(col) => { let (qualifier, field) = @@ -72,7 +72,7 @@ pub(crate) fn rebase_expr( plan: &LogicalPlan, ) -> Result { expr.clone() - .transform_down(&mut |nested_expr| { + .transform_down(|nested_expr| { if base_exprs.contains(&nested_expr) { Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?)) } else { @@ -178,7 +178,7 @@ pub(crate) fn resolve_aliases_to_exprs( aliases: &HashMap, ) -> Result { expr.clone() - .transform_up(&mut |nested_expr| match nested_expr { + .transform_up(|nested_expr| match nested_expr { Expr::Column(c) if c.relation.is_none() => { if let Some(aliased_expr) = aliases.get(&c.name) { Ok(Transformed::yes(aliased_expr.clone()))