From 4707939374b46fe65341ea34dd170ddbe3b1e89b Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 22 Feb 2024 15:54:39 +0300 Subject: [PATCH] Remove default from some executors --- .../datasource/physical_plan/arrow_file.rs | 34 ++++++++------ .../physical-plan/src/aggregates/mod.rs | 47 ++++++++++--------- datafusion/physical-plan/src/analyze.rs | 21 ++++----- .../physical-plan/src/joins/hash_join.rs | 42 ++++++++++------- .../src/windows/bounded_window_agg_exec.rs | 29 ++++++------ 5 files changed, 95 insertions(+), 78 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 1a27f9315b34b..17822d8c5a1ce 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -61,7 +61,11 @@ impl ArrowExec { pub fn new(base_config: FileScanConfig) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = PlanPropertiesCache::new_default(projected_schema.clone()); + let cache = Self::with_cache( + projected_schema.clone(), + &projected_output_ordering, + &base_config, + ); Self { base_config, projected_schema, @@ -70,36 +74,36 @@ impl ArrowExec { metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// Ref to the base configs pub fn base_config(&self) -> &FileScanConfig { &self.base_config } - fn output_partitioning_helper(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning { + Partitioning::UnknownPartitioning(file_scan_config.file_groups.len()) } - fn with_cache(mut self) -> Self { + fn with_cache( + schema: SchemaRef, + projected_output_ordering: &[LexOrdering], + file_scan_config: &FileScanConfig, + ) -> PlanPropertiesCache { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings( - self.schema(), - &self.projected_output_ordering, - ); + let eq_properties = + EquivalenceProperties::new_with_orderings(schema, projected_output_ordering); - self.cache = PlanPropertiesCache::new( + PlanPropertiesCache::new( eq_properties, - self.output_partitioning_helper(), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - self + Self::output_partitioning_helper(file_scan_config), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } fn with_file_groups(mut self, file_groups: Vec>) -> Self { self.base_config.file_groups = file_groups; // Changing file groups may invalidate output partitioning. Update it also - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(&self.base_config); self.cache = self.cache.with_partitioning(output_partitioning); self } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index ea3fc3a737b2e..fbdda78545b6f 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -260,9 +260,6 @@ pub struct AggregateExec { /// We need the input schema of partial aggregate to be able to deserialize aggregate /// expressions from protobuf for final aggregate. pub input_schema: SchemaRef, - /// The mapping used to normalize expressions like Partitioning and - /// PhysicalSortExpr that maps input to output - projection_mapping: ProjectionMapping, /// Execution metrics metrics: ExecutionPlanMetricsSet, required_input_ordering: Option, @@ -365,8 +362,14 @@ impl AggregateExec { let required_input_ordering = (!new_requirement.is_empty()).then_some(new_requirement); - let cache = PlanPropertiesCache::new_default(schema.clone()); - let aggregate = AggregateExec { + let cache = Self::with_cache( + &input, + schema.clone(), + &projection_mapping, + &mode, + &input_order_mode, + ); + Ok(AggregateExec { mode, group_by, aggr_expr, @@ -374,14 +377,12 @@ impl AggregateExec { input, schema, input_schema, - projection_mapping, metrics: ExecutionPlanMetricsSet::new(), required_input_ordering, limit: None, input_order_mode, cache, - }; - Ok(aggregate.with_cache()) + }) } /// Aggregation mode (full, partial) @@ -505,26 +506,31 @@ impl AggregateExec { true } - fn with_cache(mut self) -> Self { + fn with_cache( + input: &Arc, + schema: SchemaRef, + projection_mapping: &ProjectionMapping, + mode: &AggregateMode, + input_order_mode: &InputOrderMode, + ) -> PlanPropertiesCache { // Construct equivalence properties: - let eq_properties = self - .input + let eq_properties = input .equivalence_properties() - .project(&self.projection_mapping, self.schema()); + .project(projection_mapping, schema); // Get output partitioning: - let mut output_partitioning = self.input.output_partitioning().clone(); - if self.mode.is_first_stage() { + let mut output_partitioning = input.output_partitioning().clone(); + if mode.is_first_stage() { // First stage aggregation will not change the output partitioning, // but needs to respect aliases (e.g. mapping in the GROUP BY // expression). - let input_eq_properties = self.input.equivalence_properties(); + let input_eq_properties = input.equivalence_properties(); if let Partitioning::Hash(exprs, part) = output_partitioning { let normalized_exprs = exprs .iter() .map(|expr| { input_eq_properties - .project_expr(expr, &self.projection_mapping) + .project_expr(expr, projection_mapping) .unwrap_or_else(|| { Arc::new(UnKnownColumn::new(&expr.to_string())) }) @@ -535,18 +541,15 @@ impl AggregateExec { } // Determine execution mode: - let mut exec_mode = self.input.execution_mode(); + let mut exec_mode = input.execution_mode(); if exec_mode == ExecutionMode::Unbounded - && self.input_order_mode == InputOrderMode::Linear + && *input_order_mode == InputOrderMode::Linear { // Cannot run without breaking the pipeline exec_mode = ExecutionMode::PipelineBreaking; } - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); - - self + PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode) } pub fn input_order_mode(&self) -> &InputOrderMode { diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 99b3add2acd65..c62c3aca66a03 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -29,6 +29,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; use futures::StreamExt; @@ -55,7 +56,7 @@ impl AnalyzeExec { input: Arc, schema: SchemaRef, ) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::with_cache(&input, schema.clone()); AnalyzeExec { verbose, show_statistics, @@ -63,7 +64,6 @@ impl AnalyzeExec { schema, cache, } - .with_cache() } /// access to verbose @@ -81,15 +81,14 @@ impl AnalyzeExec { &self.input } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(self.input.execution_mode()); - - self + fn with_cache( + input: &Arc, + schema: SchemaRef, + ) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + let output_partitioning = Partitioning::UnknownPartitioning(1); + let exec_mode = input.execution_mode(); + PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode) } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 3aae053151cd6..e1889626c7fed 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -65,6 +65,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::PhysicalExprRef; +use crate::joins::utils::JoinOnRef; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -327,7 +328,14 @@ impl HashJoinExec { let random_state = RandomState::with_seeds(0, 0, 0, 0); - let cache = PlanPropertiesCache::new_default(Arc::new(schema.clone())); + let cache = Self::with_cache( + &left, + &right, + Arc::new(schema.clone()), + *join_type, + &on, + partition_mode, + ); Ok(HashJoinExec { left, @@ -343,8 +351,7 @@ impl HashJoinExec { column_indices, null_equals_null, cache, - } - .with_cache()) + }) } /// left (build) side which gets hashed @@ -399,25 +406,29 @@ impl HashJoinExec { JoinSide::Right } - fn with_cache(mut self) -> Self { - let left = &self.left; - let right = &self.right; - let schema = self.schema(); + fn with_cache( + left: &Arc, + right: &Arc, + schema: SchemaRef, + join_type: JoinType, + on: JoinOnRef, + mode: PartitionMode, + ) -> PlanPropertiesCache { // Calculate equivalence properties: let eq_properties = join_equivalence_properties( left.equivalence_properties().clone(), right.equivalence_properties().clone(), - &self.join_type, + &join_type, schema, - &Self::maintains_input_order(self.join_type), + &Self::maintains_input_order(join_type), Some(Self::probe_side()), - &self.on, + on, ); // Get output partitioning: let left_columns_len = left.schema().fields.len(); - let output_partitioning = match self.mode { - PartitionMode::CollectLeft => match self.join_type { + let output_partitioning = match mode { + PartitionMode::CollectLeft => match join_type { JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( right.output_partitioning(), left_columns_len, @@ -433,7 +444,7 @@ impl HashJoinExec { ), }, PartitionMode::Partitioned => partitioned_join_output_partitioning( - self.join_type, + join_type, left.output_partitioning(), right.output_partitioning(), left_columns_len, @@ -449,7 +460,7 @@ impl HashJoinExec { let pipeline_breaking = left.execution_mode().is_unbounded() || (right.execution_mode().is_unbounded() && matches!( - self.join_type, + join_type, JoinType::Left | JoinType::Full | JoinType::LeftAnti @@ -462,8 +473,7 @@ impl HashJoinExec { exec_mode_flatten([left, right]) }; - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index cb512302cb6f6..52d2fc8f4f1b7 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -121,8 +121,8 @@ impl BoundedWindowAggExec { vec![] } }; - let cache = PlanPropertiesCache::new_default(schema.clone()); - let window = Self { + let cache = Self::with_cache(&input, &schema, &window_expr); + Ok(Self { input, window_expr, schema, @@ -131,8 +131,7 @@ impl BoundedWindowAggExec { input_order_mode, ordered_partition_by_indices, cache, - }; - Ok(window.with_cache()) + }) } /// Window expressions @@ -183,23 +182,25 @@ impl BoundedWindowAggExec { }) } - fn with_cache(mut self) -> Self { + fn with_cache( + input: &Arc, + schema: &SchemaRef, + window_expr: &[Arc], + ) -> PlanPropertiesCache { // Calculate equivalence properties: - let eq_properties = - window_equivalence_properties(&self.schema, &self.input, &self.window_expr); + let eq_properties = window_equivalence_properties(schema, input, window_expr); // As we can have repartitioning using the partition keys, this can // be either one or more than one, depending on the presence of // repartitioning. - let output_partitioning = self.input.output_partitioning().clone(); + let output_partitioning = input.output_partitioning().clone(); // Construct properties cache - self.cache = PlanPropertiesCache::new( - eq_properties, // Equivalence Properties - output_partitioning, // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - self + PlanPropertiesCache::new( + eq_properties, // Equivalence Properties + output_partitioning, // Output Partitioning + input.execution_mode(), // Execution Mode + ) } }