diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 9b6aad1209fc..e4feecf6c674 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -314,7 +314,7 @@ impl DFSchema { &self, qualifier: Option<&TableReference>, name: &str, - ) -> Option { + ) -> Result> { let mut matches = self .iter() .enumerate() @@ -328,8 +328,45 @@ impl DFSchema { // field to lookup is unqualified, no need to compare qualifier (None, Some(_)) | (None, None) => f.name() == name, }) - .map(|(idx, _)| idx); - matches.next() + .map(|(idx, (q, _))| (idx, q)); + let first_match = matches.next(); + match first_match { + None => Ok(None), + Some((first_index, first_qualifier)) => { + let next_match = matches.next(); + match next_match { + None => Ok(Some(first_index)), + Some((_, next_qualifier)) => { + match (first_qualifier, next_qualifier) { + (Some(q), Some(_)) => { + _schema_err!(SchemaError::DuplicateQualifiedField { + qualifier: Box::new(q.clone()), + name: name.to_string(), + }) + } + + (None, None) => { + _schema_err!(SchemaError::DuplicateUnqualifiedField { + name: name.to_string(), + }) + } + + _ => _schema_err!(SchemaError::AmbiguousReference { + field: Column { + relation: Some( + first_qualifier + .or(next_qualifier) + .unwrap() + .clone() + ), + name: name.to_string(), + }, + }), + } + } + } + } + } } /// Find the index of the column with the given qualifier and name, @@ -337,7 +374,7 @@ impl DFSchema { /// /// See [Self::index_of_column] for a version that returns an error if the /// column is not found - pub fn maybe_index_of_column(&self, col: &Column) -> Option { + pub fn maybe_index_of_column(&self, col: &Column) -> Result> { self.index_of_column_by_name(col.relation.as_ref(), &col.name) } @@ -347,14 +384,15 @@ impl DFSchema { /// See [Self::maybe_index_of_column] for a version that returns `None` if /// the column is not found pub fn index_of_column(&self, col: &Column) -> Result { - self.maybe_index_of_column(col) + self.maybe_index_of_column(col)? .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self)) } /// Check if the column is in the current schema - pub fn is_column_from_schema(&self, col: &Column) -> bool { - self.index_of_column_by_name(col.relation.as_ref(), &col.name) - .is_some() + pub fn is_column_from_schema(&self, col: &Column) -> Result { + Ok(self + .index_of_column_by_name(col.relation.as_ref(), &col.name)? + .is_some()) } /// Find the field with the given name @@ -378,7 +416,7 @@ impl DFSchema { ) -> Result<(Option<&TableReference>, &Field)> { if let Some(qualifier) = qualifier { let idx = self - .index_of_column_by_name(Some(qualifier), name) + .index_of_column_by_name(Some(qualifier), name)? .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; Ok((self.field_qualifiers[idx].as_ref(), self.field(idx))) } else { @@ -490,7 +528,7 @@ impl DFSchema { name: &str, ) -> Result<&Field> { let idx = self - .index_of_column_by_name(Some(qualifier), name) + .index_of_column_by_name(Some(qualifier), name)? .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; Ok(self.field(idx)) @@ -629,9 +667,9 @@ impl DFSchema { let iter1 = fields1.iter(); let iter2 = fields2.iter(); fields1.len() == fields2.len() && - // all fields have to be the same + // all fields have to be the same iter1 - .zip(iter2) + .zip(iter2) .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2)) } (DataType::Union(fields1, _), DataType::Union(fields2, _)) => { @@ -668,9 +706,9 @@ impl DFSchema { let iter1 = fields1.iter(); let iter2 = fields2.iter(); fields1.len() == fields2.len() && - // all fields have to be the same + // all fields have to be the same iter1 - .zip(iter2) + .zip(iter2) .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2)) } (DataType::Union(fields1, _), DataType::Union(fields2, _)) => { @@ -1178,8 +1216,8 @@ mod tests { .to_string(), expected_help ); - assert!(schema.index_of_column_by_name(None, "y").is_none()); - assert!(schema.index_of_column_by_name(None, "t1.c0").is_none()); + assert!(schema.index_of_column_by_name(None, "y")?.is_none()); + assert!(schema.index_of_column_by_name(None, "t1.c0")?.is_none()); Ok(()) } @@ -1268,28 +1306,28 @@ mod tests { { let col = Column::from_qualified_name("t1.c0"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(schema.is_column_from_schema(&col)); + assert!(schema.is_column_from_schema(&col)?); } // qualified not exists { let col = Column::from_qualified_name("t1.c2"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(!schema.is_column_from_schema(&col)); + assert!(!schema.is_column_from_schema(&col)?); } // unqualified exists { let col = Column::from_name("c0"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(schema.is_column_from_schema(&col)); + assert!(schema.is_column_from_schema(&col)?); } // unqualified not exists { let col = Column::from_name("c2"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(!schema.is_column_from_schema(&col)); + assert!(!schema.is_column_from_schema(&col)?); } Ok(()) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index e9f4f1f80972..6f33bb6e5013 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, OnceLock}; use super::dml::CopyTo; use super::DdlStatement; -use crate::builder::{change_redundant_column, unnest_with_options}; +use crate::builder::unnest_with_options; use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction}; use crate::expr_rewriter::{ create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver, @@ -2193,7 +2193,7 @@ impl SubqueryAlias { alias: impl Into, ) -> Result { let alias = alias.into(); - let fields = change_redundant_column(plan.schema().fields()); + let fields = plan.schema().fields().clone(); let meta_data = plan.schema().as_ref().metadata().clone(); let schema: Schema = DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into(); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6f7c5d379260..139d33bdb285 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -1012,7 +1012,7 @@ pub fn check_all_columns_from_schema( schema: &DFSchema, ) -> Result { for col in columns.iter() { - let exist = schema.is_column_from_schema(col); + let exist = schema.is_column_from_schema(col)?; if !exist { return Ok(false); } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 1519c54dbf68..11c37764b514 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -177,7 +177,7 @@ fn optimize_projections( let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); let necessary_indices = - RequiredIndicies::new().with_exprs(schema, all_exprs_iter); + RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?; let necessary_exprs = necessary_indices.get_required_exprs(schema); return optimize_projections( @@ -217,7 +217,8 @@ fn optimize_projections( // Get all the required column indices at the input, either by the // parent or window expression requirements. - let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); + let required_indices = + child_reqs.with_exprs(&input_schema, &new_window_expr)?; return optimize_projections( Arc::unwrap_or_clone(window.input), @@ -753,7 +754,7 @@ fn rewrite_projection_given_requirements( let exprs_used = indices.get_at_indices(&expr); let required_indices = - RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter()); + RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?; // rewrite the children projection, and if they are changed rewrite the // projection down diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index 60d8ef1a8e6c..2c4a07f84bcc 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -96,7 +96,7 @@ impl RequiredIndicies { // Add indices of the child fields referred to by the expressions in the // parent plan.apply_expressions(|e| { - self.add_expr(schema, e); + self.add_expr(schema, e)?; Ok(TreeNodeRecursion::Continue) })?; Ok(self.compact()) @@ -111,17 +111,18 @@ impl RequiredIndicies { /// /// * `input_schema`: The input schema to analyze for index requirements. /// * `expr`: An expression for which we want to find necessary field indices. - fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) { + fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> Result<()> { // TODO could remove these clones (and visit the expression directly) let mut cols = expr.column_refs(); // Get outer-referenced (subquery) columns: outer_columns(expr, &mut cols); self.indices.reserve(cols.len()); for col in cols { - if let Some(idx) = input_schema.maybe_index_of_column(col) { + if let Some(idx) = input_schema.maybe_index_of_column(col)? { self.indices.push(idx); } } + Ok(()) } /// Adds the indices of the fields referred to by the given expressions @@ -132,17 +133,14 @@ impl RequiredIndicies { /// * `input_schema`: The input schema to analyze for index requirements. /// * `exprs`: the expressions for which we want to find field indices. pub fn with_exprs<'a>( - self, + mut self, schema: &DFSchemaRef, exprs: impl IntoIterator, - ) -> Self { - exprs - .into_iter() - .fold(self, |mut acc, expr| { - acc.add_expr(schema, expr); - acc - }) - .compact() + ) -> Result { + for expr in exprs { + self.add_expr(schema, expr)?; + } + Ok(self.compact()) } /// Adds all `indices` into this instance. diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 2174dc1b85ec..e861c6819239 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1750,7 +1750,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .enumerate() .map(|(i, c)| { let column_index = table_schema - .index_of_column_by_name(None, &c) + .index_of_column_by_name(None, &c)? .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?; if value_indices[column_index].is_some() { return schema_err!(SchemaError::DuplicateUnqualifiedField { diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index d0f80da83d63..3aa19ef22f89 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -269,7 +269,7 @@ pub(crate) fn unproject_sort_expr( // In case of aggregation there could be columns containing aggregation functions we need to unproject if let Some(agg) = agg { - if agg.schema.is_column_from_schema(col_ref) { + if agg.schema.is_column_from_schema(col_ref)? { let new_expr = unproject_agg_exprs(sort_expr.expr, agg, None)?; sort_expr.expr = new_expr; return Ok(sort_expr); diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index b2b6db129ad3..368b11475784 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -1215,18 +1215,19 @@ statement ok create table t1(v1 int) as values(100); ## Query with Ambiguous column reference -query I +query error DataFusion error: Schema error: Schema contains duplicate qualified field name t1\.v1 select count(*) from t1 right outer join t1 on t1.v1 > 0; ----- -1 -query I +query error select t1.v1 from t1 join t1 using(v1) cross join (select struct('foo' as v1) as t1); ---- -100 +DataFusion error: Optimizer rule 'eliminate_cross_join' failed +caused by +Schema error: Schema contains duplicate qualified field name t1.v1 + statement ok drop table t1; diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 9994ca3ba4ef..a31f92712c9f 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1807,11 +1807,23 @@ SELECT id AS col, id+1 AS col FROM users ---- 1 2 +# a reference is ambiguous +query error DataFusion error: Schema error: Ambiguous reference to unqualified field a +select a from (select 1 as a, 2 as a) t; + +# t.a reference is ambiguous +query error DataFusion error: Schema error: Schema contains duplicate qualified field name t\.a +select t.a from (select 1 as a, 2 as a) t; + +# TODO PostgreSQL disallows self-join without giving tables distinct aliases, but some other databases, e.g. Trino, do allow this, so this could work # TODO When joining using USING, the condition columns should appear once in the output, and should be selectible using unqualified name only -query ITIT +query error SELECT * FROM users JOIN users USING (id); ---- -1 Tom 1 Tom +DataFusion error: expand_wildcard_rule +caused by +Schema error: Schema contains duplicate qualified field name users.id + statement ok create view v as select count(id) from users; diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 1cce228527ec..7e7d4c884685 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -1270,7 +1270,7 @@ fn apply_projection(table: DataFrame, substrait_schema: DFSchema) -> Result>()?;