Skip to content

Commit

Permalink
Deprecate TreeNode::transform_down_mut() and `TreeNode::transform_u…
Browse files Browse the repository at this point in the history
…p_mut()` methods
  • Loading branch information
peter-toth committed Apr 18, 2024
1 parent 88c98e1 commit b2f6309
Show file tree
Hide file tree
Showing 41 changed files with 108 additions and 129 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
impl ScalarFunctionWrapper {
// replaces placeholders such as $1 with actual arguments (args[0]
fn replacement(expr: &Expr, args: &[Expr]) -> Result<Expr> {
let result = expr.clone().transform(&|e| {
let result = expr.clone().transform(&mut |e| {
let r = match e {
Expr::Placeholder(placeholder) => {
let placeholder_position =
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl AnalyzerRule for MyAnalyzerRule {

impl MyAnalyzerRule {
fn analyze_plan(plan: LogicalPlan) -> Result<LogicalPlan> {
plan.transform(&|plan| {
plan.transform(&mut |plan| {
Ok(match plan {
LogicalPlan::Filter(filter) => {
let predicate = Self::analyze_expr(filter.predicate.clone())?;
Expand All @@ -107,7 +107,7 @@ impl MyAnalyzerRule {
}

fn analyze_expr(expr: Expr) -> Result<Expr> {
expr.transform(&|expr| {
expr.transform(&mut |expr| {
// closure is invoked for all sub expressions
Ok(match expr {
Expr::Literal(ScalarValue::Int64(i)) => {
Expand Down Expand Up @@ -163,7 +163,7 @@ impl OptimizerRule for MyOptimizerRule {

/// use rewrite_expr to modify the expression tree.
fn my_rewrite(expr: Expr) -> Result<Expr> {
expr.transform(&|expr| {
expr.transform(&mut |expr| {
// closure is invoked for all sub expressions
Ok(match expr {
Expr::Between(Between {
Expand Down
28 changes: 14 additions & 14 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,40 +145,41 @@ pub trait TreeNode: Sized {
/// Convenience utility for writing optimizer rules: Recursively apply the
/// given function `f` to the tree in a bottom-up (post-order) fashion. When
/// `f` does not apply to a given node, it is left unchanged.
fn transform<F: Fn(Self) -> Result<Transformed<Self>>>(
fn transform<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: &F,
f: &mut F,
) -> Result<Transformed<Self>> {
self.transform_up(f)
}

/// Convenience utility for writing optimizer rules: Recursively apply the
/// given function `f` to a node and then to its children (pre-order traversal).
/// When `f` does not apply to a given node, it is left unchanged.
fn transform_down<F: Fn(Self) -> Result<Transformed<Self>>>(
fn transform_down<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: &F,
f: &mut F,
) -> Result<Transformed<Self>> {
handle_transform_recursion_down!(f(self), |c| c.transform_down(f))
}

/// Convenience utility for writing optimizer rules: Recursively apply the
/// given mutable function `f` to a node and then to its children (pre-order
/// traversal). When `f` does not apply to a given node, it is left unchanged.
#[deprecated(since = "38.0.0", note = "Use `transform_down` instead")]
fn transform_down_mut<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: &mut F,
) -> Result<Transformed<Self>> {
handle_transform_recursion_down!(f(self), |c| c.transform_down_mut(f))
self.transform_down(f)
}

/// Convenience utility for writing optimizer rules: Recursively apply the
/// given function `f` to all children of a node, and then to the node itself
/// (post-order traversal). When `f` does not apply to a given node, it is
/// left unchanged.
fn transform_up<F: Fn(Self) -> Result<Transformed<Self>>>(
fn transform_up<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: &F,
f: &mut F,
) -> Result<Transformed<Self>> {
handle_transform_recursion_up!(self, |c| c.transform_up(f), f)
}
Expand All @@ -187,11 +188,12 @@ pub trait TreeNode: Sized {
/// given mutable function `f` to all children of a node, and then to the
/// node itself (post-order traversal). When `f` does not apply to a given
/// node, it is left unchanged.
#[deprecated(since = "38.0.0", note = "Use `transform_up` instead")]
fn transform_up_mut<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: &mut F,
) -> Result<Transformed<Self>> {
handle_transform_recursion_up!(self, |c| c.transform_up_mut(f), f)
self.transform_up(f)
}

/// Transforms the tree using `f_down` while traversing the tree top-down
Expand All @@ -200,8 +202,8 @@ pub trait TreeNode: Sized {
///
/// Use this method if you want to start the `f_up` process right where `f_down` jumps.
/// This can make the whole process faster by reducing the number of `f_up` steps.
/// If you don't need this, it's just like using `transform_down_mut` followed by
/// `transform_up_mut` on the same tree.
/// If you don't need this, it's just like using `transform_down` followed by
/// `transform_up` on the same tree.
///
/// Consider the following tree structure:
/// ```text
Expand Down Expand Up @@ -439,9 +441,7 @@ impl TreeNodeRecursion {
/// This struct is used by tree transformation APIs such as
/// - [`TreeNode::rewrite`],
/// - [`TreeNode::transform_down`],
/// - [`TreeNode::transform_down_mut`],
/// - [`TreeNode::transform_up`],
/// - [`TreeNode::transform_up_mut`],
/// - [`TreeNode::transform_down_up`]
///
/// to control the transformation and return the transformed result.
Expand Down Expand Up @@ -1466,7 +1466,7 @@ mod tests {
#[test]
fn $NAME() -> Result<()> {
let tree = test_tree();
assert_eq!(tree.transform_down_mut(&mut $F)?, $EXPECTED_TREE);
assert_eq!(tree.transform_down(&mut $F)?, $EXPECTED_TREE);

Ok(())
}
Expand All @@ -1478,7 +1478,7 @@ mod tests {
#[test]
fn $NAME() -> Result<()> {
let tree = test_tree();
assert_eq!(tree.transform_up_mut(&mut $F)?, $EXPECTED_TREE);
assert_eq!(tree.transform_up(&mut $F)?, $EXPECTED_TREE);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
}

let target_batch_size = config.execution.batch_size;
plan.transform_up(&|plan| {
plan.transform_up(&mut |plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(&|plan| {
plan.transform_down(&mut |plan| {
let transformed =
plan.as_any()
.downcast_ref::<AggregateExec>()
Expand Down Expand Up @@ -179,7 +179,7 @@ fn normalize_group_exprs(group_exprs: GroupExprsRef) -> GroupExprs {
fn discard_column_index(group_expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
group_expr
.clone()
.transform(&|expr| {
.transform(&mut |expr| {
let normalized_form: Option<Arc<dyn PhysicalExpr>> =
match expr.as_any().downcast_ref::<Column>() {
Some(column) => Some(Arc::new(Column::new(column.name(), 0))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&get_common_requirement_of_aggregate_input)
plan.transform_up(&mut get_common_requirement_of_aggregate_input)
.data()
}

Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ impl PhysicalOptimizerRule for EnforceDistribution {
// Run a top-down process to adjust input key ordering recursively
let plan_requirements = PlanWithKeyRequirements::new_default(plan);
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering)
.transform_down(&mut adjust_input_keys_ordering)
.data()?;
adjusted.plan
} else {
// Run a bottom-up process
plan.transform_up(&|plan| {
plan.transform_up(&mut |plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})
.data()?
Expand All @@ -211,7 +211,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
let distribution_context = DistributionContext::new_default(adjusted);
// Distribution enforcement needs to be applied bottom-up.
let distribution_context = distribution_context
.transform_up(&|distribution_context| {
.transform_up(&mut |distribution_context| {
ensure_distribution(distribution_context, config)
})
.data()?;
Expand Down Expand Up @@ -1768,22 +1768,22 @@ pub(crate) mod tests {
let plan_requirements =
PlanWithKeyRequirements::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering)
.transform_down(&mut adjust_input_keys_ordering)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
adjusted.plan
} else {
// Run reorder_join_keys_to_inputs rule
$PLAN.clone().transform_up(&|plan| {
$PLAN.clone().transform_up(&mut |plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})
.data()?
};

// Then run ensure_distribution rule
DistributionContext::new_default(adjusted)
.transform_up(&|distribution_context| {
.transform_up(&mut |distribution_context| {
ensure_distribution(distribution_context, &config)
})
.data()
Expand Down
20 changes: 11 additions & 9 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
// Execute a bottom-up traversal to enforce sorting requirements,
// remove unnecessary sorts, and optimize sort-sensitive operators:
let adjusted = plan_requirements.transform_up(&ensure_sorting)?.data;
let adjusted = plan_requirements.transform_up(&mut ensure_sorting)?.data;
let new_plan = if config.optimizer.repartition_sorts {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
.transform_up(&parallelize_sorts)
.transform_up(&mut parallelize_sorts)
.data()?;
parallel.plan
} else {
Expand All @@ -174,7 +174,7 @@ impl PhysicalOptimizerRule for EnforceSorting {

let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
let updated_plan = plan_with_pipeline_fixer
.transform_up(&|plan_with_pipeline_fixer| {
.transform_up(&mut |plan_with_pipeline_fixer| {
replace_with_order_preserving_variants(
plan_with_pipeline_fixer,
false,
Expand All @@ -188,11 +188,13 @@ impl PhysicalOptimizerRule for EnforceSorting {
// missed by the bottom-up traversal:
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?.data;
let adjusted = sort_pushdown.transform_down(&mut pushdown_sorts)?.data;

adjusted
.plan
.transform_up(&|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
.transform_up(&mut |plan| {
Ok(Transformed::yes(replace_with_partial_sort(plan)?))
})
.data()
}

Expand Down Expand Up @@ -681,7 +683,7 @@ mod tests {
{
let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_up(&ensure_sorting)
.transform_up(&mut ensure_sorting)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
Expand All @@ -690,7 +692,7 @@ mod tests {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
.transform_up(&parallelize_sorts)
.transform_up(&mut parallelize_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
Expand All @@ -701,7 +703,7 @@ mod tests {

let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
let updated_plan = plan_with_pipeline_fixer
.transform_up(&|plan_with_pipeline_fixer| {
.transform_up(&mut |plan_with_pipeline_fixer| {
replace_with_order_preserving_variants(
plan_with_pipeline_fixer,
false,
Expand All @@ -716,7 +718,7 @@ mod tests {
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
sort_pushdown
.transform_down(&pushdown_sorts)
.transform_down(&mut pushdown_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl PhysicalOptimizerRule for JoinSelection {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
.transform_up(&|p| apply_subrules(p, &subrules, config))
.transform_up(&mut |p| apply_subrules(p, &subrules, config))
.data()?;
// Next, we apply another subrule that tries to optimize joins using any
// statistics their inputs might have.
Expand All @@ -280,7 +280,7 @@ impl PhysicalOptimizerRule for JoinSelection {
let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
new_plan
.transform_up(&|plan| {
.transform_up(&mut |plan| {
statistical_join_selection_subrule(
plan,
collect_threshold_byte_size,
Expand Down Expand Up @@ -842,13 +842,13 @@ mod tests_statistical {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
.transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new()))
.transform_up(&mut |p| apply_subrules(p, &subrules, &ConfigOptions::new()))
.data()?;
// TODO: End state payloads will be checked here.
let config = ConfigOptions::new().optimizer;
let collect_left_threshold = config.hash_join_single_partition_threshold;
let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
let _ = new_plan.transform_up(&|plan| {
let _ = new_plan.transform_up(&mut |plan| {
statistical_join_selection_subrule(
plan,
collect_left_threshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl LimitedDistinctAggregation {
rewrite_applicable = false;
Ok(Transformed::no(plan))
};
let child = child.clone().transform_down_mut(&mut closure).data().ok()?;
let child = child.clone().transform_down(&mut closure).data().ok()?;
if is_global_limit {
return Some(Arc::new(GlobalLimitExec::new(
child,
Expand All @@ -163,7 +163,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if config.optimizer.enable_distinct_aggregation_soft_limit {
plan.transform_down(&|plan| {
plan.transform_down(&mut |plan| {
Ok(
if let Some(plan) =
LimitedDistinctAggregation::transform_limit(plan.clone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl PhysicalOptimizerRule for OutputRequirements {
match self.mode {
RuleMode::Add => require_top_ordering(plan),
RuleMode::Remove => plan
.transform_up(&|plan| {
.transform_up(&mut |plan| {
if let Some(sort_req) =
plan.as_any().downcast_ref::<OutputRequirementExec>()
{
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl PhysicalOptimizerRule for PipelineChecker {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|p| check_finiteness_requirements(p, &config.optimizer))
plan.transform_up(&mut |p| check_finiteness_requirements(p, &config.optimizer))
.data()
}

Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(&remove_unnecessary_projections).data()
plan.transform_down(&mut remove_unnecessary_projections)
.data()
}

fn name(&self) -> &str {
Expand Down Expand Up @@ -977,7 +978,7 @@ fn update_expr(

let new_expr = expr
.clone()
.transform_up_mut(&mut |expr: Arc<dyn PhysicalExpr>| {
.transform_up(&mut |expr: Arc<dyn PhysicalExpr>| {
if state == RewriteState::RewrittenInvalid {
return Ok(Transformed::no(expr));
}
Expand Down Expand Up @@ -1120,7 +1121,7 @@ fn new_columns_for_join_on(
// Rewrite all columns in `on`
(*on)
.clone()
.transform(&|expr| {
.transform(&mut |expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
// Find the column in the projection expressions
let new_column = projection_exprs
Expand Down
Loading

0 comments on commit b2f6309

Please sign in to comment.