Skip to content

Commit

Permalink
[Minor]: Add data based sort expression test (apache#12992)
Browse files Browse the repository at this point in the history
* Initial commit

* Fix formatting, minor changes

* Minor changes

* Move test to fuzz tests

* Add comment to test
  • Loading branch information
akurmustafa authored Oct 21, 2024
1 parent b42d9b8 commit 69a4648
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 17 deletions.
68 changes: 66 additions & 2 deletions datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
// under the License.

use crate::fuzz_cases::equivalence::utils::{
create_random_schema, generate_table_for_eq_properties, is_table_same_after_sort,
TestScalarUDF,
convert_to_orderings, create_random_schema, create_test_schema_2,
generate_table_for_eq_properties, generate_table_for_orderings,
is_table_same_after_sort, TestScalarUDF,
};
use arrow_schema::SortOptions;
use datafusion_common::{DFSchema, Result};
Expand Down Expand Up @@ -158,3 +159,66 @@ fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {

Ok(())
}

// This test checks given a table is ordered with `[a ASC, b ASC, c ASC, d ASC]` and `[a ASC, c ASC, b ASC, d ASC]`
// whether the table is also ordered with `[a ASC, b ASC, d ASC]` and `[a ASC, c ASC, d ASC]`
// Since these orderings cannot be deduced, these orderings shouldn't be satisfied by the table generated.
// For background see discussion: https://github.com/apache/datafusion/issues/12700#issuecomment-2411134296
#[test]
fn test_ordering_satisfy_on_data() -> Result<()> {
let schema = create_test_schema_2()?;
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;

let option_asc = SortOptions {
descending: false,
nulls_first: false,
};

let orderings = vec![
// [a ASC, b ASC, c ASC, d ASC]
vec![
(col_a, option_asc),
(col_b, option_asc),
(col_c, option_asc),
(col_d, option_asc),
],
// [a ASC, c ASC, b ASC, d ASC]
vec![
(col_a, option_asc),
(col_c, option_asc),
(col_b, option_asc),
(col_d, option_asc),
],
];
let orderings = convert_to_orderings(&orderings);

let batch = generate_table_for_orderings(orderings, schema, 1000, 10)?;

// [a ASC, c ASC, d ASC] cannot be deduced
let ordering = vec![
(col_a, option_asc),
(col_c, option_asc),
(col_d, option_asc),
];
let ordering = convert_to_orderings(&[ordering])[0].clone();
assert!(!is_table_same_after_sort(ordering, batch.clone())?);

// [a ASC, b ASC, d ASC] cannot be deduced
let ordering = vec![
(col_a, option_asc),
(col_b, option_asc),
(col_d, option_asc),
];
let ordering = convert_to_orderings(&[ordering])[0].clone();
assert!(!is_table_same_after_sort(ordering, batch.clone())?);

// [a ASC, b ASC] can be deduced
let ordering = vec![(col_a, option_asc), (col_b, option_asc)];
let ordering = convert_to_orderings(&[ordering])[0].clone();
assert!(is_table_same_after_sort(ordering, batch.clone())?);

Ok(())
}
122 changes: 118 additions & 4 deletions datafusion/core/tests/fuzz_cases/equivalence/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,29 @@
// specific language governing permissions and limitations
// under the License.
//
// use datafusion_physical_expr::expressions::{col, Column};
use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::expressions::Column;
use datafusion_physical_expr::{ConstExpr, EquivalenceProperties, PhysicalSortExpr};
use std::any::Any;
use std::cmp::Ordering;
use std::sync::Arc;

use arrow::compute::{lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, UInt32Array};
use arrow_array::{
ArrayRef, Float32Array, Float64Array, PrimitiveArray, RecordBatch, UInt32Array,
};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::utils::{
compare_rows, get_record_batch_at_indices, get_row_at_idx,
};
use datafusion_common::{exec_err, plan_datafusion_err, DataFusionError, Result};

use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use datafusion_physical_expr::equivalence::{EquivalenceClass, ProjectionMapping};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};

use itertools::izip;
use rand::prelude::*;

Expand Down Expand Up @@ -67,7 +73,7 @@ pub fn output_schema(
}

// Generate a schema which consists of 6 columns (a, b, c, d, e, f)
fn create_test_schema_2() -> Result<SchemaRef> {
pub fn create_test_schema_2() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Float64, true);
let b = Field::new("b", DataType::Float64, true);
let c = Field::new("c", DataType::Float64, true);
Expand Down Expand Up @@ -374,6 +380,114 @@ pub fn generate_table_for_eq_properties(
Ok(RecordBatch::try_from_iter(res)?)
}

// Generate a table that satisfies the given orderings;
pub fn generate_table_for_orderings(
mut orderings: Vec<LexOrdering>,
schema: SchemaRef,
n_elem: usize,
n_distinct: usize,
) -> Result<RecordBatch> {
let mut rng = StdRng::seed_from_u64(23);

assert!(!orderings.is_empty());
// Sort the inner vectors by their lengths (longest first)
orderings.sort_by_key(|v| std::cmp::Reverse(v.len()));

let arrays = schema
.fields
.iter()
.map(|field| {
(
field.name(),
generate_random_f64_array(n_elem, n_distinct, &mut rng),
)
})
.collect::<Vec<_>>();
let batch = RecordBatch::try_from_iter(arrays)?;

// Sort batch according to first ordering expression
let sort_columns = get_sort_columns(&batch, &orderings[0])?;
let sort_indices = lexsort_to_indices(&sort_columns, None)?;
let mut batch = get_record_batch_at_indices(&batch, &sort_indices)?;

// prune out rows that is invalid according to remaining orderings.
for ordering in orderings.iter().skip(1) {
let sort_columns = get_sort_columns(&batch, ordering)?;

// Collect sort options and values into separate vectors.
let (sort_options, sort_col_values): (Vec<_>, Vec<_>) = sort_columns
.into_iter()
.map(|sort_col| (sort_col.options.unwrap(), sort_col.values))
.unzip();

let mut cur_idx = 0;
let mut keep_indices = vec![cur_idx as u32];
for next_idx in 1..batch.num_rows() {
let cur_row = get_row_at_idx(&sort_col_values, cur_idx)?;
let next_row = get_row_at_idx(&sort_col_values, next_idx)?;

if compare_rows(&cur_row, &next_row, &sort_options)? != Ordering::Greater {
// next row satisfies ordering relation given, compared to the current row.
keep_indices.push(next_idx as u32);
cur_idx = next_idx;
}
}
// Only keep valid rows, that satisfies given ordering relation.
batch = get_record_batch_at_indices(
&batch,
&PrimitiveArray::from_iter_values(keep_indices),
)?;
}

Ok(batch)
}

// Convert each tuple to PhysicalSortExpr
pub fn convert_to_sort_exprs(
in_data: &[(&Arc<dyn PhysicalExpr>, SortOptions)],
) -> Vec<PhysicalSortExpr> {
in_data
.iter()
.map(|(expr, options)| PhysicalSortExpr {
expr: Arc::clone(*expr),
options: *options,
})
.collect()
}

// Convert each inner tuple to PhysicalSortExpr
pub fn convert_to_orderings(
orderings: &[Vec<(&Arc<dyn PhysicalExpr>, SortOptions)>],
) -> Vec<Vec<PhysicalSortExpr>> {
orderings
.iter()
.map(|sort_exprs| convert_to_sort_exprs(sort_exprs))
.collect()
}

// Utility function to generate random f64 array
fn generate_random_f64_array(
n_elems: usize,
n_distinct: usize,
rng: &mut StdRng,
) -> ArrayRef {
let values: Vec<f64> = (0..n_elems)
.map(|_| rng.gen_range(0..n_distinct) as f64 / 2.0)
.collect();
Arc::new(Float64Array::from_iter_values(values))
}

// Helper function to get sort columns from a batch
fn get_sort_columns(
batch: &RecordBatch,
ordering: LexOrderingRef,
) -> Result<Vec<SortColumn>> {
ordering
.iter()
.map(|expr| expr.evaluate_to_sort_column(batch))
.collect::<Result<Vec<_>>>()
}

#[derive(Debug, Clone)]
pub struct TestScalarUDF {
pub(crate) signature: Signature,
Expand Down
31 changes: 20 additions & 11 deletions datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn add_offset_to_expr(

#[cfg(test)]
mod tests {

use super::*;
use crate::expressions::col;
use crate::PhysicalSortExpr;
Expand Down Expand Up @@ -385,14 +386,6 @@ mod tests {
let schema = eq_properties.schema();
let mut schema_vec = vec![None; schema.fields.len()];

// Utility closure to generate random array
let mut generate_random_array = |num_elems: usize, max_val: usize| -> ArrayRef {
let values: Vec<f64> = (0..num_elems)
.map(|_| rng.gen_range(0..max_val) as f64 / 2.0)
.collect();
Arc::new(Float64Array::from_iter_values(values))
};

// Fill constant columns
for constant in &eq_properties.constants {
let col = constant.expr().as_any().downcast_ref::<Column>().unwrap();
Expand All @@ -409,7 +402,7 @@ mod tests {
.map(|PhysicalSortExpr { expr, options }| {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr = generate_random_array(n_elem, n_distinct);
let arr = generate_random_f64_array(n_elem, n_distinct, &mut rng);
(
SortColumn {
values: arr,
Expand All @@ -430,7 +423,9 @@ mod tests {
for eq_group in eq_properties.eq_group.iter() {
let representative_array =
get_representative_arr(eq_group, &schema_vec, Arc::clone(schema))
.unwrap_or_else(|| generate_random_array(n_elem, n_distinct));
.unwrap_or_else(|| {
generate_random_f64_array(n_elem, n_distinct, &mut rng)
});

for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
Expand All @@ -446,11 +441,25 @@ mod tests {
(
field.name(),
// Generate random values for columns that do not occur in any of the groups (equivalence, ordering equivalence, constants)
elem.unwrap_or_else(|| generate_random_array(n_elem, n_distinct)),
elem.unwrap_or_else(|| {
generate_random_f64_array(n_elem, n_distinct, &mut rng)
}),
)
})
.collect();

Ok(RecordBatch::try_from_iter(res)?)
}

// Utility function to generate random f64 array
fn generate_random_f64_array(
n_elems: usize,
n_distinct: usize,
rng: &mut StdRng,
) -> ArrayRef {
let values: Vec<f64> = (0..n_elems)
.map(|_| rng.gen_range(0..n_distinct) as f64 / 2.0)
.collect();
Arc::new(Float64Array::from_iter_values(values))
}
}

0 comments on commit 69a4648

Please sign in to comment.