From cf963fbeb0eafa30c6d907f0403b4a5083f94456 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Wed, 22 Jan 2025 14:51:40 +0800 Subject: [PATCH 1/7] le instead of lt Signed-off-by: Jay Zhan --- datafusion/functions-aggregate/src/first_last.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 8ef139ae6123..46769e495296 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -613,7 +613,7 @@ impl Accumulator for LastValueAccumulator { orderings, &get_sort_options(self.ordering_req.as_ref()), )? - .is_lt() + .is_le() { self.update_with_new_row(&row); } @@ -652,7 +652,7 @@ impl Accumulator for LastValueAccumulator { // version in the new data: if !self.is_set || self.requirement_satisfied - || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt() + || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_le() { // Update with last value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state From 5b81e2ea2257da27ec2cc43b99f38be0b0fb7493 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 24 Jan 2025 09:45:51 +0800 Subject: [PATCH 2/7] fix get last index logic --- .../functions-aggregate/src/first_last.rs | 66 ++++++++++++++++++- parquet-testing | 2 +- testing | 2 +- 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 46769e495296..5908da4276d4 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -18,13 +18,18 @@ //! Defines the FIRST_VALUE/LAST_VALUE aggregations. use std::any::Any; +use std::cmp::Ordering; use std::fmt::Debug; use std::mem::size_of_val; +use std::num; use std::sync::Arc; -use arrow::array::{ArrayRef, AsArray, BooleanArray}; -use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, Int64Array, UInt64Array}; +use arrow::compute::{ + self, lexsort_to_indices, take_arrays, LexicographicalComparator, SortColumn, +}; use arrow::datatypes::{DataType, Field}; +use arrow_schema::SortOptions; use datafusion_common::utils::{compare_rows, get_row_at_idx}; use datafusion_common::{ arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, @@ -542,6 +547,9 @@ impl LastValueAccumulator { let [value, ordering_values @ ..] = values else { return internal_err!("Empty row in LAST_VALUE"); }; + + let num_rows = value.len(); + if self.requirement_satisfied { // Get last entry according to the order of data: if self.ignore_nulls { @@ -556,7 +564,7 @@ impl LastValueAccumulator { return Ok((!value.is_empty()).then_some(value.len() - 1)); } } - let sort_columns = ordering_values + let mut sort_columns = ordering_values .iter() .zip(self.ordering_req.iter()) .map(|(values, req)| { @@ -569,6 +577,15 @@ impl LastValueAccumulator { }) .collect::>(); + // Order by indices for cases where the values are the same, we expect the last index + let indices: UInt64Array = (0..num_rows).into_iter().map(|x| x as u64).collect(); + sort_columns.push( + SortColumn { + values: Arc::new(indices), + options: Some(!SortOptions::default()), + } + ); + if self.ignore_nulls { let indices = lexsort_to_indices(&sort_columns, None)?; // If ignoring nulls, find the last non-null value. @@ -701,9 +718,52 @@ fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec Result<()> { + // TODO: Move this kind of test to slt, we don't have a nice way to define the batch size for each `update_batch` + // so there is no trivial way to test this in slt for now + + // test query: select last_value(a order by b) from t1, where b has same value + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + ]); + + let mut last_accumulator = LastValueAccumulator::try_new( + &DataType::Int64, + &[DataType::Int64], + LexOrdering::new(vec![PhysicalSortExpr::new( + col("b", &schema)?, + SortOptions::default(), + )]), + false, + )?; + + let values = vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, // a + Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // b + ]; + last_accumulator.update_batch(&values)?; + last_accumulator.update_batch(&values)?; + + assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(3))); + + let values = vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef, // a + Arc::new(Int64Array::from(vec![1, 1, 1, 2, 2, 2])) as ArrayRef, // b + ]; + last_accumulator.update_batch(&values)?; + assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(6))); + + Ok(()) + } + #[test] fn test_first_last_value_value() -> Result<()> { let mut first_accumulator = FirstValueAccumulator::try_new( diff --git a/parquet-testing b/parquet-testing index f4d7ed772a62..e45cd23f784a 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882 +Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7 diff --git a/testing b/testing index d2a137123034..98fceecd024d 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d2a13712303498963395318a4eb42872e66aead7 +Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4 From d2c3688bf74edd34ea62e183fd3bba1e6267b2c3 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 24 Jan 2025 19:52:27 +0800 Subject: [PATCH 3/7] add test --- .../functions-aggregate/src/first_last.rs | 70 ++++++++++++++----- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 5908da4276d4..1e5477315bed 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -579,12 +579,10 @@ impl LastValueAccumulator { // Order by indices for cases where the values are the same, we expect the last index let indices: UInt64Array = (0..num_rows).into_iter().map(|x| x as u64).collect(); - sort_columns.push( - SortColumn { - values: Arc::new(indices), - options: Some(!SortOptions::default()), - } - ); + sort_columns.push(SortColumn { + values: Arc::new(indices), + options: Some(!SortOptions::default()), + }); if self.ignore_nulls { let indices = lexsort_to_indices(&sort_columns, None)?; @@ -624,6 +622,7 @@ impl Accumulator for LastValueAccumulator { } else if let Some(last_idx) = self.get_last_idx(values)? { let row = get_row_at_idx(values, last_idx)?; let orderings = &row[1..]; + // Update when there is a more recent entry if compare_rows( &self.orderings, @@ -719,7 +718,7 @@ fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec Result { + LastValueAccumulator::try_new( + &DataType::Int64, + &[DataType::Int64], + LexOrdering::new(vec![PhysicalSortExpr::new( + col("b", schema)?, + if asc { + SortOptions::default() + } else { + SortOptions::default().desc() + }, + )]), + false, + ) + } + let mut last_accumulator = create_acc(&schema, true)?; let values = vec![ Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, // a Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // b ]; last_accumulator.update_batch(&values)?; last_accumulator.update_batch(&values)?; - assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(3))); + let mut last_accumulator = create_acc(&schema, true)?; let values = vec![ Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef, // a Arc::new(Int64Array::from(vec![1, 1, 1, 2, 2, 2])) as ArrayRef, // b @@ -761,6 +769,34 @@ mod tests { last_accumulator.update_batch(&values)?; assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(6))); + let mut last_accumulator = create_acc(&schema, true)?; + let values = vec![ + Arc::new(Int64Array::from(vec![7, 8, 9])) as ArrayRef, // a + Arc::new(Int64Array::from(vec![2, 2, 2])) as ArrayRef, // b + ]; + last_accumulator.update_batch(&values)?; + assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(9))); + + let mut last_accumulator = create_acc(&schema, true)?; + let states = vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef, // a + Arc::new(Int64Array::from(vec![1, 2, 2, 1, 1])) as ArrayRef, // order by + Arc::new(BooleanArray::from(vec![true; 5])) as ArrayRef, // is set + ]; + last_accumulator.merge_batch(&states)?; + last_accumulator.merge_batch(&states)?; + assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(3))); + + // desc + let mut last_accumulator = create_acc(&schema, false)?; + let states = vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef, // a + Arc::new(Int64Array::from(vec![1, 2, 2, 1, 1])) as ArrayRef, // order by + Arc::new(BooleanArray::from(vec![true; 5])) as ArrayRef, // is set + ]; + last_accumulator.merge_batch(&states)?; + last_accumulator.merge_batch(&states)?; + assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(5))); Ok(()) } From a6b884e59053a2d991e56a125103834e7c05a04b Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 24 Jan 2025 19:55:50 +0800 Subject: [PATCH 4/7] revert submobule --- parquet-testing | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-testing b/parquet-testing index e45cd23f784a..f4d7ed772a62 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7 +Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882 diff --git a/testing b/testing index 98fceecd024d..d2a137123034 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4 +Subproject commit d2a13712303498963395318a4eb42872e66aead7 From 1cb5cc24e49d4ca8dc6a468edb0c64dc14d30be0 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 24 Jan 2025 20:01:03 +0800 Subject: [PATCH 5/7] clippy --- datafusion/functions-aggregate/src/first_last.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 1e5477315bed..b475341bf2f2 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -18,16 +18,12 @@ //! Defines the FIRST_VALUE/LAST_VALUE aggregations. use std::any::Any; -use std::cmp::Ordering; use std::fmt::Debug; use std::mem::size_of_val; -use std::num; use std::sync::Arc; -use arrow::array::{ArrayRef, AsArray, BooleanArray, Int64Array, UInt64Array}; -use arrow::compute::{ - self, lexsort_to_indices, take_arrays, LexicographicalComparator, SortColumn, -}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, UInt64Array}; +use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::{DataType, Field}; use arrow_schema::SortOptions; use datafusion_common::utils::{compare_rows, get_row_at_idx}; @@ -578,7 +574,7 @@ impl LastValueAccumulator { .collect::>(); // Order by indices for cases where the values are the same, we expect the last index - let indices: UInt64Array = (0..num_rows).into_iter().map(|x| x as u64).collect(); + let indices: UInt64Array = (0..num_rows).map(|x| x as u64).collect(); sort_columns.push(SortColumn { values: Arc::new(indices), options: Some(!SortOptions::default()), @@ -758,8 +754,12 @@ mod tests { Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // b ]; last_accumulator.update_batch(&values)?; + let values = vec![ + Arc::new(Int64Array::from(vec![4, 5, 6])) as ArrayRef, // a + Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // b + ]; last_accumulator.update_batch(&values)?; - assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(3))); + assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(6))); let mut last_accumulator = create_acc(&schema, true)?; let values = vec![ From 8ae01978c8f84879bb870c587287fd5abf749520 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 24 Jan 2025 20:11:18 +0800 Subject: [PATCH 6/7] add more test --- datafusion/functions-aggregate/src/first_last.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index b475341bf2f2..7bd3bf716508 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -795,8 +795,13 @@ mod tests { Arc::new(BooleanArray::from(vec![true; 5])) as ArrayRef, // is set ]; last_accumulator.merge_batch(&states)?; + let states = vec![ + Arc::new(Int64Array::from(vec![7, 8, 9])) as ArrayRef, // a + Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // order by + Arc::new(BooleanArray::from(vec![true; 3])) as ArrayRef, // is set + ]; last_accumulator.merge_batch(&states)?; - assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(5))); + assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(9))); Ok(()) } From 7bdc35675caadc07cd8338e5687de059e0475643 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 24 Jan 2025 21:04:12 +0800 Subject: [PATCH 7/7] fix test --- datafusion/sqllogictest/test_files/group_by.slt | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 056f88450c9f..0d2f581a1922 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2998,12 +2998,22 @@ physical_plan 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 06)----------MemoryExec: partitions=1, partition_sizes=[1] +query RP +select amount, ts from sales_global; +---- +30 2022-01-01T06:00:00 +50 2022-01-01T08:00:00 +75 2022-01-01T11:30:00 +200 2022-01-02T12:00:00 +100 2022-01-03T10:00:00 +80 2022-01-03T10:00:00 + query RR SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, LAST_VALUE(amount ORDER BY ts ASC) AS fv2 FROM sales_global ---- -30 100 +30 80 # Conversion in between FIRST_VALUE and LAST_VALUE to resolve # contradictory requirements should work in multi partitions.