From 2cda63f64ef5104ada1a95ecc56d7ca293e7ab26 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 27 Feb 2024 09:31:20 +0100 Subject: [PATCH] refactor: `SchemaProvider::table` can fail --- datafusion-cli/src/catalog.rs | 43 ++++++++++-------- .../core/src/catalog/information_schema.rs | 45 +++++++++++++------ datafusion/core/src/catalog/listing_schema.rs | 10 +++-- datafusion/core/src/catalog/schema.rs | 12 +++-- datafusion/core/src/execution/context/mod.rs | 6 +-- datafusion/core/src/physical_planner.rs | 4 +- datafusion/core/tests/sql/create_drop.rs | 2 +- 7 files changed, 78 insertions(+), 44 deletions(-) diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index f664d40df5dba..67184b8257b87 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -19,6 +19,7 @@ use crate::object_storage::get_object_store; use async_trait::async_trait; use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; +use datafusion::common::{plan_datafusion_err, DataFusionError}; use datafusion::datasource::listing::{ ListingTable, ListingTableConfig, ListingTableUrl, }; @@ -145,16 +146,21 @@ impl SchemaProvider for DynamicFileSchemaProvider { self.inner.register_table(name, table) } - async fn table(&self, name: &str) -> Option> { - let inner_table = self.inner.table(name).await; + async fn table(&self, name: &str) -> Result>> { + let inner_table = self.inner.table(name).await?; if inner_table.is_some() { - return inner_table; + return Ok(inner_table); } // if the inner schema provider didn't have a table by // that name, try to treat it as a listing table - let state = self.state.upgrade()?.read().clone(); - let table_url = ListingTableUrl::parse(name).ok()?; + let state = self + .state + .upgrade() + .ok_or_else(|| plan_datafusion_err!("locking error"))? + .read() + .clone(); + let table_url = ListingTableUrl::parse(name)?; let url: &Url = table_url.as_ref(); // If the store is already registered for this URL then `get_store` @@ -169,18 +175,20 @@ impl SchemaProvider for DynamicFileSchemaProvider { let mut options = HashMap::new(); let store = get_object_store(&state, &mut options, table_url.scheme(), url) - .await - .unwrap(); + .await?; state.runtime_env().register_object_store(url, store); } } - let config = ListingTableConfig::new(table_url) - .infer(&state) - .await - .ok()?; + let config = match ListingTableConfig::new(table_url).infer(&state).await { + Ok(cfg) => cfg, + Err(_) => { + // treat as non-existing + return Ok(None); + } + }; - Some(Arc::new(ListingTable::try_new(config).ok()?)) + Ok(Some(Arc::new(ListingTable::try_new(config)?))) } fn deregister_table(&self, name: &str) -> Result>> { @@ -227,7 +235,7 @@ mod tests { let (ctx, schema) = setup_context(); // That's a non registered table so expecting None here - let table = schema.table(&location).await; + let table = schema.table(&location).await.unwrap(); assert!(table.is_none()); // It should still create an object store for the location in the SessionState @@ -251,7 +259,7 @@ mod tests { let (ctx, schema) = setup_context(); - let table = schema.table(&location).await; + let table = schema.table(&location).await.unwrap(); assert!(table.is_none()); let store = ctx @@ -273,7 +281,7 @@ mod tests { let (ctx, schema) = setup_context(); - let table = schema.table(&location).await; + let table = schema.table(&location).await.unwrap(); assert!(table.is_none()); let store = ctx @@ -289,13 +297,10 @@ mod tests { } #[tokio::test] - #[should_panic] async fn query_invalid_location_test() { let location = "ts://file.parquet"; let (_ctx, schema) = setup_context(); - // This will panic, we cannot prevent that because `schema.table` - // returns an Option - schema.table(location).await; + assert!(schema.table(location).await.is_err()); } } diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index 80ce3b1ae4198..cd8f7649534ff 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -20,6 +20,7 @@ //! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema use async_trait::async_trait; +use datafusion_common::DataFusionError; use std::{any::Any, sync::Arc}; use arrow::{ @@ -78,7 +79,10 @@ struct InformationSchemaConfig { impl InformationSchemaConfig { /// Construct the `information_schema.tables` virtual table - async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) { + async fn make_tables( + &self, + builder: &mut InformationSchemaTablesBuilder, + ) -> Result<(), DataFusionError> { // create a mem table with the names of tables for catalog_name in self.catalog_list.catalog_names() { @@ -89,7 +93,7 @@ impl InformationSchemaConfig { // schema name may not exist in the catalog, so we need to check if let Some(schema) = catalog.schema(&schema_name) { for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await { + if let Some(table) = schema.table(&table_name).await? { builder.add_table( &catalog_name, &schema_name, @@ -124,6 +128,8 @@ impl InformationSchemaConfig { TableType::View, ); } + + Ok(()) } async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) { @@ -141,7 +147,10 @@ impl InformationSchemaConfig { } } - async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) { + async fn make_views( + &self, + builder: &mut InformationSchemaViewBuilder, + ) -> Result<(), DataFusionError> { for catalog_name in self.catalog_list.catalog_names() { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); @@ -150,7 +159,7 @@ impl InformationSchemaConfig { // schema name may not exist in the catalog, so we need to check if let Some(schema) = catalog.schema(&schema_name) { for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await { + if let Some(table) = schema.table(&table_name).await? { builder.add_view( &catalog_name, &schema_name, @@ -163,10 +172,15 @@ impl InformationSchemaConfig { } } } + + Ok(()) } /// Construct the `information_schema.columns` virtual table - async fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) { + async fn make_columns( + &self, + builder: &mut InformationSchemaColumnsBuilder, + ) -> Result<(), DataFusionError> { for catalog_name in self.catalog_list.catalog_names() { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); @@ -175,7 +189,7 @@ impl InformationSchemaConfig { // schema name may not exist in the catalog, so we need to check if let Some(schema) = catalog.schema(&schema_name) { for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await { + if let Some(table) = schema.table(&table_name).await? { for (field_position, field) in table.schema().fields().iter().enumerate() { @@ -193,6 +207,8 @@ impl InformationSchemaConfig { } } } + + Ok(()) } /// Construct the `information_schema.df_settings` virtual table @@ -223,7 +239,10 @@ impl SchemaProvider for InformationSchemaProvider { ] } - async fn table(&self, name: &str) -> Option> { + async fn table( + &self, + name: &str, + ) -> Result>, DataFusionError> { let config = self.config.clone(); let table: Arc = if name.eq_ignore_ascii_case("tables") { Arc::new(InformationSchemaTables::new(config)) @@ -236,12 +255,12 @@ impl SchemaProvider for InformationSchemaProvider { } else if name.eq_ignore_ascii_case("schemata") { Arc::new(InformationSchemata::new(config)) } else { - return None; + return Ok(None); }; - Some(Arc::new( + Ok(Some(Arc::new( StreamingTable::try_new(table.schema().clone(), vec![table]).unwrap(), - )) + ))) } fn table_exist(&self, name: &str) -> bool { @@ -292,7 +311,7 @@ impl PartitionStream for InformationSchemaTables { self.schema.clone(), // TODO: Stream this futures::stream::once(async move { - config.make_tables(&mut builder).await; + config.make_tables(&mut builder).await?; Ok(builder.finish()) }), )) @@ -383,7 +402,7 @@ impl PartitionStream for InformationSchemaViews { self.schema.clone(), // TODO: Stream this futures::stream::once(async move { - config.make_views(&mut builder).await; + config.make_views(&mut builder).await?; Ok(builder.finish()) }), )) @@ -497,7 +516,7 @@ impl PartitionStream for InformationSchemaColumns { self.schema.clone(), // TODO: Stream this futures::stream::once(async move { - config.make_columns(&mut builder).await; + config.make_columns(&mut builder).await?; Ok(builder.finish()) }), )) diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs index c3c6826895421..f64b43062d2f2 100644 --- a/datafusion/core/src/catalog/listing_schema.rs +++ b/datafusion/core/src/catalog/listing_schema.rs @@ -175,12 +175,16 @@ impl SchemaProvider for ListingSchemaProvider { .collect() } - async fn table(&self, name: &str) -> Option> { - self.tables + async fn table( + &self, + name: &str, + ) -> Result>, DataFusionError> { + Ok(self + .tables .lock() .expect("Can't lock tables") .get(name) - .cloned() + .cloned()) } fn register_table( diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 1e9a86b496112..49f8350ecc5b1 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -49,7 +49,10 @@ pub trait SchemaProvider: Sync + Send { /// Retrieves a specific table from the schema by name, if it exists, /// otherwise returns `None`. - async fn table(&self, name: &str) -> Option>; + async fn table( + &self, + name: &str, + ) -> Result>, DataFusionError>; /// If supported by the implementation, adds a new table named `name` to /// this schema. @@ -111,8 +114,11 @@ impl SchemaProvider for MemorySchemaProvider { .collect() } - async fn table(&self, name: &str) -> Option> { - self.tables.get(name).map(|table| table.value().clone()) + async fn table( + &self, + name: &str, + ) -> Result>, DataFusionError> { + Ok(self.tables.get(name).map(|table| table.value().clone())) } fn register_table( diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b130070141b25..ffc4a4f717d7e 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -783,7 +783,7 @@ impl SessionContext { }; if let Some(schema) = maybe_schema { - if let Some(table_provider) = schema.table(&table).await { + if let Some(table_provider) = schema.table(&table).await? { if table_provider.table_type() == table_type { schema.deregister_table(&table)?; return Ok(true); @@ -1115,7 +1115,7 @@ impl SessionContext { let table_ref = table_ref.into(); let table = table_ref.table().to_string(); let schema = self.state.read().schema_for_ref(table_ref)?; - match schema.table(&table).await { + match schema.table(&table).await? { Some(ref provider) => Ok(Arc::clone(provider)), _ => plan_err!("No table named '{table}'"), } @@ -1714,7 +1714,7 @@ impl SessionState { let resolved = self.resolve_table_ref(&reference); if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) { if let Ok(schema) = self.schema_for_ref(resolved) { - if let Some(table) = schema.table(table).await { + if let Some(table) = schema.table(table).await? { v.insert(provider_as_source(table)); } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 23ac7e08cad8f..83ba773464f6e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -624,7 +624,7 @@ impl DefaultPhysicalPlanner { }) => { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name)?; - if let Some(provider) = schema.table(name).await { + if let Some(provider) = schema.table(name).await? { let input_exec = self.create_initial_plan(input, session_state).await?; provider.insert_into(session_state, input_exec, false).await } else { @@ -641,7 +641,7 @@ impl DefaultPhysicalPlanner { }) => { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name)?; - if let Some(provider) = schema.table(name).await { + if let Some(provider) = schema.table(name).await? { let input_exec = self.create_initial_plan(input, session_state).await?; provider.insert_into(session_state, input_exec, true).await } else { diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index b1434dddee50f..2174009b85573 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -63,7 +63,7 @@ async fn create_external_table_with_ddl() -> Result<()> { let exists = schema.table_exist("dt"); assert!(exists, "Table should have been created!"); - let table_schema = schema.table("dt").await.unwrap().schema(); + let table_schema = schema.table("dt").await.unwrap().unwrap().schema(); assert_eq!(3, table_schema.fields().len());