-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Last Accumulator update_batch
doesn't take the last value if the order by value are equals
#14232
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jayzhan211, looks reasonable.
Could you please add a test (I suppose, only for target_partitions = 1
case)?
@@ -652,7 +652,7 @@ impl Accumulator for LastValueAccumulator { | |||
// version in the new data: | |||
if !self.is_set | |||
|| self.requirement_satisfied | |||
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt() | |||
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_le() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note: when the aggregation is parallelized, then "true" last value may not be calculated anyway (if there is such a thing as a last value when scanning multiple files).
It seems the actual issue is much more complex, this change only solve partially |
Signed-off-by: Jay Zhan <[email protected]>
le
instead of lt
in LastAccunulatorupdate_batch
doesn't take the last value if the order by value are equals
|
||
use super::*; | ||
|
||
#[test] | ||
fn test_last_value_with_order_bys() -> Result<()> { | ||
// TODO: Move this kind of test to slt, we don't have a nice way to define the batch size for each `update_batch` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading data from source file (not sure about memory tables) into aggregation with batch_size = N
should help. Is this not working?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory doesn't work #12905 (comment), not sure about file 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even for memory (not the nicest but still existing), it can be controlled by either set batch_size
before INSERT INTO memory_table SELECT FROM ...
, or by multiple INSERT INTO memory_table VALUES
, where each values
has required batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I want is multiple batches that goes through update_batch
and merge_batch
.
statement count 0
set datafusion.execution.batch_size = 2;
statement count 0
create table t(a int, b int) as values (1, 1), (2, 1), (null, 1), (3, 1), (1, 1), (2, 1), (null, 1), (3, 1);
query I
select last_value(a order by b) from t;
----
1
query TT
explain select last_value(a order by b) from t;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[last_value(t.a) ORDER BY [t.b ASC NULLS LAST]]]
02)--TableScan: t projection=[a, b]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(t.a) ORDER BY [t.b ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(t.a) ORDER BY [t.b ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------MemoryExec: partitions=1, partition_sizes=[1]
Given that the MemoryExec
is single partition, so the data goes to single batch. Even we have 4 partitions, update_batch
is only called once. No trivial way to test multiple update_batch
calls with different batch.
Insert into Table ...
is the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I've meant was
statement ok
CREATE TABLE memtest (x INT, y INT);
statement ok
INSERT INTO memtest VALUES (1, 1), (2, 1);
statement ok
INSERT INTO memtest VALUES (2, 2), (3, 1);
statement ok
INSERT INTO memtest VALUES (3, 2), (4, 1);
statement ok
set datafusion.execution.target_partitions = 1;
query II
select x, last_value(y) from memtest group by x;
----
1 1
2 2
3 2
4 1
each insert will add new RecordBatch to memory table, so there will be 3 update_batch calls for the accumulator (it will result in more calls for LastValueAccumulator::update_batch though due to splitting inputs by group value)
Likely something similar can be done for multiple partitions to make round-robin repartitioning produce multiple streams to test merge_batch function.
@@ -569,6 +573,13 @@ impl LastValueAccumulator { | |||
}) | |||
.collect::<Vec<_>>(); | |||
|
|||
// Order by indices for cases where the values are the same, we expect the last index | |||
let indices: UInt64Array = (0..num_rows).map(|x| x as u64).collect(); | |||
sort_columns.push(SortColumn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is a bit concerning in terms of performance -- won't sorting by multiple columns (due to row indices being added, now there are no cases with a single column) be noticeably slower due to falling back to lexicographical comparator?
If so, perhaps, it should either be worked around (to somehow sort leaving last is_ge()
value) or maybe it doesn't worth that, since this will work only for single threaded queries, while, I suppose, the primarily use-case is multithreaded execution, which messes up the original order of records in files due to parallel reads followed by repartitioning.
+ Not sure if current behaviour of this function is incorrect and must be fixed, it looks more like nice-to-have feature (but that is just an opinion).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need benchmark on this.
With additional indices columns, we are able to optimize it with select n_th logic
Without additional indices columns, we need to iterate them all the find the correct one, it includes the cost of comparison
since this will work only for single threaded queries, while, I suppose, the primarily use-case is multithreaded execution, which messes up the original order of records in files due to parallel reads followed by repartitioning.
I think even in this case, we still expect to return the correct number?
I agree that in 2-phase step, the true last value is calculated in merge_batch
, but if we run it in AggregateMode::Single, we would need to get the true last value in update_batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, it seems like unstable sort is used in arrow-rs probably we need another stable sort in arrow-rs to not impact the current single column case.
Alternative solution is that the users can add another column by themselves to get the true last value if they need for their scenario 🤔 Then we can keep the last value as an unstable version by default. I don't have such need to implement the stable version, I just thought it is an easy to fix issue but ends up it isn't 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even in this case, we still expect to return the correct number?
Don't think we can -- it depends on the order of partial aggregation stream completions, which it not really predictable, so function potentially may produce non-deterministic output in this case.
Alternative solution is that the users can add another column by themselves to get the true last value if they need for their scenario
I'd say that this is how it works now, which seems to be good enough.
Also, maybe the initial version of this PR, was sufficient? It didn't affect performance, and it provided guarantees for target_partitions=1
that the function will return the value which is placed last in source files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial PR doesn't work for single partition case too since the get_last_idx
return the first (value,order) pair not the last one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DuckDB also return the first one not the "true" last one. No any good reason or solution to find the "true" last value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial PR doesn't work for single partition case too since the get_last_idx return the first (value,order) pair not the last one
Yes, my bad, lost the context for a while. Then ye, looks like providing unique column as an order by
is the only way to make the output deterministic (which, I think, is an acceptable behavior for now).
Which issue does this PR close?
Closes #.
Rationale for this change
To get the Last Value, we prefer the latter one instead.
Consider
last(a order by b)
, and we have (a=3, b=1) and (a=4, b=1). We would like (a=4, b=1) insteadWhat changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?