From 9ca6c2557488c1c608182542c1be96889b64fe29 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 30 Jan 2025 00:47:45 +0530 Subject: [PATCH 1/7] move information_schema to datafusion-catalog --- datafusion-cli/Cargo.lock | 6 + datafusion/catalog/Cargo.toml | 6 + .../src}/information_schema.rs | 20 +- datafusion/catalog/src/lib.rs | 243 +++++++++++++++++- .../datasource => catalog/src}/streaming.rs | 8 +- datafusion/core/src/catalog_common/mod.rs | 243 ------------------ datafusion/core/src/datasource/mod.rs | 4 +- .../core/src/execution/session_state.rs | 12 +- datafusion/core/src/lib.rs | 2 +- datafusion/core/tests/memory_limit/mod.rs | 2 +- datafusion/substrait/Cargo.toml | 1 + .../tests/cases/substrait_validations.rs | 2 +- datafusion/substrait/tests/utils.rs | 2 +- 13 files changed, 279 insertions(+), 272 deletions(-) rename datafusion/{core/src/catalog_common => catalog/src}/information_schema.rs (98%) rename datafusion/{core/src/datasource => catalog/src}/streaming.rs (93%) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 5666406fda55..4c94ec0035fe 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1277,6 +1277,8 @@ dependencies = [ name = "datafusion-catalog" version = "44.0.0" dependencies = [ + "arrow", + "arrow-array", "arrow-schema", "async-trait", "dashmap", @@ -1284,8 +1286,12 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-physical-plan", + "datafusion-sql", + "futures", "itertools 0.14.0", + "log", "parking_lot", + "sqlparser", ] [[package]] diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index bcc258c7a7f4..a5e6754fe304 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -28,6 +28,8 @@ rust-version.workspace = true version.workspace = true [dependencies] +arrow = { workspace = true } +arrow-array = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } dashmap = { workspace = true } @@ -35,8 +37,12 @@ datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } +datafusion-sql = { workspace = true } +futures = { workspace = true } itertools = { workspace = true } +log = { workspace = true } parking_lot = { workspace = true } +sqlparser = { workspace = true } [dev-dependencies] tokio = { workspace = true } diff --git a/datafusion/core/src/catalog_common/information_schema.rs b/datafusion/catalog/src/information_schema.rs similarity index 98% rename from datafusion/core/src/catalog_common/information_schema.rs rename to datafusion/catalog/src/information_schema.rs index ce3092acfdf1..853afd83645b 100644 --- a/datafusion/core/src/catalog_common/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -19,16 +19,8 @@ //! //! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema -use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider}; -use crate::datasource::streaming::StreamingTable; -use crate::execution::context::TaskContext; -use crate::logical_expr::{TableType, Volatility}; -use crate::physical_plan::stream::RecordBatchStreamAdapter; -use crate::physical_plan::SendableRecordBatchStream; -use crate::{ - config::{ConfigEntry, ConfigOptions}, - physical_plan::streaming::PartitionStream, -}; +use crate::streaming::StreamingTable; +use crate::{CatalogProviderList, SchemaProvider, TableProvider}; use arrow::{ array::{StringBuilder, UInt64Builder}, datatypes::{DataType, Field, Schema, SchemaRef}, @@ -36,14 +28,20 @@ use arrow::{ }; use arrow_array::builder::{BooleanBuilder, UInt8Builder}; use async_trait::async_trait; +use datafusion_common::config::{ConfigEntry, ConfigOptions}; use datafusion_common::error::Result; use datafusion_common::DataFusionError; +use datafusion_execution::TaskContext; use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; +use datafusion_expr::{TableType, Volatility}; +use datafusion_physical_plan::stream::RecordBatchStreamAdapter; +use datafusion_physical_plan::streaming::PartitionStream; +use datafusion_physical_plan::SendableRecordBatchStream; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::{any::Any, sync::Arc}; -pub(crate) const INFORMATION_SCHEMA: &str = "information_schema"; +pub const INFORMATION_SCHEMA: &str = "information_schema"; pub(crate) const TABLES: &str = "tables"; pub(crate) const VIEWS: &str = "views"; pub(crate) const COLUMNS: &str = "columns"; diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index 28410eb76fab..2b789ed9dd25 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -18,23 +18,264 @@ //! Interfaces and default implementations of catalogs and schemas. //! //! Implementations +//! * Information schema: [`information_schema`] //! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`] pub mod memory; +pub use datafusion_sql::{ResolvedTableReference, TableReference}; pub use memory::{ MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider, }; +use std::collections::BTreeSet; +use std::ops::ControlFlow; mod r#async; mod catalog; mod dynamic_file; +pub mod information_schema; mod schema; mod session; mod table; - pub use catalog::*; pub use dynamic_file::catalog::*; pub use r#async::*; pub use schema::*; pub use session::*; pub use table::*; +pub mod streaming; + +/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately. +/// This can be used to determine which tables need to be in the catalog for a query to be planned. +/// +/// # Returns +/// +/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second +/// element contains any CTE aliases that were defined and possibly referenced. +/// +/// ## Example +/// +/// ``` +/// # use datafusion_sql::parser::DFParser; +/// # use datafusion::catalog_common::resolve_table_references; +/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)"; +/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); +/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); +/// assert_eq!(table_refs.len(), 2); +/// assert_eq!(table_refs[0].to_string(), "bar"); +/// assert_eq!(table_refs[1].to_string(), "foo"); +/// assert_eq!(ctes.len(), 0); +/// ``` +/// +/// ## Example with CTEs +/// +/// ``` +/// # use datafusion_sql::parser::DFParser; +/// # use datafusion::catalog_common::resolve_table_references; +/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;"; +/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); +/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); +/// assert_eq!(table_refs.len(), 0); +/// assert_eq!(ctes.len(), 1); +/// assert_eq!(ctes[0].to_string(), "my_cte"); +/// ``` +pub fn resolve_table_references( + statement: &datafusion_sql::parser::Statement, + enable_ident_normalization: bool, +) -> datafusion_common::Result<(Vec, Vec)> { + use datafusion_sql::parser::{ + CopyToSource, CopyToStatement, Statement as DFStatement, + }; + use datafusion_sql::planner::object_name_to_table_reference; + use information_schema::INFORMATION_SCHEMA; + use information_schema::INFORMATION_SCHEMA_TABLES; + use sqlparser::ast::*; + + struct RelationVisitor { + relations: BTreeSet, + all_ctes: BTreeSet, + ctes_in_scope: Vec, + } + + impl RelationVisitor { + /// Record the reference to `relation`, if it's not a CTE reference. + fn insert_relation(&mut self, relation: &ObjectName) { + if !self.relations.contains(relation) + && !self.ctes_in_scope.contains(relation) + { + self.relations.insert(relation.clone()); + } + } + } + + impl Visitor for RelationVisitor { + type Break = (); + + fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> { + self.insert_relation(relation); + ControlFlow::Continue(()) + } + + fn pre_visit_query(&mut self, q: &Query) -> ControlFlow { + if let Some(with) = &q.with { + for cte in &with.cte_tables { + // The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid: + // `WITH t AS (SELECT * FROM t) SELECT * FROM t` + // Where the first `t` refers to a predefined table. So we are careful here + // to visit the CTE first, before putting it in scope. + if !with.recursive { + // This is a bit hackish as the CTE will be visited again as part of visiting `q`, + // but thankfully `insert_relation` is idempotent. + cte.visit(self); + } + self.ctes_in_scope + .push(ObjectName(vec![cte.alias.name.clone()])); + } + } + ControlFlow::Continue(()) + } + + fn post_visit_query(&mut self, q: &Query) -> ControlFlow { + if let Some(with) = &q.with { + for _ in &with.cte_tables { + // Unwrap: We just pushed these in `pre_visit_query` + self.all_ctes.insert(self.ctes_in_scope.pop().unwrap()); + } + } + ControlFlow::Continue(()) + } + + fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> { + if let Statement::ShowCreate { + obj_type: ShowCreateObject::Table | ShowCreateObject::View, + obj_name, + } = statement + { + self.insert_relation(obj_name) + } + + // SHOW statements will later be rewritten into a SELECT from the information_schema + let requires_information_schema = matches!( + statement, + Statement::ShowFunctions { .. } + | Statement::ShowVariable { .. } + | Statement::ShowStatus { .. } + | Statement::ShowVariables { .. } + | Statement::ShowCreate { .. } + | Statement::ShowColumns { .. } + | Statement::ShowTables { .. } + | Statement::ShowCollation { .. } + ); + if requires_information_schema { + for s in INFORMATION_SCHEMA_TABLES { + self.relations.insert(ObjectName(vec![ + Ident::new(INFORMATION_SCHEMA), + Ident::new(*s), + ])); + } + } + ControlFlow::Continue(()) + } + } + + let mut visitor = RelationVisitor { + relations: BTreeSet::new(), + all_ctes: BTreeSet::new(), + ctes_in_scope: vec![], + }; + + fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) { + match statement { + DFStatement::Statement(s) => { + let _ = s.as_ref().visit(visitor); + } + DFStatement::CreateExternalTable(table) => { + visitor.relations.insert(table.name.clone()); + } + DFStatement::CopyTo(CopyToStatement { source, .. }) => match source { + CopyToSource::Relation(table_name) => { + visitor.insert_relation(table_name); + } + CopyToSource::Query(query) => { + query.visit(visitor); + } + }, + DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor), + } + } + + visit_statement(statement, &mut visitor); + + let table_refs = visitor + .relations + .into_iter() + .map(|x| object_name_to_table_reference(x, enable_ident_normalization)) + .collect::>()?; + let ctes = visitor + .all_ctes + .into_iter() + .map(|x| object_name_to_table_reference(x, enable_ident_normalization)) + .collect::>()?; + Ok((table_refs, ctes)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn resolve_table_references_shadowed_cte() { + use datafusion_sql::parser::DFParser; + + // An interesting edge case where the `t` name is used both as an ordinary table reference + // and as a CTE reference. + let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t"; + let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); + let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); + assert_eq!(table_refs.len(), 1); + assert_eq!(ctes.len(), 1); + assert_eq!(ctes[0].to_string(), "t"); + assert_eq!(table_refs[0].to_string(), "t"); + + // UNION is a special case where the CTE is not in scope for the second branch. + let query = "(with t as (select 1) select * from t) union (select * from t)"; + let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); + let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); + assert_eq!(table_refs.len(), 1); + assert_eq!(ctes.len(), 1); + assert_eq!(ctes[0].to_string(), "t"); + assert_eq!(table_refs[0].to_string(), "t"); + + // Nested CTEs are also handled. + // Here the first `u` is a CTE, but the second `u` is a table reference. + // While `t` is always a CTE. + let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)"; + let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); + let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); + assert_eq!(table_refs.len(), 1); + assert_eq!(ctes.len(), 2); + assert_eq!(ctes[0].to_string(), "t"); + assert_eq!(ctes[1].to_string(), "u"); + assert_eq!(table_refs[0].to_string(), "u"); + } + + #[test] + fn resolve_table_references_recursive_cte() { + use datafusion_sql::parser::DFParser; + + let query = " + WITH RECURSIVE nodes AS ( + SELECT 1 as id + UNION ALL + SELECT id + 1 as id + FROM nodes + WHERE id < 10 + ) + SELECT * FROM nodes + "; + let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); + let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); + assert_eq!(table_refs.len(), 0); + assert_eq!(ctes.len(), 1); + assert_eq!(ctes[0].to_string(), "nodes"); + } +} diff --git a/datafusion/core/src/datasource/streaming.rs b/datafusion/catalog/src/streaming.rs similarity index 93% rename from datafusion/core/src/datasource/streaming.rs rename to datafusion/catalog/src/streaming.rs index 1da3c3da9c89..654e6755d7d4 100644 --- a/datafusion/core/src/datasource/streaming.rs +++ b/datafusion/catalog/src/streaming.rs @@ -23,12 +23,12 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use crate::datasource::TableProvider; -use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec}; -use crate::physical_plan::ExecutionPlan; -use datafusion_catalog::Session; +use crate::Session; +use crate::TableProvider; use datafusion_common::{plan_err, Result}; use datafusion_expr::{Expr, TableType}; +use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; +use datafusion_physical_plan::ExecutionPlan; use log::debug; /// A [`TableProvider`] that streams a set of [`PartitionStream`] diff --git a/datafusion/core/src/catalog_common/mod.rs b/datafusion/core/src/catalog_common/mod.rs index 45fb6ddae1d6..213afb32405e 100644 --- a/datafusion/core/src/catalog_common/mod.rs +++ b/datafusion/core/src/catalog_common/mod.rs @@ -18,250 +18,7 @@ //! Interfaces and default implementations of catalogs and schemas. //! //! Implementations -//! * Information schema: [`information_schema`] //! * Listing schema: [`listing_schema`] -pub mod information_schema; pub mod listing_schema; pub use crate::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; - -pub use datafusion_sql::{ResolvedTableReference, TableReference}; - -use std::collections::BTreeSet; -use std::ops::ControlFlow; - -/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately. -/// This can be used to determine which tables need to be in the catalog for a query to be planned. -/// -/// # Returns -/// -/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second -/// element contains any CTE aliases that were defined and possibly referenced. -/// -/// ## Example -/// -/// ``` -/// # use datafusion_sql::parser::DFParser; -/// # use datafusion::catalog_common::resolve_table_references; -/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)"; -/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); -/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); -/// assert_eq!(table_refs.len(), 2); -/// assert_eq!(table_refs[0].to_string(), "bar"); -/// assert_eq!(table_refs[1].to_string(), "foo"); -/// assert_eq!(ctes.len(), 0); -/// ``` -/// -/// ## Example with CTEs -/// -/// ``` -/// # use datafusion_sql::parser::DFParser; -/// # use datafusion::catalog_common::resolve_table_references; -/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;"; -/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); -/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); -/// assert_eq!(table_refs.len(), 0); -/// assert_eq!(ctes.len(), 1); -/// assert_eq!(ctes[0].to_string(), "my_cte"); -/// ``` -pub fn resolve_table_references( - statement: &datafusion_sql::parser::Statement, - enable_ident_normalization: bool, -) -> datafusion_common::Result<(Vec, Vec)> { - use crate::sql::planner::object_name_to_table_reference; - use datafusion_sql::parser::{ - CopyToSource, CopyToStatement, Statement as DFStatement, - }; - use information_schema::INFORMATION_SCHEMA; - use information_schema::INFORMATION_SCHEMA_TABLES; - use sqlparser::ast::*; - - struct RelationVisitor { - relations: BTreeSet, - all_ctes: BTreeSet, - ctes_in_scope: Vec, - } - - impl RelationVisitor { - /// Record the reference to `relation`, if it's not a CTE reference. - fn insert_relation(&mut self, relation: &ObjectName) { - if !self.relations.contains(relation) - && !self.ctes_in_scope.contains(relation) - { - self.relations.insert(relation.clone()); - } - } - } - - impl Visitor for RelationVisitor { - type Break = (); - - fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> { - self.insert_relation(relation); - ControlFlow::Continue(()) - } - - fn pre_visit_query(&mut self, q: &Query) -> ControlFlow { - if let Some(with) = &q.with { - for cte in &with.cte_tables { - // The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid: - // `WITH t AS (SELECT * FROM t) SELECT * FROM t` - // Where the first `t` refers to a predefined table. So we are careful here - // to visit the CTE first, before putting it in scope. - if !with.recursive { - // This is a bit hackish as the CTE will be visited again as part of visiting `q`, - // but thankfully `insert_relation` is idempotent. - cte.visit(self); - } - self.ctes_in_scope - .push(ObjectName(vec![cte.alias.name.clone()])); - } - } - ControlFlow::Continue(()) - } - - fn post_visit_query(&mut self, q: &Query) -> ControlFlow { - if let Some(with) = &q.with { - for _ in &with.cte_tables { - // Unwrap: We just pushed these in `pre_visit_query` - self.all_ctes.insert(self.ctes_in_scope.pop().unwrap()); - } - } - ControlFlow::Continue(()) - } - - fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> { - if let Statement::ShowCreate { - obj_type: ShowCreateObject::Table | ShowCreateObject::View, - obj_name, - } = statement - { - self.insert_relation(obj_name) - } - - // SHOW statements will later be rewritten into a SELECT from the information_schema - let requires_information_schema = matches!( - statement, - Statement::ShowFunctions { .. } - | Statement::ShowVariable { .. } - | Statement::ShowStatus { .. } - | Statement::ShowVariables { .. } - | Statement::ShowCreate { .. } - | Statement::ShowColumns { .. } - | Statement::ShowTables { .. } - | Statement::ShowCollation { .. } - ); - if requires_information_schema { - for s in INFORMATION_SCHEMA_TABLES { - self.relations.insert(ObjectName(vec![ - Ident::new(INFORMATION_SCHEMA), - Ident::new(*s), - ])); - } - } - ControlFlow::Continue(()) - } - } - - let mut visitor = RelationVisitor { - relations: BTreeSet::new(), - all_ctes: BTreeSet::new(), - ctes_in_scope: vec![], - }; - - fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) { - match statement { - DFStatement::Statement(s) => { - let _ = s.as_ref().visit(visitor); - } - DFStatement::CreateExternalTable(table) => { - visitor.relations.insert(table.name.clone()); - } - DFStatement::CopyTo(CopyToStatement { source, .. }) => match source { - CopyToSource::Relation(table_name) => { - visitor.insert_relation(table_name); - } - CopyToSource::Query(query) => { - query.visit(visitor); - } - }, - DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor), - } - } - - visit_statement(statement, &mut visitor); - - let table_refs = visitor - .relations - .into_iter() - .map(|x| object_name_to_table_reference(x, enable_ident_normalization)) - .collect::>()?; - let ctes = visitor - .all_ctes - .into_iter() - .map(|x| object_name_to_table_reference(x, enable_ident_normalization)) - .collect::>()?; - Ok((table_refs, ctes)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn resolve_table_references_shadowed_cte() { - use datafusion_sql::parser::DFParser; - - // An interesting edge case where the `t` name is used both as an ordinary table reference - // and as a CTE reference. - let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t"; - let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); - let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); - assert_eq!(table_refs.len(), 1); - assert_eq!(ctes.len(), 1); - assert_eq!(ctes[0].to_string(), "t"); - assert_eq!(table_refs[0].to_string(), "t"); - - // UNION is a special case where the CTE is not in scope for the second branch. - let query = "(with t as (select 1) select * from t) union (select * from t)"; - let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); - let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); - assert_eq!(table_refs.len(), 1); - assert_eq!(ctes.len(), 1); - assert_eq!(ctes[0].to_string(), "t"); - assert_eq!(table_refs[0].to_string(), "t"); - - // Nested CTEs are also handled. - // Here the first `u` is a CTE, but the second `u` is a table reference. - // While `t` is always a CTE. - let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)"; - let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); - let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); - assert_eq!(table_refs.len(), 1); - assert_eq!(ctes.len(), 2); - assert_eq!(ctes[0].to_string(), "t"); - assert_eq!(ctes[1].to_string(), "u"); - assert_eq!(table_refs[0].to_string(), "u"); - } - - #[test] - fn resolve_table_references_recursive_cte() { - use datafusion_sql::parser::DFParser; - - let query = " - WITH RECURSIVE nodes AS ( - SELECT 1 as id - UNION ALL - SELECT id + 1 as id - FROM nodes - WHERE id < 10 - ) - SELECT * FROM nodes - "; - let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); - let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); - assert_eq!(table_refs.len(), 0); - assert_eq!(ctes.len(), 1); - assert_eq!(ctes[0].to_string(), "nodes"); - } -} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 7d3fe9ddd751..aeb7d6569de9 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -33,12 +33,9 @@ pub mod provider; pub mod schema_adapter; mod statistics; pub mod stream; -pub mod streaming; pub mod view; // backwards compatibility -pub use datafusion_execution::object_store; - pub use self::default_table_source::{ provider_as_source, source_as_provider, DefaultTableSource, }; @@ -46,6 +43,7 @@ pub use self::memory::MemTable; pub use self::view::ViewTable; pub use crate::catalog::TableProvider; pub use crate::logical_expr::TableType; +pub use datafusion_execution::object_store; pub use statistics::get_statistics_with_limit; use arrow_schema::{Schema, SortOptions}; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 6c3349625f04..f83b8528d1e8 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -24,15 +24,15 @@ use std::fmt::Debug; use std::sync::Arc; use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory}; -use crate::catalog_common::information_schema::{ - InformationSchemaProvider, INFORMATION_SCHEMA, -}; use crate::datasource::cte_worktable::CteWorkTable; use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::execution::SessionStateDefaults; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use datafusion_catalog::information_schema::{ + InformationSchemaProvider, INFORMATION_SCHEMA, +}; use datafusion_catalog::MemoryCatalogProviderList; use arrow_schema::{DataType, SchemaRef}; @@ -529,16 +529,16 @@ impl SessionState { /// Resolve all table references in the SQL statement. Does not include CTE references. /// - /// See [`catalog::resolve_table_references`] for more information. + /// See [`datafusion_catalog::resolve_table_references`] for more information. /// - /// [`catalog::resolve_table_references`]: crate::catalog_common::resolve_table_references + /// [`datafusion_catalog::resolve_table_references`]: datafusion_catalog::resolve_table_references pub fn resolve_table_references( &self, statement: &Statement, ) -> datafusion_common::Result> { let enable_ident_normalization = self.config.options().sql_parser.enable_ident_normalization; - let (table_refs, _) = crate::catalog_common::resolve_table_references( + let (table_refs, _) = datafusion_catalog::resolve_table_references( statement, enable_ident_normalization, )?; diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 5d917e1673f1..780b22983393 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -309,7 +309,7 @@ //! //! [`ListingTable`]: crate::datasource::listing::ListingTable //! [`MemTable`]: crate::datasource::memory::MemTable -//! [`StreamingTable`]: crate::datasource::streaming::StreamingTable +//! [`StreamingTable`]: datafusion_catalog::streaming::StreamingTable //! //! ## Plan Representations //! diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 77e4b491da6d..212ffdaaa2a5 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -28,7 +28,6 @@ use arrow::record_batch::RecordBatch; use arrow_array::{ArrayRef, DictionaryArray}; use arrow_schema::SortOptions; use datafusion::assert_batches_eq; -use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::RuntimeEnvBuilder; @@ -38,6 +37,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_catalog::streaming::StreamingTable; use datafusion_catalog::Session; use datafusion_common::{assert_contains, Result}; use datafusion_execution::memory_pool::{ diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 9b180e7ad34e..0f0a4180fb19 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -46,6 +46,7 @@ url = { workspace = true } [dev-dependencies] datafusion = { workspace = true, features = ["nested_expressions"] } +datafusion-catalog = { workspace = true } datafusion-functions-aggregate = { workspace = true } serde_json = "1.0" tokio = { workspace = true } diff --git a/datafusion/substrait/tests/cases/substrait_validations.rs b/datafusion/substrait/tests/cases/substrait_validations.rs index 8357e0a8621d..acda091287c7 100644 --- a/datafusion/substrait/tests/cases/substrait_validations.rs +++ b/datafusion/substrait/tests/cases/substrait_validations.rs @@ -22,7 +22,7 @@ mod tests { mod schema_compatibility { use crate::utils::test::read_json; use datafusion::arrow::datatypes::{DataType, Field}; - use datafusion::catalog_common::TableReference; + use datafusion_catalog::TableReference; use datafusion::common::{DFSchema, Result}; use datafusion::datasource::empty::EmptyTable; use datafusion::prelude::SessionContext; diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index f8b2d87d317e..a909478fa930 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -17,7 +17,7 @@ #[cfg(test)] pub mod test { - use datafusion::catalog_common::TableReference; + use datafusion_catalog::TableReference; use datafusion::common::{substrait_datafusion_err, substrait_err}; use datafusion::datasource::empty::EmptyTable; use datafusion::datasource::TableProvider; From 7cd577d73282e323218f7993c27e71919d99e8c7 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 30 Jan 2025 00:54:43 +0530 Subject: [PATCH 2/7] fix: formatting --- datafusion/substrait/tests/cases/substrait_validations.rs | 2 +- datafusion/substrait/tests/utils.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/substrait/tests/cases/substrait_validations.rs b/datafusion/substrait/tests/cases/substrait_validations.rs index acda091287c7..9c4440e429a5 100644 --- a/datafusion/substrait/tests/cases/substrait_validations.rs +++ b/datafusion/substrait/tests/cases/substrait_validations.rs @@ -22,10 +22,10 @@ mod tests { mod schema_compatibility { use crate::utils::test::read_json; use datafusion::arrow::datatypes::{DataType, Field}; - use datafusion_catalog::TableReference; use datafusion::common::{DFSchema, Result}; use datafusion::datasource::empty::EmptyTable; use datafusion::prelude::SessionContext; + use datafusion_catalog::TableReference; use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use std::collections::HashMap; use std::sync::Arc; diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index a909478fa930..f572c20dc01c 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -17,12 +17,12 @@ #[cfg(test)] pub mod test { - use datafusion_catalog::TableReference; use datafusion::common::{substrait_datafusion_err, substrait_err}; use datafusion::datasource::empty::EmptyTable; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::prelude::SessionContext; + use datafusion_catalog::TableReference; use datafusion_substrait::extensions::Extensions; use datafusion_substrait::logical_plan::consumer::{ from_substrait_named_struct, DefaultSubstraitConsumer, SubstraitConsumer, From eee402c94b3707c9908a08dde523d4d7b8a62d99 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 30 Jan 2025 00:59:12 +0530 Subject: [PATCH 3/7] fix: doctests import --- datafusion/catalog/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index 2b789ed9dd25..63d75fa3ef0c 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -56,7 +56,7 @@ pub mod streaming; /// /// ``` /// # use datafusion_sql::parser::DFParser; -/// # use datafusion::catalog_common::resolve_table_references; +/// # use datafusion_catalog::resolve_table_references; /// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)"; /// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); /// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); @@ -70,7 +70,7 @@ pub mod streaming; /// /// ``` /// # use datafusion_sql::parser::DFParser; -/// # use datafusion::catalog_common::resolve_table_references; +/// # use datafusion_catalog::resolve_table_references; /// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;"; /// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); /// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); From 65172706420a14f60183b1e539db78b634d9f84d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 Jan 2025 15:15:36 -0500 Subject: [PATCH 4/7] Remove unecessary datafuson-catalog dependency --- datafusion/substrait/Cargo.toml | 1 - datafusion/substrait/tests/cases/substrait_validations.rs | 3 +-- datafusion/substrait/tests/utils.rs | 3 +-- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 0f0a4180fb19..9b180e7ad34e 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -46,7 +46,6 @@ url = { workspace = true } [dev-dependencies] datafusion = { workspace = true, features = ["nested_expressions"] } -datafusion-catalog = { workspace = true } datafusion-functions-aggregate = { workspace = true } serde_json = "1.0" tokio = { workspace = true } diff --git a/datafusion/substrait/tests/cases/substrait_validations.rs b/datafusion/substrait/tests/cases/substrait_validations.rs index 9c4440e429a5..a7d4d4aa82fc 100644 --- a/datafusion/substrait/tests/cases/substrait_validations.rs +++ b/datafusion/substrait/tests/cases/substrait_validations.rs @@ -22,10 +22,9 @@ mod tests { mod schema_compatibility { use crate::utils::test::read_json; use datafusion::arrow::datatypes::{DataType, Field}; - use datafusion::common::{DFSchema, Result}; + use datafusion::common::{DFSchema, Result, TableReference}; use datafusion::datasource::empty::EmptyTable; use datafusion::prelude::SessionContext; - use datafusion_catalog::TableReference; use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use std::collections::HashMap; use std::sync::Arc; diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index f572c20dc01c..0034cc27bf6e 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -17,12 +17,11 @@ #[cfg(test)] pub mod test { - use datafusion::common::{substrait_datafusion_err, substrait_err}; + use datafusion::common::{substrait_datafusion_err, substrait_err, TableReference}; use datafusion::datasource::empty::EmptyTable; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::prelude::SessionContext; - use datafusion_catalog::TableReference; use datafusion_substrait::extensions::Extensions; use datafusion_substrait::logical_plan::consumer::{ from_substrait_named_struct, DefaultSubstraitConsumer, SubstraitConsumer, From ce0a514d65880035adbb6383798d4745b7989953 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 Jan 2025 15:21:52 -0500 Subject: [PATCH 5/7] remove some more unecessary dependencies --- datafusion/catalog/Cargo.toml | 2 -- datafusion/catalog/src/async.rs | 2 +- datafusion/catalog/src/information_schema.rs | 2 +- datafusion/catalog/src/table.rs | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index a5e6754fe304..749457855ca2 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -29,8 +29,6 @@ version.workspace = true [dependencies] arrow = { workspace = true } -arrow-array = { workspace = true } -arrow-schema = { workspace = true } async-trait = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true } diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index a244261b91e2..5d7a51ad7123 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -430,7 +430,7 @@ mod tests { }, }; - use arrow_schema::SchemaRef; + use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::{error::Result, Statistics, TableReference}; use datafusion_execution::config::SessionConfig; diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 853afd83645b..e68e636989f8 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -21,12 +21,12 @@ use crate::streaming::StreamingTable; use crate::{CatalogProviderList, SchemaProvider, TableProvider}; +use arrow::array::builder::{BooleanBuilder, UInt8Builder}; use arrow::{ array::{StringBuilder, UInt64Builder}, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use arrow_array::builder::{BooleanBuilder, UInt8Builder}; use async_trait::async_trait; use datafusion_common::config::{ConfigEntry, ConfigOptions}; use datafusion_common::error::Result; diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 3c8960495588..fc7d114b83da 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -21,7 +21,7 @@ use std::fmt::Debug; use std::sync::Arc; use crate::session::Session; -use arrow_schema::SchemaRef; +use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{not_impl_err, Constraints, Statistics}; From 837ccd4ae3a1dad9e813c45ca015f743d6426960 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 Jan 2025 15:22:18 -0500 Subject: [PATCH 6/7] Update datafusion-cli/Carglo.ock --- datafusion-cli/Cargo.lock | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 4c94ec0035fe..84ba87b6c4bb 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1278,8 +1278,6 @@ name = "datafusion-catalog" version = "44.0.0" dependencies = [ "arrow", - "arrow-array", - "arrow-schema", "async-trait", "dashmap", "datafusion-common", From 310201ffd0ab7b7b85ff403dd61a8f9137848741 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 30 Jan 2025 02:20:07 +0530 Subject: [PATCH 7/7] fix: doctest dependency --- datafusion/catalog/src/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index fc7d114b83da..88d2d8bde51e 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -202,7 +202,7 @@ pub trait TableProvider: Debug + Sync + Send { /// ```rust /// # use std::any::Any; /// # use std::sync::Arc; - /// # use arrow_schema::SchemaRef; + /// # use arrow::datatypes::SchemaRef; /// # use async_trait::async_trait; /// # use datafusion_catalog::{TableProvider, Session}; /// # use datafusion_common::Result;