Skip to content

Commit

Permalink
Refactor execution plan to remove ExecutionMode and introduce `Emis…
Browse files Browse the repository at this point in the history
…sionType`

- Removed the `ExecutionMode` parameter from `PlanProperties` and related implementations across multiple files.
- Introduced `EmissionType` to better represent the output characteristics of execution plans.
- Updated functions and tests to reflect the new structure, ensuring compatibility and enhancing maintainability.
- Cleaned up imports and adjusted comments accordingly.

This refactor simplifies the execution plan properties and improves the clarity of memory handling in execution plans.
  • Loading branch information
jayzhan-synnada committed Dec 10, 2024
1 parent 7713a55 commit 3f61dcf
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 142 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ arrow-ord = { version = "53.3.0", default-features = false }
arrow-schema = { version = "53.3.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.6"
bitflags = "2.6.0"
bytes = "1.4"
chrono = { version = "0.4.38", default-features = false }
ctor = "0.2.0"
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

Expand Down Expand Up @@ -234,7 +234,7 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if !physical_plan.has_finite_memory(){
if !physical_plan.has_finite_memory() {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
Expand Down
17 changes: 17 additions & 0 deletions datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::ops::Deref;
use std::sync::Arc;

use datafusion::common::Result;
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_common::{internal_err, DataFusionError};
Expand Down Expand Up @@ -127,6 +128,14 @@ impl ExecutionPlan for ParentExec {
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
unreachable!()
}

fn emission_type(&self) -> EmissionType {
unimplemented!()
}

fn has_finite_memory(&self) -> bool {
true
}
}

/// A PhysicalExtensionCodec that can serialize and deserialize ParentExec
Expand Down Expand Up @@ -203,6 +212,14 @@ impl ExecutionPlan for ChildExec {
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
unreachable!()
}

fn emission_type(&self) -> EmissionType {
EmissionType::Incremental
}

fn has_finite_memory(&self) -> bool {
true
}
}

/// A PhysicalExtensionCodec that can serialize and deserialize ChildExec
Expand Down
19 changes: 12 additions & 7 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;

Expand Down Expand Up @@ -211,11 +212,7 @@ impl CustomExec {
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1))
}
}

Expand Down Expand Up @@ -279,4 +276,12 @@ impl ExecutionPlan for CustomExec {
None,
)?))
}

fn emission_type(&self) -> EmissionType {
EmissionType::Incremental
}

fn has_finite_memory(&self) -> bool {
true
}
}
6 changes: 1 addition & 5 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,7 @@ impl ExecutionPlan for UnboundedExec {
}

fn has_finite_memory(&self) -> bool {
if self.batch_produce.is_none() {
false
} else {
true
}
self.batch_produce.is_some()
}
}

Expand Down
24 changes: 21 additions & 3 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use abi_stable::{
std_types::{RResult, RString, RVec},
StableAbi,
};
use datafusion::error::Result;
use datafusion::{
error::DataFusionError,
execution::{SendableRecordBatchStream, TaskContext},
physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
};
use datafusion::{error::Result, physical_plan::execution_plan::EmissionType};

use crate::{
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
Expand Down Expand Up @@ -268,11 +268,22 @@ impl ExecutionPlan for ForeignExecutionPlan {
}
}
}

fn emission_type(&self) -> EmissionType {
unimplemented!()
}

fn has_finite_memory(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use datafusion::{physical_plan::Partitioning, prelude::SessionContext};
use datafusion::{
physical_plan::{execution_plan::EmissionType, Partitioning},
prelude::SessionContext,
};

use super::*;

Expand All @@ -287,7 +298,6 @@ mod tests {
props: PlanProperties::new(
datafusion::physical_expr::EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(3),
datafusion::physical_plan::ExecutionMode::Incremental,
),
}
}
Expand Down Expand Up @@ -338,6 +348,14 @@ mod tests {
fn statistics(&self) -> Result<datafusion::common::Statistics> {
unimplemented!()
}

fn emission_type(&self) -> EmissionType {
unimplemented!()
}

fn has_finite_memory(&self) -> bool {
true
}
}

#[test]
Expand Down
82 changes: 51 additions & 31 deletions datafusion/ffi/src/plan_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::datatypes::SchemaRef;
use datafusion::{
error::{DataFusionError, Result},
physical_expr::EquivalenceProperties,
physical_plan::{ExecutionMode, PlanProperties},
physical_plan::{execution_plan::EmissionType, PlanProperties},
prelude::SessionContext,
};
use datafusion_proto::{
Expand All @@ -53,8 +53,11 @@ pub struct FFI_PlanProperties {
pub output_partitioning:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,

/// Return the execution mode of the plan.
pub execution_mode: unsafe extern "C" fn(plan: &Self) -> FFI_ExecutionMode,
/// Return the emission type of the plan.
pub emission_type: unsafe extern "C" fn(plan: &Self) -> OptionalEmissionType,

/// Indicates whether the plan has finite memory requirements.
pub has_finite_memory: unsafe extern "C" fn(plan: &Self) -> bool,

/// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message
/// serialized into bytes to pass across the FFI boundary.
Expand Down Expand Up @@ -98,12 +101,28 @@ unsafe extern "C" fn output_partitioning_fn_wrapper(
ROk(output_partitioning.into())
}

unsafe extern "C" fn execution_mode_fn_wrapper(
unsafe extern "C" fn emission_type_fn_wrapper(
properties: &FFI_PlanProperties,
) -> FFI_ExecutionMode {
) -> OptionalEmissionType {
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
let props = &(*private_data).props;
if let Some(emission_type) = props.emission_type {
OptionalEmissionType {
is_some: true,
value: emission_type.into(),
}
} else {
OptionalEmissionType {
is_some: false,
value: FFI_EmissionType::Incremental, // Any value as a placeholder
}
}
}

unsafe extern "C" fn memory_usage_fn_wrapper(properties: &FFI_PlanProperties) -> bool {
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
let props = &(*private_data).props;
props.execution_mode().into()
props.has_finite_memory
}

unsafe extern "C" fn output_ordering_fn_wrapper(
Expand Down Expand Up @@ -164,7 +183,8 @@ impl From<&PlanProperties> for FFI_PlanProperties {

FFI_PlanProperties {
output_partitioning: output_partitioning_fn_wrapper,
execution_mode: execution_mode_fn_wrapper,
emission_type: emission_type_fn_wrapper,
has_finite_memory: memory_usage_fn_wrapper,
output_ordering: output_ordering_fn_wrapper,
schema: schema_fn_wrapper,
release: release_fn_wrapper,
Expand Down Expand Up @@ -220,54 +240,55 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
RErr(e) => Err(DataFusionError::Plan(e.to_string())),
}?;

let execution_mode: ExecutionMode =
unsafe { (ffi_props.execution_mode)(&ffi_props).into() };

let eq_properties = match orderings {
Some(ordering) => {
EquivalenceProperties::new_with_orderings(Arc::new(schema), &[ordering])
}
None => EquivalenceProperties::new(Arc::new(schema)),
};

Ok(PlanProperties::new(
eq_properties,
partitioning,
execution_mode,
))
Ok(PlanProperties::new(eq_properties, partitioning))
}
}

/// FFI safe version of [`ExecutionMode`].
/// FFI safe version of [`EmissionType`].
#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Clone, StableAbi)]
pub enum FFI_ExecutionMode {
Bounded,
Unbounded,
PipelineBreaking,
pub enum FFI_EmissionType {
Incremental,
Final,
Both,
}

impl From<ExecutionMode> for FFI_ExecutionMode {
fn from(value: ExecutionMode) -> Self {
impl From<EmissionType> for FFI_EmissionType {
fn from(value: EmissionType) -> Self {
match value {
ExecutionMode::Bounded => FFI_ExecutionMode::Bounded,
ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded,
ExecutionMode::PipelineBreaking => FFI_ExecutionMode::PipelineBreaking,
EmissionType::Incremental => FFI_EmissionType::Incremental,
EmissionType::Final => FFI_EmissionType::Final,
EmissionType::Both => FFI_EmissionType::Both,
}
}
}

impl From<FFI_ExecutionMode> for ExecutionMode {
fn from(value: FFI_ExecutionMode) -> Self {
impl From<FFI_EmissionType> for EmissionType {
fn from(value: FFI_EmissionType) -> Self {
match value {
FFI_ExecutionMode::Bounded => ExecutionMode::Bounded,
FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded,
FFI_ExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking,
FFI_EmissionType::Incremental => EmissionType::Incremental,
FFI_EmissionType::Final => EmissionType::Final,
FFI_EmissionType::Both => EmissionType::Both,
}
}
}

#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Clone, StableAbi)]
pub struct OptionalEmissionType {
pub is_some: bool,
pub value: FFI_EmissionType, // Valid only if `is_some` is true
}

#[cfg(test)]
mod tests {
use datafusion::physical_plan::Partitioning;
Expand All @@ -283,7 +304,6 @@ mod tests {
let original_props = PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(3),
ExecutionMode::Unbounded,
);

let local_props_ptr = FFI_PlanProperties::from(&original_props);
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ arrow-buffer = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bitflags = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-common-runtime = { workspace = true, default-features = true }
Expand Down
2 changes: 0 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1742,8 +1742,6 @@ mod tests {
eq_properties,
// Output Partitioning
Partitioning::UnknownPartitioning(1),
// Execution Mode
ExecutionMode::Bounded,
)
}
}
Expand Down
Loading

0 comments on commit 3f61dcf

Please sign in to comment.