Skip to content

Commit

Permalink
Remove unwrap_arc helper
Browse files Browse the repository at this point in the history
It can now be replaced with single call `Arc::unwrap_or_clone`, with
added bonus of slightly better name.
  • Loading branch information
findepi committed Aug 26, 2024
1 parent 945902d commit 1aeba15
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 95 deletions.
37 changes: 22 additions & 15 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ use crate::utils::{
find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
};
use crate::{
and, binary_expr, logical_plan::tree_node::unwrap_arc, DmlStatement, Expr,
ExprSchemable, Operator, RecursiveQuery, TableProviderFilterPushDown, TableSource,
WriteOp,
and, binary_expr, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
TableProviderFilterPushDown, TableSource, WriteOp,
};

use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
Expand Down Expand Up @@ -376,7 +375,7 @@ impl LogicalPlanBuilder {
self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
project(unwrap_arc(self.plan), expr).map(Self::new)
project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
}

/// Select the given column indices
Expand Down Expand Up @@ -429,7 +428,7 @@ impl LogicalPlanBuilder {

/// Apply an alias
pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
subquery_alias(unwrap_arc(self.plan), alias).map(Self::new)
subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
}

/// Add missing sort columns to all downstream projection
Expand Down Expand Up @@ -484,7 +483,7 @@ impl LogicalPlanBuilder {
Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
}
expr.extend(missing_exprs);
project(unwrap_arc(input), expr)
project(Arc::unwrap_or_clone(input), expr)
}
_ => {
let is_distinct =
Expand Down Expand Up @@ -580,8 +579,11 @@ impl LogicalPlanBuilder {
let new_expr = schema.columns().into_iter().map(Expr::Column).collect();

let is_distinct = false;
let plan =
Self::add_missing_columns(unwrap_arc(self.plan), &missing_cols, is_distinct)?;
let plan = Self::add_missing_columns(
Arc::unwrap_or_clone(self.plan),
&missing_cols,
is_distinct,
)?;
let sort_plan = LogicalPlan::Sort(Sort {
expr: normalize_cols(exprs, &plan)?,
input: Arc::new(plan),
Expand All @@ -595,12 +597,12 @@ impl LogicalPlanBuilder {

/// Apply a union, preserving duplicate rows
pub fn union(self, plan: LogicalPlan) -> Result<Self> {
union(unwrap_arc(self.plan), plan).map(Self::new)
union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
}

/// Apply a union, removing duplicate rows
pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
let left_plan: LogicalPlan = unwrap_arc(self.plan);
let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
let right_plan: LogicalPlan = plan;

Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
Expand Down Expand Up @@ -1064,7 +1066,7 @@ impl LogicalPlanBuilder {

/// Build the plan
pub fn build(self) -> Result<LogicalPlan> {
Ok(unwrap_arc(self.plan))
Ok(Arc::unwrap_or_clone(self.plan))
}

/// Apply a join with the expression on constraint.
Expand Down Expand Up @@ -1138,7 +1140,7 @@ impl LogicalPlanBuilder {

/// Unnest the given column.
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
unnest(unwrap_arc(self.plan), vec![column.into()]).map(Self::new)
unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
}

/// Unnest the given column given [`UnnestOptions`]
Expand All @@ -1147,8 +1149,12 @@ impl LogicalPlanBuilder {
column: impl Into<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(unwrap_arc(self.plan), vec![column.into()], options)
.map(Self::new)
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
vec![column.into()],
options,
)
.map(Self::new)
}

/// Unnest the given columns with the given [`UnnestOptions`]
Expand All @@ -1157,7 +1163,8 @@ impl LogicalPlanBuilder {
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(unwrap_arc(self.plan), columns, options).map(Self::new)
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
}
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use datafusion_common::{

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::logical_plan::tree_node::unwrap_arc;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -770,7 +769,7 @@ impl LogicalPlan {
..
}) => {
// Update schema with unnested column type.
unnest_with_options(unwrap_arc(input), exec_columns, options)
unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
}
}
}
Expand Down
11 changes: 1 addition & 10 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,12 @@ impl TreeNode for LogicalPlan {
}
}

/// Converts a `Arc<LogicalPlan>` without copying, if possible. Copies the plan
/// if there is a shared reference
pub fn unwrap_arc(plan: Arc<LogicalPlan>) -> LogicalPlan {
Arc::try_unwrap(plan)
// if None is returned, there is another reference to this
// LogicalPlan, so we can not own it, and must clone instead
.unwrap_or_else(|node| node.as_ref().clone())
}

/// Applies `f` to rewrite a `Arc<LogicalPlan>` without copying, if possible
fn rewrite_arc<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
plan: Arc<LogicalPlan>,
mut f: F,
) -> Result<Transformed<Arc<LogicalPlan>>> {
f(unwrap_arc(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan)))
f(Arc::unwrap_or_clone(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan)))
}

/// rewrite a `Vec` of `Arc<LogicalPlan>` without copying, if possible
Expand Down
18 changes: 12 additions & 6 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema;
use datafusion_expr::expr_schema::cast_subquery;
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::Subquery;
use datafusion_expr::type_coercion::binary::{
comparison_coercion, get_input_types, like_coercion,
Expand Down Expand Up @@ -241,15 +240,19 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
subquery,
outer_ref_columns,
}) => {
let new_plan = analyze_internal(self.schema, unwrap_arc(subquery))?.data;
let new_plan =
analyze_internal(self.schema, Arc::unwrap_or_clone(subquery))?.data;
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns,
})))
}
Expr::Exists(Exists { subquery, negated }) => {
let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
let new_plan = analyze_internal(
self.schema,
Arc::unwrap_or_clone(subquery.subquery),
)?
.data;
Ok(Transformed::yes(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(new_plan),
Expand All @@ -263,8 +266,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
subquery,
negated,
}) => {
let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
let new_plan = analyze_internal(
self.schema,
Arc::unwrap_or_clone(subquery.subquery),
)?
.data;
let expr_type = expr.get_type(self.schema)?;
let subquery_type = new_plan.schema().field(0).data_type();
let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!(
Expand Down
11 changes: 5 additions & 6 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion_common::tree_node::{
};
use datafusion_common::{qualified_name, Column, DFSchema, DFSchemaRef, Result};
use datafusion_expr::expr::{Alias, ScalarFunction};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::{
Aggregate, Filter, LogicalPlan, Projection, Sort, Window,
};
Expand Down Expand Up @@ -314,7 +313,7 @@ impl CommonSubexprEliminate {
schema,
..
} = projection;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
self.try_unary_plan(expr, input, config)?
.map_data(|(new_expr, new_input)| {
Projection::try_new_with_schema(new_expr, Arc::new(new_input), schema)
Expand All @@ -327,7 +326,7 @@ impl CommonSubexprEliminate {
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let Sort { expr, input, fetch } = sort;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
let new_sort = self.try_unary_plan(expr, input, config)?.update_data(
|(new_expr, new_input)| {
LogicalPlan::Sort(Sort {
Expand All @@ -348,7 +347,7 @@ impl CommonSubexprEliminate {
let Filter {
predicate, input, ..
} = filter;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
let expr = vec![predicate];
self.try_unary_plan(expr, input, config)?
.map_data(|(mut new_expr, new_input)| {
Expand Down Expand Up @@ -458,7 +457,7 @@ impl CommonSubexprEliminate {
schema,
..
} = aggregate;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
// Extract common sub-expressions from the aggregate and grouping expressions.
self.find_common_exprs(vec![group_expr, aggr_expr], config, ExprMask::Normal)?
.map_data(|common| {
Expand Down Expand Up @@ -729,7 +728,7 @@ fn get_consecutive_window_exprs(
window_expr_list.push(window_expr);
window_schemas.push(schema);

plan = unwrap_arc(input);
plan = Arc::unwrap_or_clone(input);
}
(window_expr_list, window_schemas, plan)
}
Expand Down
9 changes: 5 additions & 4 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion_expr::{
LogicalPlan, LogicalPlanBuilder, Operator,
};

use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use log::debug;

/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
Expand All @@ -55,8 +54,10 @@ impl DecorrelatePredicateSubquery {
mut subquery: Subquery,
config: &dyn OptimizerConfig,
) -> Result<Subquery> {
subquery.subquery =
Arc::new(self.rewrite(unwrap_arc(subquery.subquery), config)?.data);
subquery.subquery = Arc::new(
self.rewrite(Arc::unwrap_or_clone(subquery.subquery), config)?
.data,
);
Ok(subquery)
}

Expand Down Expand Up @@ -164,7 +165,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = unwrap_arc(input);
let mut cur_input = Arc::unwrap_or_clone(input);
for subquery in subqueries {
if let Some(plan) =
build_join(&subquery, &cur_input, config.alias_generator())?
Expand Down
27 changes: 21 additions & 6 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::join_key_set::JoinKeySet;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, Result};
use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::{
CrossJoin, Filter, Join, JoinConstraint, JoinType, LogicalPlan, Projection,
};
Expand Down Expand Up @@ -114,7 +113,7 @@ impl OptimizerRule for EliminateCrossJoin {
input, predicate, ..
} = filter;
flatten_join_inputs(
unwrap_arc(input),
Arc::unwrap_or_clone(input),
&mut possible_join_keys,
&mut all_inputs,
)?;
Expand Down Expand Up @@ -217,12 +216,28 @@ fn flatten_join_inputs(
);
}
possible_join_keys.insert_all_owned(join.on);
flatten_join_inputs(unwrap_arc(join.left), possible_join_keys, all_inputs)?;
flatten_join_inputs(unwrap_arc(join.right), possible_join_keys, all_inputs)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.left),
possible_join_keys,
all_inputs,
)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.right),
possible_join_keys,
all_inputs,
)?;
}
LogicalPlan::CrossJoin(join) => {
flatten_join_inputs(unwrap_arc(join.left), possible_join_keys, all_inputs)?;
flatten_join_inputs(unwrap_arc(join.right), possible_join_keys, all_inputs)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.left),
possible_join_keys,
all_inputs,
)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.right),
possible_join_keys,
all_inputs,
)?;
}
_ => {
all_inputs.push(plan);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use datafusion_common::tree_node::Transformed;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::{EmptyRelation, Expr, Filter, LogicalPlan};
use std::sync::Arc;

Expand Down Expand Up @@ -65,7 +64,7 @@ impl OptimizerRule for EliminateFilter {
input,
..
}) => match v {
Some(true) => Ok(Transformed::yes(unwrap_arc(input))),
Some(true) => Ok(Transformed::yes(Arc::unwrap_or_clone(input))),
Some(false) | None => Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
Expand Down
6 changes: 4 additions & 2 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::Result;
use datafusion_expr::logical_plan::{tree_node::unwrap_arc, EmptyRelation, LogicalPlan};
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
use std::sync::Arc;

/// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is
Expand Down Expand Up @@ -74,7 +74,9 @@ impl OptimizerRule for EliminateLimit {
}
} else if limit.skip == 0 {
// input also can be Limit, so we should apply again.
return Ok(self.rewrite(unwrap_arc(limit.input), _config).unwrap());
return Ok(self
.rewrite(Arc::unwrap_or_clone(limit.input), _config)
.unwrap());
}
Ok(Transformed::no(LogicalPlan::Limit(limit)))
}
Expand Down
Loading

0 comments on commit 1aeba15

Please sign in to comment.