From 5af0df83c2cd1d3f82f293b066b401a4dfd4064b Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Tue, 12 Nov 2024 17:07:02 +0900 Subject: [PATCH] Upgrade to DF 43 --- Cargo.toml | 6 +++--- datafusion-federation/src/analyzer.rs | 3 +-- datafusion-federation/src/optimize.rs | 1 + datafusion-federation/src/plan_node.rs | 9 +++++++-- datafusion-federation/src/table_provider.rs | 14 ++++++++++++-- sources/sql/src/lib.rs | 6 ++++++ sources/sql/src/schema.rs | 11 +++++++++++ 7 files changed, 41 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3836fd7..15d64d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ # connectorx = { path = "../connector-x/connectorx" } # datafusion = { path = "../arrow-datafusion/datafusion/core" } # Pending next Datafusion release with `unnest` unparsing support -datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "06969ee5af853f0c071a98683dc2b9fde71b81a9" } +datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "0bad328656a07fd5ab899186462b09e119e21f90" } [workspace.package] version = "0.1.6" @@ -25,6 +25,6 @@ readme = "README.md" async-trait = "0.1.77" async-stream = "0.3.5" futures = "0.3.30" -datafusion = "42" -datafusion-substrait = "42" +datafusion = "43" +datafusion-substrait = "43" arrow-json = "53" diff --git a/datafusion-federation/src/analyzer.rs b/datafusion-federation/src/analyzer.rs index be57a04..644b034 100644 --- a/datafusion-federation/src/analyzer.rs +++ b/datafusion-federation/src/analyzer.rs @@ -14,7 +14,7 @@ use crate::{ optimize::Optimizer, FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef, }; -#[derive(Default)] +#[derive(Default, Debug)] pub struct FederationAnalyzerRule { optimizer: Optimizer, } @@ -24,7 +24,6 @@ impl AnalyzerRule for FederationAnalyzerRule { // TableScans from the same FederationProvider. // There 'largest sub-trees' are passed to their respective FederationProvider.optimizer. fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result { - if !contains_federated_table(&plan)? { return Ok(plan); } diff --git a/datafusion-federation/src/optimize.rs b/datafusion-federation/src/optimize.rs index 070581c..082ddf6 100644 --- a/datafusion-federation/src/optimize.rs +++ b/datafusion-federation/src/optimize.rs @@ -10,6 +10,7 @@ use datafusion::{ prelude::SessionConfig, }; +#[derive(Debug)] pub(crate) struct Optimizer { config: SessionState, push_down_filter: PushDownFilter, diff --git a/datafusion-federation/src/plan_node.rs b/datafusion-federation/src/plan_node.rs index c99204f..f938290 100644 --- a/datafusion-federation/src/plan_node.rs +++ b/datafusion-federation/src/plan_node.rs @@ -30,6 +30,12 @@ impl FederatedPlanNode { } } +impl PartialOrd for FederatedPlanNode { + fn partial_cmp(&self, other: &Self) -> Option { + self.plan.partial_cmp(&other.plan) + } +} + impl Debug for FederatedPlanNode { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { UserDefinedLogicalNodeCore::fmt_for_explain(self, f) @@ -67,8 +73,7 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode { } } -#[derive(Default)] - +#[derive(Default, Debug)] pub struct FederatedQueryPlanner {} impl FederatedQueryPlanner { diff --git a/datafusion-federation/src/table_provider.rs b/datafusion-federation/src/table_provider.rs index 92df798..93eb0aa 100644 --- a/datafusion-federation/src/table_provider.rs +++ b/datafusion-federation/src/table_provider.rs @@ -7,7 +7,9 @@ use datafusion::{ common::Constraints, datasource::TableProvider, error::{DataFusionError, Result}, - logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType}, + logical_expr::{ + dml::InsertOp, Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType, + }, physical_plan::ExecutionPlan, }; @@ -20,6 +22,14 @@ pub struct FederatedTableProviderAdaptor { pub table_provider: Option>, } +impl std::fmt::Debug for FederatedTableProviderAdaptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FederatedTableProviderAdaptor") + .field("table_provider", &self.table_provider) + .finish() + } +} + impl FederatedTableProviderAdaptor { pub fn new(source: Arc) -> Self { Self { @@ -124,7 +134,7 @@ impl TableProvider for FederatedTableProviderAdaptor { &self, _state: &dyn Session, input: Arc, - overwrite: bool, + overwrite: InsertOp, ) -> Result> { if let Some(table_provider) = &self.table_provider { return table_provider.insert_into(_state, input, overwrite).await; diff --git a/sources/sql/src/lib.rs b/sources/sql/src/lib.rs index 0cf0879..0186f1a 100644 --- a/sources/sql/src/lib.rs +++ b/sources/sql/src/lib.rs @@ -78,6 +78,12 @@ struct SQLFederationAnalyzerRule { planner: Arc, } +impl std::fmt::Debug for SQLFederationAnalyzerRule { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SQLFederationAnalyzerRule").finish() + } +} + impl SQLFederationAnalyzerRule { pub fn new(executor: Arc) -> Self { Self { diff --git a/sources/sql/src/schema.rs b/sources/sql/src/schema.rs index aa23fd0..86c58ff 100644 --- a/sources/sql/src/schema.rs +++ b/sources/sql/src/schema.rs @@ -13,6 +13,7 @@ use datafusion_federation::{ use crate::SQLFederationProvider; +#[derive(Debug)] pub struct SQLSchemaProvider { // provider: Arc, tables: Vec>, @@ -74,6 +75,7 @@ impl SchemaProvider for SQLSchemaProvider { } } +#[derive(Debug)] pub struct MultiSchemaProvider { children: Vec>, } @@ -114,6 +116,15 @@ pub struct SQLTableSource { schema: SchemaRef, } +impl std::fmt::Debug for SQLTableSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SQLTableSource") + .field("table_name", &self.table_name) + .field("schema", &self.schema) + .finish() + } +} + impl SQLTableSource { // creates a SQLTableSource and infers the table schema pub async fn new(provider: Arc, table_name: String) -> Result {