diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 90f4e6e7e3d1..ed9a68c19536 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -23,11 +23,8 @@ use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::vec::IntoIter; -use crate::error::_plan_err; use crate::utils::{merge_and_order_indices, set_difference}; -use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result}; - -use sqlparser::ast::TableConstraint; +use crate::{DFSchema, JoinType}; /// This object defines a constraint on a table. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] @@ -60,74 +57,6 @@ impl Constraints { Self { inner: constraints } } - /// Convert each `TableConstraint` to corresponding `Constraint` - pub fn new_from_table_constraints( - constraints: &[TableConstraint], - df_schema: &DFSchemaRef, - ) -> Result { - let constraints = constraints - .iter() - .map(|c: &TableConstraint| match c { - TableConstraint::Unique { name, columns, .. } => { - let field_names = df_schema.field_names(); - // Get unique constraint indices in the schema: - let indices = columns - .iter() - .map(|u| { - let idx = field_names - .iter() - .position(|item| *item == u.value) - .ok_or_else(|| { - let name = name - .as_ref() - .map(|name| format!("with name '{name}' ")) - .unwrap_or("".to_string()); - DataFusionError::Execution( - format!("Column for unique constraint {}not found in schema: {}", name,u.value) - ) - })?; - Ok(idx) - }) - .collect::>>()?; - Ok(Constraint::Unique(indices)) - } - TableConstraint::PrimaryKey { columns, .. } => { - let field_names = df_schema.field_names(); - // Get primary key indices in the schema: - let indices = columns - .iter() - .map(|pk| { - let idx = field_names - .iter() - .position(|item| *item == pk.value) - .ok_or_else(|| { - DataFusionError::Execution(format!( - "Column for primary key not found in schema: {}", - pk.value - )) - })?; - Ok(idx) - }) - .collect::>>()?; - Ok(Constraint::PrimaryKey(indices)) - } - TableConstraint::ForeignKey { .. } => { - _plan_err!("Foreign key constraints are not currently supported") - } - TableConstraint::Check { .. } => { - _plan_err!("Check constraints are not currently supported") - } - TableConstraint::Index { .. } => { - _plan_err!("Indexes are not currently supported") - } - TableConstraint::FulltextOrSpatial { .. } => { - _plan_err!("Indexes are not currently supported") - } - }) - .collect::>>()?; - Ok(Constraints::new_unverified(constraints)) - } - /// Check whether constraints is empty pub fn is_empty(&self) -> bool { self.inner.is_empty() diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index edb4316db1e0..4109f1371187 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -30,10 +30,11 @@ use crate::planner::{ use crate::utils::normalize_ident; use arrow_schema::{DataType, Fields}; +use datafusion_common::error::_plan_err; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ exec_err, not_impl_err, plan_datafusion_err, plan_err, schema_err, - unqualified_field_not_found, Column, Constraints, DFSchema, DFSchemaRef, + unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, }; @@ -427,7 +428,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan }; - let constraints = Constraints::new_from_table_constraints( + let constraints = Self::new_constraint_from_table_constraints( &all_constraints, plan.schema(), )?; @@ -452,7 +453,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema, }; let plan = LogicalPlan::EmptyRelation(plan); - let constraints = Constraints::new_from_table_constraints( + let constraints = Self::new_constraint_from_table_constraints( &all_constraints, plan.schema(), )?; @@ -1242,7 +1243,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let name = self.object_name_to_table_reference(name)?; let constraints = - Constraints::new_from_table_constraints(&all_constraints, &df_schema)?; + Self::new_constraint_from_table_constraints(&all_constraints, &df_schema)?; Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( PlanCreateExternalTable { schema: df_schema, @@ -1262,6 +1263,74 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ))) } + /// Convert each `TableConstraint` to corresponding `Constraint` + fn new_constraint_from_table_constraints( + constraints: &[TableConstraint], + df_schema: &DFSchemaRef, + ) -> Result { + let constraints = constraints + .iter() + .map(|c: &TableConstraint| match c { + TableConstraint::Unique { name, columns, .. } => { + let field_names = df_schema.field_names(); + // Get unique constraint indices in the schema: + let indices = columns + .iter() + .map(|u| { + let idx = field_names + .iter() + .position(|item| *item == u.value) + .ok_or_else(|| { + let name = name + .as_ref() + .map(|name| format!("with name '{name}' ")) + .unwrap_or("".to_string()); + DataFusionError::Execution( + format!("Column for unique constraint {}not found in schema: {}", name,u.value) + ) + })?; + Ok(idx) + }) + .collect::>>()?; + Ok(Constraint::Unique(indices)) + } + TableConstraint::PrimaryKey { columns, .. } => { + let field_names = df_schema.field_names(); + // Get primary key indices in the schema: + let indices = columns + .iter() + .map(|pk| { + let idx = field_names + .iter() + .position(|item| *item == pk.value) + .ok_or_else(|| { + DataFusionError::Execution(format!( + "Column for primary key not found in schema: {}", + pk.value + )) + })?; + Ok(idx) + }) + .collect::>>()?; + Ok(Constraint::PrimaryKey(indices)) + } + TableConstraint::ForeignKey { .. } => { + _plan_err!("Foreign key constraints are not currently supported") + } + TableConstraint::Check { .. } => { + _plan_err!("Check constraints are not currently supported") + } + TableConstraint::Index { .. } => { + _plan_err!("Indexes are not currently supported") + } + TableConstraint::FulltextOrSpatial { .. } => { + _plan_err!("Indexes are not currently supported") + } + }) + .collect::>>()?; + Ok(Constraints::new_unverified(constraints)) + } + fn parse_options_map( &self, options: Vec<(String, Value)>,