Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Dec 29, 2023
1 parent 2da06d5 commit f725553
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 109 deletions.
8 changes: 4 additions & 4 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::Result;
/// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
pub trait TreeNode: Sized {
/// Returns all children of the TreeNode
fn children_nodes(&self) -> Vec<&Self>;
fn children_nodes(&self) -> Vec<Self>;

/// Use preorder to iterate the node on the tree so that we can
/// stop fast for some cases.
Expand Down Expand Up @@ -217,7 +217,7 @@ pub trait TreeNode: Sized {
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_nodes() {
match op(child)? {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
Expand Down Expand Up @@ -355,8 +355,8 @@ pub trait DynTreeNode {
/// Blanket implementation for Arc for any tye that implements
/// [`DynTreeNode`] (such as [`Arc<dyn PhysicalExpr>`])
impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
fn children_nodes(&self) -> Vec<&Arc<T>> {
unimplemented!("Call arc_children instead")
fn children_nodes(&self) -> Vec<Arc<T>> {
self.arc_children()
}

fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,8 +1409,8 @@ impl DistributionContext {
}

impl TreeNode for DistributionContext {
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -1473,8 +1473,8 @@ impl PlanWithKeyRequirements {
}

impl TreeNode for PlanWithKeyRequirements {
fn children_nodes(&self) -> Vec<&Self> {
self.children.iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
25 changes: 5 additions & 20 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::physical_plan::{
with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode,
};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -145,23 +145,8 @@ impl PlanWithCorrespondingSort {
}

impl TreeNode for PlanWithCorrespondingSort {
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
}

fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_nodes() {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -241,8 +226,8 @@ impl PlanWithCorrespondingCoalescePartitions {
}

impl TreeNode for PlanWithCorrespondingCoalescePartitions {
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ impl PipelineStatePropagator {
}

impl TreeNode for PipelineStatePropagator {
fn children_nodes(&self) -> Vec<&Self> {
self.children.iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ impl OrderPreservationContext {
}

impl TreeNode for OrderPreservationContext {
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl SortPushDown {
}

impl TreeNode for SortPushDown {
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
114 changes: 46 additions & 68 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use crate::expr::{
};
use crate::{Expr, GetFieldAccess};

use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::tree_node::TreeNode;
use datafusion_common::{internal_err, DataFusionError, Result};

impl TreeNode for Expr {
fn children_nodes(&self) -> Vec<&Self> {
fn children_nodes(&self) -> Vec<Self> {
match self {
Expr::Alias(Alias{expr,..})
Expr::Alias(Alias { expr, .. })
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
Expand All @@ -44,28 +44,26 @@ impl TreeNode for Expr {
| Expr::Cast(Cast { expr, .. })
| Expr::TryCast(TryCast { expr, .. })
| Expr::Sort(Sort { expr, .. })
| Expr::InSubquery(InSubquery{ expr, .. }) => vec![expr.as_ref()],
| Expr::InSubquery(InSubquery { expr, .. }) => vec![expr.as_ref().clone()],
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let expr = expr.as_ref();
let expr = expr.as_ref().clone();
match field {
GetFieldAccess::ListIndex {key} => {
vec![key.as_ref(), expr]
},
GetFieldAccess::ListRange {start, stop} => {
vec![start.as_ref(), stop.as_ref(), expr]
GetFieldAccess::ListIndex { key } => {
vec![key.as_ref().clone(), expr]
}
GetFieldAccess::NamedStructField {name: _name} => {
GetFieldAccess::ListRange { start, stop } => {
vec![start.as_ref().clone(), stop.as_ref().clone(), expr]
}
GetFieldAccess::NamedStructField { name: _name } => {
vec![expr]
}
}
}
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
args.iter().collect()
}
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.clone(),
Expr::ScalarFunction(ScalarFunction { args, .. }) => args.clone(),
Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
lists_of_exprs.iter().map(|a| a.iter().collect::<Vec<_>>()).flatten().collect()
lists_of_exprs.clone().into_iter().flatten().collect()
}
Expr::Column(_)
// Treat OuterReferenceColumn as a leaf expression
Expand All @@ -74,93 +72,73 @@ impl TreeNode for Expr {
| Expr::Literal(_)
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard {..}
| Expr::Placeholder (_) => vec![],
| Expr::Wildcard { .. }
| Expr::Placeholder(_) => vec![],
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
vec![left.as_ref(), right.as_ref()]
vec![left.as_ref().clone(), right.as_ref().clone()]
}
Expr::Like(Like { expr, pattern, .. })
| Expr::SimilarTo(Like { expr, pattern, .. }) => {
vec![expr.as_ref(), pattern.as_ref()]
vec![expr.as_ref().clone(), pattern.as_ref().clone()]
}
Expr::Between(Between {
expr, low, high, ..
}) => vec![
expr.as_ref(),
low.as_ref(),
high.as_ref(),
expr, low, high, ..
}) => vec![
expr.as_ref().clone(),
low.as_ref().clone(),
high.as_ref().clone(),
],
Expr::Case(case) => {
let mut expr_vec = vec![];
if let Some(expr) = case.expr.as_ref() {
expr_vec.push(expr.as_ref());
expr_vec.push(expr.as_ref().clone());
};
for (when, then) in case.when_then_expr.iter() {
expr_vec.push(when.as_ref());
expr_vec.push(then.as_ref());
expr_vec.push(when.as_ref().clone());
expr_vec.push(then.as_ref().clone());
}
if let Some(else_expr) = case.else_expr.as_ref() {
expr_vec.push(else_expr.as_ref());
expr_vec.push(else_expr.as_ref().clone());
}
expr_vec
}
Expr::AggregateFunction(AggregateFunction { args, filter, order_by, .. })
=> {
let mut expr_vec: Vec<&Expr> = args.iter().collect();
Expr::AggregateFunction(AggregateFunction {
args,
filter,
order_by,
..
}) => {
let mut expr_vec = args.clone();

if let Some(f) = filter {
expr_vec.push(f.as_ref());
expr_vec.push(f.as_ref().clone());
}
if let Some(o) = order_by {
for x in o {
expr_vec.push(x);
}
expr_vec.extend(o.clone());
}

expr_vec
}
Expr::WindowFunction(WindowFunction {
args,
partition_by,
order_by,
..
}) => {
let mut expr_vec: Vec<&Expr> = args.iter().collect();
for x in partition_by {
expr_vec.push(x);
}
for x in order_by {
expr_vec.push(x);
}
args,
partition_by,
order_by,
..
}) => {
let mut expr_vec = args.clone();
expr_vec.extend(partition_by.clone());
expr_vec.extend(order_by.clone());
expr_vec
}
Expr::InList(InList { expr, list, .. }) => {
let mut expr_vec = vec![];
expr_vec.push(expr.as_ref());
for x in list {
expr_vec.push(x);
}
expr_vec.push(expr.as_ref().clone());
expr_vec.extend(list.clone());
expr_vec
}
}
}

fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.children_nodes();
for child in children.into_iter() {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/tree_node/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use datafusion_common::tree_node::{TreeNodeVisitor, VisitRecursion};
use datafusion_common::{tree_node::TreeNode, Result};

impl TreeNode for LogicalPlan {
fn children_nodes(&self) -> Vec<&Self> {
self.inputs()
fn children_nodes(&self) -> Vec<Self> {
self.inputs().into_iter().map(|p| p.clone()).collect()
}

fn apply<F>(&self, op: &mut F) -> Result<VisitRecursion>
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/sort_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Neg for SortProperties {
/// It encapsulates the orderings (`state`) associated with the expression (`expr`), and
/// orderings of the children expressions (`children_states`). The [`ExprOrdering`] of a parent
/// expression is determined based on the [`ExprOrdering`] states of its children expressions.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ExprOrdering {
pub expr: Arc<dyn PhysicalExpr>,
pub state: SortProperties,
Expand All @@ -173,8 +173,8 @@ impl ExprOrdering {
}

impl TreeNode for ExprOrdering {
fn children_nodes(&self) -> Vec<&Self> {
self.children.iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl<T> ExprTreeNode<T> {
}

impl<T: Clone> TreeNode for ExprTreeNode<T> {
fn children_nodes(&self) -> Vec<&Self> {
self.children().iter().collect()
fn children_nodes(&self) -> Vec<Self> {
self.children().iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down

0 comments on commit f725553

Please sign in to comment.