Skip to content

Commit

Permalink
Refactor execution plan properties to remove execution mode
Browse files Browse the repository at this point in the history
- Removed the `ExecutionMode` parameter from `PlanProperties` across multiple physical plan implementations.
- Updated related functions to utilize the new structure, ensuring compatibility with the changes.
- Adjusted comments and cleaned up imports to reflect the removal of execution mode handling.

This refactor simplifies the execution plan properties and enhances maintainability.
  • Loading branch information
jayzhan-synnada committed Dec 10, 2024
1 parent 7e353da commit 7713a55
Show file tree
Hide file tree
Showing 47 changed files with 108 additions and 378 deletions.
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::{ExecutionMode, PlanProperties};
use datafusion_physical_plan::PlanProperties;

use futures::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -98,7 +98,6 @@ impl ArrowExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use super::FileScanConfig;
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -82,7 +82,6 @@ impl AvroExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}
}
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};

use arrow::csv;
Expand Down Expand Up @@ -328,7 +328,6 @@ impl CsvExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};

use arrow::json::ReaderBuilder;
Expand Down Expand Up @@ -108,7 +108,6 @@ impl NdJsonExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};
Expand Down Expand Up @@ -661,7 +661,6 @@ impl ParquetExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,6 @@ pub(crate) mod tests {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
input.execution_mode(), // Execution Mode
)
.with_emission_type(input.emission_type())
.with_memory_usage(input.has_finite_memory())
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ mod tests {
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::MemTable;
use crate::physical_plan::{
expressions, DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties,
expressions, DisplayAs, DisplayFormatType, PlanProperties,
SendableRecordBatchStream,
};
use crate::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -2620,13 +2620,9 @@ mod tests {

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
// Output Partitioning
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
// Execution Mode
ExecutionMode::Bounded,
)
}
}
Expand Down
10 changes: 2 additions & 8 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties,
};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};

#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
Expand Down Expand Up @@ -387,13 +385,9 @@ impl StatisticsExec {

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
// Output Partitioning
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(2),
// Execution Mode
ExecutionMode::Bounded,
)
}
}
Expand Down
15 changes: 4 additions & 11 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
};
use crate::prelude::{CsvReadOptions, SessionContext};

Expand Down Expand Up @@ -257,19 +257,12 @@ impl UnboundedExec {
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
schema: SchemaRef,
batch_produce: Option<usize>,
_batch_produce: Option<usize>,
n_partitions: usize,
) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
let mode = if batch_produce.is_none() {
ExecutionMode::Incremental
} else {
ExecutionMode::Bounded | ExecutionMode::Incremental
};
PlanProperties::new(
eq_properties,
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(n_partitions),
mode,
)
}
}
Expand Down
7 changes: 2 additions & 5 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use datafusion_common::stats::Precision;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::{ExecutionMode, PlanProperties};
use datafusion_physical_plan::PlanProperties;

use async_trait::async_trait;
use datafusion_catalog::Session;
Expand Down Expand Up @@ -92,12 +92,9 @@ impl CustomExecutionPlan {

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
// Output Partitioning
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -73,11 +73,9 @@ impl CustomPlan {

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use datafusion::{
error::Result,
logical_expr::Expr,
physical_plan::{
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
},
prelude::SessionContext,
scalar::ScalarValue,
Expand Down Expand Up @@ -65,12 +65,9 @@ impl StatisticsValidation {

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);

PlanProperties::new(
eq_properties,
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(2),
ExecutionMode::Bounded,
)
}
}
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/tests/user_defined/insert_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_catalog::{Session, TableProvider};
use datafusion_expr::{dml::InsertOp, Expr, TableType};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_plan::{
execution_plan::EmissionType, DisplayAs, ExecutionMode, ExecutionPlan, PlanProperties,
execution_plan::EmissionType, DisplayAs, ExecutionPlan, PlanProperties,
};

#[tokio::test]
Expand Down Expand Up @@ -125,11 +125,8 @@ struct TestInsertExec {
impl TestInsertExec {
fn new(op: InsertOp) -> Self {
let eq_properties = EquivalenceProperties::new(make_count_schema());
let plan_properties = PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
let plan_properties =
PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1));
Self {
op,
plan_properties,
Expand Down
11 changes: 3 additions & 8 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ use datafusion::{
optimizer::{OptimizerConfig, OptimizerRule},
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
},
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
prelude::{SessionConfig, SessionContext},
Expand Down Expand Up @@ -498,11 +497,7 @@ impl TopKExec {
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);

PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1))
}
}

Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ impl OutputRequirementExec {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
input.execution_mode(), // Execution Mode
)
.with_emission_type(input.emission_type())
.with_memory_usage(input.has_finite_memory())
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::any::Any;
use std::sync::Arc;

use super::{DisplayAs, ExecutionMode, ExecutionPlanProperties, PlanProperties};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
use crate::aggregates::{
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
topk_stream::GroupedTopKAggregateStream,
Expand Down Expand Up @@ -669,8 +669,7 @@ impl AggregateExec {
input.emission_type()
};

let mode = ExecutionMode::empty();
PlanProperties::new(eq_properties, output_partitioning, mode)
PlanProperties::new(eq_properties, output_partitioning)
.with_emission_type(emission_type)
.with_memory_usage(input.has_finite_memory())
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ impl AnalyzeExec {
) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
let output_partitioning = Partitioning::UnknownPartitioning(1);
let exec_mode = input.execution_mode();
PlanProperties::new(eq_properties, output_partitioning, exec_mode)
PlanProperties::new(eq_properties, output_partitioning)
.with_emission_type(input.emission_type())
.with_memory_usage(input.has_finite_memory())
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ impl CoalesceBatchesExec {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
input.execution_mode(), // Execution Mode
)
.with_emission_type(input.emission_type())
.with_memory_usage(input.has_finite_memory())
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl CoalescePartitionsExec {
PlanProperties::new(
eq_properties, // Equivalence Properties
Partitioning::UnknownPartitioning(1), // Output Partitioning
input.execution_mode(), // Execution Mode
)
.with_emission_type(input.emission_type())
.with_memory_usage(input.has_finite_memory())
Expand Down
14 changes: 3 additions & 11 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
use std::any::Any;
use std::sync::Arc;

use super::{
common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream,
Statistics,
};
use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
use crate::{
execution_plan::EmissionType, memory::MemoryStream, DisplayFormatType, ExecutionPlan,
Partitioning,
Expand Down Expand Up @@ -77,14 +74,9 @@ impl EmptyExec {

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
let output_partitioning = Self::output_partitioning_helper(n_partitions);
PlanProperties::new(
eq_properties,
// Output Partitioning
output_partitioning,
// Execution Mode
ExecutionMode::Bounded,
EquivalenceProperties::new(schema),
Self::output_partitioning_helper(n_partitions),
)
}
}
Expand Down
Loading

0 comments on commit 7713a55

Please sign in to comment.