-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: ResourceExhausted
for memory limit in GroupedHashAggregateStream
#4371
Conversation
@@ -2295,7 +2295,7 @@ impl ScalarValue { | |||
/// Estimate size if bytes including `Self`. For values with internal containers such as `String` | |||
/// includes the allocated size (`capacity`) rather than the current length (`len`) | |||
pub fn size(&self) -> usize { | |||
std::mem::size_of_val(&self) | |||
std::mem::size_of_val(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests panicked due to an integer underflow. Apart that the vector/hashmap calculations were wrong, this here was also kinda tricky: the size of &ScalarValue
is 8 bytes, not 48 🤦.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great to me -- thank you @crepererum
I have some minor style suggestions, but nothing that would prevent this PR from being merged.
For other reviewers, I found the changes easier to see with whitespace blind diff: https://github.com/apache/arrow-datafusion/pull/4371/files?w=1
I'll plan to merge this PR tomorrow unless anyone else wants more time to review
}) | ||
}; | ||
|
||
let stream = futures::stream::unfold(inner, |mut this| async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -257,12 +313,32 @@ fn group_aggregate_batch( | |||
accumulator_set, | |||
indices: vec![row as u32], // 1.3 | |||
}; | |||
// NOTE: do NOT include the `GroupState` struct size in here because this is captured by | |||
// `group_states` (see allocation down below) | |||
allocated += group_state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figuring out how to encapsulate some of this accounting (so it wasn't inlined into the code) would make it easier to maintain I think. But I don't think that is required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think as long as the Allocator
API in Rust isn't stable, this will be a bit of a mess. Once it is stable, we could have very elegant memory accounting.
@@ -326,10 +402,20 @@ fn group_aggregate_batch( | |||
) | |||
}) | |||
.try_for_each(|(accumulator, values)| match mode { | |||
AggregateMode::Partial => accumulator.update_batch(&values), | |||
AggregateMode::Partial => { | |||
let size_pre = accumulator.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might also consider pulling the size accounting to before/after the match
to avoid the duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
baseline_metrics, | ||
)?)) | ||
} | ||
self.execute_typed(partition, context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks again @crepererum |
Benchmark runs are scheduled for baseline = fe8aee6 and contender = be1d376. be1d376 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Add size_of_ref lint This addresses #9995, which is likely raising a valid point about `std::mem::size_of_val()`: It's [very easy to use double-references as the argument](apache/datafusion#4371 (comment)), which the function will happily accept and give back the size of _the reference_, not the size of the value _behind_ the reference. In the worst case, if the value matches the programmer's expectation, this seems to work, while in fact, everything will go horribly wrong e.g. on a different platform. The size of a `&T` is independent of what `T` is, and people might want to use `std::mem::size_of_val()` to actually get the size of _any_ reference (e.g. via `&&()`). I would rather suggest that this is always bad behavior, though ([instead](https://doc.rust-lang.org/reference/type-layout.html#pointers-and-references-layout), [and](https://doc.rust-lang.org/stable/std/primitive.usize.html#associatedconstant.BITS)). I, therefore, put this lint into `correctness`. Since the problem is usually easily fixed by removing extra `&`, I went light on suggesting code. --- changelog: New lint: [`size_of_ref`] [#10098](#10098) <!-- changelog_checked -->
Which issue does this PR close?
For #3940.
Update: This does NOT close the issue. I forgot the no-group version (
AggregateStream
). Will do that in a follow-up PR. It's a rather simple change though.Rationale for this change
Ensure that users don't run out of memory while performing group-by operations. This is esp. important for servers or multi-tenant systems.
What changes are included in this PR?
This is similar to #4202. It includes an additional type
StreamType
so we can double-check our test setup (namely: is the stream that we request actually the stream version we want).Are these changes tested?
Extended
test_oom
. Also here are the perf results:I think the regressions (<3%) are within the safety margin of such a crucial feature (and also within what a laptop could reliable reproduce).
Are there any user-facing changes?
The V1 group-by op an now emit a
ResourceExhausted
error if it runs out of memory. Note that the error is kinda nested/wrapped due to #4172.