Skip to content

Commit

Permalink
chore: use builder API to create FileScanConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Lamb <[email protected]>
  • Loading branch information
alamb authored and ion-elgreco committed Feb 25, 2025
1 parent c1bbc4c commit 25fd4a1
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 56 deletions.
41 changes: 20 additions & 21 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
config::ConfigOptions, Column, Constraints, DFSchema, DataFusionError,
Result as DataFusionResult, TableReference, ToDFSchema,
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
TableReference, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::logical_plan::CreateExternalTable;
Expand Down Expand Up @@ -648,25 +648,24 @@ impl<'a> DeltaScanBuilder<'a> {
..Default::default()
};

let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema,
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
file_groups: if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
constraints: Constraints::default(),
statistics: stats,
projection: self.projection.cloned(),
limit: self.limit,
table_partition_cols,
output_ordering: vec![],
})
let mut exec_plan_builder = ParquetExecBuilder::new(
FileScanConfig::new(self.log_store.object_store_url(), file_schema)
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols),
)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
.with_table_parquet_options(parquet_options);

Expand Down
49 changes: 14 additions & 35 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::execution::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_common::{Constraints, ScalarValue, Statistics};
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{expressions, PhysicalExpr};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::union::UnionExec;
Expand Down Expand Up @@ -377,53 +377,32 @@ impl CdfLoadBuilder {
let cdc_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema: cdc_file_schema.clone(),
file_groups: cdc_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&cdc_file_schema),
projection: None,
limit: None,
table_partition_cols: cdc_partition_cols,
output_ordering: vec![],
},
FileScanConfig::new(self.log_store.object_store_url(), cdc_file_schema)
.with_file_groups(cdc_file_groups.into_values().collect())
.with_table_partition_cols(cdc_partition_cols),
filters,
)
.await?;

let add_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema: add_remove_file_schema.clone(),
file_groups: add_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&add_remove_file_schema.clone()),
projection: None,
limit: None,
table_partition_cols: add_remove_partition_cols.clone(),
output_ordering: vec![],
},
FileScanConfig::new(
self.log_store.object_store_url(),
add_remove_file_schema.clone(),
)
.with_file_groups(add_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols.clone()),
filters,
)
.await?;

let remove_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema: add_remove_file_schema.clone(),
file_groups: remove_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&add_remove_file_schema),
projection: None,
limit: None,
table_partition_cols: add_remove_partition_cols,
output_ordering: vec![],
},
FileScanConfig::new(self.log_store.object_store_url(), add_remove_file_schema)
.with_file_groups(remove_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols),
filters,
)
.await?;
Expand All @@ -434,7 +413,7 @@ impl CdfLoadBuilder {
Arc::new(UnionExec::new(vec![cdc_scan, add_scan, remove_scan]));

// We project the union in the order of the input_schema + cdc cols at the end
// This is to ensure the DeltaCdfTableProvider uses the correct schema consturction.
// This is to ensure the DeltaCdfTableProvider uses the correct schema construction.
let mut fields = schema.fields().to_vec();
for f in ADD_PARTITION_SCHEMA.clone() {
fields.push(f.into());
Expand Down

0 comments on commit 25fd4a1

Please sign in to comment.