Skip to content

Commit

Permalink
chore: Update to DataFusion 46.0.0, update for API chanages
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Lamb <[email protected]>
  • Loading branch information
alamb committed Feb 28, 2025
1 parent 94a2009 commit c9b1903
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 118 deletions.
33 changes: 22 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,28 @@ object_store = { version = "0.11.2" , features = ["cloud"]}
parquet = { version = "54" }

# datafusion
datafusion = "45"
datafusion-expr = "45"
datafusion-common = "45"
datafusion-ffi = "45"
datafusion-functions = "45"
datafusion-functions-aggregate = "45"
datafusion-physical-expr = "45"
datafusion-physical-plan = "45"
datafusion-proto = "45"
datafusion-sql = "45"
# datafusion = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/core" }
# datafusion-expr = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/expr" }
# datafusion-common = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/common" }
# datafusion-ffi = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/ffi" }
# datafusion-functions = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/functions" }
# datafusion-functions-aggregate = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/functions-aggregate" }
# datafusion-physical-expr = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/physical-expr" }
# datafusion-physical-plan = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/physical-plan" }
# datafusion-proto = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/proto" }
# datafusion-sql = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/sql" }


datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-ffi = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-functions-aggregate = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand All @@ -77,4 +89,3 @@ async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }

10 changes: 6 additions & 4 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::expr::InList;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource};
use datafusion_expr::{
AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, ScalarFunctionArgs, TableSource,
};
// Needed for MakeParquetArray
use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_functions::core::planner::CoreFunctionPlanner;
Expand Down Expand Up @@ -99,13 +101,13 @@ impl ScalarUDFImpl for MakeParquetArray {
r_type
}

fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let mut data_type = DataType::Null;
for arg in args {
for arg in &args.args {
data_type = arg.data_type();
}

match self.actual.invoke_batch(args, number_rows)? {
match self.actual.invoke_with_args(args)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new(
Expand Down
100 changes: 53 additions & 47 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource,
};
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
Expand Down Expand Up @@ -648,36 +647,39 @@ impl<'a> DeltaScanBuilder<'a> {
..Default::default()
};

let mut exec_plan_builder = ParquetExecBuilder::new(
FileScanConfig::new(self.log_store.object_store_url(), file_schema)
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols),
)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
.with_table_parquet_options(parquet_options);
let mut file_source = ParquetSource::new(parquet_options)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
// filter
if let Some(predicate) = logical_filter {
if config.enable_parquet_pushdown {
exec_plan_builder = exec_plan_builder.with_predicate(predicate);
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
}
};

let file_scan_config = FileScanConfig::new(
self.log_store.object_store_url(),
file_schema,
Arc::new(file_source),
)
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols);

let metrics = ExecutionPlanMetricsSet::new();
MetricBuilder::new(&metrics)
.global_counter("files_scanned")
Expand All @@ -688,7 +690,7 @@ impl<'a> DeltaScanBuilder<'a> {

Ok(DeltaScan {
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
parquet_scan: exec_plan_builder.build_arc(),
parquet_scan: file_scan_config.build(),
config,
logical_schema,
metrics,
Expand Down Expand Up @@ -1955,7 +1957,7 @@ mod tests {
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
use datafusion_expr::lit;
Expand Down Expand Up @@ -2655,7 +2657,7 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
Expand Down Expand Up @@ -2690,7 +2692,7 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert!(visitor.predicate.is_none());
Expand Down Expand Up @@ -2719,42 +2721,46 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetOptionsVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
}

/// Extracts fields from the parquet scan
#[derive(Default)]
struct ParquetPredicateVisitor {
struct ParquetVisitor {
predicate: Option<Arc<dyn PhysicalExpr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
options: Option<TableParquetOptions>,
}

impl ExecutionPlanVisitor for ParquetPredicateVisitor {
impl ExecutionPlanVisitor for ParquetVisitor {
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.predicate = parquet_exec.predicate().cloned();
self.pruning_predicate = parquet_exec.pruning_predicate().cloned();
}
Ok(true)
}
}

#[derive(Default)]
struct ParquetOptionsVisitor {
options: Option<TableParquetOptions>,
}
let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
return Ok(true);
};

impl ExecutionPlanVisitor for ParquetOptionsVisitor {
type Error = DataFusionError;
let Some(scan_config) = datasource_exec
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
else {
return Ok(true);
};

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.options = Some(parquet_exec.table_parquet_options().clone())
if let Some(parquet_source) = scan_config
.file_source
.as_any()
.downcast_ref::<ParquetSource>()
{
self.options = Some(parquet_source.table_parquet_options().clone());
self.predicate = parquet_source.predicate().cloned();
self.pruning_predicate = parquet_source.pruning_predicate().cloned();
}

Ok(true)
}
}
Expand Down
69 changes: 34 additions & 35 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ use std::time::SystemTime;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, Field, Schema};
use chrono::{DateTime, Utc};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, ParquetSource};
use datafusion::execution::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{expressions, PhysicalExpr};
use datafusion_physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -369,38 +368,38 @@ impl CdfLoadBuilder {
)?;

// Create the parquet scans for each associated type of file.
let cdc_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(self.log_store.object_store_url(), cdc_file_schema)
.with_file_groups(cdc_file_groups.into_values().collect())
.with_table_partition_cols(cdc_partition_cols),
filters,
)
.await?;

let add_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(
self.log_store.object_store_url(),
add_remove_file_schema.clone(),
)
.with_file_groups(add_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols.clone()),
filters,
)
.await?;

let remove_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(self.log_store.object_store_url(), add_remove_file_schema)
.with_file_groups(remove_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols),
filters,
)
.await?;
let mut parquet_source = ParquetSource::new(TableParquetOptions::new());
if let Some(filters) = filters {
parquet_source =
parquet_source.with_predicate(Arc::clone(&cdc_file_schema), Arc::clone(filters));
}
let parquet_source: Arc<dyn FileSource> = Arc::new(parquet_source);
let cdc_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&cdc_file_schema),
Arc::clone(&parquet_source),
)
.with_file_groups(cdc_file_groups.into_values().collect())
.with_table_partition_cols(cdc_partition_cols)
.build();

let add_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&add_remove_file_schema),
Arc::clone(&parquet_source),
)
.with_file_groups(add_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols.clone())
.build();

let remove_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&add_remove_file_schema),
parquet_source,
)
.with_file_groups(remove_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols)
.build();

// The output batches are then unioned to create a single output. Coalesce partitions is only here for the time
// being for development. I plan to parallelize the reads once the base idea is correct.
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ pub(crate) fn find_node<T: 'static>(

#[cfg(test)]
mod tests {
use super::BarrierSurvivorSet;
use crate::operations::merge::MergeBarrierExec;
use crate::operations::merge::{
TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN,
Expand All @@ -474,16 +475,14 @@ mod tests {
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_physical_expr::expressions::Column;
use futures::StreamExt;
use std::sync::Arc;

use super::BarrierSurvivorSet;

#[tokio::test]
async fn test_barrier() {
// Validate that files without modifications are dropped and that files with changes passthrough
Expand Down Expand Up @@ -662,7 +661,7 @@ mod tests {
async fn execute(input: Vec<RecordBatch>) -> (Vec<RecordBatch>, BarrierSurvivorSet) {
let schema = get_schema();
let repartition = Arc::new(Column::new("__delta_rs_path", 2));
let exec = Arc::new(MemoryExec::try_new(&[input], schema.clone(), None).unwrap());
let exec = MemorySourceConfig::try_new_exec(&[input], schema.clone(), None).unwrap();

let task_ctx = Arc::new(TaskContext::default());
let merge =
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
//! })?
//! .await?
//! ````
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
Expand Down Expand Up @@ -540,18 +539,22 @@ impl MergeOperation {
Column {
relation: None,
name,
spans,
} => Column {
relation: Some(r),
name,
spans,
},
Column {
relation: Some(TableReference::Bare { table }),
name,
spans,
} => {
if table.as_ref() == alias {
Column {
relation: Some(r),
name,
spans,
}
} else {
return Err(DeltaTableError::Generic(
Expand Down
Loading

0 comments on commit c9b1903

Please sign in to comment.