Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for GROUPING SETS/CUBE/ROLLUP #2716

Merged
merged 26 commits into from
Jun 13, 2022
Merged

Conversation

thinkharderdev
Copy link
Contributor

@thinkharderdev thinkharderdev commented Jun 10, 2022

Which issue does this PR close?

Closes #1327

TODO

  • Implement CUBE expansion
  • Implement ROLLUP expansion
  • Add SQL tests for CUBE/ROLLUP queries

Note that currently the sql parser doesn't seem to handle GROUP BY GROUPING SETS ... so we need to address that to test that explicitly.

Rationale for this change

This PR adds support for GROUPING SETS (and special cases CUBE/ROLLUP) in the physical planner and execution plan.

What changes are included in this PR?

There are three primary changes:

  1. AggregateExec now takes a Vec<Vec<(Arc<dyn PhysicalExpr>,String)>> to represent grouping sets. A normal GROUP BY is just a special case. We expect the grouping sets to be "aligned". For example, for a SQL clause like GROUP BY GROUPING SETS ((a),(b),(a,b)), AggregateExec assumes that the planner will expand that to the grouping set ((a,NULL),(NULL,b),(a,b)). We can't handle this in the execution plan because we don't have ParialEq for PhysicalExpr.
  2. In DefaultPhysicalPlanner handle expanding and aligning grouping sets. This includes expanding CUBE/ROLLUP expressions and merging and aligning GROUPING SET expressions.
  3. Handle grouping sets correctly in optimizers.

Also we include serialization for grouping set expression in datafusion-proto

Are there any user-facing changes?

SQL statements with CUBE/ROLLUP should now be supported. GROUPING SETS should also be supported but it seems like the sql parser is not handling them correctly.

I don't think so.

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules labels Jun 10, 2022
@thinkharderdev thinkharderdev marked this pull request as draft June 10, 2022 12:20
@thinkharderdev
Copy link
Contributor Author

cc @alamb @tustvold @jimexist @yjshen @andygrove

The part of this that I am least confident about is that I didn't break anything in any of the optimizers :). So if someone familiar with that code can review that part I would be very grateful.

@alamb
Copy link
Contributor

alamb commented Jun 10, 2022

Thanks @thinkharderdev -- I'll try and find some time to review this over the weekend.

@thinkharderdev thinkharderdev marked this pull request as ready for review June 11, 2022 12:12
@codecov-commenter
Copy link

codecov-commenter commented Jun 11, 2022

Codecov Report

Merging #2716 (a2cb52d) into master (080c324) will increase coverage by 0.13%.
The diff coverage is 93.96%.

@@            Coverage Diff             @@
##           master    #2716      +/-   ##
==========================================
+ Coverage   84.72%   84.86%   +0.13%     
==========================================
  Files         270      270              
  Lines       47254    47717     +463     
==========================================
+ Hits        40036    40495     +459     
- Misses       7218     7222       +4     
Impacted Files Coverage Δ
datafusion/expr/src/expr_fn.rs 88.23% <0.00%> (-3.23%) ⬇️
datafusion/expr/src/utils.rs 90.80% <71.42%> (-0.39%) ⬇️
datafusion/proto/src/from_proto.rs 34.64% <81.25%> (+0.85%) ⬆️
...atafusion/core/src/physical_plan/aggregates/mod.rs 91.16% <84.49%> (-3.44%) ⬇️
datafusion/core/src/physical_plan/planner.rs 80.83% <96.55%> (+2.61%) ⬆️
datafusion/core/tests/dataframe.rs 98.62% <97.18%> (-1.38%) ⬇️
datafusion/core/tests/sql/aggregates.rs 99.27% <98.24%> (-0.10%) ⬇️
...ore/src/physical_optimizer/aggregate_statistics.rs 100.00% <100.00%> (ø)
...afusion/core/src/physical_optimizer/repartition.rs 100.00% <100.00%> (ø)
...tafusion/core/src/physical_plan/aggregates/hash.rs 92.95% <100.00%> (+0.04%) ⬆️
... and 17 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 080c324...a2cb52d. Read the comment docs.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @thinkharderdev and @Tomczik76 -- this is super cool. I haven't made it all the way through yet but what I have reviewed is 👌

I found the whitespace blind diff easier to review: https://github.com/apache/arrow-datafusion/pull/2716/files?w=1

cc @andygrove @liukun4515

&input.schema(),
&grouping_set.expr,
&aggr_expr,
grouping_set.groups.iter().flatten().any(|is_null| *is_null),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if extracting this code to a function such as GroupingSets::contains_null() might make the code easier to read. The same comment applies to other places where GroupingSets::groups is referenced as well.

Given the size of this PR already, definitely could be done as a follow on

@@ -204,6 +204,7 @@
/// DataFusion crate version
pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION");

extern crate core;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not :) Not sure where it came from but removed now.

@@ -110,12 +111,15 @@ impl GroupedHashAggregateStreamV2 {
// The expressions to evaluate the batch, one vec of expressions per aggregation.
// Assume create_schema() always put group columns in front of aggr columns, we set
// col_idx_base to group expression count.
let aggregate_expressions =
aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
let aggregate_expressions = aggregates::aggregate_expressions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @yjshen -- it would be really nice to try and consolidate row_hash and hash -- filed #2723 to track 👍

GroupingSet::GroupingSets(groups) => {
let mut exprs: Vec<Expr> = vec![];
for exp in groups.iter().flatten() {
if !exprs.contains(exp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is N^2 in the number of grouping sets -- probably not an issue, I just figured I would point it out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is unfortunate

"| e | 4 | | -16064.57142857143 |",
"| e | 5 | -86 | 32514 |",
"| e | 5 | 64 | -26526 |",
"| e | 5 | | 2994 |",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -250,6 +280,29 @@ mod tests {
Ok(())
}

#[test]
fn single_distinct_and_grouping_set() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there is special handling for CUBE and ROLLUP in this pass, I suggest test coverage for those cases too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think there is actually a bug in this. I'll work on a fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this optimization is a bit more complicated for grouping sets. We need to create a separate alias for each group. For the moment I have just disabled the optimization for this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think disabling the optimization for grouping sets is a wise idea.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks great to me. Thanks again!

I had a few minor comments (e.g. some left over printlns) but all in all I think this one is good to go

@@ -265,7 +265,7 @@ mod tests {

use crate::error::Result;
use crate::logical_plan::Operator;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the new name PhysicalGroupBy

@@ -65,13 +66,60 @@ pub enum AggregateMode {
FinalPartitioned,
}

/// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- this is super helpful

@@ -117,14 +171,16 @@ impl AggregateExec {

/// Grouping expressions
pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
&self.group_expr
// TODO Is this right?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so -- this seems to be used by the "use statistics instead of aggregates" optimization

/Users/alamb/Software/arrow-datafusion/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
113:             && final_agg_exec.group_expr().is_empty()
121:                         && partial_agg_exec.group_expr().is_empty()
/Users/alamb/Software/arrow-datafusion/datafusion/core/src/physical_plan/aggregates/mod.rs
728:         let groups = partial_aggregate.group_expr().to_vec();

In general, it might make sense to disable / skip all such optimizations in the cases of grouping sets / cube / rollup -- that would be the conservative approach and avoid potential subtle wrong answer bugs. As the feature is used more and people have a need to optimize it more, we can revisit the optimizations and make sure they are relevant to grouping sets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it would still be correct right? The aggregate stats are only used if there is no group by which this would still represent correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe this should just return &PhysicalGroupBy instead? I could see how this could lead to issues elsewhere if it is used for optimizations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning &PhysicalGroupBy sounds like a good future proof idea

@@ -62,9 +63,11 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
schema,
group_expr,
}) => {
if is_single_distinct_agg(plan) {
if is_single_distinct_agg(plan) && !contains_grouping_set(group_expr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea -- to skip grouping sets in optimizations

@@ -160,6 +166,7 @@ fn optimize_children(plan: &LogicalPlan) -> Result<LogicalPlan> {
}

fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
// false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over?

@@ -212,6 +224,9 @@ mod tests {
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");

println!("{:?}", optimized_plan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left over?

Comment on lines 571 to 572
let contains_dict = groups
.expr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a minor thing, but one might imagine keeping the fields of PhysicalGroupBy private and adding functions like fn expr() and fn is_empty() mostly as a way of additional documentation

Copy link
Contributor Author

@thinkharderdev thinkharderdev Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that is a good idea. Fixed.

Comment on lines 1113 to 1120
let mut group: Vec<bool> = Vec::with_capacity(expr_count);
for expr in all_exprs.iter() {
if expr_group.contains(expr) {
group.push(false);
} else {
group.push(true)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters, but you can probably express this in a functional style like:

Suggested change
let mut group: Vec<bool> = Vec::with_capacity(expr_count);
for expr in all_exprs.iter() {
if expr_group.contains(expr) {
group.push(false);
} else {
group.push(true)
}
}
let group: Vec<bool> = all_exprs.iter()
.map(expr_group.contains(expr))
.collect();

Comment on lines 1624 to 1627
.aggregate(
vec![cube(vec![col("c1"), col("c2"), col("c3")])],
vec![sum(col("c2"))],
)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the need to creating the aggregate on the logical plan (as then new cube expressions are planned below). Can you simply use the output of the project plan?

The same question applies to the other plans below

assert_batches_sorted_eq!(expected, &results);
Ok(())
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test coverage is quite good. Thank you

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -117,14 +171,16 @@ impl AggregateExec {

/// Grouping expressions
pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
&self.group_expr
// TODO Is this right?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning &PhysicalGroupBy sounds like a good future proof idea

@alamb
Copy link
Contributor

alamb commented Jun 13, 2022

Looks great to me -- thanks again for all the work @thinkharderdev 🎉

@alamb alamb merged commit ca5339b into apache:master Jun 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

implement grouping sets, cubes, and rollups
4 participants