From d5f5a77680b860079f21f5bc18176b4a10b1c88f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 17 Jul 2024 16:15:07 +0200 Subject: [PATCH] Extract catalog API to separate crate This moves `CatalogProvider`, `TableProvider`, `SchemaProvider` to a new `datafusion-catalog` crate. The circular dependency between core `SessionState` and implementations is broken up by introducing `CatalogSession` dyn trait. Implementations of `TableProvider` that reside under core current have access to `CatalogSession` by downcasting. This is supposed to be an intermediate step. --- Cargo.toml | 2 + datafusion-cli/Cargo.lock | 13 + datafusion-cli/src/functions.rs | 3 +- .../examples/advanced_parquet_index.rs | 6 +- datafusion-examples/examples/catalog.rs | 6 +- .../examples/custom_datasource.rs | 5 +- datafusion-examples/examples/parquet_index.rs | 4 +- datafusion-examples/examples/simple_udtf.rs | 6 +- datafusion/catalog/Cargo.toml | 21 ++ datafusion/catalog/src/catalog.rs | 153 +++++++++ datafusion/catalog/src/lib.rs | 26 ++ .../src/catalog => catalog/src}/schema.rs | 7 +- datafusion/catalog/src/session.rs | 110 +++++++ datafusion/catalog/src/table.rs | 293 ++++++++++++++++++ datafusion/core/Cargo.toml | 1 + .../core/src/catalog/information_schema.rs | 4 +- datafusion/core/src/catalog/listing_schema.rs | 4 +- datafusion/core/src/catalog/memory.rs | 8 +- datafusion/core/src/catalog/mod.rs | 136 +------- datafusion/core/src/dataframe/mod.rs | 3 +- .../core/src/datasource/cte_worktable.rs | 4 +- datafusion/core/src/datasource/empty.rs | 4 +- .../core/src/datasource/listing/table.rs | 27 +- .../src/datasource/listing_table_factory.rs | 18 +- datafusion/core/src/datasource/memory.rs | 5 +- datafusion/core/src/datasource/mod.rs | 2 +- datafusion/core/src/datasource/provider.rs | 276 +---------------- datafusion/core/src/datasource/stream.rs | 12 +- datafusion/core/src/datasource/streaming.rs | 11 +- datafusion/core/src/datasource/view.rs | 4 +- datafusion/core/src/execution/context/mod.rs | 8 +- .../core/src/execution/session_state.rs | 58 +++- datafusion/core/src/lib.rs | 5 + datafusion/core/src/test_util/mod.rs | 12 +- .../core/tests/custom_sources_cases/mod.rs | 5 +- .../provider_filter_pushdown.rs | 8 +- .../tests/custom_sources_cases/statistics.rs | 5 +- datafusion/core/tests/memory_limit/mod.rs | 4 +- .../user_defined_table_functions.rs | 6 +- .../tests/cases/roundtrip_logical_plan.rs | 3 +- datafusion/sqllogictest/src/test_context.rs | 7 +- .../custom-table-providers.md | 2 +- 42 files changed, 791 insertions(+), 506 deletions(-) create mode 100644 datafusion/catalog/Cargo.toml create mode 100644 datafusion/catalog/src/catalog.rs create mode 100644 datafusion/catalog/src/lib.rs rename datafusion/{core/src/catalog => catalog/src}/schema.rs (95%) create mode 100644 datafusion/catalog/src/session.rs create mode 100644 datafusion/catalog/src/table.rs diff --git a/Cargo.toml b/Cargo.toml index f61ed7e58fe37..65f3e51e3d3aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ exclude = ["datafusion-cli", "dev/depcheck"] members = [ "datafusion/common", "datafusion/common-runtime", + "datafusion/catalog", "datafusion/core", "datafusion/expr", "datafusion/execution", @@ -87,6 +88,7 @@ chrono = { version = "0.4.34", default-features = false } ctor = "0.2.0" dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false } +datafusion-catalog = { path = "datafusion/catalog", version = "40.0.0" } datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" } datafusion-execution = { path = "datafusion/execution", version = "40.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e48c6b081e1a5..f3f61bb22c628 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1144,6 +1144,7 @@ dependencies = [ "bzip2", "chrono", "dashmap", + "datafusion-catalog", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", @@ -1182,6 +1183,18 @@ dependencies = [ "zstd 0.13.2", ] +[[package]] +name = "datafusion-catalog" +version = "40.0.0" +dependencies = [ + "arrow-schema", + "async-trait", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-plan", +] + [[package]] name = "datafusion-cli" version = "40.0.0" diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index 806e2bb39cd4a..679c8c7d2ca83 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; +use datafusion::catalog_api::CatalogSession; use datafusion::common::{plan_err, Column}; use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; @@ -234,7 +235,7 @@ impl TableProvider for ParquetMetadataTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 9bf71e52c3de8..3421f73c1583f 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -19,6 +19,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::SchemaRef; use async_trait::async_trait; use bytes::Bytes; +use datafusion::catalog_api::CatalogSession; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::{ ParquetAccessPlan, ParquetExecBuilder, @@ -27,7 +28,6 @@ use datafusion::datasource::physical_plan::{ parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, }; use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::arrow_reader::{ ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, @@ -271,7 +271,7 @@ impl IndexTableProvider { /// to a single predicate like `a = 1 AND b = 2` suitable for execution fn filters_to_predicate( &self, - state: &SessionState, + state: &dyn CatalogSession, filters: &[Expr], ) -> Result> { let df_schema = DFSchema::try_from(self.schema())?; @@ -463,7 +463,7 @@ impl TableProvider for IndexTableProvider { async fn scan( &self, - state: &SessionState, + state: &dyn CatalogSession, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index b9188e1cd5e01..c65d645decdb7 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -19,10 +19,8 @@ use async_trait::async_trait; use datafusion::{ arrow::util::pretty, - catalog::{ - schema::SchemaProvider, - {CatalogProvider, CatalogProviderList}, - }, + catalog::CatalogProviderList, + catalog_api::{CatalogProvider, SchemaProvider}, datasource::{ file_format::{csv::CsvFormat, FileFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index cfb49b0231596..09db40d8d18b5 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::{provider_as_source, TableProvider, TableType}; use datafusion::error::Result; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, @@ -37,6 +37,7 @@ use datafusion_expr::LogicalPlanBuilder; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion::catalog_api::CatalogSession; use tokio::time::timeout; /// This example demonstrates executing a simple query against a custom datasource @@ -175,7 +176,7 @@ impl TableProvider for CustomDataSource { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, // filters and limit can be used here to inject some push-down operations if needed _filters: &[Expr], diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 668eda047444a..8aa50df86c609 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -23,13 +23,13 @@ use arrow::datatypes::Int32Type; use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use async_trait::async_trait; +use datafusion::catalog_api::CatalogSession; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{ parquet::StatisticsConverter, {FileScanConfig, ParquetExec}, }; use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::{ arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter, @@ -222,7 +222,7 @@ impl TableProvider for IndexTableProvider { async fn scan( &self, - state: &SessionState, + state: &dyn CatalogSession, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index c68c21fab169d..0212da331d50e 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -20,10 +20,11 @@ use arrow::csv::ReaderBuilder; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::catalog_api::CatalogSession; use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; -use datafusion::execution::context::{ExecutionProps, SessionState}; +use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -35,7 +36,6 @@ use std::fs::File; use std::io::Seek; use std::path::Path; use std::sync::Arc; - // To define your own table function, you only need to do the following 3 things: // 1. Implement your own [`TableProvider`] // 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`] @@ -95,7 +95,7 @@ impl TableProvider for LocalCsvTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml new file mode 100644 index 0000000000000..85b8feaa9dd54 --- /dev/null +++ b/datafusion/catalog/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "datafusion-catalog" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +arrow-schema = { workspace = true } +async-trait = "0.1.41" +datafusion-expr = { workspace = true } +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-physical-plan = { workspace = true } + +[lints] +workspace = true diff --git a/datafusion/catalog/src/catalog.rs b/datafusion/catalog/src/catalog.rs new file mode 100644 index 0000000000000..716d0b41364b5 --- /dev/null +++ b/datafusion/catalog/src/catalog.rs @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +pub use crate::schema::SchemaProvider; +use datafusion_common::not_impl_err; +use datafusion_common::Result; + +/// Represents a catalog, comprising a number of named schemas. +/// +/// # Catalog Overview +/// +/// To plan and execute queries, DataFusion needs a "Catalog" that provides +/// metadata such as which schemas and tables exist, their columns and data +/// types, and how to access the data. +/// +/// The Catalog API consists: +/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s +/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems) +/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems) +/// * [`TableProvider]`: individual tables +/// +/// # Implementing Catalogs +/// +/// To implement a catalog, you implement at least one of the [`CatalogProviderList`], +/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them +/// appropriately the [`SessionContext`]. +/// +/// [`SessionContext`]: crate::execution::context::SessionContext +/// +/// DataFusion comes with a simple in-memory catalog implementation, +/// [`MemoryCatalogProvider`], that is used by default and has no persistence. +/// DataFusion does not include more complex Catalog implementations because +/// catalog management is a key design choice for most data systems, and thus +/// it is unlikely that any general-purpose catalog implementation will work +/// well across many use cases. +/// +/// # Implementing "Remote" catalogs +/// +/// Sometimes catalog information is stored remotely and requires a network call +/// to retrieve. For example, the [Delta Lake] table format stores table +/// metadata in files on S3 that must be first downloaded to discover what +/// schemas and tables exist. +/// +/// [Delta Lake]: https://delta.io/ +/// +/// The [`CatalogProvider`] can support this use case, but it takes some care. +/// The planning APIs in DataFusion are not `async` and thus network IO can not +/// be performed "lazily" / "on demand" during query planning. The rationale for +/// this design is that using remote procedure calls for all catalog accesses +/// required for query planning would likely result in multiple network calls +/// per plan, resulting in very poor planning performance. +/// +/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs, +/// you need to provide an in memory snapshot of the required metadata. Most +/// systems typically either already have this information cached locally or can +/// batch access to the remote catalog to retrieve multiple schemas and tables +/// in a single network call. +/// +/// Note that [`SchemaProvider::table`] is an `async` function in order to +/// simplify implementing simple [`SchemaProvider`]s. For many table formats it +/// is easy to list all available tables but there is additional non trivial +/// access required to read table details (e.g. statistics). +/// +/// The pattern that DataFusion itself uses to plan SQL queries is to walk over +/// the query to [find all table references], +/// performing required remote catalog in parallel, and then plans the query +/// using that snapshot. +/// +/// [find all table references]: resolve_table_references +/// +/// # Example Catalog Implementations +/// +/// Here are some examples of how to implement custom catalogs: +/// +/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider +/// that treats files and directories on a filesystem as tables. +/// +/// * The [`catalog.rs`]: a simple directory based catalog. +/// +/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can +/// read from Delta Lake tables +/// +/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html +/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75 +/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs +/// [delta-rs]: https://github.com/delta-io/delta-rs +/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123 +/// +/// [`TableProvider]: crate::datasource::TableProvider + +pub trait CatalogProvider: Sync + Send { + /// Returns the catalog provider as [`Any`] + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Retrieves the list of available schema names in this catalog. + fn schema_names(&self) -> Vec; + + /// Retrieves a specific schema from the catalog by name, provided it exists. + fn schema(&self, name: &str) -> Option>; + + /// Adds a new schema to this catalog. + /// + /// If a schema of the same name existed before, it is replaced in + /// the catalog and returned. + /// + /// By default returns a "Not Implemented" error + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + // use variables to avoid unused variable warnings + let _ = name; + let _ = schema; + not_impl_err!("Registering new schemas is not supported") + } + + /// Removes a schema from this catalog. Implementations of this method should return + /// errors if the schema exists but cannot be dropped. For example, in DataFusion's + /// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema + /// will only be successfully dropped when `cascade` is true. + /// This is equivalent to how DROP SCHEMA works in PostgreSQL. + /// + /// Implementations of this method should return None if schema with `name` + /// does not exist. + /// + /// By default returns a "Not Implemented" error + fn deregister_schema( + &self, + _name: &str, + _cascade: bool, + ) -> Result>> { + not_impl_err!("Deregistering new schemas is not supported") + } +} diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs new file mode 100644 index 0000000000000..fe76b5dc9c649 --- /dev/null +++ b/datafusion/catalog/src/lib.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod catalog; +mod schema; +mod session; +mod table; + +pub use catalog::*; +pub use schema::*; +pub use session::*; +pub use table::*; diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/catalog/src/schema.rs similarity index 95% rename from datafusion/core/src/catalog/schema.rs rename to datafusion/catalog/src/schema.rs index 7d76b3fa4f197..21bca9fa828dc 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/catalog/src/schema.rs @@ -23,11 +23,8 @@ use datafusion_common::{exec_err, DataFusionError}; use std::any::Any; use std::sync::Arc; -use crate::datasource::TableProvider; -use crate::error::Result; - -// backwards compatibility -pub use super::MemorySchemaProvider; +use crate::table::TableProvider; +use datafusion_common::Result; /// Represents a schema, comprising a number of named tables. /// diff --git a/datafusion/catalog/src/session.rs b/datafusion/catalog/src/session.rs new file mode 100644 index 0000000000000..3a84ff3d26f73 --- /dev/null +++ b/datafusion/catalog/src/session.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use datafusion_common::config::ConfigOptions; +use datafusion_common::{DFSchema, Result}; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::TaskContext; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; +use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr}; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +#[async_trait] +pub trait CatalogSession: Send + Sync { + /// Return the session ID + fn session_id(&self) -> &str; + + /// Return the [`SessionConfig`] + fn config(&self) -> &SessionConfig; + + /// return the configuration options + fn config_options(&self) -> &ConfigOptions { + self.config().options() + } + + /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`]. + /// + /// Note: this will optimize the provided plan first. + /// + /// This function will error for [`LogicalPlan`]s such as catalog DDL like + /// `CREATE TABLE`, which do not have corresponding physical plans and must + /// be handled by another layer, typically [`SessionContext`]. + /// + /// [`SessionContext`]: crate::execution::context::SessionContext + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> Result>; + + /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type + /// coercion, and function rewrites. + /// + /// Note: The expression is not [simplified] or otherwise optimized: `a = 1 + /// + 2` will not be simplified to `a = 3` as this is a more involved process. + /// See the [expr_api] example for how to simplify expressions. + /// + /// # See Also: + /// * [`SessionContext::create_physical_expr`] for a higher-level API + /// * [`create_physical_expr`] for a lower-level API + /// + /// [simplified]: datafusion_optimizer::simplify_expressions + /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs + /// [`SessionContext::create_physical_expr`]: crate::execution::context::SessionContext::create_physical_expr + fn create_physical_expr( + &self, + expr: Expr, + df_schema: &DFSchema, + ) -> Result>; + + /// Return reference to scalar_functions + fn scalar_functions(&self) -> &HashMap>; + + /// Return reference to aggregate_functions + fn aggregate_functions(&self) -> &HashMap>; + + /// Return reference to window functions + fn window_functions(&self) -> &HashMap>; + + /// Return the runtime env + fn runtime_env(&self) -> &Arc; + + /// Return the execution properties + fn execution_props(&self) -> &ExecutionProps; + + fn as_any(&self) -> &dyn Any; +} + +/// Create a new task context instance from CatalogSession +impl From<&dyn CatalogSession> for TaskContext { + fn from(state: &dyn CatalogSession) -> Self { + let task_id = None; + TaskContext::new( + task_id, + state.session_id().to_string(), + state.config().clone(), + state.scalar_functions().clone(), + state.aggregate_functions().clone(), + state.window_functions().clone(), + state.runtime_env().clone(), + ) + } +} diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs new file mode 100644 index 0000000000000..834f11a9e7320 --- /dev/null +++ b/datafusion/catalog/src/table.rs @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use crate::session::CatalogSession; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_common::Result; +use datafusion_common::{not_impl_err, Constraints, Statistics}; +use datafusion_expr::{ + CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType, +}; +use datafusion_physical_plan::ExecutionPlan; + +/// Source table +#[async_trait] +pub trait TableProvider: Sync + Send { + /// Returns the table provider as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef; + + /// Get a reference to the constraints of the table. + /// Returns: + /// - `None` for tables that do not support constraints. + /// - `Some(&Constraints)` for tables supporting constraints. + /// Therefore, a `Some(&Constraints::empty())` return value indicates that + /// this table supports constraints, but there are no constraints. + fn constraints(&self) -> Option<&Constraints> { + None + } + + /// Get the type of this table for metadata/catalog purposes. + fn table_type(&self) -> TableType; + + /// Get the create statement used to create this table, if available. + fn get_table_definition(&self) -> Option<&str> { + None + } + + /// Get the [`LogicalPlan`] of this table, if available + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } + + /// Get the default value for a column, if available. + fn get_column_default(&self, _column: &str) -> Option<&Expr> { + None + } + + /// Create an [`ExecutionPlan`] for scanning the table with optionally + /// specified `projection`, `filter` and `limit`, described below. + /// + /// The `ExecutionPlan` is responsible scanning the datasource's + /// partitions in a streaming, parallelized fashion. + /// + /// # Projection + /// + /// If specified, only a subset of columns should be returned, in the order + /// specified. The projection is a set of indexes of the fields in + /// [`Self::schema`]. + /// + /// DataFusion provides the projection to scan only the columns actually + /// used in the query to improve performance, an optimization called + /// "Projection Pushdown". Some datasources, such as Parquet, can use this + /// information to go significantly faster when only a subset of columns is + /// required. + /// + /// # Filters + /// + /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the + /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for + /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the + /// expressions are `AND`ed together). + /// + /// To enable filter pushdown you must override + /// [`Self::supports_filters_pushdown`] as the default implementation does + /// not and `filters` will be empty. + /// + /// DataFusion pushes filtering into the scans whenever possible + /// ("Filter Pushdown"), and depending on the format and the + /// implementation of the format, evaluating the predicate during the scan + /// can increase performance significantly. + /// + /// ## Note: Some columns may appear *only* in Filters + /// + /// In certain cases, a query may only use a certain column in a Filter that + /// has been completely pushed down to the scan. In this case, the + /// projection will not contain all the columns found in the filter + /// expressions. + /// + /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`, + /// + /// ```text + /// ┌────────────────────┐ + /// │ Projection(t.a) │ + /// └────────────────────┘ + /// ▲ + /// │ + /// │ + /// ┌────────────────────┐ Filter ┌────────────────────┐ Projection ┌────────────────────┐ + /// │ Filter(t.b > 5) │────Pushdown──▶ │ Projection(t.a) │ ───Pushdown───▶ │ Projection(t.a) │ + /// └────────────────────┘ └────────────────────┘ └────────────────────┘ + /// ▲ ▲ ▲ + /// │ │ │ + /// │ │ ┌────────────────────┐ + /// ┌────────────────────┐ ┌────────────────────┐ │ Scan │ + /// │ Scan │ │ Scan │ │ filter=(t.b > 5) │ + /// └────────────────────┘ │ filter=(t.b > 5) │ │ projection=(t.a) │ + /// └────────────────────┘ └────────────────────┘ + /// + /// Initial Plan If `TableProviderFilterPushDown` Projection pushdown notes that + /// returns true, filter pushdown the scan only needs t.a + /// pushes the filter into the scan + /// BUT internally evaluating the + /// predicate still requires t.b + /// ``` + /// + /// # Limit + /// + /// If `limit` is specified, must only produce *at least* this many rows, + /// (though it may return more). Like Projection Pushdown and Filter + /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as + /// possible, called "Limit Pushdown" as some sources can use this + /// information to improve their performance. Note that if there are any + /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is + /// because inexact filters do not guarantee that every filtered row is + /// removed, so applying the limit could lead to too few rows being available + /// to return as a final result. + async fn scan( + &self, + state: &dyn CatalogSession, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result>; + + /// Specify if DataFusion should provide filter expressions to the + /// TableProvider to apply *during* the scan. + /// + /// Some TableProviders can evaluate filters more efficiently than the + /// `Filter` operator in DataFusion, for example by using an index. + /// + /// # Parameters and Return Value + /// + /// The return `Vec` must have one element for each element of the `filters` + /// argument. The value of each element indicates if the TableProvider can + /// apply the corresponding filter during the scan. The position in the return + /// value corresponds to the expression in the `filters` parameter. + /// + /// If the length of the resulting `Vec` does not match the `filters` input + /// an error will be thrown. + /// + /// Each element in the resulting `Vec` is one of the following: + /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter + /// during scan + /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan + /// + /// By default, this function returns [`Unsupported`] for all filters, + /// meaning no filters will be provided to [`Self::scan`]. + /// + /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported + /// [`Exact`]: TableProviderFilterPushDown::Exact + /// [`Inexact`]: TableProviderFilterPushDown::Inexact + /// # Example + /// + /// ```rust-TODO-fix-this-example + /// # use std::any::Any; + /// # use std::sync::Arc; + /// # use arrow_schema::SchemaRef; + /// # use async_trait::async_trait; + /// # use datafusion::catalog::table::TableProvider; + /// # use datafusion::error::{Result, DataFusionError}; + /// # use datafusion::execution::context::SessionState; + /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; + /// # use datafusion_physical_plan::ExecutionPlan; + /// // Define a struct that implements the TableProvider trait + /// struct TestDataSource {} + /// + /// #[async_trait] + /// impl TableProvider for TestDataSource { + /// # fn as_any(&self) -> &dyn Any { todo!() } + /// # fn schema(&self) -> SchemaRef { todo!() } + /// # fn table_type(&self) -> TableType { todo!() } + /// # async fn scan(&self, s: &dyn CatalogSession, p: Option<&Vec>, f: &[Expr], l: Option) -> Result> { + /// todo!() + /// # } + /// // Override the supports_filters_pushdown to evaluate which expressions + /// // to accept as pushdown predicates. + /// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + /// // Process each filter + /// let support: Vec<_> = filters.iter().map(|expr| { + /// match expr { + /// // This example only supports a between expr with a single column named "c1". + /// Expr::Between(between_expr) => { + /// between_expr.expr + /// .try_into_col() + /// .map(|column| { + /// if column.name == "c1" { + /// TableProviderFilterPushDown::Exact + /// } else { + /// TableProviderFilterPushDown::Unsupported + /// } + /// }) + /// // If there is no column in the expr set the filter to unsupported. + /// .unwrap_or(TableProviderFilterPushDown::Unsupported) + /// } + /// _ => { + /// // For all other cases return Unsupported. + /// TableProviderFilterPushDown::Unsupported + /// } + /// } + /// }).collect(); + /// Ok(support) + /// } + /// } + /// ``` + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![ + TableProviderFilterPushDown::Unsupported; + filters.len() + ]) + } + + /// Get statistics for this table, if available + fn statistics(&self) -> Option { + None + } + + /// Return an [`ExecutionPlan`] to insert data into this table, if + /// supported. + /// + /// The returned plan should return a single row in a UInt64 + /// column called "count" such as the following + /// + /// ```text + /// +-------+, + /// | count |, + /// +-------+, + /// | 6 |, + /// +-------+, + /// ``` + /// + /// # See Also + /// + /// See [`DataSinkExec`] for the common pattern of inserting a + /// streams of `RecordBatch`es as files to an ObjectStore. + /// + /// [`DataSinkExec`]: crate::physical_plan::insert::DataSinkExec + async fn insert_into( + &self, + _state: &dyn CatalogSession, + _input: Arc, + _overwrite: bool, + ) -> Result> { + not_impl_err!("Insert into not implemented for this table") + } +} + +/// A factory which creates [`TableProvider`]s at runtime given a URL. +/// +/// For example, this can be used to create a table "on the fly" +/// from a directory of files only when that name is referenced. +#[async_trait] +pub trait TableProviderFactory: Sync + Send { + /// Create a TableProvider with the given url + async fn create( + &self, + state: &dyn CatalogSession, + cmd: &CreateExternalTable, + ) -> Result>; +} diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 532ca8fde9e73..f529d0d909250 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -96,6 +96,7 @@ bytes = { workspace = true } bzip2 = { version = "0.4.3", optional = true } chrono = { workspace = true } dashmap = { workspace = true } +datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index c953de6d16d38..bdb8d9e8cb23e 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -29,8 +29,8 @@ use arrow::{ record_batch::RecordBatch, }; +use crate::catalog_api::{SchemaProvider, TableProvider}; use crate::datasource::streaming::StreamingTable; -use crate::datasource::TableProvider; use crate::execution::context::TaskContext; use crate::logical_expr::TableType; use crate::physical_plan::stream::RecordBatchStreamAdapter; @@ -40,7 +40,7 @@ use crate::{ physical_plan::streaming::PartitionStream, }; -use super::{schema::SchemaProvider, CatalogProviderList}; +use super::CatalogProviderList; pub(crate) const INFORMATION_SCHEMA: &str = "information_schema"; pub(crate) const TABLES: &str = "tables"; diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs index 373fe788c7218..9503a53326fbc 100644 --- a/datafusion/core/src/catalog/listing_schema.rs +++ b/datafusion/core/src/catalog/listing_schema.rs @@ -22,9 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, Mutex}; -use crate::catalog::schema::SchemaProvider; -use crate::datasource::provider::TableProviderFactory; -use crate::datasource::TableProvider; +use crate::catalog_api::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::execution::context::SessionState; use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference}; diff --git a/datafusion/core/src/catalog/memory.rs b/datafusion/core/src/catalog/memory.rs index 3af823913a291..c2e7039352139 100644 --- a/datafusion/core/src/catalog/memory.rs +++ b/datafusion/core/src/catalog/memory.rs @@ -18,9 +18,8 @@ //! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory //! implementations of [`CatalogProviderList`] and [`CatalogProvider`]. -use crate::catalog::schema::SchemaProvider; -use crate::catalog::{CatalogProvider, CatalogProviderList}; -use crate::datasource::TableProvider; +use crate::catalog::CatalogProviderList; +use crate::catalog_api::{CatalogProvider, SchemaProvider, TableProvider}; use async_trait::async_trait; use dashmap::DashMap; use datafusion_common::{exec_err, DataFusionError}; @@ -201,11 +200,10 @@ impl SchemaProvider for MemorySchemaProvider { #[cfg(test)] mod test { use super::*; - use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider}; + use crate::catalog::memory::MemorySchemaProvider; use crate::catalog::CatalogProvider; use crate::datasource::empty::EmptyTable; use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; - use crate::datasource::TableProvider; use crate::prelude::SessionContext; use arrow_schema::Schema; use datafusion_common::assert_batches_eq; diff --git a/datafusion/core/src/catalog/mod.rs b/datafusion/core/src/catalog/mod.rs index 531adc4b210cf..2f8a32bf464b4 100644 --- a/datafusion/core/src/catalog/mod.rs +++ b/datafusion/core/src/catalog/mod.rs @@ -29,17 +29,15 @@ pub mod information_schema; pub mod listing_schema; -mod memory; -pub mod schema; +pub mod memory; +pub use crate::catalog_api::{CatalogProvider, SchemaProvider}; pub use memory::{ MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider, }; -pub use schema::SchemaProvider; pub use datafusion_sql::{ResolvedTableReference, TableReference}; -use datafusion_common::{not_impl_err, Result}; use std::any::Any; use std::collections::BTreeSet; use std::ops::ControlFlow; @@ -73,136 +71,6 @@ pub trait CatalogProviderList: Sync + Send { #[deprecated(since = "35.0.0", note = "use [`CatalogProviderList`] instead")] pub trait CatalogList: CatalogProviderList {} -/// Represents a catalog, comprising a number of named schemas. -/// -/// # Catalog Overview -/// -/// To plan and execute queries, DataFusion needs a "Catalog" that provides -/// metadata such as which schemas and tables exist, their columns and data -/// types, and how to access the data. -/// -/// The Catalog API consists: -/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s -/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems) -/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems) -/// * [`TableProvider]`: individual tables -/// -/// # Implementing Catalogs -/// -/// To implement a catalog, you implement at least one of the [`CatalogProviderList`], -/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them -/// appropriately the [`SessionContext`]. -/// -/// [`SessionContext`]: crate::execution::context::SessionContext -/// -/// DataFusion comes with a simple in-memory catalog implementation, -/// [`MemoryCatalogProvider`], that is used by default and has no persistence. -/// DataFusion does not include more complex Catalog implementations because -/// catalog management is a key design choice for most data systems, and thus -/// it is unlikely that any general-purpose catalog implementation will work -/// well across many use cases. -/// -/// # Implementing "Remote" catalogs -/// -/// Sometimes catalog information is stored remotely and requires a network call -/// to retrieve. For example, the [Delta Lake] table format stores table -/// metadata in files on S3 that must be first downloaded to discover what -/// schemas and tables exist. -/// -/// [Delta Lake]: https://delta.io/ -/// -/// The [`CatalogProvider`] can support this use case, but it takes some care. -/// The planning APIs in DataFusion are not `async` and thus network IO can not -/// be performed "lazily" / "on demand" during query planning. The rationale for -/// this design is that using remote procedure calls for all catalog accesses -/// required for query planning would likely result in multiple network calls -/// per plan, resulting in very poor planning performance. -/// -/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs, -/// you need to provide an in memory snapshot of the required metadata. Most -/// systems typically either already have this information cached locally or can -/// batch access to the remote catalog to retrieve multiple schemas and tables -/// in a single network call. -/// -/// Note that [`SchemaProvider::table`] is an `async` function in order to -/// simplify implementing simple [`SchemaProvider`]s. For many table formats it -/// is easy to list all available tables but there is additional non trivial -/// access required to read table details (e.g. statistics). -/// -/// The pattern that DataFusion itself uses to plan SQL queries is to walk over -/// the query to [find all table references], -/// performing required remote catalog in parallel, and then plans the query -/// using that snapshot. -/// -/// [find all table references]: resolve_table_references -/// -/// # Example Catalog Implementations -/// -/// Here are some examples of how to implement custom catalogs: -/// -/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider -/// that treats files and directories on a filesystem as tables. -/// -/// * The [`catalog.rs`]: a simple directory based catalog. -/// -/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can -/// read from Delta Lake tables -/// -/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html -/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75 -/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs -/// [delta-rs]: https://github.com/delta-io/delta-rs -/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123 -/// -/// [`TableProvider]: crate::datasource::TableProvider - -pub trait CatalogProvider: Sync + Send { - /// Returns the catalog provider as [`Any`] - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Retrieves the list of available schema names in this catalog. - fn schema_names(&self) -> Vec; - - /// Retrieves a specific schema from the catalog by name, provided it exists. - fn schema(&self, name: &str) -> Option>; - - /// Adds a new schema to this catalog. - /// - /// If a schema of the same name existed before, it is replaced in - /// the catalog and returned. - /// - /// By default returns a "Not Implemented" error - fn register_schema( - &self, - name: &str, - schema: Arc, - ) -> Result>> { - // use variables to avoid unused variable warnings - let _ = name; - let _ = schema; - not_impl_err!("Registering new schemas is not supported") - } - - /// Removes a schema from this catalog. Implementations of this method should return - /// errors if the schema exists but cannot be dropped. For example, in DataFusion's - /// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema - /// will only be successfully dropped when `cascade` is true. - /// This is equivalent to how DROP SCHEMA works in PostgreSQL. - /// - /// Implementations of this method should return None if schema with `name` - /// does not exist. - /// - /// By default returns a "Not Implemented" error - fn deregister_schema( - &self, - _name: &str, - _cascade: bool, - ) -> Result>> { - not_impl_err!("Deregistering new schemas is not supported") - } -} - /// Collects all tables and views referenced in the SQL statement. CTEs are collected separately. /// This can be used to determine which tables need to be in the catalog for a query to be planned. /// diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c55b7c752765d..a8aa80f81200d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -58,6 +58,7 @@ use datafusion_expr::{ use datafusion_functions_aggregate::expr_fn::{avg, count, median, stddev, sum}; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; /// Contains options that control how data is /// written out from a DataFrame @@ -1657,7 +1658,7 @@ impl TableProvider for DataFrameTableProvider { async fn scan( &self, - state: &SessionState, + state: &dyn CatalogSession, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion/core/src/datasource/cte_worktable.rs b/datafusion/core/src/datasource/cte_worktable.rs index afc4536f068e2..33fdaff6a2b4d 100644 --- a/datafusion/core/src/datasource/cte_worktable.rs +++ b/datafusion/core/src/datasource/cte_worktable.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use datafusion_physical_plan::work_table::WorkTableExec; use crate::{ @@ -31,7 +32,6 @@ use crate::{ }; use crate::datasource::{TableProvider, TableType}; -use crate::execution::context::SessionState; /// The temporary working table where the previous iteration of a recursive query is stored /// Naming is based on PostgreSQL's implementation. @@ -77,7 +77,7 @@ impl TableProvider for CteWorkTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5100987520ee1..becfa7f66566c 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -22,11 +22,11 @@ use std::sync::Arc; use arrow::datatypes::*; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use datafusion_common::project_schema; use crate::datasource::{TableProvider, TableType}; use crate::error::Result; -use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::{empty::EmptyExec, ExecutionPlan}; @@ -69,7 +69,7 @@ impl TableProvider for EmptyTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ea4d396a14cb3..2567df78aa131 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -24,9 +24,8 @@ use std::{any::Any, sync::Arc}; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; use super::PartitionedFile; -use crate::datasource::{ - create_ordering, get_statistics_with_limit, TableProvider, TableType, -}; +use crate::catalog_api::TableProvider; +use crate::datasource::{create_ordering, get_statistics_with_limit, TableType}; use crate::datasource::{ file_format::{file_compression_type::FileCompressionType, FileFormat}, listing::ListingTableUrl, @@ -52,6 +51,7 @@ use datafusion_physical_expr::{ }; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; @@ -736,13 +736,16 @@ impl TableProvider for ListingTable { async fn scan( &self, - state: &SessionState, + state: &dyn CatalogSession, projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result> { - let (mut partitioned_file_lists, statistics) = - self.list_files_for_scan(state, filters, limit).await?; + // TODO remove downcast_ref from here? + let session_state = state.as_any().downcast_ref::().unwrap(); + let (mut partitioned_file_lists, statistics) = self + .list_files_for_scan(session_state, filters, limit) + .await?; // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { @@ -805,7 +808,7 @@ impl TableProvider for ListingTable { self.options .format .create_physical_plan( - state, + session_state, FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema)) .with_file_groups(partitioned_file_lists) .with_statistics(statistics) @@ -852,7 +855,7 @@ impl TableProvider for ListingTable { async fn insert_into( &self, - state: &SessionState, + state: &dyn CatalogSession, input: Arc, overwrite: bool, ) -> Result> { @@ -878,8 +881,10 @@ impl TableProvider for ListingTable { // Get the object store for the table path. let store = state.runtime_env().object_store(table_path)?; + // TODO remove downcast_ref from here? + let session_state = state.as_any().downcast_ref::().unwrap(); let file_list_stream = pruned_partition_list( - state, + session_state, store.as_ref(), table_path, &[], @@ -890,7 +895,7 @@ impl TableProvider for ListingTable { let file_groups = file_list_stream.try_collect::>().await?; let keep_partition_by_columns = - state.config().options().execution.keep_partition_by_columns; + state.config_options().execution.keep_partition_by_columns; // Sink related option, apart from format let config = FileSinkConfig { @@ -926,7 +931,7 @@ impl TableProvider for ListingTable { self.options() .format - .create_writer_physical_plan(input, state, config, order_requirements) + .create_writer_physical_plan(input, session_state, config, order_requirements) .await } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 1d4d08481895b..3119f67291517 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -20,11 +20,10 @@ use std::path::Path; use std::sync::Arc; +use crate::catalog_api::{TableProvider, TableProviderFactory}; use crate::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use crate::datasource::provider::TableProviderFactory; -use crate::datasource::TableProvider; use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; @@ -33,6 +32,7 @@ use datafusion_common::{config_datafusion_err, Result}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; /// A `TableProviderFactory` capable of creating new `ListingTable`s #[derive(Debug, Default)] @@ -49,16 +49,18 @@ impl ListingTableFactory { impl TableProviderFactory for ListingTableFactory { async fn create( &self, - state: &SessionState, + state: &dyn CatalogSession, cmd: &CreateExternalTable, ) -> Result> { - let file_format = state + // TODO remove downcast_ref from here. Should file format factory be an extension to session state? + let session_state = state.as_any().downcast_ref::().unwrap(); + let file_format = session_state .get_file_format_factory(cmd.file_type.as_str()) .ok_or(config_datafusion_err!( "Unable to create table with format {}! Could not find FileFormat.", cmd.file_type ))? - .create(state, &cmd.options)?; + .create(session_state, &cmd.options)?; let file_extension = get_extension(cmd.location.as_str()); @@ -114,10 +116,12 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()); - options.validate_partitions(state, &table_path).await?; + options + .validate_partitions(session_state, &table_path) + .await?; let resolved_schema = match provided_schema { - None => options.infer_schema(state, &table_path).await?, + None => options.infer_schema(session_state, &table_path).await?, Some(s) => s, }; let config = ListingTableConfig::new(table_path) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index aab42285a0b2f..c2215aaf1fc9a 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -42,6 +42,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use futures::StreamExt; use log::debug; use parking_lot::Mutex; @@ -206,7 +207,7 @@ impl TableProvider for MemTable { async fn scan( &self, - state: &SessionState, + state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, @@ -258,7 +259,7 @@ impl TableProvider for MemTable { /// * A plan that returns the number of rows written. async fn insert_into( &self, - _state: &SessionState, + _state: &dyn CatalogSession, input: Arc, overwrite: bool, ) -> Result> { diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index c28788eed4582..015e2d3f03c00 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -43,8 +43,8 @@ pub use self::default_table_source::{ provider_as_source, source_as_provider, DefaultTableSource, }; pub use self::memory::MemTable; -pub use self::provider::TableProvider; pub use self::view::ViewTable; +pub use crate::catalog_api::TableProvider; pub use crate::logical_expr::TableType; pub use statistics::get_statistics_with_limit; diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 7c58aded31081..ce0f760358de9 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -17,285 +17,17 @@ //! Data source traits -use std::any::Any; use std::sync::Arc; use async_trait::async_trait; -use datafusion_common::{not_impl_err, Constraints, Statistics}; -use datafusion_expr::{CreateExternalTable, LogicalPlan}; +use datafusion_catalog::CatalogSession; +use datafusion_expr::CreateExternalTable; pub use datafusion_expr::{TableProviderFilterPushDown, TableType}; -use crate::arrow::datatypes::SchemaRef; +use crate::catalog_api::{TableProvider, TableProviderFactory}; use crate::datasource::listing_table_factory::ListingTableFactory; use crate::datasource::stream::StreamTableFactory; use crate::error::Result; -use crate::execution::context::SessionState; -use crate::logical_expr::Expr; -use crate::physical_plan::ExecutionPlan; - -/// Source table -#[async_trait] -pub trait TableProvider: Sync + Send { - /// Returns the table provider as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Get a reference to the schema for this table - fn schema(&self) -> SchemaRef; - - /// Get a reference to the constraints of the table. - /// Returns: - /// - `None` for tables that do not support constraints. - /// - `Some(&Constraints)` for tables supporting constraints. - /// Therefore, a `Some(&Constraints::empty())` return value indicates that - /// this table supports constraints, but there are no constraints. - fn constraints(&self) -> Option<&Constraints> { - None - } - - /// Get the type of this table for metadata/catalog purposes. - fn table_type(&self) -> TableType; - - /// Get the create statement used to create this table, if available. - fn get_table_definition(&self) -> Option<&str> { - None - } - - /// Get the [`LogicalPlan`] of this table, if available - fn get_logical_plan(&self) -> Option<&LogicalPlan> { - None - } - - /// Get the default value for a column, if available. - fn get_column_default(&self, _column: &str) -> Option<&Expr> { - None - } - - /// Create an [`ExecutionPlan`] for scanning the table with optionally - /// specified `projection`, `filter` and `limit`, described below. - /// - /// The `ExecutionPlan` is responsible scanning the datasource's - /// partitions in a streaming, parallelized fashion. - /// - /// # Projection - /// - /// If specified, only a subset of columns should be returned, in the order - /// specified. The projection is a set of indexes of the fields in - /// [`Self::schema`]. - /// - /// DataFusion provides the projection to scan only the columns actually - /// used in the query to improve performance, an optimization called - /// "Projection Pushdown". Some datasources, such as Parquet, can use this - /// information to go significantly faster when only a subset of columns is - /// required. - /// - /// # Filters - /// - /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the - /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for - /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the - /// expressions are `AND`ed together). - /// - /// To enable filter pushdown you must override - /// [`Self::supports_filters_pushdown`] as the default implementation does - /// not and `filters` will be empty. - /// - /// DataFusion pushes filtering into the scans whenever possible - /// ("Filter Pushdown"), and depending on the format and the - /// implementation of the format, evaluating the predicate during the scan - /// can increase performance significantly. - /// - /// ## Note: Some columns may appear *only* in Filters - /// - /// In certain cases, a query may only use a certain column in a Filter that - /// has been completely pushed down to the scan. In this case, the - /// projection will not contain all the columns found in the filter - /// expressions. - /// - /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`, - /// - /// ```text - /// ┌────────────────────┐ - /// │ Projection(t.a) │ - /// └────────────────────┘ - /// ▲ - /// │ - /// │ - /// ┌────────────────────┐ Filter ┌────────────────────┐ Projection ┌────────────────────┐ - /// │ Filter(t.b > 5) │────Pushdown──▶ │ Projection(t.a) │ ───Pushdown───▶ │ Projection(t.a) │ - /// └────────────────────┘ └────────────────────┘ └────────────────────┘ - /// ▲ ▲ ▲ - /// │ │ │ - /// │ │ ┌────────────────────┐ - /// ┌────────────────────┐ ┌────────────────────┐ │ Scan │ - /// │ Scan │ │ Scan │ │ filter=(t.b > 5) │ - /// └────────────────────┘ │ filter=(t.b > 5) │ │ projection=(t.a) │ - /// └────────────────────┘ └────────────────────┘ - /// - /// Initial Plan If `TableProviderFilterPushDown` Projection pushdown notes that - /// returns true, filter pushdown the scan only needs t.a - /// pushes the filter into the scan - /// BUT internally evaluating the - /// predicate still requires t.b - /// ``` - /// - /// # Limit - /// - /// If `limit` is specified, must only produce *at least* this many rows, - /// (though it may return more). Like Projection Pushdown and Filter - /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as - /// possible, called "Limit Pushdown" as some sources can use this - /// information to improve their performance. Note that if there are any - /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is - /// because inexact filters do not guarantee that every filtered row is - /// removed, so applying the limit could lead to too few rows being available - /// to return as a final result. - async fn scan( - &self, - state: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result>; - - /// Specify if DataFusion should provide filter expressions to the - /// TableProvider to apply *during* the scan. - /// - /// Some TableProviders can evaluate filters more efficiently than the - /// `Filter` operator in DataFusion, for example by using an index. - /// - /// # Parameters and Return Value - /// - /// The return `Vec` must have one element for each element of the `filters` - /// argument. The value of each element indicates if the TableProvider can - /// apply the corresponding filter during the scan. The position in the return - /// value corresponds to the expression in the `filters` parameter. - /// - /// If the length of the resulting `Vec` does not match the `filters` input - /// an error will be thrown. - /// - /// Each element in the resulting `Vec` is one of the following: - /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter - /// during scan - /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan - /// - /// By default, this function returns [`Unsupported`] for all filters, - /// meaning no filters will be provided to [`Self::scan`]. - /// - /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported - /// [`Exact`]: TableProviderFilterPushDown::Exact - /// [`Inexact`]: TableProviderFilterPushDown::Inexact - /// # Example - /// - /// ```rust - /// # use std::any::Any; - /// # use std::sync::Arc; - /// # use arrow_schema::SchemaRef; - /// # use async_trait::async_trait; - /// # use datafusion::datasource::TableProvider; - /// # use datafusion::error::{Result, DataFusionError}; - /// # use datafusion::execution::context::SessionState; - /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; - /// # use datafusion_physical_plan::ExecutionPlan; - /// // Define a struct that implements the TableProvider trait - /// struct TestDataSource {} - /// - /// #[async_trait] - /// impl TableProvider for TestDataSource { - /// # fn as_any(&self) -> &dyn Any { todo!() } - /// # fn schema(&self) -> SchemaRef { todo!() } - /// # fn table_type(&self) -> TableType { todo!() } - /// # async fn scan(&self, s: &SessionState, p: Option<&Vec>, f: &[Expr], l: Option) -> Result> { - /// todo!() - /// # } - /// // Override the supports_filters_pushdown to evaluate which expressions - /// // to accept as pushdown predicates. - /// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { - /// // Process each filter - /// let support: Vec<_> = filters.iter().map(|expr| { - /// match expr { - /// // This example only supports a between expr with a single column named "c1". - /// Expr::Between(between_expr) => { - /// between_expr.expr - /// .try_into_col() - /// .map(|column| { - /// if column.name == "c1" { - /// TableProviderFilterPushDown::Exact - /// } else { - /// TableProviderFilterPushDown::Unsupported - /// } - /// }) - /// // If there is no column in the expr set the filter to unsupported. - /// .unwrap_or(TableProviderFilterPushDown::Unsupported) - /// } - /// _ => { - /// // For all other cases return Unsupported. - /// TableProviderFilterPushDown::Unsupported - /// } - /// } - /// }).collect(); - /// Ok(support) - /// } - /// } - /// ``` - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> Result> { - Ok(vec![ - TableProviderFilterPushDown::Unsupported; - filters.len() - ]) - } - - /// Get statistics for this table, if available - fn statistics(&self) -> Option { - None - } - - /// Return an [`ExecutionPlan`] to insert data into this table, if - /// supported. - /// - /// The returned plan should return a single row in a UInt64 - /// column called "count" such as the following - /// - /// ```text - /// +-------+, - /// | count |, - /// +-------+, - /// | 6 |, - /// +-------+, - /// ``` - /// - /// # See Also - /// - /// See [`DataSinkExec`] for the common pattern of inserting a - /// streams of `RecordBatch`es as files to an ObjectStore. - /// - /// [`DataSinkExec`]: crate::physical_plan::insert::DataSinkExec - async fn insert_into( - &self, - _state: &SessionState, - _input: Arc, - _overwrite: bool, - ) -> Result> { - not_impl_err!("Insert into not implemented for this table") - } -} - -/// A factory which creates [`TableProvider`]s at runtime given a URL. -/// -/// For example, this can be used to create a table "on the fly" -/// from a directory of files only when that name is referenced. -#[async_trait] -pub trait TableProviderFactory: Sync + Send { - /// Create a TableProvider with the given url - async fn create( - &self, - state: &SessionState, - cmd: &CreateExternalTable, - ) -> Result>; -} /// The default [`TableProviderFactory`] /// @@ -318,7 +50,7 @@ impl DefaultTableFactory { impl TableProviderFactory for DefaultTableFactory { async fn create( &self, - state: &SessionState, + state: &dyn CatalogSession, cmd: &CreateExternalTable, ) -> Result> { let mut unbounded = cmd.unbounded; diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 9cfdb7bb1168f..bb6f9ae0e71a8 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -25,9 +25,8 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use crate::datasource::provider::TableProviderFactory; -use crate::datasource::{create_ordering, TableProvider}; -use crate::execution::context::SessionState; +use crate::catalog_api::{TableProvider, TableProviderFactory}; +use crate::datasource::create_ordering; use arrow_array::{RecordBatch, RecordBatchReader, RecordBatchWriter}; use arrow_schema::SchemaRef; @@ -42,6 +41,7 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use futures::StreamExt; /// A [`TableProviderFactory`] for [`StreamTable`] @@ -52,7 +52,7 @@ pub struct StreamTableFactory {} impl TableProviderFactory for StreamTableFactory { async fn create( &self, - state: &SessionState, + state: &dyn CatalogSession, cmd: &CreateExternalTable, ) -> Result> { let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); @@ -322,7 +322,7 @@ impl TableProvider for StreamTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], limit: Option, @@ -347,7 +347,7 @@ impl TableProvider for StreamTable { async fn insert_into( &self, - _state: &SessionState, + _state: &dyn CatalogSession, input: Arc, _overwrite: bool, ) -> Result> { diff --git a/datafusion/core/src/datasource/streaming.rs b/datafusion/core/src/datasource/streaming.rs index 0ba6f85ec3e2b..6276f46acc24c 100644 --- a/datafusion/core/src/datasource/streaming.rs +++ b/datafusion/core/src/datasource/streaming.rs @@ -23,14 +23,13 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_common::{plan_err, Result}; -use datafusion_expr::{Expr, TableType}; -use log::debug; - use crate::datasource::TableProvider; -use crate::execution::context::SessionState; use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec}; use crate::physical_plan::ExecutionPlan; +use datafusion_catalog::CatalogSession; +use datafusion_common::{plan_err, Result}; +use datafusion_expr::{Expr, TableType}; +use log::debug; /// A [`TableProvider`] that streams a set of [`PartitionStream`] pub struct StreamingTable { @@ -85,7 +84,7 @@ impl TableProvider for StreamingTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], limit: Option, diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 3f024a6b4cb71..42352353d35dc 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use datafusion_common::Column; use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown}; @@ -31,7 +32,6 @@ use crate::{ }; use crate::datasource::{TableProvider, TableType}; -use crate::execution::context::SessionState; /// An implementation of `TableProvider` that uses another logical plan. pub struct ViewTable { @@ -103,7 +103,7 @@ impl TableProvider for ViewTable { async fn scan( &self, - state: &SessionState, + state: &dyn CatalogSession, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 640a9b14a65f1..45acc87dd737c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -24,15 +24,15 @@ use std::sync::{Arc, Weak}; use super::options::ReadOptions; use crate::{ catalog::listing_schema::ListingSchemaProvider, - catalog::schema::MemorySchemaProvider, + catalog::memory::MemorySchemaProvider, catalog::{CatalogProvider, CatalogProviderList, MemoryCatalogProvider}, + catalog_api::{TableProvider, TableProviderFactory}, dataframe::DataFrame, datasource::{ function::{TableFunction, TableFunctionImpl}, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, - provider::TableProviderFactory, }, - datasource::{provider_as_source, MemTable, TableProvider, ViewTable}, + datasource::{provider_as_source, MemTable, ViewTable}, error::{DataFusionError, Result}, execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry}, logical_expr::AggregateUDF, @@ -1578,7 +1578,7 @@ mod tests { use datafusion_common_runtime::SpawnedTask; - use crate::catalog::schema::SchemaProvider; + use crate::catalog_api::SchemaProvider; use crate::execution::session_state::SessionStateBuilder; use crate::physical_planner::PhysicalPlanner; use async_trait::async_trait; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 03ce8d3b5892a..1473f4d2b0723 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -19,11 +19,13 @@ use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA}; use crate::catalog::listing_schema::ListingSchemaProvider; -use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider}; +use crate::catalog::memory::MemorySchemaProvider; use crate::catalog::{ CatalogProvider, CatalogProviderList, MemoryCatalogProvider, MemoryCatalogProviderList, }; +use crate::catalog_api::SchemaProvider; +use crate::catalog_api::TableProviderFactory; use crate::datasource::cte_worktable::CteWorkTable; use crate::datasource::file_format::arrow::ArrowFormatFactory; use crate::datasource::file_format::avro::AvroFormatFactory; @@ -33,7 +35,7 @@ use crate::datasource::file_format::json::JsonFormatFactory; use crate::datasource::file_format::parquet::ParquetFormatFactory; use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; use crate::datasource::function::{TableFunction, TableFunctionImpl}; -use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory}; +use crate::datasource::provider::DefaultTableFactory; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; #[cfg(feature = "array_expressions")] @@ -45,6 +47,7 @@ use crate::{functions, functions_aggregate}; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_catalog::CatalogSession; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions}; use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; @@ -81,6 +84,7 @@ use itertools::Itertools; use log::{debug, info}; use sqlparser::ast::Expr as SQLExpr; use sqlparser::dialect::dialect_from_str; +use std::any::Any; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; @@ -194,6 +198,56 @@ impl Debug for SessionState { } } +#[async_trait] +impl CatalogSession for SessionState { + fn session_id(&self) -> &str { + self.session_id() + } + + fn config(&self) -> &SessionConfig { + self.config() + } + + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> datafusion_common::Result> { + self.create_physical_plan(logical_plan).await + } + + fn create_physical_expr( + &self, + expr: Expr, + df_schema: &DFSchema, + ) -> datafusion_common::Result> { + self.create_physical_expr(expr, df_schema) + } + + fn scalar_functions(&self) -> &HashMap> { + self.scalar_functions() + } + + fn aggregate_functions(&self) -> &HashMap> { + self.aggregate_functions() + } + + fn window_functions(&self) -> &HashMap> { + self.window_functions() + } + + fn runtime_env(&self) -> &Arc { + self.runtime_env() + } + + fn execution_props(&self) -> &ExecutionProps { + self.execution_props() + } + + fn as_any(&self) -> &dyn Any { + self + } +} + impl SessionState { /// Returns new [`SessionState`] using the provided /// [`SessionConfig`] and [`RuntimeEnv`]. diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 81c1c4629a3ad..52ad932d6b02e 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -535,6 +535,11 @@ pub use common::config; // NB datafusion execution is re-exported in the `execution` module +/// re-export of [`datafusion_catalog`] crate +pub mod catalog_api { + pub use datafusion_catalog::*; +} + /// re-export of [`datafusion_expr`] crate pub mod logical_expr { pub use datafusion_expr::*; diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index ba0509f3f51ac..defa93708da97 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -29,12 +29,12 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::catalog_api::{TableProvider, TableProviderFactory}; use crate::dataframe::DataFrame; -use crate::datasource::provider::TableProviderFactory; use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; -use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; +use crate::datasource::{empty::EmptyTable, provider_as_source}; use crate::error::Result; -use crate::execution::context::{SessionState, TaskContext}; +use crate::execution::context::TaskContext; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use crate::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, @@ -49,9 +49,9 @@ use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use futures::Stream; use tempfile::TempDir; - // backwards compatibility #[cfg(feature = "parquet")] pub use datafusion_common::test_util::parquet_test_data; @@ -177,7 +177,7 @@ pub struct TestTableFactory {} impl TableProviderFactory for TestTableFactory { async fn create( &self, - _: &SessionState, + _: &dyn CatalogSession, cmd: &CreateExternalTable, ) -> Result> { Ok(Arc::new(TestTableProvider { @@ -213,7 +213,7 @@ impl TableProvider for TestTableProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index eebc946ccb68c..99369e0521d7f 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::logical_expr::{ col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE, }; @@ -43,6 +43,7 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::{ExecutionMode, PlanProperties}; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; use futures::stream::Stream; mod provider_filter_pushdown; @@ -212,7 +213,7 @@ impl TableProvider for CustomTableProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index b5506b7c12f61..9760ce7a3ae3d 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -21,9 +21,10 @@ use std::sync::Arc; use arrow::array::{Int32Builder, Int64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion::datasource::provider::{TableProvider, TableType}; +use datafusion::catalog_api::TableProvider; +use datafusion::datasource::provider::TableType; use datafusion::error::Result; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::logical_expr::TableProviderFilterPushDown; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ @@ -39,6 +40,7 @@ use datafusion_functions_aggregate::expr_fn::count; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; fn create_batch(value: i32, num_rows: usize) -> Result { let mut builder = Int32Builder::with_capacity(num_rows); @@ -162,7 +164,7 @@ impl TableProvider for CustomProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, filters: &[Expr], _: Option, diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 2d42b03bfed87..119f0b0f6e634 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -20,7 +20,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::{ datasource::{TableProvider, TableType}, error::Result, @@ -36,6 +36,7 @@ use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion_catalog::CatalogSession; /// This is a testing structure for statistics /// It will act both as a table provider and execution plan @@ -89,7 +90,7 @@ impl TableProvider for StatisticsValidation { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, filters: &[Expr], // limit is ignored because it is not mandatory for a `TableProvider` to honor it diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 1d151f9fd3683..90051e68f46e7 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -35,7 +35,6 @@ use tokio::fs::File; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::execution::context::SessionState; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::session_state::SessionStateBuilder; @@ -45,6 +44,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use datafusion_common::{assert_contains, Result}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_catalog::CatalogSession; use datafusion_execution::TaskContext; use test_utils::AccessLogGenerator; @@ -771,7 +771,7 @@ impl TableProvider for SortedTableProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 1e8d30cab6385..2d1691a66a060 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -24,11 +24,11 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; -use datafusion::execution::context::SessionState; use datafusion::execution::TaskContext; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; +use datafusion_catalog::CatalogSession; use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue}; use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType}; use std::fs::File; @@ -127,7 +127,7 @@ impl TableProvider for SimpleCsvTable { async fn scan( &self, - state: &SessionState, + state: &dyn CatalogSession, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, @@ -161,7 +161,7 @@ impl TableProvider for SimpleCsvTable { } impl SimpleCsvTable { - async fn interpreter_expr(&self, state: &SessionState) -> Result { + async fn interpreter_expr(&self, state: &dyn CatalogSession) -> Result { use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::utils::columnize_expr; let plan = LogicalPlan::EmptyRelation(EmptyRelation { diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 0117502f400d2..07790faf1c889 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -30,12 +30,11 @@ use arrow::datatypes::{ }; use prost::Message; +use datafusion::catalog_api::{TableProvider, TableProviderFactory}; use datafusion::datasource::file_format::arrow::ArrowFormatFactory; use datafusion::datasource::file_format::csv::CsvFormatFactory; use datafusion::datasource::file_format::format_as_file_type; use datafusion::datasource::file_format::parquet::ParquetFormatFactory; -use datafusion::datasource::provider::TableProviderFactory; -use datafusion::datasource::TableProvider; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::execution::FunctionRegistry; use datafusion::functions_aggregate::count::count_udaf; diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index dd27727e3ad5d..3a2e6d6d9fd0d 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -27,12 +27,12 @@ use arrow::array::{ }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; -use datafusion::execution::context::SessionState; use datafusion::logical_expr::{create_udf, ColumnarValue, Expr, ScalarUDF, Volatility}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionConfig; use datafusion::{ - catalog::{schema::MemorySchemaProvider, CatalogProvider, MemoryCatalogProvider}, + catalog::{memory::MemoryCatalogProvider, memory::MemorySchemaProvider}, + catalog_api::CatalogProvider, datasource::{MemTable, TableProvider, TableType}, prelude::{CsvReadOptions, SessionContext}, }; @@ -40,6 +40,7 @@ use datafusion_common::cast::as_float64_array; use datafusion_common::DataFusionError; use async_trait::async_trait; +use datafusion::catalog_api::CatalogSession; use log::info; use tempfile::TempDir; @@ -221,7 +222,7 @@ pub async fn register_temp_table(ctx: &SessionContext) { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, _: Option<&Vec>, _: &[Expr], _: Option, diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index f53ac6cfae976..4d77996ed3e7e 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -112,7 +112,7 @@ impl CustomDataSource { impl TableProvider for CustomDataSource { async fn scan( &self, - _state: &SessionState, + _state: &dyn CatalogSession, projection: Option<&Vec>, // filters and limit can be used here to inject some push-down operations if needed _filters: &[Expr],