Skip to content

Commit

Permalink
Refactor constraint based ordering satisfaction logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gokselk committed Jan 15, 2025
1 parent ab93279 commit a6c83a7
Showing 1 changed file with 29 additions and 35 deletions.
64 changes: 29 additions & 35 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
internal_err, plan_err, Constraint, Constraints, JoinSide, JoinType, Result,
internal_err, plan_err, Constraint, Constraints, HashMap, JoinSide, JoinType, Result,
};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
Expand Down Expand Up @@ -600,45 +600,39 @@ impl EquivalenceProperties {
return false;
}

// Find orderings that contain all required indices
let prefix_candidates = self
.oeq_class
.iter()
.filter(|&ordering| {
if indices.len() > ordering.len() {
return false;
}
// Check each ordering directly without collecting candidates first
self.oeq_class.iter().any(|ordering| {
if indices.len() > ordering.len() {
return false;
}

// Check if all constraint indices appear in this ordering
indices.iter().all(|&idx| {
ordering.iter().enumerate().any(|(pos, req)| {
// Only handle Column expressions
let Some(col) = req.expr.as_any().downcast_ref::<Column>() else {
return false;
};

// Column index must match
if col.index() != idx {
return false;
}
// Build a map of column positions in the ordering
let mut col_positions = HashMap::with_capacity(ordering.len());
for (pos, req) in ordering.iter().enumerate() {
if let Some(col) = req.expr.as_any().downcast_ref::<Column>() {
col_positions.insert(
col.index(),
(pos, col.nullable(&self.schema).unwrap_or(true)),
);
}
}

// Check if all constraint indices appear in valid positions
if !indices.iter().all(|&idx| {
col_positions
.get(&idx)
.map(|&(pos, nullable)| {
// For unique constraints, verify column is not nullable if it's first/last
if check_null && (pos == 0 || pos == ordering.len() - 1) {
return !col.nullable(&self.schema).unwrap_or(true);
}

true
!check_null
|| (pos != 0 && pos != ordering.len() - 1)
|| !nullable
})
})
})
.collect::<Vec<_>>();

if prefix_candidates.is_empty() {
return false;
}
.unwrap_or(false)
}) {
return false;
}

// Check if any candidate ordering matches requirements prefix
prefix_candidates.iter().any(|&ordering| {
// Check if this ordering matches requirements prefix
let prefix_len = indices.len();
ordering.len() >= prefix_len
&& normalized_reqs[..prefix_len].iter().zip(ordering).all(
Expand Down

0 comments on commit a6c83a7

Please sign in to comment.