Skip to content

Commit

Permalink
Grouped Aggregate in row format (#2375)
Browse files Browse the repository at this point in the history
* first move: re-group aggregates functionalities in core/physical_p/aggregates

* basic accumulators

* main updating procedure

* output as record batch

* aggregate with row state

* make row non-optional

* address comments, add docs, part fix #2455

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
yjshen and alamb authored May 7, 2022
1 parent 32cf354 commit 6786203
Show file tree
Hide file tree
Showing 21 changed files with 1,581 additions and 50 deletions.
7 changes: 2 additions & 5 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ force_hash_collisions = []
jit = ["datafusion-jit"]
pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
# Used to enable row format experiment
row = ["datafusion-row"]
# Used to enable scheduler
scheduler = ["rayon"]
simd = ["arrow/simd"]
Expand All @@ -66,7 +64,7 @@ datafusion-data-access = { path = "../../data-access", version = "1.0.0" }
datafusion-expr = { path = "../expr", version = "7.0.0" }
datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
datafusion-physical-expr = { path = "../physical-expr", version = "7.0.0" }
datafusion-row = { path = "../row", version = "7.0.0", optional = true }
datafusion-row = { path = "../row", version = "7.0.0" }
futures = "0.3"
hashbrown = { version = "0.12", features = ["raw"] }
lazy_static = { version = "^1.4.0" }
Expand Down Expand Up @@ -134,8 +132,7 @@ name = "sql_planner"
[[bench]]
harness = false
name = "jit"
required-features = ["row", "jit"]
required-features = ["jit"]

[[test]]
name = "row"
required-features = ["row"]
10 changes: 10 additions & 0 deletions datafusion/core/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY u64_wide, utf8",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ pub use datafusion_data_access;
pub use datafusion_expr as logical_expr;
pub use datafusion_physical_expr as physical_expr;

#[cfg(feature = "row")]
pub use datafusion_row as row;

#[cfg(feature = "jit")]
Expand Down
25 changes: 3 additions & 22 deletions datafusion/core/src/physical_plan/aggregates/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use futures::{
};

use crate::error::Result;
use crate::physical_plan::aggregates::{AccumulatorItem, AggregateMode};
use crate::physical_plan::aggregates::{
evaluate, evaluate_many, AccumulatorItem, AggregateMode,
};
use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
Expand Down Expand Up @@ -380,27 +382,6 @@ impl std::fmt::Debug for Accumulators {
}
}

/// Evaluates expressions against a record batch.
fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
}

/// Evaluates expressions against a record batch.
fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter()
.map(|expr| evaluate(expr, batch))
.collect::<Result<Vec<_>>>()
}

/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(
mode: &AggregateMode,
Expand Down
67 changes: 67 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::physical_plan::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::Accumulator;
use datafusion_physical_expr::expressions::Column;
Expand All @@ -40,9 +41,13 @@ use std::sync::Arc;

mod hash;
mod no_grouping;
mod row_hash;

use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2;
pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_row::{row_supported, RowType};

/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -142,6 +147,12 @@ impl AggregateExec {
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}

fn row_aggregate_supported(&self) -> bool {
let group_schema = group_schema(&self.schema, self.group_expr.len());
row_supported(&group_schema, RowType::Compact)
&& accumulator_v2_supported(&self.aggr_expr)
}
}

impl ExecutionPlan for AggregateExec {
Expand Down Expand Up @@ -212,6 +223,15 @@ impl ExecutionPlan for AggregateExec {
input,
baseline_metrics,
)?))
} else if self.row_aggregate_supported() {
Ok(Box::pin(GroupedHashAggregateStreamV2::new(
self.mode,
self.schema.clone(),
group_expr,
self.aggr_expr.clone(),
input,
baseline_metrics,
)?))
} else {
Ok(Box::pin(GroupedHashAggregateStream::new(
self.mode,
Expand Down Expand Up @@ -315,6 +335,11 @@ fn create_schema(
Ok(Schema::new(fields))
}

fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
let group_fields = schema.fields()[0..group_count].to_vec();
Arc::new(Schema::new(group_fields))
}

/// returns physical expressions to evaluate against a batch
/// The expressions are different depending on `mode`:
/// * Partial: AggregateExpr::expressions
Expand Down Expand Up @@ -362,6 +387,7 @@ fn merge_expressions(
}

pub(crate) type AccumulatorItem = Box<dyn Accumulator>;
pub(crate) type AccumulatorItemV2 = Box<dyn RowAccumulator>;

fn create_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
Expand All @@ -372,6 +398,26 @@ fn create_accumulators(
.collect::<datafusion_common::Result<Vec<_>>>()
}

fn accumulator_v2_supported(aggr_expr: &[Arc<dyn AggregateExpr>]) -> bool {
aggr_expr
.iter()
.all(|expr| expr.row_accumulator_supported())
}

fn create_accumulators_v2(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> datafusion_common::Result<Vec<AccumulatorItemV2>> {
let mut state_index = 0;
aggr_expr
.iter()
.map(|expr| {
let result = expr.create_row_accumulator(state_index);
state_index += expr.state_fields().unwrap().len();
result
})
.collect::<datafusion_common::Result<Vec<_>>>()
}

/// returns a vector of ArrayRefs, where each entry corresponds to either the
/// final value (mode = Final) or states (mode = Partial)
fn finalize_aggregation(
Expand Down Expand Up @@ -402,6 +448,27 @@ fn finalize_aggregation(
}
}

/// Evaluates expressions against a record batch.
fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
}

/// Evaluates expressions against a record batch.
fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter()
.map(|expr| evaluate(expr, batch))
.collect::<Result<Vec<_>>>()
}

#[cfg(test)]
mod tests {
use crate::execution::context::TaskContext;
Expand Down
Loading

0 comments on commit 6786203

Please sign in to comment.