diff --git a/Cargo.toml b/Cargo.toml index 1107a87019c8..0c76ff196a10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,6 @@ arrow-ord = { version = "53.3.0", default-features = false } arrow-schema = { version = "53.3.0", default-features = false } async-trait = "0.1.73" bigdecimal = "0.4.6" -bitflags = "2.6.0" bytes = "1.4" chrono = { version = "0.4.38", default-features = false } ctor = "0.2.0" diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 77cb187bc297..26e0c67d064f 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -38,7 +38,7 @@ use datafusion::config::ConfigFileType; use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{DdlStatement, LogicalPlan}; -use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties}; +use datafusion::physical_plan::{collect, execute_stream}; use datafusion::sql::parser::{DFParser, Statement}; use datafusion::sql::sqlparser::dialect::dialect_from_str; @@ -234,7 +234,7 @@ pub(super) async fn exec_and_print( let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; - if !physical_plan.has_finite_memory(){ + if !physical_plan.has_finite_memory() { let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { diff --git a/datafusion-examples/examples/composed_extension_codec.rs b/datafusion-examples/examples/composed_extension_codec.rs index 5c34eccf26e1..4efba7e888b5 100644 --- a/datafusion-examples/examples/composed_extension_codec.rs +++ b/datafusion-examples/examples/composed_extension_codec.rs @@ -36,6 +36,7 @@ use std::ops::Deref; use std::sync::Arc; use datafusion::common::Result; +use datafusion::physical_plan::execution_plan::EmissionType; use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_common::{internal_err, DataFusionError}; @@ -127,6 +128,14 @@ impl ExecutionPlan for ParentExec { ) -> Result { unreachable!() } + + fn emission_type(&self) -> EmissionType { + unimplemented!() + } + + fn has_finite_memory(&self) -> bool { + true + } } /// A PhysicalExtensionCodec that can serialize and deserialize ParentExec @@ -203,6 +212,14 @@ impl ExecutionPlan for ChildExec { ) -> Result { unreachable!() } + + fn emission_type(&self) -> EmissionType { + EmissionType::Incremental + } + + fn has_finite_memory(&self) -> bool { + true + } } /// A PhysicalExtensionCodec that can serialize and deserialize ChildExec diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 90e9d2c7a632..822646e93ad3 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -30,10 +30,11 @@ use datafusion::error::Result; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::LogicalPlanBuilder; use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::EmissionType; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ - project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, - Partitioning, PlanProperties, SendableRecordBatchStream, + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, }; use datafusion::prelude::*; @@ -211,11 +212,7 @@ impl CustomExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ) + PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1)) } } @@ -279,4 +276,12 @@ impl ExecutionPlan for CustomExec { None, )?)) } + + fn emission_type(&self) -> EmissionType { + EmissionType::Incremental + } + + fn has_finite_memory(&self) -> bool { + true + } } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 9d10fc0683eb..0e437ac03ca6 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -326,11 +326,7 @@ impl ExecutionPlan for UnboundedExec { } fn has_finite_memory(&self) -> bool { - if self.batch_produce.is_none() { - false - } else { - true - } + self.batch_produce.is_some() } } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 3d793c912c8d..cd674a9a0d8a 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -21,12 +21,12 @@ use abi_stable::{ std_types::{RResult, RString, RVec}, StableAbi, }; -use datafusion::error::Result; use datafusion::{ error::DataFusionError, execution::{SendableRecordBatchStream, TaskContext}, physical_plan::{DisplayAs, ExecutionPlan, PlanProperties}, }; +use datafusion::{error::Result, physical_plan::execution_plan::EmissionType}; use crate::{ plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream, @@ -268,11 +268,22 @@ impl ExecutionPlan for ForeignExecutionPlan { } } } + + fn emission_type(&self) -> EmissionType { + unimplemented!() + } + + fn has_finite_memory(&self) -> bool { + true + } } #[cfg(test)] mod tests { - use datafusion::{physical_plan::Partitioning, prelude::SessionContext}; + use datafusion::{ + physical_plan::{execution_plan::EmissionType, Partitioning}, + prelude::SessionContext, + }; use super::*; @@ -287,7 +298,6 @@ mod tests { props: PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), - datafusion::physical_plan::ExecutionMode::Incremental, ), } } @@ -338,6 +348,14 @@ mod tests { fn statistics(&self) -> Result { unimplemented!() } + + fn emission_type(&self) -> EmissionType { + unimplemented!() + } + + fn has_finite_memory(&self) -> bool { + true + } } #[test] diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 722681ae4a1d..00d6831a972a 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -28,7 +28,7 @@ use arrow::datatypes::SchemaRef; use datafusion::{ error::{DataFusionError, Result}, physical_expr::EquivalenceProperties, - physical_plan::{ExecutionMode, PlanProperties}, + physical_plan::{execution_plan::EmissionType, PlanProperties}, prelude::SessionContext, }; use datafusion_proto::{ @@ -53,8 +53,11 @@ pub struct FFI_PlanProperties { pub output_partitioning: unsafe extern "C" fn(plan: &Self) -> RResult, RStr<'static>>, - /// Return the execution mode of the plan. - pub execution_mode: unsafe extern "C" fn(plan: &Self) -> FFI_ExecutionMode, + /// Return the emission type of the plan. + pub emission_type: unsafe extern "C" fn(plan: &Self) -> OptionalEmissionType, + + /// Indicates whether the plan has finite memory requirements. + pub has_finite_memory: unsafe extern "C" fn(plan: &Self) -> bool, /// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message /// serialized into bytes to pass across the FFI boundary. @@ -98,12 +101,28 @@ unsafe extern "C" fn output_partitioning_fn_wrapper( ROk(output_partitioning.into()) } -unsafe extern "C" fn execution_mode_fn_wrapper( +unsafe extern "C" fn emission_type_fn_wrapper( properties: &FFI_PlanProperties, -) -> FFI_ExecutionMode { +) -> OptionalEmissionType { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + if let Some(emission_type) = props.emission_type { + OptionalEmissionType { + is_some: true, + value: emission_type.into(), + } + } else { + OptionalEmissionType { + is_some: false, + value: FFI_EmissionType::Incremental, // Any value as a placeholder + } + } +} + +unsafe extern "C" fn memory_usage_fn_wrapper(properties: &FFI_PlanProperties) -> bool { let private_data = properties.private_data as *const PlanPropertiesPrivateData; let props = &(*private_data).props; - props.execution_mode().into() + props.has_finite_memory } unsafe extern "C" fn output_ordering_fn_wrapper( @@ -164,7 +183,8 @@ impl From<&PlanProperties> for FFI_PlanProperties { FFI_PlanProperties { output_partitioning: output_partitioning_fn_wrapper, - execution_mode: execution_mode_fn_wrapper, + emission_type: emission_type_fn_wrapper, + has_finite_memory: memory_usage_fn_wrapper, output_ordering: output_ordering_fn_wrapper, schema: schema_fn_wrapper, release: release_fn_wrapper, @@ -220,9 +240,6 @@ impl TryFrom for PlanProperties { RErr(e) => Err(DataFusionError::Plan(e.to_string())), }?; - let execution_mode: ExecutionMode = - unsafe { (ffi_props.execution_mode)(&ffi_props).into() }; - let eq_properties = match orderings { Some(ordering) => { EquivalenceProperties::new_with_orderings(Arc::new(schema), &[ordering]) @@ -230,44 +247,48 @@ impl TryFrom for PlanProperties { None => EquivalenceProperties::new(Arc::new(schema)), }; - Ok(PlanProperties::new( - eq_properties, - partitioning, - execution_mode, - )) + Ok(PlanProperties::new(eq_properties, partitioning)) } } -/// FFI safe version of [`ExecutionMode`]. +/// FFI safe version of [`EmissionType`]. #[repr(C)] #[allow(non_camel_case_types)] #[derive(Clone, StableAbi)] -pub enum FFI_ExecutionMode { - Bounded, - Unbounded, - PipelineBreaking, +pub enum FFI_EmissionType { + Incremental, + Final, + Both, } -impl From for FFI_ExecutionMode { - fn from(value: ExecutionMode) -> Self { +impl From for FFI_EmissionType { + fn from(value: EmissionType) -> Self { match value { - ExecutionMode::Bounded => FFI_ExecutionMode::Bounded, - ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded, - ExecutionMode::PipelineBreaking => FFI_ExecutionMode::PipelineBreaking, + EmissionType::Incremental => FFI_EmissionType::Incremental, + EmissionType::Final => FFI_EmissionType::Final, + EmissionType::Both => FFI_EmissionType::Both, } } } -impl From for ExecutionMode { - fn from(value: FFI_ExecutionMode) -> Self { +impl From for EmissionType { + fn from(value: FFI_EmissionType) -> Self { match value { - FFI_ExecutionMode::Bounded => ExecutionMode::Bounded, - FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded, - FFI_ExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking, + FFI_EmissionType::Incremental => EmissionType::Incremental, + FFI_EmissionType::Final => EmissionType::Final, + FFI_EmissionType::Both => EmissionType::Both, } } } +#[repr(C)] +#[allow(non_camel_case_types)] +#[derive(Clone, StableAbi)] +pub struct OptionalEmissionType { + pub is_some: bool, + pub value: FFI_EmissionType, // Valid only if `is_some` is true +} + #[cfg(test)] mod tests { use datafusion::physical_plan::Partitioning; @@ -283,7 +304,6 @@ mod tests { let original_props = PlanProperties::new( EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), - ExecutionMode::Unbounded, ); let local_props_ptr = FFI_PlanProperties::from(&original_props); diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 0b767c43d10c..bb0e21fdfd15 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -46,7 +46,6 @@ arrow-buffer = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } -bitflags = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4d43ec1fab07..581433eb591c 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1742,8 +1742,6 @@ mod tests { eq_properties, // Output Partitioning Partitioning::UnknownPartitioning(1), - // Execution Mode - ExecutionMode::Bounded, ) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 657ae9481a71..c4303420efd3 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -51,8 +51,6 @@ use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; pub use crate::stream::EmptyRecordBatchStream; use crate::stream::RecordBatchStreamAdapter; -use bitflags::bitflags; - /// Represent nodes in the DataFusion Physical Plan. /// /// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of @@ -506,85 +504,6 @@ pub enum EmissionType { Both, } -bitflags! { - /// Describes the execution mode of the result of calling - /// [`ExecutionPlan::execute`] with respect to its size and behavior. - /// - /// The mode of the execution plan is determined by the mode of its input - /// execution plans and the details of the operator itself. For example, a - /// `FilterExec` operator will have the same execution mode as its input, but a - /// `SortExec` operator may have a different execution mode than its input, - /// depending on how the input stream is sorted. - /// - /// There are three possible execution modes: `Bounded`, `Unbounded` and - /// `PipelineBreaking`. - #[derive(Clone, Copy, PartialEq, Debug)] - pub struct ExecutionMode: u32 { - const Bounded = 0b1; - - /// Emission Type - const Incremental = 0b1000; - const Final = 0b10000; - } -} - -impl ExecutionMode { - /// Check whether the execution mode is unbounded or not. - #[inline] - pub fn is_unbounded(&self) -> bool { - !self.contains(ExecutionMode::Bounded) - } - - /// Check whether the execution is pipeline friendly. If so, operator can - /// execute safely. - // #[inline] - // pub fn pipeline_friendly(&self) -> bool { - // !self.is_pipeline_breaking() - // } - - // #[inline] - // pub fn is_pipeline_breaking(&self) -> bool { - // // self.contains(ExecutionMode::PipelineBreaking) - // self.is_unbounded() && self.is_emit_at_final() - // } - - #[inline] - pub fn is_emit_incremental(&self) -> bool { - self.contains(ExecutionMode::Incremental) - } - - #[inline] - pub fn is_emit_at_final(&self) -> bool { - self.contains(ExecutionMode::Final) - } - - #[inline] - pub fn switch_to_bounded(mut self) -> ExecutionMode { - self.insert(ExecutionMode::Bounded); - self - } - - #[inline] - pub fn switch_to_unbounded(mut self) -> ExecutionMode { - self.remove(ExecutionMode::Bounded); - self - } - - #[inline] - pub fn emit_incremental(mut self) -> ExecutionMode { - self.insert(ExecutionMode::Incremental); - self.remove(ExecutionMode::Final); - self - } - - #[inline] - pub fn emit_at_final(mut self) -> ExecutionMode { - self.insert(ExecutionMode::Final); - self.remove(ExecutionMode::Incremental); - self - } -} - pub(crate) fn emission_type_from_children( children: &[Arc], ) -> EmissionType { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index de4b69c20c45..5ad37f0b1ac0 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -38,7 +38,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDi pub use crate::execution_plan::{ collect, collect_partitioned, displayable, execute_input_stream, execute_stream, execute_stream_partitioned, get_plan_string, with_new_children_if_necessary, - ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e0da2dc93f10..7e716beedad6 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1018,12 +1018,12 @@ mod tests { use super::*; use crate::coalesce_partitions::CoalescePartitionsExec; + use crate::collect; use crate::expressions::col; use crate::memory::MemoryExec; use crate::test; use crate::test::assert_is_pending; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; - use crate::{collect, ExecutionMode}; use arrow::array::*; use arrow::compute::SortOptions; @@ -1062,8 +1062,7 @@ mod tests { eq_properties.add_new_orderings(vec![LexOrdering::new(vec![ PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))), ])]); - let mode = ExecutionMode::Final; - PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode) + PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1)) } } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 647bde286eeb..02e73502f8ab 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -342,7 +342,7 @@ mod tests { use crate::stream::RecordBatchReceiverStream; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending, make_partition}; - use crate::{collect, common, ExecutionMode}; + use crate::{collect, common}; use arrow::array::{ArrayRef, Int32Array, StringArray, TimestampNanosecondArray}; use arrow::compute::SortOptions; @@ -1279,8 +1279,7 @@ mod tests { .iter() .map(|expr| PhysicalSortExpr::new_default(Arc::clone(expr))) .collect::()]); - let mode = ExecutionMode::empty(); - PlanProperties::new(eq_properties, Partitioning::Hash(columns, 3), mode) + PlanProperties::new(eq_properties, Partitioning::Hash(columns, 3)) } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 18dcb1cd092d..8428eba9db67 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -25,8 +25,8 @@ use std::{ }; use crate::{ - common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, + common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::{ execution_plan::EmissionType,