Skip to content

Commit

Permalink
test: Add hash_agg_aggregation_strategy_with_nongrouped_single_value_…
Browse files Browse the repository at this point in the history
…columns_in_sort_key test
  • Loading branch information
srh committed Oct 24, 2024
1 parent 9e663dc commit 1d39e45
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,8 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
#[cfg(test)]
mod tests {
use super::*;
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
use crate::logical_plan::{and, DFField, DFSchema, DFSchemaRef};
use crate::physical_plan::OptimizerHints;
use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
use crate::scalar::ScalarValue;
use crate::{
Expand Down Expand Up @@ -2041,6 +2042,50 @@ mod tests {
Ok(())
}

#[test]
fn hash_agg_aggregation_strategy_with_nongrouped_single_value_columns_in_sort_key() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);

fn sort(column_name: &str) -> Expr {
col(column_name).sort(true, true)
}

// Instead of creating a mock ExecutionPlan, we have some input plan which produces the desired output_hints().
let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
.filter(and(col("c4").eq(lit("value_a")), col("c8").eq(lit("value_b"))))?
.sort(vec![sort("c1"), sort("c2"), sort("c3"), sort("c4"), sort("c5"), sort("c6"), sort("c7"), sort("c8")])?
.build()?;

let execution_plan = plan(&logical_plan)?;

// Note that both single_value_columns are part of the sort key... but one will not be part of the group key.
let hints: OptimizerHints = execution_plan.output_hints();
assert_eq!(hints.sort_order, Some(vec![0, 1, 2, 3, 4, 5, 6, 7]));
assert_eq!(hints.single_value_columns, vec![3, 7]);

// Now make a group_key that overlaps one single_value_column, but the single value column 7
// has column 5 and 6 ("c6" and "c7" respectively) in between.
let group_key = vec![col("c1"), col("c2"), col("c3"), col("c4"), col("c5")];
let mut ctx_state = make_ctx_state();
ctx_state.config.concurrency = 4;
let planner = DefaultPhysicalPlanner::default();
let mut physical_group_key = Vec::new();
for expr in group_key {
let phys_expr = planner.create_physical_expr(&expr, &logical_plan.schema(), &execution_plan.schema(), &ctx_state)?;
physical_group_key.push((phys_expr, "".to_owned()));
}

let mut sort_order = Vec::<usize>::new();
let is_sorted: bool = input_sorted_by_group_key(execution_plan.as_ref(), &physical_group_key, &mut sort_order);
assert!(is_sorted);
assert_eq!(sort_order, vec![0, 1, 2, 3, 4]);

Ok(())
}

#[test]
fn test_explain() {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
Expand Down

0 comments on commit 1d39e45

Please sign in to comment.