Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grouped Aggregate in row format #2375

Merged
merged 9 commits into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ force_hash_collisions = []
jit = ["datafusion-jit"]
pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
# Used to enable row format experiment
row = ["datafusion-row"]
# Used to enable scheduler
scheduler = ["rayon"]
simd = ["arrow/simd"]
Expand All @@ -66,7 +64,7 @@ datafusion-data-access = { path = "../../data-access", version = "1.0.0" }
datafusion-expr = { path = "../expr", version = "7.0.0" }
datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
datafusion-physical-expr = { path = "../physical-expr", version = "7.0.0" }
datafusion-row = { path = "../row", version = "7.0.0", optional = true }
datafusion-row = { path = "../row", version = "7.0.0" }
futures = "0.3"
hashbrown = { version = "0.12", features = ["raw"] }
lazy_static = { version = "^1.4.0" }
Expand Down Expand Up @@ -134,8 +132,7 @@ name = "sql_planner"
[[bench]]
harness = false
name = "jit"
required-features = ["row", "jit"]
required-features = ["jit"]

[[test]]
name = "row"
required-features = ["row"]
10 changes: 10 additions & 0 deletions datafusion/core/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY u64_wide, utf8",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ pub use datafusion_data_access;
pub use datafusion_expr as logical_expr;
pub use datafusion_physical_expr as physical_expr;

#[cfg(feature = "row")]
pub use datafusion_row as row;

#[cfg(feature = "jit")]
Expand Down
25 changes: 3 additions & 22 deletions datafusion/core/src/physical_plan/aggregates/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use futures::{
};

use crate::error::Result;
use crate::physical_plan::aggregates::{AccumulatorItem, AggregateMode};
use crate::physical_plan::aggregates::{
evaluate, evaluate_many, AccumulatorItem, AggregateMode,
};
use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
Expand Down Expand Up @@ -380,27 +382,6 @@ impl std::fmt::Debug for Accumulators {
}
}

/// Evaluates expressions against a record batch.
fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
}

/// Evaluates expressions against a record batch.
fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter()
.map(|expr| evaluate(expr, batch))
.collect::<Result<Vec<_>>>()
}

/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(
mode: &AggregateMode,
Expand Down
67 changes: 67 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::physical_plan::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::Accumulator;
use datafusion_physical_expr::expressions::Column;
Expand All @@ -40,9 +41,13 @@ use std::sync::Arc;

mod hash;
mod no_grouping;
mod row_hash;

use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2;
pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_row::{row_supported, RowType};

/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -142,6 +147,12 @@ impl AggregateExec {
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}

fn row_aggregate_supported(&self) -> bool {
let group_schema = group_schema(&self.schema, self.group_expr.len());
row_supported(&group_schema, RowType::Compact)
&& accumulator_v2_supported(&self.aggr_expr)
}
}

impl ExecutionPlan for AggregateExec {
Expand Down Expand Up @@ -212,6 +223,15 @@ impl ExecutionPlan for AggregateExec {
input,
baseline_metrics,
)?))
} else if self.row_aggregate_supported() {
Ok(Box::pin(GroupedHashAggregateStreamV2::new(
self.mode,
self.schema.clone(),
group_expr,
self.aggr_expr.clone(),
input,
baseline_metrics,
)?))
} else {
Ok(Box::pin(GroupedHashAggregateStream::new(
self.mode,
Expand Down Expand Up @@ -315,6 +335,11 @@ fn create_schema(
Ok(Schema::new(fields))
}

fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
let group_fields = schema.fields()[0..group_count].to_vec();
Arc::new(Schema::new(group_fields))
}

/// returns physical expressions to evaluate against a batch
/// The expressions are different depending on `mode`:
/// * Partial: AggregateExpr::expressions
Expand Down Expand Up @@ -362,6 +387,7 @@ fn merge_expressions(
}

pub(crate) type AccumulatorItem = Box<dyn Accumulator>;
pub(crate) type AccumulatorItemV2 = Box<dyn RowAccumulator>;

fn create_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
Expand All @@ -372,6 +398,26 @@ fn create_accumulators(
.collect::<datafusion_common::Result<Vec<_>>>()
}

fn accumulator_v2_supported(aggr_expr: &[Arc<dyn AggregateExpr>]) -> bool {
aggr_expr
.iter()
.all(|expr| expr.row_accumulator_supported())
}

fn create_accumulators_v2(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> datafusion_common::Result<Vec<AccumulatorItemV2>> {
let mut state_index = 0;
aggr_expr
.iter()
.map(|expr| {
let result = expr.create_row_accumulator(state_index);
state_index += expr.state_fields().unwrap().len();
result
})
.collect::<datafusion_common::Result<Vec<_>>>()
}

/// returns a vector of ArrayRefs, where each entry corresponds to either the
/// final value (mode = Final) or states (mode = Partial)
fn finalize_aggregation(
Expand Down Expand Up @@ -402,6 +448,27 @@ fn finalize_aggregation(
}
}

/// Evaluates expressions against a record batch.
fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
}

/// Evaluates expressions against a record batch.
fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter()
.map(|expr| evaluate(expr, batch))
.collect::<Result<Vec<_>>>()
}

#[cfg(test)]
mod tests {
use crate::execution::context::TaskContext;
Expand Down
Loading