Skip to content

Commit

Permalink
Upgrade to DF 43
Browse files Browse the repository at this point in the history
  • Loading branch information
phillipleblanc committed Nov 12, 2024
1 parent 914bd08 commit 5af0df8
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 9 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
# connectorx = { path = "../connector-x/connectorx" }
# datafusion = { path = "../arrow-datafusion/datafusion/core" }
# Pending next Datafusion release with `unnest` unparsing support
datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "06969ee5af853f0c071a98683dc2b9fde71b81a9" }
datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "0bad328656a07fd5ab899186462b09e119e21f90" }

[workspace.package]
version = "0.1.6"
Expand All @@ -25,6 +25,6 @@ readme = "README.md"
async-trait = "0.1.77"
async-stream = "0.3.5"
futures = "0.3.30"
datafusion = "42"
datafusion-substrait = "42"
datafusion = "43"
datafusion-substrait = "43"
arrow-json = "53"
3 changes: 1 addition & 2 deletions datafusion-federation/src/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
optimize::Optimizer, FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef,
};

#[derive(Default)]
#[derive(Default, Debug)]
pub struct FederationAnalyzerRule {
optimizer: Optimizer,
}
Expand All @@ -24,7 +24,6 @@ impl AnalyzerRule for FederationAnalyzerRule {
// TableScans from the same FederationProvider.
// There 'largest sub-trees' are passed to their respective FederationProvider.optimizer.
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {

if !contains_federated_table(&plan)? {
return Ok(plan);
}
Expand Down
1 change: 1 addition & 0 deletions datafusion-federation/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::{
prelude::SessionConfig,
};

#[derive(Debug)]
pub(crate) struct Optimizer {
config: SessionState,
push_down_filter: PushDownFilter,
Expand Down
9 changes: 7 additions & 2 deletions datafusion-federation/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ impl FederatedPlanNode {
}
}

impl PartialOrd for FederatedPlanNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.plan.partial_cmp(&other.plan)
}
}

impl Debug for FederatedPlanNode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
Expand Down Expand Up @@ -67,8 +73,7 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode {
}
}

#[derive(Default)]

#[derive(Default, Debug)]
pub struct FederatedQueryPlanner {}

impl FederatedQueryPlanner {
Expand Down
14 changes: 12 additions & 2 deletions datafusion-federation/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use datafusion::{
common::Constraints,
datasource::TableProvider,
error::{DataFusionError, Result},
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType},
logical_expr::{
dml::InsertOp, Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType,
},
physical_plan::ExecutionPlan,
};

Expand All @@ -20,6 +22,14 @@ pub struct FederatedTableProviderAdaptor {
pub table_provider: Option<Arc<dyn TableProvider>>,
}

impl std::fmt::Debug for FederatedTableProviderAdaptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FederatedTableProviderAdaptor")
.field("table_provider", &self.table_provider)
.finish()
}
}

impl FederatedTableProviderAdaptor {
pub fn new(source: Arc<dyn FederatedTableSource>) -> Self {
Self {
Expand Down Expand Up @@ -124,7 +134,7 @@ impl TableProvider for FederatedTableProviderAdaptor {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
overwrite: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.insert_into(_state, input, overwrite).await;
Expand Down
6 changes: 6 additions & 0 deletions sources/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ struct SQLFederationAnalyzerRule {
planner: Arc<dyn FederationPlanner>,
}

impl std::fmt::Debug for SQLFederationAnalyzerRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SQLFederationAnalyzerRule").finish()
}
}

impl SQLFederationAnalyzerRule {
pub fn new(executor: Arc<dyn SQLExecutor>) -> Self {
Self {
Expand Down
11 changes: 11 additions & 0 deletions sources/sql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion_federation::{

use crate::SQLFederationProvider;

#[derive(Debug)]
pub struct SQLSchemaProvider {
// provider: Arc<SQLFederationProvider>,
tables: Vec<Arc<SQLTableSource>>,
Expand Down Expand Up @@ -74,6 +75,7 @@ impl SchemaProvider for SQLSchemaProvider {
}
}

#[derive(Debug)]
pub struct MultiSchemaProvider {
children: Vec<Arc<dyn SchemaProvider>>,
}
Expand Down Expand Up @@ -114,6 +116,15 @@ pub struct SQLTableSource {
schema: SchemaRef,
}

impl std::fmt::Debug for SQLTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SQLTableSource")
.field("table_name", &self.table_name)
.field("schema", &self.schema)
.finish()
}
}

impl SQLTableSource {
// creates a SQLTableSource and infers the table schema
pub async fn new(provider: Arc<SQLFederationProvider>, table_name: String) -> Result<Self> {
Expand Down

0 comments on commit 5af0df8

Please sign in to comment.