Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Plan Properties caching #1

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ use crate::{

use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::dml::CopyTo;
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -231,7 +230,7 @@ async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if is_plan_streaming(&physical_plan)? {
if physical_plan.execution_mode().is_unbounded() {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
Expand Down Expand Up @@ -305,10 +304,9 @@ mod tests {
use std::str::FromStr;

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{
file_options::StatementOptions, FileType, FileTypeWriterOptions,
};

use datafusion::common::{plan_err, FileType, FileTypeWriterOptions};
use datafusion_common::file_options::StatementOptions;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
30 changes: 17 additions & 13 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanPropertiesCache, SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
Expand Down Expand Up @@ -190,6 +189,7 @@ impl TableProvider for CustomDataSource {
struct CustomExec {
db: CustomDataSource,
projected_schema: SchemaRef,
cache: PlanPropertiesCache,
}

impl CustomExec {
Expand All @@ -199,10 +199,22 @@ impl CustomExec {
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
let cache = PlanPropertiesCache::new_default(projected_schema.clone());
Self {
db,
projected_schema,
cache,
}
.with_cache()
}

fn with_cache(mut self) -> Self {
self.cache = self
.cache
.with_partitioning(Partitioning::UnknownPartitioning(1))
.with_exec_mode(ExecutionMode::Bounded);

self
}
}

Expand All @@ -217,16 +229,8 @@ impl ExecutionPlan for CustomExec {
self
}

fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn cache(&self) -> &PlanPropertiesCache {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
context::{SessionState, TaskContext},
FunctionRegistry,
};
use crate::execution::context::{SessionState, TaskContext};
use crate::execution::FunctionRegistry;
use crate::logical_expr::utils::find_window_exprs;
use crate::logical_expr::{
col, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, TableType,
Expand All @@ -40,6 +38,7 @@ use crate::physical_plan::{
collect, collect_partitioned, execute_stream, execute_stream_partitioned,
ExecutionPlan, SendableRecordBatchStream,
};
use crate::prelude::SessionContext;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
Expand All @@ -58,7 +57,6 @@ use datafusion_expr::{
TableProviderFilterPushDown, UNNAMED_TABLE,
};

use crate::prelude::SessionContext;
use async_trait::async_trait;

/// Contains options that control how data is
Expand Down Expand Up @@ -2902,7 +2900,7 @@ mod tests {
// For non-partition aware union, the output partitioning count should be the combination of all output partitions count
assert!(matches!(
physical_plan.output_partitioning(),
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
Partitioning::UnknownPartitioning(partition_count) if *partition_count == default_partition_count * 2));
Ok(())
}

Expand Down Expand Up @@ -2951,7 +2949,7 @@ mod tests {
];
assert_eq!(
out_partitioning,
Partitioning::Hash(left_exprs, default_partition_count)
&Partitioning::Hash(left_exprs, default_partition_count)
);
}
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
Expand All @@ -2961,13 +2959,13 @@ mod tests {
];
assert_eq!(
out_partitioning,
Partitioning::Hash(right_exprs, default_partition_count)
&Partitioning::Hash(right_exprs, default_partition_count)
);
}
JoinType::Full => {
assert!(matches!(
out_partitioning,
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
&Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
}
}
}
Expand Down
62 changes: 38 additions & 24 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use std::any::Any;
use std::sync::Arc;

use super::FileGroupPartitioner;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
};
Expand All @@ -34,14 +36,13 @@ use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache};

use futures::StreamExt;
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

use super::FileGroupPartitioner;

/// Execution plan for scanning Arrow data source
#[derive(Debug, Clone)]
#[allow(dead_code)]
Expand All @@ -52,26 +53,56 @@ pub struct ArrowExec {
projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanPropertiesCache,
}

impl ArrowExec {
/// Create a new Arrow reader execution plan provided base configurations
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();

let cache = PlanPropertiesCache::new_default(projected_schema.clone());
Self {
base_config,
projected_schema,
projected_statistics,
projected_output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
.with_cache()
}
/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

fn output_partitioning_helper(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}

fn with_cache(mut self) -> Self {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);

self.cache = PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = self.output_partitioning_helper();
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
}

impl DisplayAs for ArrowExec {
Expand All @@ -90,25 +121,8 @@ impl ExecutionPlan for ArrowExec {
self
}

fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
.map(|ordering| ordering.as_slice())
}

fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
fn cache(&self) -> &PlanPropertiesCache {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -138,7 +152,7 @@ impl ExecutionPlan for ArrowExec {

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut new_plan = self.clone();
new_plan.base_config.file_groups = repartitioned_file_groups;
new_plan = new_plan.with_file_groups(repartitioned_file_groups);
return Ok(Some(Arc::new(new_plan)));
}
Ok(None)
Expand Down
48 changes: 25 additions & 23 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ use std::sync::Arc;

use super::FileScanConfig;
use crate::error::Result;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanPropertiesCache, SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::SchemaRef;
Expand All @@ -43,26 +42,45 @@ pub struct AvroExec {
projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanPropertiesCache,
}

impl AvroExec {
/// Create a new Avro reader execution plan provided base configurations
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();

let cache = PlanPropertiesCache::new_default(projected_schema.clone());
Self {
base_config,
projected_schema,
projected_statistics,
projected_output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
.with_cache()
}
/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

fn with_cache(mut self) -> Self {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);
let n_partitions = self.base_config.file_groups.len();

self.cache = PlanPropertiesCache::new(
eq_properties,
Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
}
}

impl DisplayAs for AvroExec {
Expand All @@ -81,25 +99,8 @@ impl ExecutionPlan for AvroExec {
self
}

fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
.map(|ordering| ordering.as_slice())
}

fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
fn cache(&self) -> &PlanPropertiesCache {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -163,6 +164,7 @@ mod private {
use crate::datasource::avro_to_arrow::Reader as AvroReader;
use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener};
use crate::datasource::physical_plan::FileMeta;

use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};
Expand Down
Loading
Loading