diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 984d8ca267d5..5f262d634af3 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -60,6 +60,41 @@ impl Constraints { pub fn is_empty(&self) -> bool { self.inner.is_empty() } + + /// Projects constraints using the given projection indices. + /// Returns None if any of the constraint columns are not included in the projection. + pub fn project(&self, proj_indices: &[usize]) -> Option { + let projected = self + .inner + .iter() + .filter_map(|constraint| { + match constraint { + Constraint::PrimaryKey(indices) => { + let new_indices = + update_elements_with_matching_indices(indices, proj_indices); + // Only keep constraint if all columns are preserved + (new_indices.len() == indices.len()) + .then_some(Constraint::PrimaryKey(new_indices)) + } + Constraint::Unique(indices) => { + let new_indices = + update_elements_with_matching_indices(indices, proj_indices); + // Only keep constraint if all columns are preserved + (new_indices.len() == indices.len()) + .then_some(Constraint::Unique(new_indices)) + } + } + }) + .collect::>(); + + (!projected.is_empty()).then_some(Constraints::new_unverified(projected)) + } +} + +impl Default for Constraints { + fn default() -> Self { + Constraints::empty() + } } impl IntoIterator for Constraints { @@ -73,13 +108,13 @@ impl IntoIterator for Constraints { impl Display for Constraints { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let pk: Vec = self.inner.iter().map(|c| format!("{:?}", c)).collect(); + let pk = self + .inner + .iter() + .map(|c| format!("{:?}", c)) + .collect::>(); let pk = pk.join(", "); - if !pk.is_empty() { - write!(f, " constraints=[{pk}]") - } else { - write!(f, "") - } + write!(f, "constraints=[{pk}]") } } @@ -599,6 +634,24 @@ mod tests { assert_eq!(iter.next(), None); } + #[test] + fn test_project_constraints() { + let constraints = Constraints::new_unverified(vec![ + Constraint::PrimaryKey(vec![1, 2]), + Constraint::Unique(vec![0, 3]), + ]); + + // Project keeping columns 1,2,3 + let projected = constraints.project(&[1, 2, 3]).unwrap(); + assert_eq!( + projected, + Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])]) + ); + + // Project keeping only column 0 - should return None as no constraints are preserved + assert!(constraints.project(&[0]).is_none()); + } + #[test] fn test_get_updated_id_keys() { let fund_dependencies = diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 06b94f804268..b2f47a08f9bc 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -935,6 +935,7 @@ impl TableProvider for ListingTable { session_state, FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema)) .with_file_groups(partitioned_file_lists) + .with_constraints(self.constraints.clone()) .with_statistics(statistics) .with_projection(projection.cloned()) .with_limit(limit) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 095dab9d91ef..352cd4a5663e 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -132,6 +132,7 @@ impl MemTable { state: &SessionState, ) -> Result { let schema = t.schema(); + let constraints = t.constraints(); let exec = t.scan(state, None, &[], None).await?; let partition_count = exec.output_partitioning().partition_count(); @@ -162,7 +163,10 @@ impl MemTable { } } - let exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?; + let mut exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?; + if let Some(cons) = constraints { + exec = exec.with_constraints(cons.clone()); + } if let Some(num_partitions) = output_partitions { let exec = RepartitionExec::try_new( diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 4e76b087abb1..54344d55bbd1 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -35,7 +35,7 @@ use arrow::buffer::Buffer; use arrow_ipc::reader::FileDecoder; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::Statistics; +use datafusion_common::{Constraints, Statistics}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -60,11 +60,16 @@ pub struct ArrowExec { impl ArrowExec { /// Create a new Arrow reader execution plan provided base configurations pub fn new(base_config: FileScanConfig) -> Self { - let (projected_schema, projected_statistics, projected_output_ordering) = - base_config.project(); + let ( + projected_schema, + projected_constraints, + projected_statistics, + projected_output_ordering, + ) = base_config.project(); let cache = Self::compute_properties( Arc::clone(&projected_schema), &projected_output_ordering, + projected_constraints, &base_config, ); Self { @@ -88,12 +93,14 @@ impl ArrowExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( schema: SchemaRef, - projected_output_ordering: &[LexOrdering], + output_ordering: &[LexOrdering], + constraints: Constraints, file_scan_config: &FileScanConfig, ) -> PlanProperties { // Equivalence Properties let eq_properties = - EquivalenceProperties::new_with_orderings(schema, projected_output_ordering); + EquivalenceProperties::new_with_orderings(schema, output_ordering) + .with_constraints(constraints); PlanProperties::new( eq_properties, diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index fb36179c3cf6..87d8964bed6a 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -29,6 +29,7 @@ use crate::physical_plan::{ }; use arrow::datatypes::SchemaRef; +use datafusion_common::Constraints; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -48,11 +49,16 @@ pub struct AvroExec { impl AvroExec { /// Create a new Avro reader execution plan provided base configurations pub fn new(base_config: FileScanConfig) -> Self { - let (projected_schema, projected_statistics, projected_output_ordering) = - base_config.project(); + let ( + projected_schema, + projected_constraints, + projected_statistics, + projected_output_ordering, + ) = base_config.project(); let cache = Self::compute_properties( Arc::clone(&projected_schema), &projected_output_ordering, + projected_constraints, &base_config, ); Self { @@ -73,10 +79,12 @@ impl AvroExec { fn compute_properties( schema: SchemaRef, orderings: &[LexOrdering], + constraints: Constraints, file_scan_config: &FileScanConfig, ) -> PlanProperties { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints); let n_partitions = file_scan_config.file_groups.len(); PlanProperties::new( diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index a00e74cf4fcd..dd5736806eeb 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -40,6 +40,7 @@ use crate::physical_plan::{ use arrow::csv; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; +use datafusion_common::Constraints; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; @@ -209,11 +210,16 @@ impl CsvExecBuilder { newlines_in_values, } = self; - let (projected_schema, projected_statistics, projected_output_ordering) = - base_config.project(); + let ( + projected_schema, + projected_constraints, + projected_statistics, + projected_output_ordering, + ) = base_config.project(); let cache = CsvExec::compute_properties( projected_schema, &projected_output_ordering, + projected_constraints, &base_config, ); @@ -320,10 +326,12 @@ impl CsvExec { fn compute_properties( schema: SchemaRef, orderings: &[LexOrdering], + constraints: Constraints, file_scan_config: &FileScanConfig, ) -> PlanProperties { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints); PlanProperties::new( eq_properties, diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index a5f2bd1760b3..a1cd892336b1 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -33,7 +33,9 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type}; use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics}; +use datafusion_common::{ + exec_err, ColumnStatistics, Constraints, DataFusionError, Statistics, +}; use datafusion_physical_expr::LexOrdering; use log::warn; @@ -114,6 +116,8 @@ pub struct FileScanConfig { /// concurrently, however files *within* a partition will be read /// sequentially, one after the next. pub file_groups: Vec>, + /// Table constraints + pub constraints: Constraints, /// Estimated overall statistics of the files, taking `filters` into account. /// Defaults to [`Statistics::new_unknown`]. pub statistics: Statistics, @@ -146,6 +150,7 @@ impl FileScanConfig { object_store_url, file_schema, file_groups: vec![], + constraints: Constraints::empty(), statistics, projection: None, limit: None, @@ -154,6 +159,12 @@ impl FileScanConfig { } } + /// Set the table constraints of the files + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + /// Set the statistics of the files pub fn with_statistics(mut self, statistics: Statistics) -> Self { self.statistics = statistics; @@ -210,30 +221,31 @@ impl FileScanConfig { self } - /// Project the schema and the statistics on the given column indices - pub fn project(&self) -> (SchemaRef, Statistics, Vec) { + /// Project the schema, constraints, and the statistics on the given column indices + pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec) { if self.projection.is_none() && self.table_partition_cols.is_empty() { return ( Arc::clone(&self.file_schema), + self.constraints.clone(), self.statistics.clone(), self.output_ordering.clone(), ); } - let proj_iter: Box> = match &self.projection { - Some(proj) => Box::new(proj.iter().copied()), - None => Box::new( - 0..(self.file_schema.fields().len() + self.table_partition_cols.len()), - ), + let proj_indices = if let Some(proj) = &self.projection { + proj + } else { + let len = self.file_schema.fields().len() + self.table_partition_cols.len(); + &(0..len).collect::>() }; let mut table_fields = vec![]; let mut table_cols_stats = vec![]; - for idx in proj_iter { - if idx < self.file_schema.fields().len() { - let field = self.file_schema.field(idx); + for idx in proj_indices { + if *idx < self.file_schema.fields().len() { + let field = self.file_schema.field(*idx); table_fields.push(field.clone()); - table_cols_stats.push(self.statistics.column_statistics[idx].clone()) + table_cols_stats.push(self.statistics.column_statistics[*idx].clone()) } else { let partition_idx = idx - self.file_schema.fields().len(); table_fields.push(self.table_partition_cols[partition_idx].to_owned()); @@ -254,10 +266,20 @@ impl FileScanConfig { self.file_schema.metadata().clone(), )); + let projected_constraints = self + .constraints + .project(proj_indices) + .unwrap_or_else(Constraints::empty); + let projected_output_ordering = get_projected_output_ordering(self, &projected_schema); - (projected_schema, table_stats, projected_output_ordering) + ( + projected_schema, + projected_constraints, + table_stats, + projected_output_ordering, + ) } #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro @@ -635,7 +657,7 @@ mod tests { )]), ); - let (proj_schema, proj_statistics, _) = conf.project(); + let (proj_schema, _, proj_statistics, _) = conf.project(); assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1); assert_eq!( proj_schema.field(file_schema.fields().len()).name(), @@ -675,7 +697,7 @@ mod tests { ); // verify the proj_schema includes the last column and exactly the same the field it is defined - let (proj_schema, _proj_statistics, _) = conf.project(); + let (proj_schema, _, _, _) = conf.project(); assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1); assert_eq!( *proj_schema.field(file_schema.fields().len()), @@ -708,7 +730,7 @@ mod tests { )]), ); - let (proj_schema, proj_statistics, _) = conf.project(); + let (proj_schema, _, proj_statistics, _) = conf.project(); assert_eq!( columns(&proj_schema), vec!["date".to_owned(), "c1".to_owned()] diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 4071f9c26b58..7ac062e549c4 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -39,6 +39,7 @@ use crate::physical_plan::{ use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; +use datafusion_common::Constraints; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -66,11 +67,16 @@ impl NdJsonExec { base_config: FileScanConfig, file_compression_type: FileCompressionType, ) -> Self { - let (projected_schema, projected_statistics, projected_output_ordering) = - base_config.project(); + let ( + projected_schema, + projected_constraints, + projected_statistics, + projected_output_ordering, + ) = base_config.project(); let cache = Self::compute_properties( projected_schema, &projected_output_ordering, + projected_constraints, &base_config, ); Self { @@ -100,10 +106,12 @@ impl NdJsonExec { fn compute_properties( schema: SchemaRef, orderings: &[LexOrdering], + constraints: Constraints, file_scan_config: &FileScanConfig, ) -> PlanProperties { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints); PlanProperties::new( eq_properties, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3146d124d9f1..bbf0b8abbb2f 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -111,7 +111,7 @@ impl Debug for FileScanConfig { impl DisplayAs for FileScanConfig { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - let (schema, _, orderings) = self.project(); + let (schema, _, _, orderings) = self.project(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; @@ -126,6 +126,10 @@ impl DisplayAs for FileScanConfig { display_orderings(f, &orderings)?; + if !self.constraints.is_empty() { + write!(f, ", {}", self.constraints)?; + } + Ok(()) } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 83b544a76e11..085f44191b8a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -40,6 +40,7 @@ use crate::{ }; use arrow::datatypes::SchemaRef; +use datafusion_common::Constraints; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -444,12 +445,17 @@ impl ParquetExecBuilder { }) .map(Arc::new); - let (projected_schema, projected_statistics, projected_output_ordering) = - base_config.project(); + let ( + projected_schema, + projected_constraints, + projected_statistics, + projected_output_ordering, + ) = base_config.project(); let cache = ParquetExec::compute_properties( projected_schema, &projected_output_ordering, + projected_constraints, &base_config, ); ParquetExec { @@ -653,10 +659,12 @@ impl ParquetExec { fn compute_properties( schema: SchemaRef, orderings: &[LexOrdering], + constraints: Constraints, file_config: &FileScanConfig, ) -> PlanProperties { PlanProperties::new( - EquivalenceProperties::new_with_orderings(schema, orderings), + EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints), Self::output_partitioning_helper(file_config), // Output Partitioning EmissionType::Incremental, Boundedness::Bounded, diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index a433871ef20d..bf8c5ad3d701 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -133,14 +133,22 @@ impl DdlStatement { constraints, .. }) => { - write!(f, "CreateExternalTable: {name:?}{constraints}") + if constraints.is_empty() { + write!(f, "CreateExternalTable: {name:?}") + } else { + write!(f, "CreateExternalTable: {name:?} {constraints}") + } } DdlStatement::CreateMemoryTable(CreateMemoryTable { name, constraints, .. }) => { - write!(f, "CreateMemoryTable: {name:?}{constraints}") + if constraints.is_empty() { + write!(f, "CreateMemoryTable: {name:?}") + } else { + write!(f, "CreateMemoryTable: {name:?} {constraints}") + } } DdlStatement::CreateView(CreateView { name, .. }) => { write!(f, "CreateView: {name:?}") diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 2c7335649b28..85440e0d3e12 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -34,7 +34,9 @@ use crate::{ use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{internal_err, plan_err, JoinSide, JoinType, Result}; +use datafusion_common::{ + internal_err, plan_err, Constraint, Constraints, HashMap, JoinSide, JoinType, Result, +}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_physical_expr_common::utils::ExprPropertiesNode; @@ -131,6 +133,8 @@ pub struct EquivalenceProperties { /// TODO: We do not need to track constants separately, they can be tracked /// inside `eq_group` as `Literal` expressions. constants: Vec, + /// Table constraints + constraints: Constraints, /// Schema associated with this object. schema: SchemaRef, } @@ -142,16 +146,24 @@ impl EquivalenceProperties { eq_group: EquivalenceGroup::empty(), oeq_class: OrderingEquivalenceClass::empty(), constants: vec![], + constraints: Constraints::empty(), schema, } } + /// Adds constraints to the properties. + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + /// Creates a new `EquivalenceProperties` object with the given orderings. pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self { Self { eq_group: EquivalenceGroup::empty(), oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()), constants: vec![], + constraints: Constraints::empty(), schema, } } @@ -181,6 +193,10 @@ impl EquivalenceProperties { &self.constants } + pub fn constraints(&self) -> &Constraints { + &self.constraints + } + /// Returns the output ordering of the properties. pub fn output_ordering(&self) -> Option { let constants = self.constants(); @@ -525,6 +541,12 @@ impl EquivalenceProperties { let mut eq_properties = self.clone(); // First, standardize the given requirement: let normalized_reqs = eq_properties.normalize_sort_requirements(reqs); + + // Check whether given ordering is satisfied by constraints first + if self.satisfied_by_constraints(&normalized_reqs) { + return true; + } + for normalized_req in normalized_reqs { // Check whether given ordering is satisfied if !eq_properties.ordering_satisfy_single(&normalized_req) { @@ -548,6 +570,82 @@ impl EquivalenceProperties { true } + /// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique). + /// Returns true if any constraint fully satisfies the requirements. + fn satisfied_by_constraints( + &self, + normalized_reqs: &[PhysicalSortRequirement], + ) -> bool { + self.constraints.iter().any(|constraint| match constraint { + Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => self + .satisfied_by_constraint( + normalized_reqs, + indices, + matches!(constraint, Constraint::Unique(_)), + ), + }) + } + + /// Checks if sort requirements are satisfied by a constraint (primary key or unique). + /// Returns true if the constraint indices form a valid prefix of an existing ordering + /// that matches the requirements. For unique constraints, also verifies nullable columns. + fn satisfied_by_constraint( + &self, + normalized_reqs: &[PhysicalSortRequirement], + indices: &[usize], + check_null: bool, + ) -> bool { + // Requirements must contain indices + if indices.len() > normalized_reqs.len() { + return false; + } + + // Iterate over all orderings + self.oeq_class.iter().any(|ordering| { + if indices.len() > ordering.len() { + return false; + } + + // Build a map of column positions in the ordering + let mut col_positions = HashMap::with_capacity(ordering.len()); + for (pos, req) in ordering.iter().enumerate() { + if let Some(col) = req.expr.as_any().downcast_ref::() { + col_positions.insert( + col.index(), + (pos, col.nullable(&self.schema).unwrap_or(true)), + ); + } + } + + // Check if all constraint indices appear in valid positions + if !indices.iter().all(|&idx| { + col_positions + .get(&idx) + .map(|&(pos, nullable)| { + // For unique constraints, verify column is not nullable if it's first/last + !check_null + || (pos != 0 && pos != ordering.len() - 1) + || !nullable + }) + .unwrap_or(false) + }) { + return false; + } + + // Check if this ordering matches requirements prefix + let ordering_len = ordering.len(); + normalized_reqs.len() >= ordering_len + && normalized_reqs[..ordering_len].iter().zip(ordering).all( + |(req, existing)| { + req.expr.eq(&existing.expr) + && req + .options + .map_or(true, |req_opts| req_opts == existing.options) + }, + ) + }) + } + /// Determines whether the ordering specified by the given sort requirement /// is satisfied based on the orderings within, equivalence classes, and /// constant expressions. @@ -966,21 +1064,46 @@ impl EquivalenceProperties { projected_constants } - /// Projects the equivalences within according to `projection_mapping` + /// Projects constraints according to the given projection mapping. + /// + /// This function takes a projection mapping and extracts the column indices of the target columns. + /// It then projects the constraints to only include relationships between + /// columns that exist in the projected output. + /// + /// # Arguments + /// + /// * `mapping` - A reference to `ProjectionMapping` that defines how expressions are mapped + /// in the projection operation + /// + /// # Returns + /// + /// Returns a new `Constraints` object containing only the constraints + /// that are valid for the projected columns. + fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option { + let indices = mapping + .iter() + .filter_map(|(_, target)| target.as_any().downcast_ref::()) + .map(|col| col.index()) + .collect::>(); + debug_assert_eq!(mapping.map.len(), indices.len()); + self.constraints.project(&indices) + } + + /// Projects the equivalences within according to `mapping` /// and `output_schema`. - pub fn project( - &self, - projection_mapping: &ProjectionMapping, - output_schema: SchemaRef, - ) -> Self { - let projected_constants = self.projected_constants(projection_mapping); - let projected_eq_group = self.eq_group.project(projection_mapping); - let projected_orderings = self.projected_orderings(projection_mapping); + pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self { + let eq_group = self.eq_group.project(mapping); + let oeq_class = OrderingEquivalenceClass::new(self.projected_orderings(mapping)); + let constants = self.projected_constants(mapping); + let constraints = self + .projected_constraints(mapping) + .unwrap_or_else(Constraints::empty); Self { - eq_group: projected_eq_group, - oeq_class: OrderingEquivalenceClass::new(projected_orderings), - constants: projected_constants, schema: output_schema, + eq_group, + oeq_class, + constants, + constraints, } } @@ -2262,7 +2385,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{Fields, TimeUnit}; - use datafusion_common::ScalarValue; + use datafusion_common::{Constraint, ScalarValue}; use datafusion_expr::Operator; use datafusion_functions::string::concat; @@ -4221,4 +4344,181 @@ mod tests { Ok(()) } + + #[test] + fn test_ordering_satisfaction_with_key_constraints() -> Result<()> { + let pk_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + ])); + + let unique_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + ])); + + // Test cases to run + let test_cases = vec![ + // (name, schema, constraint, base_ordering, satisfied_orderings, unsatisfied_orderings) + ( + "single column primary key", + &pk_schema, + vec![Constraint::PrimaryKey(vec![0])], + vec!["a"], // base ordering + vec![vec!["a", "b"], vec!["a", "c", "d"]], + vec![vec!["b", "a"], vec!["c", "a"]], + ), + ( + "single column unique", + &unique_schema, + vec![Constraint::Unique(vec![0])], + vec!["a"], // base ordering + vec![vec!["a", "b"], vec!["a", "c", "d"]], + vec![vec!["b", "a"], vec!["c", "a"]], + ), + ( + "multi-column primary key", + &pk_schema, + vec![Constraint::PrimaryKey(vec![0, 1])], + vec!["a", "b"], // base ordering + vec![vec!["a", "b", "c"], vec!["a", "b", "d"]], + vec![vec!["b", "a"], vec!["a", "c", "b"]], + ), + ( + "multi-column unique", + &unique_schema, + vec![Constraint::Unique(vec![0, 1])], + vec!["a", "b"], // base ordering + vec![vec!["a", "b", "c"], vec!["a", "b", "d"]], + vec![vec!["b", "a"], vec!["c", "a", "b"]], + ), + ( + "nullable unique", + &unique_schema, + vec![Constraint::Unique(vec![2, 3])], + vec!["c", "d"], // base ordering + vec![], + vec![vec!["c", "d", "a"]], + ), + ( + "ordering with arbitrary column unique", + &unique_schema, + vec![Constraint::Unique(vec![0, 1])], + vec!["a", "c", "b"], // base ordering + vec![vec!["a", "c", "b", "d"]], + vec![vec!["a", "b", "d"]], + ), + ( + "ordering with arbitrary column pk", + &pk_schema, + vec![Constraint::PrimaryKey(vec![0, 1])], + vec!["a", "c", "b"], // base ordering + vec![vec!["a", "c", "b", "d"]], + vec![vec!["a", "b", "d"]], + ), + ( + "ordering with arbitrary column pk complex", + &pk_schema, + vec![Constraint::PrimaryKey(vec![3, 1])], + vec!["b", "a", "d"], // base ordering + vec![vec!["b", "a", "d", "c"]], + vec![vec!["b", "c", "d", "a"], vec!["b", "a", "c", "d"]], + ), + ]; + + for ( + name, + schema, + constraints, + base_order, + satisfied_orders, + unsatisfied_orders, + ) in test_cases + { + let mut eq_properties = EquivalenceProperties::new(Arc::clone(schema)); + + // Convert base ordering + let base_ordering = LexOrdering::new( + base_order + .iter() + .map(|col_name| PhysicalSortExpr { + expr: col(col_name, schema).unwrap(), + options: SortOptions::default(), + }) + .collect(), + ); + + // Convert string column names to orderings + let satisfied_orderings: Vec = satisfied_orders + .iter() + .map(|cols| { + LexOrdering::new( + cols.iter() + .map(|col_name| PhysicalSortExpr { + expr: col(col_name, schema).unwrap(), + options: SortOptions::default(), + }) + .collect(), + ) + }) + .collect(); + + let unsatisfied_orderings: Vec = unsatisfied_orders + .iter() + .map(|cols| { + LexOrdering::new( + cols.iter() + .map(|col_name| PhysicalSortExpr { + expr: col(col_name, schema).unwrap(), + options: SortOptions::default(), + }) + .collect(), + ) + }) + .collect(); + + // Test that orderings are not satisfied before adding constraints + for ordering in &satisfied_orderings { + assert!( + !eq_properties.ordering_satisfy(ordering), + "{}: ordering {:?} should not be satisfied before adding constraints", + name, + ordering + ); + } + + // Add base ordering + eq_properties.add_new_ordering(base_ordering); + + // Add constraints + eq_properties = + eq_properties.with_constraints(Constraints::new_unverified(constraints)); + + // Test that expected orderings are now satisfied + for ordering in &satisfied_orderings { + assert!( + eq_properties.ordering_satisfy(ordering), + "{}: ordering {:?} should be satisfied after adding constraints", + name, + ordering + ); + } + + // Test that unsatisfied orderings remain unsatisfied + for ordering in &unsatisfied_orderings { + assert!( + !eq_properties.ordering_satisfy(ordering), + "{}: ordering {:?} should not be satisfied after adding constraints", + name, + ordering + ); + } + } + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index cc8d6e74f4b9..fb29254249f6 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -39,7 +39,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::{UInt16Array, UInt32Array, UInt64Array, UInt8Array}; use datafusion_common::stats::Precision; -use datafusion_common::{internal_err, not_impl_err, Result}; +use datafusion_common::{internal_err, not_impl_err, Constraint, Constraints, Result}; use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; @@ -500,7 +500,7 @@ impl AggregateExec { }; // construct a map from the input expression to the output expression of the Aggregation group by - let projection_mapping = + let group_expr_mapping = ProjectionMapping::try_new(&group_by.expr, &input.schema())?; let required_input_ordering = @@ -509,7 +509,7 @@ impl AggregateExec { let cache = Self::compute_properties( &input, Arc::clone(&schema), - &projection_mapping, + &group_expr_mapping, &mode, &input_order_mode, ); @@ -645,14 +645,33 @@ impl AggregateExec { pub fn compute_properties( input: &Arc, schema: SchemaRef, - projection_mapping: &ProjectionMapping, + group_expr_mapping: &ProjectionMapping, mode: &AggregateMode, input_order_mode: &InputOrderMode, ) -> PlanProperties { // Construct equivalence properties: - let eq_properties = input + let mut eq_properties = input .equivalence_properties() - .project(projection_mapping, schema); + .project(group_expr_mapping, schema); + + // Group by expression will be a distinct value after the aggregation. + // Add it into the constraint set. + let mut constraints = eq_properties.constraints().to_vec(); + let new_constraint = Constraint::Unique( + group_expr_mapping + .map + .iter() + .filter_map(|(_, target_col)| { + target_col + .as_any() + .downcast_ref::() + .map(|c| c.index()) + }) + .collect(), + ); + constraints.push(new_constraint); + eq_properties = + eq_properties.with_constraints(Constraints::new_unverified(constraints)); // Get output partitioning: let input_partitioning = input.output_partitioning().clone(); @@ -661,7 +680,7 @@ impl AggregateExec { // but needs to respect aliases (e.g. mapping in the GROUP BY // expression). let input_eq_properties = input.equivalence_properties(); - input_partitioning.project(projection_mapping, input_eq_properties) + input_partitioning.project(group_expr_mapping, input_eq_properties) } else { input_partitioning.clone() }; diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 5f0b229ce92a..6d7935553116 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -28,7 +28,7 @@ use tokio::task::JoinSet; use datafusion_common::config::ConfigOptions; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{exec_err, Constraints, Result}; pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; use datafusion_execution::TaskContext; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -716,6 +716,12 @@ impl PlanProperties { self } + /// Overwrite constraints with its new value. + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.eq_properties = self.eq_properties.with_constraints(constraints); + self + } + pub fn equivalence_properties(&self) -> &EquivalenceProperties { &self.eq_properties } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 5e8ee713703b..ed0bfa75d14d 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -34,7 +34,9 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow_array::RecordBatchOptions; use arrow_schema::Schema; -use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue}; +use datafusion_common::{ + internal_err, plan_err, project_schema, Constraints, Result, ScalarValue, +}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -88,14 +90,25 @@ impl DisplayAs for MemoryExec { }) .unwrap_or_default(); + let constraints = self.cache.equivalence_properties().constraints(); + let constraints = if constraints.is_empty() { + String::new() + } else { + format!(", {}", constraints) + }; + if self.show_sizes { write!( f, - "MemoryExec: partitions={}, partition_sizes={partition_sizes:?}{output_ordering}", + "MemoryExec: partitions={}, partition_sizes={partition_sizes:?}{output_ordering}{constraints}", partition_sizes.len(), ) } else { - write!(f, "MemoryExec: partitions={}", partition_sizes.len(),) + write!( + f, + "MemoryExec: partitions={}{output_ordering}{constraints}", + partition_sizes.len(), + ) } } } @@ -164,8 +177,13 @@ impl MemoryExec { projection: Option>, ) -> Result { let projected_schema = project_schema(&schema, projection.as_ref())?; - let cache = - Self::compute_properties(Arc::clone(&projected_schema), &[], partitions); + let constraints = Constraints::empty(); + let cache = Self::compute_properties( + Arc::clone(&projected_schema), + &[], + constraints, + partitions, + ); Ok(Self { partitions: partitions.to_vec(), schema, @@ -255,7 +273,12 @@ impl MemoryExec { } let partitions = vec![batches]; - let cache = Self::compute_properties(Arc::clone(&schema), &[], &partitions); + let cache = Self::compute_properties( + Arc::clone(&schema), + &[], + Constraints::empty(), + &partitions, + ); Ok(Self { partitions, schema: Arc::clone(&schema), @@ -267,12 +290,22 @@ impl MemoryExec { }) } + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.cache = self.cache.with_constraints(constraints); + self + } + /// Set `show_sizes` to determine whether to display partition sizes pub fn with_show_sizes(mut self, show_sizes: bool) -> Self { self.show_sizes = show_sizes; self } + /// Ref to constraints + pub fn constraints(&self) -> &Constraints { + self.cache.equivalence_properties().constraints() + } + /// Ref to partitions pub fn partitions(&self) -> &[Vec] { &self.partitions @@ -377,10 +410,12 @@ impl MemoryExec { fn compute_properties( schema: SchemaRef, orderings: &[LexOrdering], + constraints: Constraints, partitions: &[Vec], ) -> PlanProperties { PlanProperties::new( - EquivalenceProperties::new_with_orderings(schema, orderings), + EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints), Partitioning::UnknownPartitioning(partitions.len()), EmissionType::Incremental, Boundedness::Bounded, diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8306275b11..eb75eb09482d 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -783,6 +783,40 @@ impl From for JoinSide { } } +impl From<&protobuf::Constraint> for Constraint { + fn from(value: &protobuf::Constraint) -> Self { + match &value.constraint_mode { + Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => { + Constraint::PrimaryKey( + elem.indices.iter().map(|&item| item as usize).collect(), + ) + } + Some(protobuf::constraint::ConstraintMode::Unique(elem)) => { + Constraint::Unique( + elem.indices.iter().map(|&item| item as usize).collect(), + ) + } + None => panic!("constraint_mode not set"), + } + } +} + +impl TryFrom<&protobuf::Constraints> for Constraints { + type Error = DataFusionError; + + fn try_from( + constraints: &protobuf::Constraints, + ) -> datafusion_common::Result { + Ok(Constraints::new_unverified( + constraints + .constraints + .iter() + .map(|item| item.into()) + .collect(), + )) + } +} + impl TryFrom<&protobuf::Statistics> for Statistics { type Error = DataFusionError; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 416227f70de9..dc60f7c7ff4c 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -965,9 +965,6 @@ message PhysicalSortExprNodeCollection { } message FileScanExecConf { - // Was repeated ConfigOption options = 10; - reserved 10; - repeated FileGroup file_groups = 1; datafusion_common.Schema schema = 2; repeated uint32 projection = 4; @@ -976,6 +973,11 @@ message FileScanExecConf { repeated string table_partition_cols = 7; string object_store_url = 8; repeated PhysicalSortExprNodeCollection output_ordering = 9; + + // Was repeated ConfigOption options = 10; + reserved 10; + + datafusion_common.Constraints constraints = 11; } message ParquetScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index cffb63018676..3b43acb26609 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5529,6 +5529,9 @@ impl serde::Serialize for FileScanExecConf { if !self.output_ordering.is_empty() { len += 1; } + if self.constraints.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -5554,6 +5557,9 @@ impl serde::Serialize for FileScanExecConf { if !self.output_ordering.is_empty() { struct_ser.serialize_field("outputOrdering", &self.output_ordering)?; } + if let Some(v) = self.constraints.as_ref() { + struct_ser.serialize_field("constraints", v)?; + } struct_ser.end() } } @@ -5576,6 +5582,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "objectStoreUrl", "output_ordering", "outputOrdering", + "constraints", ]; #[allow(clippy::enum_variant_names)] @@ -5588,6 +5595,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { TablePartitionCols, ObjectStoreUrl, OutputOrdering, + Constraints, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5617,6 +5625,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), "objectStoreUrl" | "object_store_url" => Ok(GeneratedField::ObjectStoreUrl), "outputOrdering" | "output_ordering" => Ok(GeneratedField::OutputOrdering), + "constraints" => Ok(GeneratedField::Constraints), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5644,6 +5653,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut table_partition_cols__ = None; let mut object_store_url__ = None; let mut output_ordering__ = None; + let mut constraints__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -5697,6 +5707,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } output_ordering__ = Some(map_.next_value()?); } + GeneratedField::Constraints => { + if constraints__.is_some() { + return Err(serde::de::Error::duplicate_field("constraints")); + } + constraints__ = map_.next_value()?; + } } } Ok(FileScanExecConf { @@ -5708,6 +5724,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { table_partition_cols: table_partition_cols__.unwrap_or_default(), object_store_url: object_store_url__.unwrap_or_default(), output_ordering: output_ordering__.unwrap_or_default(), + constraints: constraints__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d2fda5dc8892..e0c5d7302215 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1442,6 +1442,8 @@ pub struct FileScanExecConf { pub object_store_url: ::prost::alloc::string::String, #[prost(message, repeated, tag = "9")] pub output_ordering: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "11")] + pub constraints: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index d1fe48cfec74..3b794026e0e0 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -485,6 +485,8 @@ pub fn parse_protobuf_file_scan_config( } else { Some(projection) }; + + let constraints = convert_required!(proto.constraints)?; let statistics = convert_required!(proto.statistics)?; let file_groups: Vec> = proto @@ -532,6 +534,7 @@ pub fn parse_protobuf_file_scan_config( object_store_url, file_schema, file_groups, + constraints, statistics, projection, limit: proto.limit.as_ref().map(|sl| sl.limit as usize), diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 000d0521def3..a22ff3cd4500 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -528,6 +528,7 @@ pub fn serialize_file_scan_config( physical_sort_expr_nodes: e, }) .collect::>(), + constraints: Some(conf.constraints.clone().into()), }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0a6ea6c7ff85..af6ac58e1806 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -94,7 +94,7 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_err, not_impl_err, DataFusionError, Result, UnnestOptions, + internal_err, not_impl_err, Constraints, DataFusionError, Result, UnnestOptions, }; use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, @@ -712,6 +712,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { "/path/to/file.parquet".to_string(), 1024, )]], + constraints: Constraints::empty(), statistics: Statistics { num_rows: Precision::Inexact(100), total_byte_size: Precision::Inexact(1024), @@ -748,6 +749,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { let scan_config = FileScanConfig { object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![file_group]], + constraints: Constraints::empty(), statistics: Statistics::new_unknown(&schema), file_schema: schema, projection: Some(vec![0, 1]), @@ -776,6 +778,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { "/path/to/file.parquet".to_string(), 1024, )]], + constraints: Constraints::empty(), statistics: Statistics { num_rows: Precision::Inexact(100), total_byte_size: Precision::Inexact(1024), diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index df7e21c2da44..63d285cc93de 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3918,7 +3918,7 @@ physical_plan 04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true +07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true # drop table multiple_ordered_table_with_pk statement ok @@ -3959,7 +3959,7 @@ physical_plan 04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true +07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true statement ok set datafusion.execution.target_partitions = 1; @@ -3981,7 +3981,7 @@ physical_plan 01)AggregateExec: mode=Single, gby=[c@0 as c, sum1@1 as sum1], aggr=[], ordering_mode=PartiallySorted([0]) 02)--ProjectionExec: expr=[c@0 as c, sum(multiple_ordered_table_with_pk.d)@1 as sum1] 03)----AggregateExec: mode=Single, gby=[c@0 as c], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted -04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true +04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true query TT EXPLAIN SELECT c, sum1, SUM(b) OVER() as sumb @@ -4001,7 +4001,7 @@ physical_plan 02)--WindowAggExec: wdw=[sum(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 03)----ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1] 04)------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true query TT EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1 @@ -4032,10 +4032,10 @@ physical_plan 03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, b@1)], projection=[c@0, sum1@2, c@3, sum1@5] 04)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1] 05)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true 07)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1] 08)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true +09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true query TT EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1 @@ -4064,10 +4064,10 @@ physical_plan 02)--CrossJoinExec 03)----ProjectionExec: expr=[c@0 as c, sum(multiple_ordered_table_with_pk.d)@1 as sum1] 04)------AggregateExec: mode=Single, gby=[c@0 as c], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted -05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true 06)----ProjectionExec: expr=[c@0 as c, sum(multiple_ordered_table_with_pk.d)@1 as sum1] 07)------AggregateExec: mode=Single, gby=[c@0 as c], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted -08)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true +08)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], constraints=[PrimaryKey([3])], has_header=true # we do not generate physical plan for Repartition yet (e.g Distribute By queries). query TT @@ -4106,10 +4106,10 @@ physical_plan 01)UnionExec 02)--ProjectionExec: expr=[c@0 as c, a@1 as a, sum(multiple_ordered_table_with_pk.d)@2 as sum1] 03)----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted -04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true +04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], constraints=[PrimaryKey([3])], has_header=true 05)--ProjectionExec: expr=[c@0 as c, a@1 as a, sum(multiple_ordered_table_with_pk.d)@2 as sum1] 06)----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted -07)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true +07)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], constraints=[PrimaryKey([3])], has_header=true # table scan should be simplified. query TT @@ -4124,7 +4124,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[c@0 as c, a@1 as a, sum(multiple_ordered_table_with_pk.d)@2 as sum1] 02)--AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted -03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], constraints=[PrimaryKey([3])], has_header=true # limit should be simplified query TT @@ -4143,7 +4143,7 @@ physical_plan 01)ProjectionExec: expr=[c@0 as c, a@1 as a, sum(multiple_ordered_table_with_pk.d)@2 as sum1] 02)--GlobalLimitExec: skip=0, fetch=5 03)----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted -04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true +04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], constraints=[PrimaryKey([3])], has_header=true statement ok set datafusion.execution.target_partitions = 8; diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 8d96fe47f6b3..0f8417169725 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1306,3 +1306,57 @@ ORDER BY a DESC; ---- 0.5 0 + +# Test that when a column has a unique non-null constraint (via PRIMARY KEY) and is ordered (via WITH ORDER), +# we can eliminate sorts on derived lexicographical orderings that start with that column. +statement ok +CREATE EXTERNAL TABLE table_with_ordered_pk ( + c1 INT, + c2 INT, + c3 INT, + PRIMARY KEY (c1) +) +STORED AS CSV +WITH ORDER (c1 ASC) +OPTIONS ('format.has_header' 'true') +LOCATION '../core/tests/data/aggregate_agg_multi_order.csv'; + +query TT +EXPLAIN SELECT c1, c2 FROM table_with_ordered_pk ORDER BY c1, c2; +---- +logical_plan +01)Sort: table_with_ordered_pk.c1 ASC NULLS LAST, table_with_ordered_pk.c2 ASC NULLS LAST +02)--TableScan: table_with_ordered_pk projection=[c1, c2] +physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], constraints=[PrimaryKey([0])], has_header=true + +statement ok +drop table table_with_ordered_pk; + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.explain.physical_plan_only = true; + +# Aggregation operation also can introduce unique values +statement ok +CREATE EXTERNAL TABLE table_with_ordered_not_null ( + c1 INT NOT NULL, + c2 INT, + c3 INT, +) +STORED AS CSV +WITH ORDER (c1) +OPTIONS ('format.has_header' 'true') +LOCATION '../core/tests/data/aggregate_agg_multi_order.csv'; + +query TT +EXPLAIN SELECT c1, SUM(c2) as sum_c2 FROM table_with_ordered_not_null GROUP BY c1 ORDER BY c1, sum_c2; +---- +physical_plan +01)ProjectionExec: expr=[c1@0 as c1, sum(table_with_ordered_not_null.c2)@1 as sum_c2] +02)--AggregateExec: mode=Single, gby=[c1@0 as c1], aggr=[sum(table_with_ordered_not_null.c2)], ordering_mode=Sorted +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], has_header=true + +statement ok +drop table table_with_ordered_not_null;