Skip to content

Commit

Permalink
Support for GROUPING SETS/CUBE/ROLLUP (#2716)
Browse files Browse the repository at this point in the history
* WIP

* Implement for non-row based accumulators

* Non-row aggregations

* Map logical plan and add some basic tests

* Handle grouping sets in various optimize passes.

* Implemented create_cube_expr and create_rollup_expr functions

* Cleanup and ignore SingleDistinctToGroupBy when using grouping sets for now

* Handle grouping sets in SingleDistinctToGroupBy

* Add more tests and burn the boats

* Fix(ish) partitioning

* Serialization for grouping set exprs

* fixed bug with create_cube_expr function

* fixed bug with create_cube_expr function

* Fixed bug in row-based-aggregation

* Added unit tests for test_create_rollup_expr and test_create_cube_expr

* Formatting

* Tests, linter fixes and docs

* Linting

* Better encoding which avoids evaluating grouping expressions redundantly

* Remove commented code

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* PR Comments: Rename PhysicalGroupingSet -> PhysicalGroupBy and clarify doc comment

* Disable single_distinct_to_groupby for grouping sets for now and add unit tests for single distinct queries.

* PR comments

* Remove old comment

* Return PhysicalGroupBy from AggregateExec::group_expr

Co-authored-by: Ryan Tomczik <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Jun 13, 2022
1 parent a957778 commit ca5339b
Show file tree
Hide file tree
Showing 18 changed files with 2,044 additions and 381 deletions.
26 changes: 13 additions & 13 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ mod tests {

use crate::error::Result;
use crate::logical_plan::Operator;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::expressions::Count;
Expand Down Expand Up @@ -407,15 +407,15 @@ mod tests {

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
source,
Arc::clone(&schema),
)?;

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -435,15 +435,15 @@ mod tests {

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
source,
Arc::clone(&schema),
)?;

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -462,7 +462,7 @@ mod tests {

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
source,
Arc::clone(&schema),
Expand All @@ -473,7 +473,7 @@ mod tests {

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
Arc::new(coalesce),
Arc::clone(&schema),
Expand All @@ -492,7 +492,7 @@ mod tests {

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
source,
Arc::clone(&schema),
Expand All @@ -503,7 +503,7 @@ mod tests {

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
Arc::new(coalesce),
Arc::clone(&schema),
Expand Down Expand Up @@ -533,15 +533,15 @@ mod tests {

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
filter,
Arc::clone(&schema),
)?;

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down Expand Up @@ -576,15 +576,15 @@ mod tests {

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
filter,
Arc::clone(&schema),
)?;

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
PhysicalGroupBy::default(),
vec![agg.count_expr()],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::{col, PhysicalSortExpr};
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
Expand Down Expand Up @@ -305,12 +307,12 @@ mod tests {
Arc::new(
AggregateExec::try_new(
AggregateMode::Final,
vec![],
PhysicalGroupBy::default(),
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
vec![],
PhysicalGroupBy::default(),
vec![],
input,
schema.clone(),
Expand Down
Loading

0 comments on commit ca5339b

Please sign in to comment.