diff --git a/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs index b1ee24a7a373..604d1a1000c3 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs @@ -16,8 +16,9 @@ // under the License. use crate::fuzz_cases::equivalence::utils::{ - create_random_schema, generate_table_for_eq_properties, is_table_same_after_sort, - TestScalarUDF, + convert_to_orderings, create_random_schema, create_test_schema_2, + generate_table_for_eq_properties, generate_table_for_orderings, + is_table_same_after_sort, TestScalarUDF, }; use arrow_schema::SortOptions; use datafusion_common::{DFSchema, Result}; @@ -158,3 +159,66 @@ fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> { Ok(()) } + +// This test checks given a table is ordered with `[a ASC, b ASC, c ASC, d ASC]` and `[a ASC, c ASC, b ASC, d ASC]` +// whether the table is also ordered with `[a ASC, b ASC, d ASC]` and `[a ASC, c ASC, d ASC]` +// Since these orderings cannot be deduced, these orderings shouldn't be satisfied by the table generated. +// For background see discussion: https://github.com/apache/datafusion/issues/12700#issuecomment-2411134296 +#[test] +fn test_ordering_satisfy_on_data() -> Result<()> { + let schema = create_test_schema_2()?; + let col_a = &col("a", &schema)?; + let col_b = &col("b", &schema)?; + let col_c = &col("c", &schema)?; + let col_d = &col("d", &schema)?; + + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + + let orderings = vec![ + // [a ASC, b ASC, c ASC, d ASC] + vec![ + (col_a, option_asc), + (col_b, option_asc), + (col_c, option_asc), + (col_d, option_asc), + ], + // [a ASC, c ASC, b ASC, d ASC] + vec![ + (col_a, option_asc), + (col_c, option_asc), + (col_b, option_asc), + (col_d, option_asc), + ], + ]; + let orderings = convert_to_orderings(&orderings); + + let batch = generate_table_for_orderings(orderings, schema, 1000, 10)?; + + // [a ASC, c ASC, d ASC] cannot be deduced + let ordering = vec![ + (col_a, option_asc), + (col_c, option_asc), + (col_d, option_asc), + ]; + let ordering = convert_to_orderings(&[ordering])[0].clone(); + assert!(!is_table_same_after_sort(ordering, batch.clone())?); + + // [a ASC, b ASC, d ASC] cannot be deduced + let ordering = vec![ + (col_a, option_asc), + (col_b, option_asc), + (col_d, option_asc), + ]; + let ordering = convert_to_orderings(&[ordering])[0].clone(); + assert!(!is_table_same_after_sort(ordering, batch.clone())?); + + // [a ASC, b ASC] can be deduced + let ordering = vec![(col_a, option_asc), (col_b, option_asc)]; + let ordering = convert_to_orderings(&[ordering])[0].clone(); + assert!(is_table_same_after_sort(ordering, batch.clone())?); + + Ok(()) +} diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs index e51dabd6437f..ce3afba81ee2 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs @@ -15,23 +15,29 @@ // specific language governing permissions and limitations // under the License. // -// use datafusion_physical_expr::expressions::{col, Column}; use datafusion::physical_plan::expressions::col; use datafusion::physical_plan::expressions::Column; use datafusion_physical_expr::{ConstExpr, EquivalenceProperties, PhysicalSortExpr}; use std::any::Any; +use std::cmp::Ordering; use std::sync::Arc; use arrow::compute::{lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field, Schema}; -use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, UInt32Array}; +use arrow_array::{ + ArrayRef, Float32Array, Float64Array, PrimitiveArray, RecordBatch, UInt32Array, +}; use arrow_schema::{SchemaRef, SortOptions}; +use datafusion_common::utils::{ + compare_rows, get_record_batch_at_indices, get_row_at_idx, +}; use datafusion_common::{exec_err, plan_datafusion_err, DataFusionError, Result}; - use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use datafusion_physical_expr::equivalence::{EquivalenceClass, ProjectionMapping}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; + use itertools::izip; use rand::prelude::*; @@ -67,7 +73,7 @@ pub fn output_schema( } // Generate a schema which consists of 6 columns (a, b, c, d, e, f) -fn create_test_schema_2() -> Result { +pub fn create_test_schema_2() -> Result { let a = Field::new("a", DataType::Float64, true); let b = Field::new("b", DataType::Float64, true); let c = Field::new("c", DataType::Float64, true); @@ -374,6 +380,114 @@ pub fn generate_table_for_eq_properties( Ok(RecordBatch::try_from_iter(res)?) } +// Generate a table that satisfies the given orderings; +pub fn generate_table_for_orderings( + mut orderings: Vec, + schema: SchemaRef, + n_elem: usize, + n_distinct: usize, +) -> Result { + let mut rng = StdRng::seed_from_u64(23); + + assert!(!orderings.is_empty()); + // Sort the inner vectors by their lengths (longest first) + orderings.sort_by_key(|v| std::cmp::Reverse(v.len())); + + let arrays = schema + .fields + .iter() + .map(|field| { + ( + field.name(), + generate_random_f64_array(n_elem, n_distinct, &mut rng), + ) + }) + .collect::>(); + let batch = RecordBatch::try_from_iter(arrays)?; + + // Sort batch according to first ordering expression + let sort_columns = get_sort_columns(&batch, &orderings[0])?; + let sort_indices = lexsort_to_indices(&sort_columns, None)?; + let mut batch = get_record_batch_at_indices(&batch, &sort_indices)?; + + // prune out rows that is invalid according to remaining orderings. + for ordering in orderings.iter().skip(1) { + let sort_columns = get_sort_columns(&batch, ordering)?; + + // Collect sort options and values into separate vectors. + let (sort_options, sort_col_values): (Vec<_>, Vec<_>) = sort_columns + .into_iter() + .map(|sort_col| (sort_col.options.unwrap(), sort_col.values)) + .unzip(); + + let mut cur_idx = 0; + let mut keep_indices = vec![cur_idx as u32]; + for next_idx in 1..batch.num_rows() { + let cur_row = get_row_at_idx(&sort_col_values, cur_idx)?; + let next_row = get_row_at_idx(&sort_col_values, next_idx)?; + + if compare_rows(&cur_row, &next_row, &sort_options)? != Ordering::Greater { + // next row satisfies ordering relation given, compared to the current row. + keep_indices.push(next_idx as u32); + cur_idx = next_idx; + } + } + // Only keep valid rows, that satisfies given ordering relation. + batch = get_record_batch_at_indices( + &batch, + &PrimitiveArray::from_iter_values(keep_indices), + )?; + } + + Ok(batch) +} + +// Convert each tuple to PhysicalSortExpr +pub fn convert_to_sort_exprs( + in_data: &[(&Arc, SortOptions)], +) -> Vec { + in_data + .iter() + .map(|(expr, options)| PhysicalSortExpr { + expr: Arc::clone(*expr), + options: *options, + }) + .collect() +} + +// Convert each inner tuple to PhysicalSortExpr +pub fn convert_to_orderings( + orderings: &[Vec<(&Arc, SortOptions)>], +) -> Vec> { + orderings + .iter() + .map(|sort_exprs| convert_to_sort_exprs(sort_exprs)) + .collect() +} + +// Utility function to generate random f64 array +fn generate_random_f64_array( + n_elems: usize, + n_distinct: usize, + rng: &mut StdRng, +) -> ArrayRef { + let values: Vec = (0..n_elems) + .map(|_| rng.gen_range(0..n_distinct) as f64 / 2.0) + .collect(); + Arc::new(Float64Array::from_iter_values(values)) +} + +// Helper function to get sort columns from a batch +fn get_sort_columns( + batch: &RecordBatch, + ordering: LexOrderingRef, +) -> Result> { + ordering + .iter() + .map(|expr| expr.evaluate_to_sort_column(batch)) + .collect::>>() +} + #[derive(Debug, Clone)] pub struct TestScalarUDF { pub(crate) signature: Signature, diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 7726458a46ac..253f1196491b 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -72,6 +72,7 @@ pub fn add_offset_to_expr( #[cfg(test)] mod tests { + use super::*; use crate::expressions::col; use crate::PhysicalSortExpr; @@ -385,14 +386,6 @@ mod tests { let schema = eq_properties.schema(); let mut schema_vec = vec![None; schema.fields.len()]; - // Utility closure to generate random array - let mut generate_random_array = |num_elems: usize, max_val: usize| -> ArrayRef { - let values: Vec = (0..num_elems) - .map(|_| rng.gen_range(0..max_val) as f64 / 2.0) - .collect(); - Arc::new(Float64Array::from_iter_values(values)) - }; - // Fill constant columns for constant in &eq_properties.constants { let col = constant.expr().as_any().downcast_ref::().unwrap(); @@ -409,7 +402,7 @@ mod tests { .map(|PhysicalSortExpr { expr, options }| { let col = expr.as_any().downcast_ref::().unwrap(); let (idx, _field) = schema.column_with_name(col.name()).unwrap(); - let arr = generate_random_array(n_elem, n_distinct); + let arr = generate_random_f64_array(n_elem, n_distinct, &mut rng); ( SortColumn { values: arr, @@ -430,7 +423,9 @@ mod tests { for eq_group in eq_properties.eq_group.iter() { let representative_array = get_representative_arr(eq_group, &schema_vec, Arc::clone(schema)) - .unwrap_or_else(|| generate_random_array(n_elem, n_distinct)); + .unwrap_or_else(|| { + generate_random_f64_array(n_elem, n_distinct, &mut rng) + }); for expr in eq_group.iter() { let col = expr.as_any().downcast_ref::().unwrap(); @@ -446,11 +441,25 @@ mod tests { ( field.name(), // Generate random values for columns that do not occur in any of the groups (equivalence, ordering equivalence, constants) - elem.unwrap_or_else(|| generate_random_array(n_elem, n_distinct)), + elem.unwrap_or_else(|| { + generate_random_f64_array(n_elem, n_distinct, &mut rng) + }), ) }) .collect(); Ok(RecordBatch::try_from_iter(res)?) } + + // Utility function to generate random f64 array + fn generate_random_f64_array( + n_elems: usize, + n_distinct: usize, + rng: &mut StdRng, + ) -> ArrayRef { + let values: Vec = (0..n_elems) + .map(|_| rng.gen_range(0..n_distinct) as f64 / 2.0) + .collect(); + Arc::new(Float64Array::from_iter_values(values)) + } }