From a6c83a7feac5132b8b03541a4ce27227fbe9ada1 Mon Sep 17 00:00:00 2001 From: Goksel Kabadayi <45314116+gokselk@users.noreply.github.com> Date: Wed, 15 Jan 2025 17:04:38 +0300 Subject: [PATCH] Refactor constraint based ordering satisfaction logic --- .../src/equivalence/properties.rs | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 27b183b4d7db5..67bbb5dda2838 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -35,7 +35,7 @@ use crate::{ use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ - internal_err, plan_err, Constraint, Constraints, JoinSide, JoinType, Result, + internal_err, plan_err, Constraint, Constraints, HashMap, JoinSide, JoinType, Result, }; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; @@ -600,45 +600,39 @@ impl EquivalenceProperties { return false; } - // Find orderings that contain all required indices - let prefix_candidates = self - .oeq_class - .iter() - .filter(|&ordering| { - if indices.len() > ordering.len() { - return false; - } + // Check each ordering directly without collecting candidates first + self.oeq_class.iter().any(|ordering| { + if indices.len() > ordering.len() { + return false; + } - // Check if all constraint indices appear in this ordering - indices.iter().all(|&idx| { - ordering.iter().enumerate().any(|(pos, req)| { - // Only handle Column expressions - let Some(col) = req.expr.as_any().downcast_ref::() else { - return false; - }; - - // Column index must match - if col.index() != idx { - return false; - } + // Build a map of column positions in the ordering + let mut col_positions = HashMap::with_capacity(ordering.len()); + for (pos, req) in ordering.iter().enumerate() { + if let Some(col) = req.expr.as_any().downcast_ref::() { + col_positions.insert( + col.index(), + (pos, col.nullable(&self.schema).unwrap_or(true)), + ); + } + } + // Check if all constraint indices appear in valid positions + if !indices.iter().all(|&idx| { + col_positions + .get(&idx) + .map(|&(pos, nullable)| { // For unique constraints, verify column is not nullable if it's first/last - if check_null && (pos == 0 || pos == ordering.len() - 1) { - return !col.nullable(&self.schema).unwrap_or(true); - } - - true + !check_null + || (pos != 0 && pos != ordering.len() - 1) + || !nullable }) - }) - }) - .collect::>(); - - if prefix_candidates.is_empty() { - return false; - } + .unwrap_or(false) + }) { + return false; + } - // Check if any candidate ordering matches requirements prefix - prefix_candidates.iter().any(|&ordering| { + // Check if this ordering matches requirements prefix let prefix_len = indices.len(); ordering.len() >= prefix_len && normalized_reqs[..prefix_len].iter().zip(ordering).all(