Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last Accumulator update_batch doesn't take the last value if the order by value are equals #14232

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 105 additions & 4 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use std::fmt::Debug;
use std::mem::size_of_val;
use std::sync::Arc;

use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::array::{ArrayRef, AsArray, BooleanArray, UInt64Array};
use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::{DataType, Field};
use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
Expand Down Expand Up @@ -542,6 +543,9 @@ impl LastValueAccumulator {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in LAST_VALUE");
};

let num_rows = value.len();

if self.requirement_satisfied {
// Get last entry according to the order of data:
if self.ignore_nulls {
Expand All @@ -556,7 +560,7 @@ impl LastValueAccumulator {
return Ok((!value.is_empty()).then_some(value.len() - 1));
}
}
let sort_columns = ordering_values
let mut sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
Expand All @@ -569,6 +573,13 @@ impl LastValueAccumulator {
})
.collect::<Vec<_>>();

// Order by indices for cases where the values are the same, we expect the last index
let indices: UInt64Array = (0..num_rows).map(|x| x as u64).collect();
sort_columns.push(SortColumn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is a bit concerning in terms of performance -- won't sorting by multiple columns (due to row indices being added, now there are no cases with a single column) be noticeably slower due to falling back to lexicographical comparator?

If so, perhaps, it should either be worked around (to somehow sort leaving last is_ge() value) or maybe it doesn't worth that, since this will work only for single threaded queries, while, I suppose, the primarily use-case is multithreaded execution, which messes up the original order of records in files due to parallel reads followed by repartitioning.

+ Not sure if current behaviour of this function is incorrect and must be fixed, it looks more like nice-to-have feature (but that is just an opinion).

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need benchmark on this.
With additional indices columns, we are able to optimize it with select n_th logic
Without additional indices columns, we need to iterate them all the find the correct one, it includes the cost of comparison

since this will work only for single threaded queries, while, I suppose, the primarily use-case is multithreaded execution, which messes up the original order of records in files due to parallel reads followed by repartitioning.

I think even in this case, we still expect to return the correct number?

I agree that in 2-phase step, the true last value is calculated in merge_batch, but if we run it in AggregateMode::Single, we would need to get the true last value in update_batch

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, it seems like unstable sort is used in arrow-rs probably we need another stable sort in arrow-rs to not impact the current single column case.

Alternative solution is that the users can add another column by themselves to get the true last value if they need for their scenario 🤔 Then we can keep the last value as an unstable version by default. I don't have such need to implement the stable version, I just thought it is an easy to fix issue but ends up it isn't 😆

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even in this case, we still expect to return the correct number?

Don't think we can -- it depends on the order of partial aggregation stream completions, which it not really predictable, so function potentially may produce non-deterministic output in this case.

Alternative solution is that the users can add another column by themselves to get the true last value if they need for their scenario

I'd say that this is how it works now, which seems to be good enough.

Also, maybe the initial version of this PR, was sufficient? It didn't affect performance, and it provided guarantees for target_partitions=1 that the function will return the value which is placed last in source files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial PR doesn't work for single partition case too since the get_last_idx return the first (value,order) pair not the last one

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DuckDB also return the first one not the "true" last one. No any good reason or solution to find the "true" last value

Copy link
Contributor

@korowa korowa Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial PR doesn't work for single partition case too since the get_last_idx return the first (value,order) pair not the last one

Yes, my bad, lost the context for a while. Then ye, looks like providing unique column as an order by is the only way to make the output deterministic (which, I think, is an acceptable behavior for now).

values: Arc::new(indices),
options: Some(!SortOptions::default()),
});

if self.ignore_nulls {
let indices = lexsort_to_indices(&sort_columns, None)?;
// If ignoring nulls, find the last non-null value.
Expand Down Expand Up @@ -607,13 +618,14 @@ impl Accumulator for LastValueAccumulator {
} else if let Some(last_idx) = self.get_last_idx(values)? {
let row = get_row_at_idx(values, last_idx)?;
let orderings = &row[1..];

// Update when there is a more recent entry
if compare_rows(
&self.orderings,
orderings,
&get_sort_options(self.ordering_req.as_ref()),
)?
.is_lt()
.is_le()
{
self.update_with_new_row(&row);
}
Expand Down Expand Up @@ -652,7 +664,7 @@ impl Accumulator for LastValueAccumulator {
// version in the new data:
if !self.is_set
|| self.requirement_satisfied
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt()
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_le()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: when the aggregation is parallelized, then "true" last value may not be calculated anyway (if there is such a thing as a last value when scanning multiple files).

{
// Update with last value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
Expand Down Expand Up @@ -701,9 +713,98 @@ fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec<Sort
#[cfg(test)]
mod tests {
use arrow::array::Int64Array;
use arrow_schema::Schema;
use compute::SortOptions;
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};

use super::*;

#[test]
fn test_last_value_with_order_bys() -> Result<()> {
// TODO: Move this kind of test to slt, we don't have a nice way to define the batch size for each `update_batch`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading data from source file (not sure about memory tables) into aggregation with batch_size = N should help. Is this not working?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory doesn't work #12905 (comment), not sure about file 🤔

Copy link
Contributor

@korowa korowa Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for memory (not the nicest but still existing), it can be controlled by either set batch_size before INSERT INTO memory_table SELECT FROM ..., or by multiple INSERT INTO memory_table VALUES, where each values has required batch size.

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I want is multiple batches that goes through update_batch and merge_batch.

statement count 0
set datafusion.execution.batch_size = 2;

statement count 0
create table t(a int, b int) as values (1, 1), (2, 1), (null, 1), (3, 1), (1, 1), (2, 1), (null, 1), (3, 1);

query I
select last_value(a order by b) from t;
----
1

query TT
explain select last_value(a order by b) from t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[last_value(t.a) ORDER BY [t.b ASC NULLS LAST]]]
02)--TableScan: t projection=[a, b]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(t.a) ORDER BY [t.b ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(t.a) ORDER BY [t.b ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------MemoryExec: partitions=1, partition_sizes=[1]

Given that the MemoryExec is single partition, so the data goes to single batch. Even we have 4 partitions, update_batch is only called once. No trivial way to test multiple update_batch calls with different batch.

Insert into Table ... is the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I've meant was

statement ok
CREATE TABLE memtest (x INT, y INT);

statement ok
INSERT INTO memtest VALUES (1, 1), (2, 1);

statement ok
INSERT INTO memtest VALUES (2, 2), (3, 1);

statement ok
INSERT INTO memtest VALUES (3, 2), (4, 1);

statement ok
set datafusion.execution.target_partitions = 1;

query II
select x, last_value(y) from memtest group by x;
----
1 1
2 2
3 2
4 1

each insert will add new RecordBatch to memory table, so there will be 3 update_batch calls for the accumulator (it will result in more calls for LastValueAccumulator::update_batch though due to splitting inputs by group value)

Likely something similar can be done for multiple partitions to make round-robin repartitioning produce multiple streams to test merge_batch function.

// so there is no trivial way to test this in slt for now

// test query: select last_value(a order by b) from t1, where b has same value
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
]);

// TODO: Cleanup state in `evaluate` or introduce another method?
// We don't have cleanup method to reset the state, so create a new one each time
fn create_acc(schema: &Schema, asc: bool) -> Result<LastValueAccumulator> {
LastValueAccumulator::try_new(
&DataType::Int64,
&[DataType::Int64],
LexOrdering::new(vec![PhysicalSortExpr::new(
col("b", schema)?,
if asc {
SortOptions::default()
} else {
SortOptions::default().desc()
},
)]),
false,
)
}

let mut last_accumulator = create_acc(&schema, true)?;
let values = vec![
Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, // a
Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // b
];
last_accumulator.update_batch(&values)?;
let values = vec![
Arc::new(Int64Array::from(vec![4, 5, 6])) as ArrayRef, // a
Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // b
];
last_accumulator.update_batch(&values)?;
assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(6)));

let mut last_accumulator = create_acc(&schema, true)?;
let values = vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef, // a
Arc::new(Int64Array::from(vec![1, 1, 1, 2, 2, 2])) as ArrayRef, // b
];
last_accumulator.update_batch(&values)?;
assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(6)));

let mut last_accumulator = create_acc(&schema, true)?;
let values = vec![
Arc::new(Int64Array::from(vec![7, 8, 9])) as ArrayRef, // a
Arc::new(Int64Array::from(vec![2, 2, 2])) as ArrayRef, // b
];
last_accumulator.update_batch(&values)?;
assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(9)));

let mut last_accumulator = create_acc(&schema, true)?;
let states = vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef, // a
Arc::new(Int64Array::from(vec![1, 2, 2, 1, 1])) as ArrayRef, // order by
Arc::new(BooleanArray::from(vec![true; 5])) as ArrayRef, // is set
];
last_accumulator.merge_batch(&states)?;
last_accumulator.merge_batch(&states)?;
assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(3)));

// desc
let mut last_accumulator = create_acc(&schema, false)?;
let states = vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef, // a
Arc::new(Int64Array::from(vec![1, 2, 2, 1, 1])) as ArrayRef, // order by
Arc::new(BooleanArray::from(vec![true; 5])) as ArrayRef, // is set
];
last_accumulator.merge_batch(&states)?;
let states = vec![
Arc::new(Int64Array::from(vec![7, 8, 9])) as ArrayRef, // a
Arc::new(Int64Array::from(vec![1, 1, 1])) as ArrayRef, // order by
Arc::new(BooleanArray::from(vec![true; 3])) as ArrayRef, // is set
];
last_accumulator.merge_batch(&states)?;
assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(9)));
Ok(())
}

#[test]
fn test_first_last_value_value() -> Result<()> {
let mut first_accumulator = FirstValueAccumulator::try_new(
Expand Down
12 changes: 11 additions & 1 deletion datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2998,12 +2998,22 @@ physical_plan
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
06)----------MemoryExec: partitions=1, partition_sizes=[1]

query RP
select amount, ts from sales_global;
----
30 2022-01-01T06:00:00
50 2022-01-01T08:00:00
75 2022-01-01T11:30:00
200 2022-01-02T12:00:00
100 2022-01-03T10:00:00
80 2022-01-03T10:00:00

query RR
SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
LAST_VALUE(amount ORDER BY ts ASC) AS fv2
FROM sales_global
----
30 100
30 80

# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
# contradictory requirements should work in multi partitions.
Expand Down
Loading