Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade to DataFusion 46.0.0 (WORK IN PROGRESS) #3261

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion 46 requires Rust 1.82

You can see CI fail without these changes
https://github.com/delta-io/delta-rs/actions/runs/13591787541/job/38000195037?pr=3261

I don't know what the MSRV policy in delta is so we probably can't merge this PR until it is ok to increase MSRV in delta

override: true

- name: Format
Expand All @@ -42,7 +42,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

- name: build and lint with clippy
Expand Down Expand Up @@ -79,7 +79,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

- name: Run tests
Expand Down Expand Up @@ -114,7 +114,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

# Install Java and Hadoop for HDFS integration tests
Expand Down Expand Up @@ -160,7 +160,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true

- name: Download Lakectl
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
toolchain: '1.82'
override: true
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
Expand Down
35 changes: 23 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ resolver = "2"

[workspace.package]
authors = ["Qingping Hou <[email protected]>"]
rust-version = "1.81"
rust-version = "1.82"
keywords = ["deltalake", "delta", "datalake"]
readme = "README.md"
edition = "2021"
Expand Down Expand Up @@ -45,16 +45,28 @@ object_store = { version = "0.11.2" , features = ["cloud"]}
parquet = { version = "54" }

# datafusion
datafusion = "45"
datafusion-expr = "45"
datafusion-common = "45"
datafusion-ffi = "45"
datafusion-functions = "45"
datafusion-functions-aggregate = "45"
datafusion-physical-expr = "45"
datafusion-physical-plan = "45"
datafusion-proto = "45"
datafusion-sql = "45"
# datafusion = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/core" }
# datafusion-expr = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/expr" }
# datafusion-common = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/common" }
# datafusion-ffi = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/ffi" }
# datafusion-functions = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/functions" }
# datafusion-functions-aggregate = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/functions-aggregate" }
# datafusion-physical-expr = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/physical-expr" }
# datafusion-physical-plan = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/physical-plan" }
# datafusion-proto = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/proto" }
# datafusion-sql = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/sql" }


datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-ffi = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-functions-aggregate = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand All @@ -77,4 +89,3 @@ async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }

10 changes: 6 additions & 4 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::expr::InList;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource};
use datafusion_expr::{
AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, ScalarFunctionArgs, TableSource,
};
// Needed for MakeParquetArray
use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_functions::core::planner::CoreFunctionPlanner;
Expand Down Expand Up @@ -99,13 +101,13 @@ impl ScalarUDFImpl for MakeParquetArray {
r_type
}

fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let mut data_type = DataType::Null;
for arg in args {
for arg in &args.args {
data_type = arg.data_type();
}

match self.actual.invoke_batch(args, number_rows)? {
match self.actual.invoke_with_args(args)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new(
Expand Down
100 changes: 53 additions & 47 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource,
};
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
Expand Down Expand Up @@ -648,36 +647,39 @@ impl<'a> DeltaScanBuilder<'a> {
..Default::default()
};

let mut exec_plan_builder = ParquetExecBuilder::new(
FileScanConfig::new(self.log_store.object_store_url(), file_schema)
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols),
)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
.with_table_parquet_options(parquet_options);
let mut file_source = ParquetSource::new(parquet_options)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
// filter
if let Some(predicate) = logical_filter {
if config.enable_parquet_pushdown {
exec_plan_builder = exec_plan_builder.with_predicate(predicate);
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
}
};

let file_scan_config = FileScanConfig::new(
self.log_store.object_store_url(),
file_schema,
Arc::new(file_source),
)
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols);

let metrics = ExecutionPlanMetricsSet::new();
MetricBuilder::new(&metrics)
.global_counter("files_scanned")
Expand All @@ -688,7 +690,7 @@ impl<'a> DeltaScanBuilder<'a> {

Ok(DeltaScan {
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
parquet_scan: exec_plan_builder.build_arc(),
parquet_scan: file_scan_config.build(),
config,
logical_schema,
metrics,
Expand Down Expand Up @@ -1955,7 +1957,7 @@ mod tests {
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
use datafusion_expr::lit;
Expand Down Expand Up @@ -2655,7 +2657,7 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
Expand Down Expand Up @@ -2690,7 +2692,7 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert!(visitor.predicate.is_none());
Expand Down Expand Up @@ -2719,42 +2721,46 @@ mod tests {
.await
.unwrap();

let mut visitor = ParquetOptionsVisitor::default();
let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
}

/// Extracts fields from the parquet scan
#[derive(Default)]
struct ParquetPredicateVisitor {
struct ParquetVisitor {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change the way the parquet scan information was found in the two visitors, and I combined them together at the same time as they were mostly boiler plate copy/paste

predicate: Option<Arc<dyn PhysicalExpr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
options: Option<TableParquetOptions>,
}

impl ExecutionPlanVisitor for ParquetPredicateVisitor {
impl ExecutionPlanVisitor for ParquetVisitor {
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.predicate = parquet_exec.predicate().cloned();
self.pruning_predicate = parquet_exec.pruning_predicate().cloned();
}
Ok(true)
}
}

#[derive(Default)]
struct ParquetOptionsVisitor {
options: Option<TableParquetOptions>,
}
let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
return Ok(true);
};

impl ExecutionPlanVisitor for ParquetOptionsVisitor {
type Error = DataFusionError;
let Some(scan_config) = datasource_exec
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
else {
return Ok(true);
};

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.options = Some(parquet_exec.table_parquet_options().clone())
if let Some(parquet_source) = scan_config
.file_source
.as_any()
.downcast_ref::<ParquetSource>()
{
self.options = Some(parquet_source.table_parquet_options().clone());
self.predicate = parquet_source.predicate().cloned();
self.pruning_predicate = parquet_source.pruning_predicate().cloned();
}

Ok(true)
}
}
Expand Down
69 changes: 34 additions & 35 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ use std::time::SystemTime;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, Field, Schema};
use chrono::{DateTime, Utc};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, ParquetSource};
use datafusion::execution::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{expressions, PhysicalExpr};
use datafusion_physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -369,38 +368,38 @@ impl CdfLoadBuilder {
)?;

// Create the parquet scans for each associated type of file.
let cdc_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(self.log_store.object_store_url(), cdc_file_schema)
.with_file_groups(cdc_file_groups.into_values().collect())
.with_table_partition_cols(cdc_partition_cols),
filters,
)
.await?;

let add_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(
self.log_store.object_store_url(),
add_remove_file_schema.clone(),
)
.with_file_groups(add_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols.clone()),
filters,
)
.await?;

let remove_scan = ParquetFormat::new()
.create_physical_plan(
session_sate,
FileScanConfig::new(self.log_store.object_store_url(), add_remove_file_schema)
.with_file_groups(remove_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols),
filters,
)
.await?;
let mut parquet_source = ParquetSource::new(TableParquetOptions::new());
if let Some(filters) = filters {
parquet_source =
parquet_source.with_predicate(Arc::clone(&cdc_file_schema), Arc::clone(filters));
}
let parquet_source: Arc<dyn FileSource> = Arc::new(parquet_source);
let cdc_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&cdc_file_schema),
Arc::clone(&parquet_source),
)
.with_file_groups(cdc_file_groups.into_values().collect())
.with_table_partition_cols(cdc_partition_cols)
.build();

let add_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&add_remove_file_schema),
Arc::clone(&parquet_source),
)
.with_file_groups(add_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols.clone())
.build();

let remove_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
self.log_store.object_store_url(),
Arc::clone(&add_remove_file_schema),
parquet_source,
)
.with_file_groups(remove_file_groups.into_values().collect())
.with_table_partition_cols(add_remove_partition_cols)
.build();

// The output batches are then unioned to create a single output. Coalesce partitions is only here for the time
// being for development. I plan to parallelize the reads once the base idea is correct.
Expand Down
Loading
Loading