diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index c2ea6f2b52a1..cfb49b023159 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -237,7 +237,7 @@ impl ExecutionPlan for CustomExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index f0e889e7cf74..d0dd24621d3e 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -123,8 +123,8 @@ pub trait TreeNode: Sized { /// TreeNodeVisitor::f_up(ChildNode2) /// TreeNodeVisitor::f_up(ParentNode) /// ``` - fn visit>( - &self, + fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>( + &'n self, visitor: &mut V, ) -> Result { visitor @@ -190,12 +190,12 @@ pub trait TreeNode: Sized { /// # See Also /// * [`Self::transform_down`] for the equivalent transformation API. /// * [`Self::visit`] for both top-down and bottom up traversal. - fn apply Result>( - &self, + fn apply<'n, F: FnMut(&'n Self) -> Result>( + &'n self, mut f: F, ) -> Result { - fn apply_impl Result>( - node: &N, + fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result>( + node: &'n N, f: &mut F, ) -> Result { f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c, f))) @@ -427,8 +427,8 @@ pub trait TreeNode: Sized { /// /// Description: Apply `f` to inspect node's children (but not the node /// itself). - fn apply_children Result>( - &self, + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( + &'n self, f: F, ) -> Result; @@ -466,19 +466,19 @@ pub trait TreeNode: Sized { /// /// # See Also: /// * [`TreeNode::rewrite`] to rewrite owned `TreeNode`s -pub trait TreeNodeVisitor: Sized { +pub trait TreeNodeVisitor<'n>: Sized { /// The node type which is visitable. type Node: TreeNode; /// Invoked while traversing down the tree, before any children are visited. /// Default implementation continues the recursion. - fn f_down(&mut self, _node: &Self::Node) -> Result { + fn f_down(&mut self, _node: &'n Self::Node) -> Result { Ok(TreeNodeRecursion::Continue) } /// Invoked while traversing up the tree after children are visited. Default /// implementation continues the recursion. - fn f_up(&mut self, _node: &Self::Node) -> Result { + fn f_up(&mut self, _node: &'n Self::Node) -> Result { Ok(TreeNodeRecursion::Continue) } } @@ -855,7 +855,7 @@ impl TransformedResult for Result> { /// its related `Arc` will automatically implement [`TreeNode`]. pub trait DynTreeNode { /// Returns all children of the specified `TreeNode`. - fn arc_children(&self) -> Vec>; + fn arc_children(&self) -> Vec<&Arc>; /// Constructs a new node with the specified children. fn with_new_arc_children( @@ -868,11 +868,11 @@ pub trait DynTreeNode { /// Blanket implementation for any `Arc` where `T` implements [`DynTreeNode`] /// (such as [`Arc`]). impl TreeNode for Arc { - fn apply_children Result>( - &self, + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( + &'n self, f: F, ) -> Result { - self.arc_children().iter().apply_until_stop(f) + self.arc_children().into_iter().apply_until_stop(f) } fn map_children Result>>( @@ -881,7 +881,10 @@ impl TreeNode for Arc { ) -> Result> { let children = self.arc_children(); if !children.is_empty() { - let new_children = children.into_iter().map_until_stop_and_collect(f)?; + let new_children = children + .into_iter() + .cloned() + .map_until_stop_and_collect(f)?; // Propagate up `new_children.transformed` and `new_children.tnr` // along with the node containing transformed children. if new_children.transformed { @@ -913,8 +916,8 @@ pub trait ConcreteTreeNode: Sized { } impl TreeNode for T { - fn apply_children Result>( - &self, + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( + &'n self, f: F, ) -> Result { self.children().iter().apply_until_stop(f) @@ -938,6 +941,7 @@ impl TreeNode for T { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::fmt::Display; use crate::tree_node::{ @@ -946,7 +950,7 @@ mod tests { }; use crate::Result; - #[derive(PartialEq, Debug)] + #[derive(Debug, Eq, Hash, PartialEq)] struct TestTreeNode { children: Vec>, data: T, @@ -959,8 +963,8 @@ mod tests { } impl TreeNode for TestTreeNode { - fn apply_children Result>( - &self, + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( + &'n self, f: F, ) -> Result { self.children.iter().apply_until_stop(f) @@ -1459,15 +1463,15 @@ mod tests { } } - impl TreeNodeVisitor for TestVisitor { + impl<'n, T: Display> TreeNodeVisitor<'n> for TestVisitor { type Node = TestTreeNode; - fn f_down(&mut self, node: &Self::Node) -> Result { + fn f_down(&mut self, node: &'n Self::Node) -> Result { self.visits.push(format!("f_down({})", node.data)); (*self.f_down)(node) } - fn f_up(&mut self, node: &Self::Node) -> Result { + fn f_up(&mut self, node: &'n Self::Node) -> Result { self.visits.push(format!("f_up({})", node.data)); (*self.f_up)(node) } @@ -1912,4 +1916,87 @@ mod tests { TreeNodeRecursion::Stop ) ); + + // F + // / | \ + // / | \ + // E C A + // | / \ + // C B D + // / \ | + // B D A + // | + // A + #[test] + fn test_apply_and_visit_references() -> Result<()> { + let node_a = TestTreeNode::new(vec![], "a".to_string()); + let node_b = TestTreeNode::new(vec![], "b".to_string()); + let node_d = TestTreeNode::new(vec![node_a], "d".to_string()); + let node_c = TestTreeNode::new(vec![node_b, node_d], "c".to_string()); + let node_e = TestTreeNode::new(vec![node_c], "e".to_string()); + let node_a_2 = TestTreeNode::new(vec![], "a".to_string()); + let node_b_2 = TestTreeNode::new(vec![], "b".to_string()); + let node_d_2 = TestTreeNode::new(vec![node_a_2], "d".to_string()); + let node_c_2 = TestTreeNode::new(vec![node_b_2, node_d_2], "c".to_string()); + let node_a_3 = TestTreeNode::new(vec![], "a".to_string()); + let tree = TestTreeNode::new(vec![node_e, node_c_2, node_a_3], "f".to_string()); + + let node_f_ref = &tree; + let node_e_ref = &node_f_ref.children[0]; + let node_c_ref = &node_e_ref.children[0]; + let node_b_ref = &node_c_ref.children[0]; + let node_d_ref = &node_c_ref.children[1]; + let node_a_ref = &node_d_ref.children[0]; + + let mut m: HashMap<&TestTreeNode, usize> = HashMap::new(); + tree.apply(|e| { + *m.entry(e).or_insert(0) += 1; + Ok(TreeNodeRecursion::Continue) + })?; + + let expected = HashMap::from([ + (node_f_ref, 1), + (node_e_ref, 1), + (node_c_ref, 2), + (node_d_ref, 2), + (node_b_ref, 2), + (node_a_ref, 3), + ]); + assert_eq!(m, expected); + + struct TestVisitor<'n> { + m: HashMap<&'n TestTreeNode, (usize, usize)>, + } + + impl<'n> TreeNodeVisitor<'n> for TestVisitor<'n> { + type Node = TestTreeNode; + + fn f_down(&mut self, node: &'n Self::Node) -> Result { + let (down_count, _) = self.m.entry(node).or_insert((0, 0)); + *down_count += 1; + Ok(TreeNodeRecursion::Continue) + } + + fn f_up(&mut self, node: &'n Self::Node) -> Result { + let (_, up_count) = self.m.entry(node).or_insert((0, 0)); + *up_count += 1; + Ok(TreeNodeRecursion::Continue) + } + } + + let mut visitor = TestVisitor { m: HashMap::new() }; + tree.visit(&mut visitor)?; + + let expected = HashMap::from([ + (node_f_ref, (1, 1)), + (node_e_ref, (1, 1)), + (node_c_ref, (2, 2)), + (node_d_ref, (2, 2)), + (node_b_ref, (2, 2)), + (node_a_ref, (3, 3)), + ]); + assert_eq!(visitor.m, expected); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 1e8775731015..e536ae823232 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -134,7 +134,7 @@ impl ExecutionPlan for ArrowExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { Vec::new() } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 4bb8f28860ac..15ee0a1f7c22 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -111,7 +111,7 @@ impl ExecutionPlan for AvroExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { Vec::new() } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 679f6c010966..a1e43b20a2da 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -173,7 +173,7 @@ impl ExecutionPlan for CsvExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 5e8ba526594c..e97554a791bd 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -138,7 +138,7 @@ impl ExecutionPlan for NdJsonExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { Vec::new() } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 17cb6a66c705..efda992fcfcd 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -364,7 +364,7 @@ impl ExecutionPlan for ParquetExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d23982cfa952..cb0dfd079169 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -2465,10 +2465,10 @@ impl<'a> BadPlanVisitor<'a> { } } -impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> { +impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> { type Node = LogicalPlan; - fn f_down(&mut self, node: &Self::Node) -> Result { + fn f_down(&mut self, node: &'n Self::Node) -> Result { match node { LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => { plan_err!("DDL not supported: {}", ddl.name()) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 1a82dac4658c..05f05d95b8db 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -123,7 +123,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> return Some(child); } } - if let [ref childrens_child] = child.children().as_slice() { + if let [childrens_child] = child.children().as_slice() { child = Arc::clone(childrens_child); } else { break; diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 033cec53019d..9eb5aafd81a2 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1375,8 +1375,8 @@ pub(crate) mod tests { vec![false] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } // model that it requires the output ordering of its input diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index bc435626c6a9..24306647c686 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -567,7 +567,7 @@ fn remove_corresponding_sort_from_sub_plan( // Replace with variants that do not preserve order. if is_sort_preserving_merge(&node.plan) { node.children = node.children.swap_remove(0).children; - node.plan = node.plan.children().swap_remove(0); + node.plan = node.plan.children().swap_remove(0).clone(); } else if let Some(repartition) = node.plan.as_any().downcast_ref::() { diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index 950bb3c8eeb2..1274fbe50a5f 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -78,7 +78,7 @@ impl LimitedDistinctAggregation { let mut is_global_limit = false; if let Some(local_limit) = plan.as_any().downcast_ref::() { limit = local_limit.fetch(); - children = local_limit.children(); + children = local_limit.children().into_iter().cloned().collect(); } else if let Some(global_limit) = plan.as_any().downcast_ref::() { global_fetch = global_limit.fetch(); @@ -86,7 +86,7 @@ impl LimitedDistinctAggregation { global_skip = global_limit.skip(); // the aggregate must read at least fetch+skip number of rows limit = global_fetch.unwrap() + global_skip; - children = global_limit.children(); + children = global_limit.children().into_iter().cloned().collect(); is_global_limit = true } else { return None; diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 5bf86e88d646..67b38dba90ca 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -157,8 +157,8 @@ impl ExecutionPlan for OutputRequirementExec { vec![true] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn required_input_ordering(&self) -> Vec>> { @@ -273,7 +273,7 @@ fn require_top_ordering_helper( // When an operator requires an ordering, any `SortExec` below can not // be responsible for (i.e. the originator of) the global ordering. let (new_child, is_changed) = - require_top_ordering_helper(children.swap_remove(0))?; + require_top_ordering_helper(children.swap_remove(0).clone())?; Ok((plan.with_new_children(vec![new_child])?, is_changed)) } else { // Stop searching, there is no global ordering desired for the query. diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index a15b9d4fbc87..335632ab6efa 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -378,7 +378,7 @@ fn try_swapping_with_coalesce_partitions( return Ok(None); } // CoalescePartitionsExec always has a single child, so zero indexing is safe. - make_with_child(projection, &projection.input().children()[0]) + make_with_child(projection, projection.input().children()[0]) .map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _)) } @@ -526,7 +526,7 @@ fn try_pushdown_through_union( let new_children = union .children() .into_iter() - .map(|child| make_with_child(projection, &child)) + .map(|child| make_with_child(projection, child)) .collect::>>()?; Ok(Some(Arc::new(UnionExec::new(new_children)))) @@ -813,8 +813,8 @@ fn try_swapping_with_sort_merge_join( projection_as_columns, far_right_left_col_ind, far_left_right_col_ind, - &sm_join.children()[0], - &sm_join.children()[1], + sm_join.children()[0], + sm_join.children()[1], )?; Ok(Some(Arc::new(SortMergeJoinExec::try_new( diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index c527819e7746..83531da3ca8f 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -123,7 +123,7 @@ fn pushdown_requirement_to_children( if is_window(plan) { let required_input_ordering = plan.required_input_ordering(); let request_child = required_input_ordering[0].as_deref().unwrap_or(&[]); - let child_plan = plan.children().swap_remove(0); + let child_plan = plan.children().swap_remove(0).clone(); match determine_children_requirement(parent_required, request_child, child_plan) { RequirementsCompatibility::Satisfy => { let req = (!request_child.is_empty()).then(|| request_child.to_vec()); diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index 7c0519eda3b3..b754ee75ef3e 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -88,7 +88,7 @@ impl TopKAggregation { let sort = plan.as_any().downcast_ref::()?; let children = sort.children(); - let child = children.iter().exactly_one().ok()?; + let child = children.into_iter().exactly_one().ok()?; let order = sort.properties().output_ordering()?; let order = order.iter().exactly_one().ok()?; let limit = sort.fetch()?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index bc5818361b7d..5e2e546a86f6 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2870,7 +2870,7 @@ mod tests { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index b03aaabcad6b..f76e1bb60b53 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -404,7 +404,7 @@ impl ExecutionPlan for StatisticsExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 75ef364d01fd..7aec66825de3 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -293,7 +293,7 @@ impl ExecutionPlan for UnboundedExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index df1d2c6f0999..ed539d29bd26 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -186,7 +186,7 @@ impl TestParquetFile { /// /// Recursively searches for ParquetExec and returns the metrics /// on the first one it finds - pub fn parquet_metrics(plan: Arc) -> Option { + pub fn parquet_metrics(plan: &Arc) -> Option { if let Some(parquet) = plan.as_any().downcast_ref::() { return parquet.metrics(); } diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 31a823f3eaf9..e8ead01d2ee4 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -148,7 +148,7 @@ impl ExecutionPlan for CustomExecutionPlan { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -303,6 +303,6 @@ fn contains_place_holder_exec(plan: Arc) -> bool { } else if plan.children().len() != 1 { false } else { - contains_place_holder_exec(Arc::clone(&plan.children()[0])) + contains_place_holder_exec(Arc::clone(plan.children()[0])) } } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 4579fe806d6f..8c9cffcf08d1 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -101,7 +101,7 @@ impl ExecutionPlan for CustomPlan { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 85ac47dc97fc..c7be89533f1d 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -153,7 +153,7 @@ impl ExecutionPlan for StatisticsValidation { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 8df16e7944d2..1e4ef0ecb2c6 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -312,10 +312,10 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) { } let mut visitor = Visitor { expected_sort }; - impl TreeNodeVisitor for Visitor { + impl<'n> TreeNodeVisitor<'n> for Visitor { type Node = Arc; - fn f_down(&mut self, node: &Self::Node) -> Result { + fn f_down(&mut self, node: &'n Self::Node) -> Result { if let Some(exec) = node.as_any().downcast_ref::() { if self.expected_sort { assert!(matches!( diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index feb928a3a474..8c7624f07813 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -529,7 +529,7 @@ impl<'a> TestCase<'a> { // verify expected pushdown let metrics = - TestParquetFile::parquet_metrics(exec).expect("found parquet metrics"); + TestParquetFile::parquet_metrics(&exec).expect("found parquet metrics"); let pushdown_expected = if scan_options.pushdown_filters { self.pushdown_expected diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 54dcffe35f62..07622e48afaf 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -472,8 +472,8 @@ impl ExecutionPlan for TopKExec { vec![Distribution::SinglePartition] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index f3765fb184bb..707cff8ab5f1 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -58,12 +58,12 @@ impl<'a, 'b> IndentVisitor<'a, 'b> { } } -impl<'a, 'b> TreeNodeVisitor for IndentVisitor<'a, 'b> { +impl<'n, 'a, 'b> TreeNodeVisitor<'n> for IndentVisitor<'a, 'b> { type Node = LogicalPlan; fn f_down( &mut self, - plan: &LogicalPlan, + plan: &'n LogicalPlan, ) -> datafusion_common::Result { if self.indent > 0 { writeln!(self.f)?; @@ -84,7 +84,7 @@ impl<'a, 'b> TreeNodeVisitor for IndentVisitor<'a, 'b> { fn f_up( &mut self, - _plan: &LogicalPlan, + _plan: &'n LogicalPlan, ) -> datafusion_common::Result { self.indent -= 1; Ok(TreeNodeRecursion::Continue) @@ -180,12 +180,12 @@ impl<'a, 'b> GraphvizVisitor<'a, 'b> { } } -impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> { +impl<'n, 'a, 'b> TreeNodeVisitor<'n> for GraphvizVisitor<'a, 'b> { type Node = LogicalPlan; fn f_down( &mut self, - plan: &LogicalPlan, + plan: &'n LogicalPlan, ) -> datafusion_common::Result { let id = self.graphviz_builder.next_id(); @@ -663,12 +663,12 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { } } -impl<'a, 'b> TreeNodeVisitor for PgJsonVisitor<'a, 'b> { +impl<'n, 'a, 'b> TreeNodeVisitor<'n> for PgJsonVisitor<'a, 'b> { type Node = LogicalPlan; fn f_down( &mut self, - node: &LogicalPlan, + node: &'n LogicalPlan, ) -> datafusion_common::Result { let id = self.next_id; self.next_id += 1; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 97592c05abe2..d2d133c6cbc3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -3012,10 +3012,10 @@ digraph { strings: Vec, } - impl TreeNodeVisitor for OkVisitor { + impl<'n> TreeNodeVisitor<'n> for OkVisitor { type Node = LogicalPlan; - fn f_down(&mut self, plan: &LogicalPlan) -> Result { + fn f_down(&mut self, plan: &'n LogicalPlan) -> Result { let s = match plan { LogicalPlan::Projection { .. } => "pre_visit Projection", LogicalPlan::Filter { .. } => "pre_visit Filter", @@ -3029,7 +3029,7 @@ digraph { Ok(TreeNodeRecursion::Continue) } - fn f_up(&mut self, plan: &LogicalPlan) -> Result { + fn f_up(&mut self, plan: &'n LogicalPlan) -> Result { let s = match plan { LogicalPlan::Projection { .. } => "post_visit Projection", LogicalPlan::Filter { .. } => "post_visit Filter", @@ -3095,10 +3095,10 @@ digraph { return_false_from_post_in: OptionalCounter, } - impl TreeNodeVisitor for StoppingVisitor { + impl<'n> TreeNodeVisitor<'n> for StoppingVisitor { type Node = LogicalPlan; - fn f_down(&mut self, plan: &LogicalPlan) -> Result { + fn f_down(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_false_from_pre_in.dec() { return Ok(TreeNodeRecursion::Stop); } @@ -3107,7 +3107,7 @@ digraph { Ok(TreeNodeRecursion::Continue) } - fn f_up(&mut self, plan: &LogicalPlan) -> Result { + fn f_up(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_false_from_post_in.dec() { return Ok(TreeNodeRecursion::Stop); } @@ -3164,10 +3164,10 @@ digraph { return_error_from_post_in: OptionalCounter, } - impl TreeNodeVisitor for ErrorVisitor { + impl<'n> TreeNodeVisitor<'n> for ErrorVisitor { type Node = LogicalPlan; - fn f_down(&mut self, plan: &LogicalPlan) -> Result { + fn f_down(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_error_from_pre_in.dec() { return not_impl_err!("Error in pre_visit"); } @@ -3175,7 +3175,7 @@ digraph { self.inner.f_down(plan) } - fn f_up(&mut self, plan: &LogicalPlan) -> Result { + fn f_up(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_error_from_post_in.dec() { return not_impl_err!("Error in post_visit"); } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 215b2cb4d4cf..86c0cffd80a1 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -56,8 +56,8 @@ use datafusion_common::{ }; impl TreeNode for LogicalPlan { - fn apply_children Result>( - &self, + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( + &'n self, f: F, ) -> Result { self.inputs().into_iter().apply_until_stop(f) @@ -71,10 +71,10 @@ impl TreeNode for LogicalPlan { /// subqueries, for example such as are in [`Expr::Exists`]. /// /// [`Expr::Exists`]: crate::Expr::Exists - fn map_children(self, mut f: F) -> Result> - where - F: FnMut(Self) -> Result>, - { + fn map_children Result>>( + self, + mut f: F, + ) -> Result> { Ok(match self { LogicalPlan::Projection(Projection { expr, @@ -379,24 +379,18 @@ pub fn unwrap_arc(plan: Arc) -> LogicalPlan { } /// Applies `f` to rewrite a `Arc` without copying, if possible -fn rewrite_arc( +fn rewrite_arc Result>>( plan: Arc, mut f: F, -) -> Result>> -where - F: FnMut(LogicalPlan) -> Result>, -{ +) -> Result>> { f(unwrap_arc(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan))) } /// rewrite a `Vec` of `Arc` without copying, if possible -fn rewrite_arcs( +fn rewrite_arcs Result>>( input_plans: Vec>, mut f: F, -) -> Result>>> -where - F: FnMut(LogicalPlan) -> Result>, -{ +) -> Result>>> { input_plans .into_iter() .map_until_stop_and_collect(|plan| rewrite_arc(plan, &mut f)) @@ -407,13 +401,10 @@ where /// /// Should be removed when we have an API for in place modifications of the /// extension to avoid these copies -fn rewrite_extension_inputs( +fn rewrite_extension_inputs Result>>( extension: Extension, f: F, -) -> Result> -where - F: FnMut(LogicalPlan) -> Result>, -{ +) -> Result> { let Extension { node } = extension; node.inputs() @@ -744,7 +735,7 @@ impl LogicalPlan { /// Visits a plan similarly to [`Self::visit`], including subqueries that /// may appear in expressions such as `IN (SELECT ...)`. - pub fn visit_with_subqueries>( + pub fn visit_with_subqueries TreeNodeVisitor<'n, Node = Self>>( &self, visitor: &mut V, ) -> Result { diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index c5f1694c1138..e57d57188743 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -30,8 +30,8 @@ use datafusion_common::tree_node::{ use datafusion_common::{map_until_stop_and_collect, Result}; impl TreeNode for Expr { - fn apply_children Result>( - &self, + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( + &'n self, f: F, ) -> Result { let children = match self { @@ -368,42 +368,36 @@ impl TreeNode for Expr { } } -fn transform_box(be: Box, f: &mut F) -> Result>> -where - F: FnMut(Expr) -> Result>, -{ +fn transform_box Result>>( + be: Box, + f: &mut F, +) -> Result>> { Ok(f(*be)?.update_data(Box::new)) } -fn transform_option_box( +fn transform_option_box Result>>( obe: Option>, f: &mut F, -) -> Result>>> -where - F: FnMut(Expr) -> Result>, -{ +) -> Result>>> { obe.map_or(Ok(Transformed::no(None)), |be| { Ok(transform_box(be, f)?.update_data(Some)) }) } /// &mut transform a Option<`Vec` of `Expr`s> -pub fn transform_option_vec( +pub fn transform_option_vec Result>>( ove: Option>, f: &mut F, -) -> Result>>> -where - F: FnMut(Expr) -> Result>, -{ +) -> Result>>> { ove.map_or(Ok(Transformed::no(None)), |ve| { Ok(transform_vec(ve, f)?.update_data(Some)) }) } /// &mut transform a `Vec` of `Expr`s -fn transform_vec(ve: Vec, f: &mut F) -> Result>> -where - F: FnMut(Expr) -> Result>, -{ +fn transform_vec Result>>( + ve: Vec, + f: &mut F, +) -> Result>> { ve.into_iter().map_until_stop_and_collect(f) } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 3532a57f6206..174440dac316 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -682,10 +682,10 @@ impl ExprIdentifierVisitor<'_> { } } -impl TreeNodeVisitor for ExprIdentifierVisitor<'_> { +impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_> { type Node = Expr; - fn f_down(&mut self, expr: &Expr) -> Result { + fn f_down(&mut self, expr: &'n Expr) -> Result { // related to https://github.com/apache/arrow-datafusion/issues/8814 // If the expr contain volatile expression or is a short-circuit expression, skip it. // TODO: propagate is_volatile state bottom-up + consider non-volatile sub-expressions for CSE @@ -704,7 +704,7 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> { Ok(TreeNodeRecursion::Continue) } - fn f_up(&mut self, expr: &Expr) -> Result { + fn f_up(&mut self, expr: &'n Expr) -> Result { let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else { return Ok(TreeNodeRecursion::Continue); }; diff --git a/datafusion/physical-expr-common/src/expressions/column.rs b/datafusion/physical-expr-common/src/expressions/column.rs index 2cd52d6332fb..956c33d59b20 100644 --- a/datafusion/physical-expr-common/src/expressions/column.rs +++ b/datafusion/physical-expr-common/src/expressions/column.rs @@ -92,7 +92,7 @@ impl PhysicalExpr for Column { Ok(ColumnarValue::Array(batch.column(self.index).clone())) } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 00b3dd725dc2..fd8d07dc966d 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -65,7 +65,7 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { } /// Get a list of child PhysicalExpr that provide the input for this expr. - fn children(&self) -> Vec>; + fn children(&self) -> Vec<&Arc>; /// Returns a new PhysicalExpr where all children were replaced by new exprs. fn with_new_children( diff --git a/datafusion/physical-expr-common/src/tree_node.rs b/datafusion/physical-expr-common/src/tree_node.rs index 5de6457b8e5e..d9892ce55509 100644 --- a/datafusion/physical-expr-common/src/tree_node.rs +++ b/datafusion/physical-expr-common/src/tree_node.rs @@ -26,7 +26,7 @@ use datafusion_common::tree_node::{ConcreteTreeNode, DynTreeNode}; use datafusion_common::Result; impl DynTreeNode for dyn PhysicalExpr { - fn arc_children(&self) -> Vec> { + fn arc_children(&self) -> Vec<&Arc> { self.children() } @@ -70,7 +70,12 @@ impl ExprContext { impl ExprContext { pub fn new_default(plan: Arc) -> Self { - let children = plan.children().into_iter().map(Self::new_default).collect(); + let children = plan + .children() + .into_iter() + .cloned() + .map(Self::new_default) + .collect(); Self::new(plan, Default::default(), children) } } diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 601d344e4aac..487aba945aa5 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -35,7 +35,12 @@ impl ExprPropertiesNode { /// given physical expression. This node initializes with default properties /// and recursively applies this to all child expressions. pub fn new_unknown(expr: Arc) -> Self { - let children = expr.children().into_iter().map(Self::new_unknown).collect(); + let children = expr + .children() + .into_iter() + .cloned() + .map(Self::new_unknown) + .collect(); Self { expr, data: ExprProperties::new_unknown(), diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 9ea456b0f879..b4d12e963611 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -374,7 +374,7 @@ impl EquivalenceGroup { } children .into_iter() - .map(|child| self.project_expr(mapping, &child)) + .map(|child| self.project_expr(mapping, child)) .collect::>>() .map(|children| expr.clone().with_new_children(children).unwrap()) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 08f7523f92f0..98df0cba9f3e 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -312,8 +312,8 @@ impl PhysicalExpr for BinaryExpr { .map(ColumnarValue::Array) } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 7b10df9ac146..c56229e07a63 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -19,7 +19,7 @@ use std::borrow::Cow; use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use crate::expressions::{try_cast, NoOp}; +use crate::expressions::try_cast; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; @@ -314,20 +314,18 @@ impl PhysicalExpr for CaseExpr { } } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { let mut children = vec![]; - match &self.expr { - Some(expr) => children.push(expr.clone()), - None => children.push(Arc::new(NoOp::new())), + if let Some(expr) = &self.expr { + children.push(expr) } self.when_then_expr.iter().for_each(|(cond, value)| { - children.push(cond.clone()); - children.push(value.clone()); + children.push(cond); + children.push(value); }); - match &self.else_expr { - Some(expr) => children.push(expr.clone()), - None => children.push(Arc::new(NoOp::new())), + if let Some(else_expr) = &self.else_expr { + children.push(else_expr) } children } @@ -340,29 +338,27 @@ impl PhysicalExpr for CaseExpr { if children.len() != self.children().len() { internal_err!("CaseExpr: Wrong number of children") } else { - assert_eq!(children.len() % 2, 0); - let expr = match children[0].clone().as_any().downcast_ref::() { - Some(_) => None, - _ => Some(children[0].clone()), - }; - let else_expr = match children[children.len() - 1] - .clone() - .as_any() - .downcast_ref::() - { - Some(_) => None, - _ => Some(children[children.len() - 1].clone()), - }; - - let branches = children[1..children.len() - 1].to_vec(); - let mut when_then_expr: Vec = vec![]; - for (prev, next) in branches.into_iter().tuples() { - when_then_expr.push((prev, next)); - } + let (expr, when_then_expr, else_expr) = + match (self.expr().is_some(), self.else_expr().is_some()) { + (true, true) => ( + Some(&children[0]), + &children[1..children.len() - 1], + Some(&children[children.len() - 1]), + ), + (true, false) => { + (Some(&children[0]), &children[1..children.len()], None) + } + (false, true) => ( + None, + &children[0..children.len() - 1], + Some(&children[children.len() - 1]), + ), + (false, false) => (None, &children[0..children.len()], None), + }; Ok(Arc::new(CaseExpr::try_new( - expr, - when_then_expr, - else_expr, + expr.cloned(), + when_then_expr.iter().cloned().tuples().collect(), + else_expr.cloned(), )?)) } } diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 79a44ac30cfc..4f940a792bb9 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -123,8 +123,8 @@ impl PhysicalExpr for CastExpr { value.cast_to(&self.cast_type, Some(&self.cast_options)) } - fn children(&self) -> Vec> { - vec![self.expr.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.expr] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 634a56d1d683..f6525c7c0462 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -77,7 +77,7 @@ impl PhysicalExpr for UnKnownColumn { internal_err!("UnKnownColumn::evaluate() should not be called") } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 9ae4c2784ccf..dd61fc802441 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -372,10 +372,10 @@ impl PhysicalExpr for InListExpr { Ok(ColumnarValue::Array(Arc::new(r))) } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { let mut children = vec![]; - children.push(self.expr.clone()); - children.extend(self.list.clone()); + children.push(&self.expr); + children.extend(&self.list); children } diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index c5c673ec28ea..1918f0891fff 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -82,8 +82,8 @@ impl PhysicalExpr for IsNotNullExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index b0f70b6f0d7a..3430efcd7635 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -83,8 +83,8 @@ impl PhysicalExpr for IsNullExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index 6e0beeb0beea..eec347db8ed8 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -112,8 +112,8 @@ impl PhysicalExpr for LikeExpr { } } - fn children(&self) -> Vec> { - vec![self.expr.clone(), self.pattern.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.expr, &self.pattern] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 371028959ab8..fcaf229af0a8 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -75,7 +75,7 @@ impl PhysicalExpr for Literal { Ok(ColumnarValue::Scalar(self.value.clone())) } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 62f865bd9b32..aed2675e0447 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -89,8 +89,8 @@ impl PhysicalExpr for NegativeExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr/src/expressions/no_op.rs index b558ccab154d..9148cb7c1c1d 100644 --- a/datafusion/physical-expr/src/expressions/no_op.rs +++ b/datafusion/physical-expr/src/expressions/no_op.rs @@ -68,7 +68,7 @@ impl PhysicalExpr for NoOp { internal_err!("NoOp::evaluate() should not be called") } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 1428be71cc21..9aaab0658d39 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -89,8 +89,8 @@ impl PhysicalExpr for NotExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index d25a904f7d6a..d31306e239bd 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -97,8 +97,8 @@ impl PhysicalExpr for TryCastExpr { } } - fn children(&self) -> Vec> { - vec![self.expr.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.expr] } fn with_new_children( diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 14631caec55e..10e29b41031d 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -143,8 +143,8 @@ impl PhysicalExpr for ScalarFunctionExpr { Ok(output) } - fn children(&self) -> Vec> { - self.args.clone() + fn children(&self) -> Vec<&Arc> { + self.args.iter().collect() } fn with_new_children( diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index cf31c2990b7d..b0e2af82e6e2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -696,8 +696,8 @@ impl ExecutionPlan for AggregateExec { vec![self.required_input_ordering.clone()] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( @@ -1641,7 +1641,7 @@ mod tests { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index c420581c4323..5b859804163b 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -124,8 +124,8 @@ impl ExecutionPlan for AnalyzeExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } /// AnalyzeExec is handled specially so this value is ignored diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index bc7c4a3d0673..804fabff71ac 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -117,8 +117,8 @@ impl ExecutionPlan for CoalesceBatchesExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 1c725ce31f14..ce67cba2cd0e 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -102,8 +102,8 @@ impl ExecutionPlan for CoalescePartitionsExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index ca93ce5e7b83..ed85c80251d6 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -501,7 +501,7 @@ mod tests { unimplemented!() } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 33bf1668b3c9..11af0624db15 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -114,7 +114,7 @@ impl ExecutionPlan for EmptyExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 649946993229..4b2edbf2045d 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -111,7 +111,7 @@ impl ExecutionPlan for ExplainExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6729e3b9e603..6153dbacfbff 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -242,8 +242,8 @@ impl ExecutionPlan for FilterExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 259db644ae0a..fa30141a1934 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -248,8 +248,8 @@ impl ExecutionPlan for DataSinkExec { vec![true] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 9d1de3715f54..92443d06856a 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -207,8 +207,8 @@ impl ExecutionPlan for CrossJoinExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d3abedbe3806..e669517be400 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -683,8 +683,8 @@ impl ExecutionPlan for HashJoinExec { Self::maintains_input_order(self.join_type) } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 47e262c3c8f6..18518600ef2f 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -292,8 +292,8 @@ impl ExecutionPlan for NestedLoopJoinExec { ] } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 1cc7bf4700d1..ec83fe3f2af8 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -298,8 +298,8 @@ impl ExecutionPlan for SortMergeJoinExec { Self::maintains_input_order(self.join_type) } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 9d48c2a7d408..0d902af9c6cc 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -427,8 +427,8 @@ impl ExecutionPlan for SymmetricHashJoinExec { ] } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 8d8a3e71031e..739bff2cfa23 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -215,7 +215,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// The returned list will be empty for leaf nodes such as scans, will contain /// a single value for unary nodes, or two values for binary nodes (such as /// joins). - fn children(&self) -> Vec>; + fn children(&self) -> Vec<&Arc>; /// Returns a new `ExecutionPlan` where all existing children were replaced /// by the `children`, in order @@ -841,7 +841,7 @@ mod tests { unimplemented!() } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -900,7 +900,7 @@ mod tests { unimplemented!() } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 4f8ff4c5606e..4c6d1b3674d5 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -124,8 +124,8 @@ impl ExecutionPlan for GlobalLimitExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn required_input_distribution(&self) -> Vec { @@ -334,8 +334,8 @@ impl ExecutionPlan for LocalLimitExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 883cdb540a9e..39ae8d551f4b 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -116,7 +116,7 @@ impl ExecutionPlan for MemoryExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index c94c2b0607d7..3b10cc0ac435 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -132,7 +132,7 @@ impl ExecutionPlan for PlaceholderRowExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f72815c01a9e..8341549340dd 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -193,8 +193,8 @@ impl ExecutionPlan for ProjectionExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index ed897d78f0c8..9a0b66caba31 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -120,8 +120,8 @@ impl ExecutionPlan for RecursiveQueryExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.static_term.clone(), self.recursive_term.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.static_term, &self.recursive_term] } // TODO: control these hints and see whether we can @@ -358,7 +358,9 @@ fn reset_plan_states(plan: Arc) -> Result() { Ok(Transformed::no(plan)) } else { - let new_plan = plan.clone().with_new_children(plan.children())?; + let new_plan = plan + .clone() + .with_new_children(plan.children().into_iter().cloned().collect())?; Ok(Transformed::yes(new_plan)) } }) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index e31fdc6ee2c2..65f7d5070a5d 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -519,8 +519,8 @@ impl ExecutionPlan for RepartitionExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index d24bc5a670e5..ad5d485cffc9 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -250,8 +250,8 @@ impl ExecutionPlan for PartialSortExec { vec![false] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c684748bb29a..2a4862534590 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -903,8 +903,8 @@ impl ExecutionPlan for SortExec { } } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 88c6c312b94b..8a349bd22abf 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -173,8 +173,8 @@ impl ExecutionPlan for SortPreservingMergeExec { vec![true] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index d174e3b8b6ca..ff57adde4e2e 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -217,7 +217,7 @@ impl ExecutionPlan for StreamingTableExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index b4f1eac0a655..d5ad9292b49d 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -185,7 +185,7 @@ impl ExecutionPlan for MockExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -343,7 +343,7 @@ impl ExecutionPlan for BarrierExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { unimplemented!() } @@ -452,7 +452,7 @@ impl ExecutionPlan for ErrorExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { unimplemented!() } @@ -535,7 +535,7 @@ impl ExecutionPlan for StatisticsExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -627,7 +627,7 @@ impl ExecutionPlan for BlockingExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } @@ -768,7 +768,7 @@ impl ExecutionPlan for PanicExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index d3b51062c25c..1570778be69b 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -26,7 +26,7 @@ use datafusion_common::tree_node::{ConcreteTreeNode, DynTreeNode}; use datafusion_common::Result; impl DynTreeNode for dyn ExecutionPlan { - fn arc_children(&self) -> Vec> { + fn arc_children(&self) -> Vec<&Arc> { self.children() } @@ -71,7 +71,12 @@ impl PlanContext { impl PlanContext { pub fn new_default(plan: Arc) -> Self { - let children = plan.children().into_iter().map(Self::new_default).collect(); + let children = plan + .children() + .into_iter() + .cloned() + .map(Self::new_default) + .collect(); Self::new(plan, Default::default(), children) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 1354644788ea..dc7d270bae25 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -196,8 +196,8 @@ impl ExecutionPlan for UnionExec { &self.cache } - fn children(&self) -> Vec> { - self.inputs.clone() + fn children(&self) -> Vec<&Arc> { + self.inputs.iter().collect() } fn maintains_input_order(&self) -> Vec { @@ -387,8 +387,8 @@ impl ExecutionPlan for InterleaveExec { &self.cache } - fn children(&self) -> Vec> { - self.inputs.clone() + fn children(&self) -> Vec<&Arc> { + self.inputs.iter().collect() } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index a8151fe0220b..0685903cb76c 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -138,8 +138,8 @@ impl ExecutionPlan for UnnestExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 2aa893fd2916..4d385812d4a8 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -167,7 +167,7 @@ impl ExecutionPlan for ValuesExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index cff91283eb6e..48f1bee59bbf 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -250,8 +250,8 @@ impl ExecutionPlan for BoundedWindowAggExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn required_input_ordering(&self) -> Vec>> { diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 1507902c22ea..eb01da2ec094 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -184,8 +184,8 @@ impl ExecutionPlan for WindowAggExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index b3c9043d4fdc..003957947fec 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -169,7 +169,7 @@ impl ExecutionPlan for WorkTableExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0515ed5006aa..4c581f99b32a 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1972,6 +1972,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let inputs: Vec = plan_clone .children() .into_iter() + .cloned() .map(|i| { protobuf::PhysicalPlanNode::try_from_physical_plan( i,