diff --git a/Cargo.toml b/Cargo.toml index abd292c077..97e4732bb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,16 +45,28 @@ object_store = { version = "0.11.2" , features = ["cloud"]} parquet = { version = "54" } # datafusion -datafusion = "45" -datafusion-expr = "45" -datafusion-common = "45" -datafusion-ffi = "45" -datafusion-functions = "45" -datafusion-functions-aggregate = "45" -datafusion-physical-expr = "45" -datafusion-physical-plan = "45" -datafusion-proto = "45" -datafusion-sql = "45" +# datafusion = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/core" } +# datafusion-expr = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/expr" } +# datafusion-common = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/common" } +# datafusion-ffi = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/ffi" } +# datafusion-functions = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/functions" } +# datafusion-functions-aggregate = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/functions-aggregate" } +# datafusion-physical-expr = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/physical-expr" } +# datafusion-physical-plan = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/physical-plan" } +# datafusion-proto = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/proto" } +# datafusion-sql = { path = "/Users/andrewlamb/Software/datafusion2/datafusion/sql" } + + +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-ffi = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-functions-aggregate = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } +datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2fd558fdd0fa95e70d6ae5135776d164119f3536" } # serde serde = { version = "1.0.194", features = ["derive"] } @@ -77,4 +89,3 @@ async-trait = { version = "0.1" } futures = { version = "0.3" } tokio = { version = "1" } num_cpus = { version = "1" } - diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index d730756d98..b5d522ede0 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -34,7 +34,9 @@ use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; use datafusion_expr::expr::InList; use datafusion_expr::planner::ExprPlanner; -use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource}; +use datafusion_expr::{ + AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, ScalarFunctionArgs, TableSource, +}; // Needed for MakeParquetArray use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature}; use datafusion_functions::core::planner::CoreFunctionPlanner; @@ -99,13 +101,14 @@ impl ScalarUDFImpl for MakeParquetArray { r_type } - fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result { + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + // (&self, args: &[ColumnarValue], number_rows: usize) -> Result { let mut data_type = DataType::Null; - for arg in args { + for arg in &args.args { data_type = arg.data_type(); } - match self.actual.invoke_batch(args, number_rows)? { + match self.actual.invoke_with_args(args)? { ColumnarValue::Scalar(ScalarValue::List(df_array)) => { let field = Arc::new(Field::new("element", data_type, true)); let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new( diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index e72112c230..4c6c7fa301 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -39,9 +39,8 @@ use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::config::TableParquetOptions; -use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ - wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, + wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource, }; use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType}; use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext}; @@ -648,36 +647,39 @@ impl<'a> DeltaScanBuilder<'a> { ..Default::default() }; - let mut exec_plan_builder = ParquetExecBuilder::new( - FileScanConfig::new(self.log_store.object_store_url(), file_schema) - .with_file_groups( - // If all files were filtered out, we still need to emit at least one partition to - // pass datafusion sanity checks. - // - // See https://github.com/apache/datafusion/issues/11322 - if file_groups.is_empty() { - vec![vec![]] - } else { - file_groups.into_values().collect() - }, - ) - .with_statistics(stats) - .with_projection(self.projection.cloned()) - .with_limit(self.limit) - .with_table_partition_cols(table_partition_cols), - ) - .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {})) - .with_table_parquet_options(parquet_options); + let mut file_source = ParquetSource::new(parquet_options) + .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {})); // Sometimes (i.e Merge) we want to prune files that don't make the // filter and read the entire contents for files that do match the // filter if let Some(predicate) = logical_filter { if config.enable_parquet_pushdown { - exec_plan_builder = exec_plan_builder.with_predicate(predicate); + file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate); } }; + let file_scan_config = FileScanConfig::new( + self.log_store.object_store_url(), + file_schema, + Arc::new(file_source), + ) + .with_file_groups( + // If all files were filtered out, we still need to emit at least one partition to + // pass datafusion sanity checks. + // + // See https://github.com/apache/datafusion/issues/11322 + if file_groups.is_empty() { + vec![vec![]] + } else { + file_groups.into_values().collect() + }, + ) + .with_statistics(stats) + .with_projection(self.projection.cloned()) + .with_limit(self.limit) + .with_table_partition_cols(table_partition_cols); + let metrics = ExecutionPlanMetricsSet::new(); MetricBuilder::new(&metrics) .global_counter("files_scanned") @@ -688,7 +690,7 @@ impl<'a> DeltaScanBuilder<'a> { Ok(DeltaScan { table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), - parquet_scan: exec_plan_builder.build_arc(), + parquet_scan: file_scan_config.build(), config, logical_schema, metrics, @@ -1955,7 +1957,7 @@ mod tests { use bytes::Bytes; use chrono::{TimeZone, Utc}; use datafusion::assert_batches_sorted_eq; - use datafusion::datasource::physical_plan::ParquetExec; + use datafusion::datasource::source::DataSourceExec; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr}; use datafusion_expr::lit; @@ -2655,7 +2657,7 @@ mod tests { .await .unwrap(); - let mut visitor = ParquetPredicateVisitor::default(); + let mut visitor = ParquetVisitor::default(); visit_execution_plan(&scan, &mut visitor).unwrap(); assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s"); @@ -2690,7 +2692,7 @@ mod tests { .await .unwrap(); - let mut visitor = ParquetPredicateVisitor::default(); + let mut visitor = ParquetVisitor::default(); visit_execution_plan(&scan, &mut visitor).unwrap(); assert!(visitor.predicate.is_none()); @@ -2719,42 +2721,46 @@ mod tests { .await .unwrap(); - let mut visitor = ParquetOptionsVisitor::default(); + let mut visitor = ParquetVisitor::default(); visit_execution_plan(&scan, &mut visitor).unwrap(); assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap()); } + /// Extracts fields from the parquet scan #[derive(Default)] - struct ParquetPredicateVisitor { + struct ParquetVisitor { predicate: Option>, pruning_predicate: Option>, + options: Option, } - impl ExecutionPlanVisitor for ParquetPredicateVisitor { + impl ExecutionPlanVisitor for ParquetVisitor { type Error = DataFusionError; fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { - if let Some(parquet_exec) = plan.as_any().downcast_ref::() { - self.predicate = parquet_exec.predicate().cloned(); - self.pruning_predicate = parquet_exec.pruning_predicate().cloned(); - } - Ok(true) - } - } - - #[derive(Default)] - struct ParquetOptionsVisitor { - options: Option, - } + let Some(datasource_exec) = plan.as_any().downcast_ref::() else { + return Ok(true); + }; - impl ExecutionPlanVisitor for ParquetOptionsVisitor { - type Error = DataFusionError; + let Some(scan_config) = datasource_exec + .data_source() + .as_any() + .downcast_ref::() + else { + return Ok(true); + }; - fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { - if let Some(parquet_exec) = plan.as_any().downcast_ref::() { - self.options = Some(parquet_exec.table_parquet_options().clone()) + if let Some(parquet_source) = scan_config + .file_source + .as_any() + .downcast_ref::() + { + self.options = Some(parquet_source.table_parquet_options().clone()); + self.predicate = parquet_source.predicate().cloned(); + self.pruning_predicate = parquet_source.pruning_predicate().cloned(); } + Ok(true) } } diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 2516a17782..90d2b5d364 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -16,11 +16,10 @@ use std::time::SystemTime; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, Field, Schema}; use chrono::{DateTime, Utc}; -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, ParquetSource}; use datafusion::execution::SessionState; use datafusion::prelude::SessionContext; +use datafusion_common::config::TableParquetOptions; use datafusion_common::ScalarValue; use datafusion_physical_expr::{expressions, PhysicalExpr}; use datafusion_physical_plan::projection::ProjectionExec; @@ -369,38 +368,38 @@ impl CdfLoadBuilder { )?; // Create the parquet scans for each associated type of file. - let cdc_scan = ParquetFormat::new() - .create_physical_plan( - session_sate, - FileScanConfig::new(self.log_store.object_store_url(), cdc_file_schema) - .with_file_groups(cdc_file_groups.into_values().collect()) - .with_table_partition_cols(cdc_partition_cols), - filters, - ) - .await?; - - let add_scan = ParquetFormat::new() - .create_physical_plan( - session_sate, - FileScanConfig::new( - self.log_store.object_store_url(), - add_remove_file_schema.clone(), - ) - .with_file_groups(add_file_groups.into_values().collect()) - .with_table_partition_cols(add_remove_partition_cols.clone()), - filters, - ) - .await?; - - let remove_scan = ParquetFormat::new() - .create_physical_plan( - session_sate, - FileScanConfig::new(self.log_store.object_store_url(), add_remove_file_schema) - .with_file_groups(remove_file_groups.into_values().collect()) - .with_table_partition_cols(add_remove_partition_cols), - filters, - ) - .await?; + let mut parquet_source = ParquetSource::new(TableParquetOptions::new()); + if let Some(filters) = filters { + parquet_source = + parquet_source.with_predicate(Arc::clone(&cdc_file_schema), Arc::clone(filters)); + } + let parquet_source: Arc = Arc::new(parquet_source); + let cdc_scan: Arc = FileScanConfig::new( + self.log_store.object_store_url(), + Arc::clone(&cdc_file_schema), + Arc::clone(&parquet_source), + ) + .with_file_groups(cdc_file_groups.into_values().collect()) + .with_table_partition_cols(cdc_partition_cols) + .build(); + + let add_scan: Arc = FileScanConfig::new( + self.log_store.object_store_url(), + Arc::clone(&add_remove_file_schema), + Arc::clone(&parquet_source), + ) + .with_file_groups(add_file_groups.into_values().collect()) + .with_table_partition_cols(add_remove_partition_cols.clone()) + .build(); + + let remove_scan: Arc = FileScanConfig::new( + self.log_store.object_store_url(), + Arc::clone(&add_remove_file_schema), + parquet_source, + ) + .with_file_groups(remove_file_groups.into_values().collect()) + .with_table_partition_cols(add_remove_partition_cols) + .build(); // The output batches are then unioned to create a single output. Coalesce partitions is only here for the time // being for development. I plan to parallelize the reads once the base idea is correct. diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 0df507826b..7a459dd3d7 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -463,6 +463,7 @@ pub(crate) fn find_node( #[cfg(test)] mod tests { + use super::BarrierSurvivorSet; use crate::operations::merge::MergeBarrierExec; use crate::operations::merge::{ TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN, @@ -474,16 +475,14 @@ mod tests { use arrow_schema::DataType as ArrowDataType; use arrow_schema::Field; use datafusion::assert_batches_sorted_eq; + use datafusion::datasource::memory::MemorySourceConfig; use datafusion::execution::TaskContext; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; - use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion_physical_expr::expressions::Column; use futures::StreamExt; use std::sync::Arc; - use super::BarrierSurvivorSet; - #[tokio::test] async fn test_barrier() { // Validate that files without modifications are dropped and that files with changes passthrough @@ -662,7 +661,7 @@ mod tests { async fn execute(input: Vec) -> (Vec, BarrierSurvivorSet) { let schema = get_schema(); let repartition = Arc::new(Column::new("__delta_rs_path", 2)); - let exec = Arc::new(MemoryExec::try_new(&[input], schema.clone(), None).unwrap()); + let exec = MemorySourceConfig::try_new_exec(&[input], schema.clone(), None).unwrap(); let task_ctx = Arc::new(TaskContext::default()); let merge = diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 26829c569a..89484e991a 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -27,7 +27,6 @@ //! })? //! .await? //! ```` - use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; @@ -540,18 +539,22 @@ impl MergeOperation { Column { relation: None, name, + spans, } => Column { relation: Some(r), name, + spans, }, Column { relation: Some(TableReference::Bare { table }), name, + spans, } => { if table.as_ref() == alias { Column { relation: Some(r), name, + spans, } } else { return Err(DeltaTableError::Generic( diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 44820dd88e..768887b9c2 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -630,7 +630,7 @@ impl MergePlan { ) -> Result>, DeltaTableError> { use datafusion_common::Column; use datafusion_expr::expr::ScalarFunction; - use datafusion_expr::{col, Expr, ScalarUDF}; + use datafusion_expr::{Expr, ScalarUDF}; let provider = table_provider.with_files(files.files); let df = context.ctx.read_table(Arc::new(provider))?; diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index fe9295c36d..990339b942 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -11,7 +11,6 @@ use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit, }; use datafusion::assert_batches_sorted_eq; -use datafusion::datasource::physical_plan::ParquetExec; use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -40,6 +39,7 @@ use serial_test::serial; use url::Url; mod local { + use datafusion::datasource::source::DataSourceExec; use datafusion::{common::stats::Precision, datasource::provider_as_source}; use datafusion_expr::LogicalPlanBuilder; use deltalake_core::{ @@ -85,7 +85,7 @@ mod local { &mut self, plan: &dyn ExecutionPlan, ) -> std::result::Result { - if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(exec) = plan.as_any().downcast_ref::() { let files = get_scanned_files(exec); self.scanned_files.extend(files); } else if let Some(exec) = plan.as_any().downcast_ref::() { diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 5c820ffba9..a9bea842cb 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -44,6 +44,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { enable_ident_normalization: self.options.enable_ident_normalization, support_varchar_with_length: false, enable_options_value_normalization: false, + collect_spans: false, }, ); planner.statement_to_plan(s) diff --git a/crates/test/tests/data/checkpoints/_delta_log/.gitignore b/crates/test/tests/data/checkpoints/_delta_log/.gitignore deleted file mode 100644 index 8624856880..0000000000 --- a/crates/test/tests/data/checkpoints/_delta_log/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.parquet -_last_checkpoint - diff --git a/python/src/lib.rs b/python/src/lib.rs index 743ef205ce..67525cf613 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -2288,7 +2288,7 @@ fn write_to_deltalake( _table: Arc::new(Mutex::new(table)), _config: FsConfig { root_url: table_uri, - options: options, + options, }, }; Ok(raw_table)