-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return ResourceExhausted
errors when memory limit is exceed in GroupedHashAggregateStreamV2
(Row Hash)
#4202
Conversation
Most of it is refactoring to allow us to call the async memory subsystem while polling the stream. The actual memory accounting is rather easy (since it's only ever growing except when the stream is dropped). Helps with apache#3940. (not closing yet, also need to do V1) Performance Impact: ------------------- ```text ❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue3940a-pre Finished bench [optimized] target(s) in 0.08s Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-e9e315ab7a06a262) aggregate_query_no_group_by 15 12 time: [654.77 µs 655.49 µs 656.29 µs] change: [-1.6711% -1.2910% -0.8435%] (p = 0.00 < 0.05) Change within noise threshold. Found 9 outliers among 100 measurements (9.00%) 1 (1.00%) low mild 5 (5.00%) high mild 3 (3.00%) high severe aggregate_query_no_group_by_min_max_f64 time: [579.93 µs 580.59 µs 581.27 µs] change: [-3.8985% -3.2219% -2.6198%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 1 (1.00%) low severe 3 (3.00%) low mild 1 (1.00%) high mild 4 (4.00%) high severe aggregate_query_no_group_by_count_distinct_wide time: [2.4610 ms 2.4801 ms 2.4990 ms] change: [-2.9300% -1.8414% -0.7493%] (p = 0.00 < 0.05) Change within noise threshold. Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.4s, enable flat sampling, or reduce sample count to 50. aggregate_query_no_group_by_count_distinct_narrow time: [1.6578 ms 1.6661 ms 1.6743 ms] change: [-4.5391% -3.5033% -2.5050%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 1 (1.00%) low severe 2 (2.00%) low mild 2 (2.00%) high mild 2 (2.00%) high severe aggregate_query_group_by time: [2.1767 ms 2.2045 ms 2.2486 ms] change: [-4.1048% -2.5858% -0.3237%] (p = 0.00 < 0.05) Change within noise threshold. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high severe Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter time: [1.0916 ms 1.0927 ms 1.0941 ms] change: [-0.8524% -0.4230% -0.0724%] (p = 0.02 < 0.05) Change within noise threshold. Found 9 outliers among 100 measurements (9.00%) 2 (2.00%) low severe 1 (1.00%) low mild 4 (4.00%) high mild 2 (2.00%) high severe aggregate_query_group_by_u64 15 12 time: [2.2108 ms 2.2238 ms 2.2368 ms] change: [-4.2142% -3.2743% -2.3523%] (p = 0.00 < 0.05) Performance has improved. Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter_u64 15 12 time: [1.0922 ms 1.0931 ms 1.0940 ms] change: [-0.6872% -0.3192% +0.1193%] (p = 0.12 > 0.05) No change in performance detected. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) low mild 4 (4.00%) high severe aggregate_query_group_by_u64_multiple_keys time: [14.714 ms 15.023 ms 15.344 ms] change: [-5.8337% -2.7471% +0.2798%] (p = 0.09 > 0.05) No change in performance detected. aggregate_query_approx_percentile_cont_on_u64 time: [3.7776 ms 3.8049 ms 3.8329 ms] change: [-4.4977% -3.4230% -2.3282%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high mild aggregate_query_approx_percentile_cont_on_f32 time: [3.1769 ms 3.1997 ms 3.2230 ms] change: [-4.4664% -3.2597% -2.0955%] (p = 0.00 < 0.05) Performance has improved. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild ``` I think the mild improvements are either flux or due to the somewhat manual memory allocation pattern.
GroupedHashAggregateStreamV2
ResourceExhausted
errors when memory limit is exceed in GroupedHashAggregateStreamV2
(Row Hash)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @crepererum I went through this PR carefully and I think it could be merged as is. Thank you for the performance results
Note to other reviewers is that the memory limits are not enabled by default so the additional accounting will not be used except if the memory manager limits are engaged
I had some small suggestions but none that I think are required
I also found this PR easier to review using whitespace blind diff https://github.com/apache/arrow-datafusion/pull/4202/files?w=1
cc @yjshen and @milenkovicm who I think has been working in this area
ALso cc @Dandandan as I know you are often interested in this type of code
/// high due to lock contention) and pre-calculating the entire allocation for a whole [`RecordBatch`] is complicated or | ||
/// expensive. | ||
/// | ||
/// The pool will try to allocate a whole block of memory and gives back overallocated memory on [drop](Self::drop). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
@@ -70,6 +72,16 @@ use hashbrown::raw::RawTable; | |||
/// [Compact]: datafusion_row::layout::RowType::Compact | |||
/// [WordAligned]: datafusion_row::layout::RowType::WordAligned | |||
pub(crate) struct GroupedHashAggregateStreamV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very much like other stream adapters we have in DataFusion -- perhaps we can name it something more general like SendableRecordBatchStreamWrapper
or something and put it in
we can always do this as a follow on PR as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do that in a follow-up, since migrating V1 will probably end up with the same helper.
// allocate more | ||
|
||
// growth factor: 2, but at least 2 elements | ||
let bump_elements = (group_state.indices.capacity() * 2).max(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could somehow encapsulate the memory manager interactions into functions on GroupAggrState
rather than treating it like a struct. I don't think that is necessary .
However encapsulating might:
- Keep this code manageable for future readers
- Allow the memory allocation routines to be unit tested (like that when new groups are added that the memory allocation is incremented correctly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with with @alamb here, IMHO group_aggregate_batch
is too busy at the moment, and some kind of separation of concerns would help.
What if group_aggregate_batch
returns how much more memory it allocated, and accounting is done after method call? This would help with encapsulation of aggregation algorithm and make it easier to swap. I'm aware that it might not produce 100% correct results but as we discussed in #3941 it is ok to have small discrepancy for short period of time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this way end of the batch would be a "safe point" at which we could trigger spill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could somehow encapsulate the memory manager interactions into functions on
GroupAggrState
rather than treating it like a struct.
That only works if all interactions with GroupState
go throw methods, not only a few of them due to how Rust handles borrowing (= fn f(&self)
and fn f(&mut self)
borrow the whole struct, so you cannot mutable borrow any member at the same time).
What if
group_aggregate_batch
returns how much more memory it allocated, and accounting is done after method call? ... Also, this way end of the batch would be a "safe point" at which we could trigger spill
Fair, let me try that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Let me know if this looks better. I will pull out + document all the helper structs and traits when I port V1 (I want at least a 2nd consumer so I can make sure the interface makes sense).
if group_state.indices.capacity() == group_state.indices.len() { | ||
// allocate more | ||
|
||
// growth factor: 2, but at least 2 elements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Growth factors like this are sometimes capped at some large value (like 1G) to avoid the 2x memory overhead associated at large memory levels.
If we use 2x growth with no cap, you can get into situations like the table would fit in 36GB but the code is trying to go from 32GB to 64GB and hits the limit even when the query could complete. This could always be handled in a follow on PR -- users can always disable the memory manager and let the allocations happen and suffer OOMs if they want the current behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC Vec
doesn't cap, so at least this ain't a regression:
timer.done(); | ||
|
||
match result { | ||
Ok(_) => continue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, this would be place to do something like:
Ok(_) => {
let new_data_size = this.aggr_state.get_current_size();
let acquired = this.memory_manager.can_grow_directly(new_data_size - data_size_before_batch, data_size_before_batch);
if !acquired {
this.aggr_state.spill();
this.memory_manager.record_free_then_acquire(data_size, 0);
}
continue;
}
we basically assume that group_aggregate_batch
can get all the memory it needs, no need to do per row interaction with memory manager.
this would decouple process and accounting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interaction is not per row. It's per batch. I can place the accounting here. The code you propose is basically the same that currently runs, just inlined (it's the default impl. of MemoryConsumer::try_grow
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies you're right @crepererum it is per batch.
The reason why I believe moving it out makes sense is separation of concerns, but it's up to you.
for example, at line 363
// allocate memory
// This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
// overshooting a bit. Also this means we either store the whole record batch or not.
memory_consumer.alloc(allocated).await?;
can this trigger spill? will the state be consistent if spill is triggered. My guess it will be not, it might be implementation specific, but hard to tell without understanding memory management implementation, and store implementation.
fn insert_accounted( | ||
&mut self, | ||
x: Self::T, | ||
hasher: impl Fn(&Self::T) -> u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is coupling with current implementation. for example, what if we decide to keep state in b-tree rather than hash map (we need it sorted due to spill)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, memory accounting is ALWAYS coupled to the data structures that are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad @crepererum ignore my comment, apologies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long-term I would wish that Rust stabilizes the Allocator
trait so we could plug this into the data structures and measure their usage (no need to guess).
// allocate memory | ||
// This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with | ||
// overshooting a bit. Also this means we either store the whole record batch or not. | ||
memory_consumer.alloc(allocated).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as i mentioned above, should this call go before return statement? if it triggers spill we internal state should be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking great
https://github.com/apache/arrow-datafusion/pull/4202/files?w=1 shows the diff clearly
What are your thoughts @milenkovicm ?
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState { | |||
} | |||
} | |||
|
|||
/// Accounting data structure for memory usage. | |||
struct AggregationStateMemoryConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
fn push_accounted(&mut self, x: Self::T, accounting: &mut usize); | ||
} | ||
|
||
impl<T> VecAllocExt for Vec<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very nice
I think @crepererum did fine job here. Not sure if he will move
Just before return statement, otherwise it is spot on. |
I'll move the alloc statement, give me a few minutes... |
done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great -- thank you @milenkovicm and @crepererum -- I will plan to merge this tomorrow unless I hear otherwise
Benchmark runs are scheduled for baseline = 09e1c91 and contender = f3a65c7. f3a65c7 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Doesn't close, but works towards #3940 (need to migrate V1 as well
Rationale for this change
Ensure that users don't run out of memory while performing group-by operations. This is esp. important for servers or multi-tenant systems.
What changes are included in this PR?
BoxStream
) forGroupedHashAggregateStreamV2
so we can call into the async memory manager (I thought about NOT doing this but I think it's worth to consider because on the long run a group-by can get another splillable operation to spill)Are these changes tested?
test_oom
)Perf results:
I think the mild improvements are either flux or due to the somewhat
manual memory allocation pattern.
Are there any user-facing changes?
The V2 group-by op an now emit a
ResourceExhausted
error if it runs out of memory. Note that the error is kinda nested/wrapped due to #4172.