Skip to content

Commit

Permalink
Remove default from some executors
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Feb 22, 2024
1 parent 93f5282 commit 4707939
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 78 deletions.
34 changes: 19 additions & 15 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ impl ArrowExec {
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = PlanPropertiesCache::new_default(projected_schema.clone());
let cache = Self::with_cache(
projected_schema.clone(),
&projected_output_ordering,
&base_config,
);
Self {
base_config,
projected_schema,
Expand All @@ -70,36 +74,36 @@ impl ArrowExec {
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
.with_cache()
}
/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

fn output_partitioning_helper(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

fn with_cache(mut self) -> Self {
fn with_cache(
schema: SchemaRef,
projected_output_ordering: &[LexOrdering],
file_scan_config: &FileScanConfig,
) -> PlanPropertiesCache {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);
let eq_properties =
EquivalenceProperties::new_with_orderings(schema, projected_output_ordering);

self.cache = PlanPropertiesCache::new(
PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = self.output_partitioning_helper();
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
Expand Down
47 changes: 25 additions & 22 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,6 @@ pub struct AggregateExec {
/// We need the input schema of partial aggregate to be able to deserialize aggregate
/// expressions from protobuf for final aggregate.
pub input_schema: SchemaRef,
/// The mapping used to normalize expressions like Partitioning and
/// PhysicalSortExpr that maps input to output
projection_mapping: ProjectionMapping,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
required_input_ordering: Option<LexRequirement>,
Expand Down Expand Up @@ -365,23 +362,27 @@ impl AggregateExec {
let required_input_ordering =
(!new_requirement.is_empty()).then_some(new_requirement);

let cache = PlanPropertiesCache::new_default(schema.clone());
let aggregate = AggregateExec {
let cache = Self::with_cache(
&input,
schema.clone(),
&projection_mapping,
&mode,
&input_order_mode,
);
Ok(AggregateExec {
mode,
group_by,
aggr_expr,
filter_expr,
input,
schema,
input_schema,
projection_mapping,
metrics: ExecutionPlanMetricsSet::new(),
required_input_ordering,
limit: None,
input_order_mode,
cache,
};
Ok(aggregate.with_cache())
})
}

/// Aggregation mode (full, partial)
Expand Down Expand Up @@ -505,26 +506,31 @@ impl AggregateExec {
true
}

fn with_cache(mut self) -> Self {
fn with_cache(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
projection_mapping: &ProjectionMapping,
mode: &AggregateMode,
input_order_mode: &InputOrderMode,
) -> PlanPropertiesCache {
// Construct equivalence properties:
let eq_properties = self
.input
let eq_properties = input
.equivalence_properties()
.project(&self.projection_mapping, self.schema());
.project(projection_mapping, schema);

// Get output partitioning:
let mut output_partitioning = self.input.output_partitioning().clone();
if self.mode.is_first_stage() {
let mut output_partitioning = input.output_partitioning().clone();
if mode.is_first_stage() {
// First stage aggregation will not change the output partitioning,
// but needs to respect aliases (e.g. mapping in the GROUP BY
// expression).
let input_eq_properties = self.input.equivalence_properties();
let input_eq_properties = input.equivalence_properties();
if let Partitioning::Hash(exprs, part) = output_partitioning {
let normalized_exprs = exprs
.iter()
.map(|expr| {
input_eq_properties
.project_expr(expr, &self.projection_mapping)
.project_expr(expr, projection_mapping)
.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
})
Expand All @@ -535,18 +541,15 @@ impl AggregateExec {
}

// Determine execution mode:
let mut exec_mode = self.input.execution_mode();
let mut exec_mode = input.execution_mode();
if exec_mode == ExecutionMode::Unbounded
&& self.input_order_mode == InputOrderMode::Linear
&& *input_order_mode == InputOrderMode::Linear
{
// Cannot run without breaking the pipeline
exec_mode = ExecutionMode::PipelineBreaking;
}

self.cache =
PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode);

self
PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode)
}

pub fn input_order_mode(&self) -> &InputOrderMode {
Expand Down
21 changes: 10 additions & 11 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;

use futures::StreamExt;

Expand All @@ -55,15 +56,14 @@ impl AnalyzeExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> Self {
let cache = PlanPropertiesCache::new_default(schema.clone());
let cache = Self::with_cache(&input, schema.clone());
AnalyzeExec {
verbose,
show_statistics,
input,
schema,
cache,
}
.with_cache()
}

/// access to verbose
Expand All @@ -81,15 +81,14 @@ impl AnalyzeExec {
&self.input
}

fn with_cache(mut self) -> Self {
self.cache = self
.cache
// Output Partitioning
.with_partitioning(Partitioning::UnknownPartitioning(1))
// Execution Mode
.with_exec_mode(self.input.execution_mode());

self
fn with_cache(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
let output_partitioning = Partitioning::UnknownPartitioning(1);
let exec_mode = input.execution_mode();
PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode)
}
}

Expand Down
42 changes: 26 additions & 16 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::PhysicalExprRef;

use crate::joins::utils::JoinOnRef;
use ahash::RandomState;
use futures::{ready, Stream, StreamExt, TryStreamExt};

Expand Down Expand Up @@ -327,7 +328,14 @@ impl HashJoinExec {

let random_state = RandomState::with_seeds(0, 0, 0, 0);

let cache = PlanPropertiesCache::new_default(Arc::new(schema.clone()));
let cache = Self::with_cache(
&left,
&right,
Arc::new(schema.clone()),
*join_type,
&on,
partition_mode,
);

Ok(HashJoinExec {
left,
Expand All @@ -343,8 +351,7 @@ impl HashJoinExec {
column_indices,
null_equals_null,
cache,
}
.with_cache())
})
}

/// left (build) side which gets hashed
Expand Down Expand Up @@ -399,25 +406,29 @@ impl HashJoinExec {
JoinSide::Right
}

fn with_cache(mut self) -> Self {
let left = &self.left;
let right = &self.right;
let schema = self.schema();
fn with_cache(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
join_type: JoinType,
on: JoinOnRef,
mode: PartitionMode,
) -> PlanPropertiesCache {
// Calculate equivalence properties:
let eq_properties = join_equivalence_properties(
left.equivalence_properties().clone(),
right.equivalence_properties().clone(),
&self.join_type,
&join_type,
schema,
&Self::maintains_input_order(self.join_type),
&Self::maintains_input_order(join_type),
Some(Self::probe_side()),
&self.on,
on,
);

// Get output partitioning:
let left_columns_len = left.schema().fields.len();
let output_partitioning = match self.mode {
PartitionMode::CollectLeft => match self.join_type {
let output_partitioning = match mode {
PartitionMode::CollectLeft => match join_type {
JoinType::Inner | JoinType::Right => adjust_right_output_partitioning(
right.output_partitioning(),
left_columns_len,
Expand All @@ -433,7 +444,7 @@ impl HashJoinExec {
),
},
PartitionMode::Partitioned => partitioned_join_output_partitioning(
self.join_type,
join_type,
left.output_partitioning(),
right.output_partitioning(),
left_columns_len,
Expand All @@ -449,7 +460,7 @@ impl HashJoinExec {
let pipeline_breaking = left.execution_mode().is_unbounded()
|| (right.execution_mode().is_unbounded()
&& matches!(
self.join_type,
join_type,
JoinType::Left
| JoinType::Full
| JoinType::LeftAnti
Expand All @@ -462,8 +473,7 @@ impl HashJoinExec {
exec_mode_flatten([left, right])
};

self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode);
self
PlanPropertiesCache::new(eq_properties, output_partitioning, mode)
}
}

Expand Down
29 changes: 15 additions & 14 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ impl BoundedWindowAggExec {
vec![]
}
};
let cache = PlanPropertiesCache::new_default(schema.clone());
let window = Self {
let cache = Self::with_cache(&input, &schema, &window_expr);
Ok(Self {
input,
window_expr,
schema,
Expand All @@ -131,8 +131,7 @@ impl BoundedWindowAggExec {
input_order_mode,
ordered_partition_by_indices,
cache,
};
Ok(window.with_cache())
})
}

/// Window expressions
Expand Down Expand Up @@ -183,23 +182,25 @@ impl BoundedWindowAggExec {
})
}

fn with_cache(mut self) -> Self {
fn with_cache(
input: &Arc<dyn ExecutionPlan>,
schema: &SchemaRef,
window_expr: &[Arc<dyn WindowExpr>],
) -> PlanPropertiesCache {
// Calculate equivalence properties:
let eq_properties =
window_equivalence_properties(&self.schema, &self.input, &self.window_expr);
let eq_properties = window_equivalence_properties(schema, input, window_expr);

// As we can have repartitioning using the partition keys, this can
// be either one or more than one, depending on the presence of
// repartitioning.
let output_partitioning = self.input.output_partitioning().clone();
let output_partitioning = input.output_partitioning().clone();

// Construct properties cache
self.cache = PlanPropertiesCache::new(
eq_properties, // Equivalence Properties
output_partitioning, // Output Partitioning
self.input.execution_mode(), // Execution Mode
);
self
PlanPropertiesCache::new(
eq_properties, // Equivalence Properties
output_partitioning, // Output Partitioning
input.execution_mode(), // Execution Mode
)
}
}

Expand Down

0 comments on commit 4707939

Please sign in to comment.