Skip to content

Commit

Permalink
Row accumulator support update Scalar values (#6003)
Browse files Browse the repository at this point in the history
* support update RowAccumulators using Scalar values

* fix group by count multi exprs

* refine hot path, avoid Vec creation

* fix UT

* resolve review comments

* remove redundant null check
  • Loading branch information
mingmwang authored Apr 20, 2023
1 parent 9798fbc commit 10b0eff
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 41 deletions.
2 changes: 1 addition & 1 deletion datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ impl std::hash::Hash for ScalarValue {
/// return a reference to the values array and the index into it for a
/// dictionary array
#[inline]
fn get_dict_value<K: ArrowDictionaryKeyType>(
pub fn get_dict_value<K: ArrowDictionaryKeyType>(
array: &dyn Array,
index: usize,
) -> (&ArrayRef, Option<usize>) {
Expand Down
146 changes: 114 additions & 32 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ use futures::stream::{Stream, StreamExt};

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};

use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder};
use arrow::array::*;
use arrow::compute::{cast, filter};
use arrow::datatypes::{DataType, Schema, UInt32Type};
use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
Expand All @@ -53,6 +52,7 @@ use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::MutableRecordBatch;
use hashbrown::raw::RawTable;
use itertools::izip;

/// Grouping aggregate with row-format aggregation states inside.
///
Expand Down Expand Up @@ -409,7 +409,7 @@ impl GroupedHashAggregateStream {

// Update the accumulator results, according to row_aggr_state.
#[allow(clippy::too_many_arguments)]
fn update_accumulators(
fn update_accumulators_using_batch(
&mut self,
groups_with_rows: &[usize],
offsets: &[usize],
Expand Down Expand Up @@ -490,6 +490,55 @@ impl GroupedHashAggregateStream {
Ok(())
}

// Update the accumulator results, according to row_aggr_state.
fn update_accumulators_using_scalar(
&mut self,
groups_with_rows: &[usize],
row_values: &[Vec<ArrayRef>],
row_filter_values: &[Option<ArrayRef>],
) -> Result<()> {
let filter_bool_array = row_filter_values
.iter()
.map(|filter_opt| match filter_opt {
Some(f) => Ok(Some(as_boolean_array(f)?)),
None => Ok(None),
})
.collect::<Result<Vec<_>>>()?;

for group_idx in groups_with_rows {
let group_state = &mut self.aggr_state.group_states[*group_idx];
let mut state_accessor =
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
for idx in &group_state.indices {
for (accumulator, values_array, filter_array) in izip!(
self.row_accumulators.iter_mut(),
row_values.iter(),
filter_bool_array.iter()
) {
if values_array.len() == 1 {
let scalar_value =
col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
} else {
let scalar_values = values_array
.iter()
.map(|array| {
col_to_scalar(array, filter_array, *idx as usize)
})
.collect::<Result<Vec<_>>>()?;
accumulator
.update_scalar_values(&scalar_values, &mut state_accessor)?;
}
}
}
// clear the group indices in this group
group_state.indices.clear();
}

Ok(())
}

/// Perform group-by aggregation for the given [`RecordBatch`].
///
/// If successful, this returns the additional number of bytes that were allocated during this process.
Expand All @@ -515,35 +564,50 @@ impl GroupedHashAggregateStream {
for group_values in &group_by_values {
let groups_with_rows =
self.update_group_state(group_values, &mut allocated)?;

// Collect all indices + offsets based on keys in this vec
let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
let indices = &self.aggr_state.group_states[group_idx].indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
// Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
// 1) The aggregation mode is Partial or Single
// 2) There is not normal aggregation expressions
// 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case
if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
&& normal_aggr_input_values.is_empty()
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / 10
{
self.update_accumulators_using_scalar(
&groups_with_rows,
&row_aggr_input_values,
&row_filter_values,
)?;
} else {
// Collect all indices + offsets based on keys in this vec
let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
let indices = &self.aggr_state.group_states[group_idx].indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();

let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
let normal_values =
get_at_indices(&normal_aggr_input_values, &batch_indices)?;
let row_filter_values =
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values, &batch_indices);
self.update_accumulators_using_batch(
&groups_with_rows,
&offsets,
&row_values,
&normal_values,
&row_filter_values,
&normal_filter_values,
&mut allocated,
)?;
}
let batch_indices = batch_indices.finish();

let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
let normal_values =
get_at_indices(&normal_aggr_input_values, &batch_indices)?;
let row_filter_values =
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values, &batch_indices);
self.update_accumulators(
&groups_with_rows,
&offsets,
&row_values,
&normal_values,
&row_filter_values,
&normal_filter_values,
&mut allocated,
)?;
}
allocated += self
.row_converter
Expand Down Expand Up @@ -791,3 +855,21 @@ fn slice_and_maybe_filter(
};
Ok(filtered_arrays)
}

/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
array: &ArrayRef,
filter: &Option<&BooleanArray>,
row_index: usize,
) -> Result<ScalarValue> {
if array.is_null(row_index) {
return Ok(ScalarValue::Null);
}
if let Some(filter) = filter {
if !filter.value(row_index) {
return Ok(ScalarValue::Null);
}
}
ScalarValue::try_from_array(array, row_index)
}
51 changes: 51 additions & 0 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,57 @@ async fn count_multi_expr() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn count_multi_expr_group_by() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
]));

let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![
Some(0),
None,
Some(1),
Some(2),
None,
])),
Arc::new(Int32Array::from(vec![
Some(1),
Some(1),
Some(0),
None,
None,
])),
Arc::new(Int32Array::from(vec![
Some(10),
Some(10),
Some(10),
Some(10),
Some(10),
])),
],
)?;

let ctx = SessionContext::new();
ctx.register_batch("test", data)?;
let sql = "SELECT c3, count(c1, c2) FROM test group by c3";
let actual = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+----+------------------------+",
"| c3 | COUNT(test.c1,test.c2) |",
"+----+------------------------+",
"| 10 | 2 |",
"+----+------------------------+",
];
assert_batches_sorted_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn simple_avg() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
Expand Down
23 changes: 19 additions & 4 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,24 @@ impl RowAccumulator for AvgRowAccumulator {
self.state_index() + 1,
accessor,
&sum::sum_batch(values, &self.sum_datatype)?,
)?;
Ok(())
)
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
let value = &values[0];
sum::update_avg_to_row(self.state_index(), accessor, value)
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
sum::update_avg_to_row(self.state_index(), accessor, value)
}

fn merge_batch(
Expand All @@ -315,8 +331,7 @@ impl RowAccumulator for AvgRowAccumulator {

// sum
let difference = sum::sum_batch(&states[1], &self.sum_datatype)?;
sum::add_to_row(self.state_index() + 1, accessor, &difference)?;
Ok(())
sum::add_to_row(self.state_index() + 1, accessor, &difference)
}

fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
Expand Down
25 changes: 25 additions & 0 deletions datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ impl RowAccumulator for CountRowAccumulator {
Ok(())
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
if !values.iter().any(|s| matches!(s, ScalarValue::Null)) {
accessor.add_u64(self.state_index, 1)
}
Ok(())
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
match value {
ScalarValue::Null => {
// do not update the accumulator
}
_ => accessor.add_u64(self.state_index, 1),
}
Ok(())
}

fn merge_batch(
&mut self,
states: &[ArrayRef],
Expand Down
40 changes: 38 additions & 2 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ macro_rules! min_max_v2 {
ScalarValue::Decimal128(rhs, ..) => {
typed_min_max_v2!($INDEX, $ACC, rhs, i128, $OP)
}
ScalarValue::Null => {
// do nothing
}
e => {
return Err(DataFusionError::Internal(format!(
"MIN/MAX is not expected to receive scalars of incompatible types {:?}",
Expand Down Expand Up @@ -709,8 +712,24 @@ impl RowAccumulator for MaxRowAccumulator {
) -> Result<()> {
let values = &values[0];
let delta = &max_batch(values)?;
max_row(self.index, accessor, delta)?;
Ok(())
max_row(self.index, accessor, delta)
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
let value = &values[0];
max_row(self.index, accessor, value)
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
max_row(self.index, accessor, value)
}

fn merge_batch(
Expand Down Expand Up @@ -956,6 +975,23 @@ impl RowAccumulator for MinRowAccumulator {
Ok(())
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
let value = &values[0];
min_row(self.index, accessor, value)
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
min_row(self.index, accessor, value)
}

fn merge_batch(
&mut self,
states: &[ArrayRef],
Expand Down
Loading

0 comments on commit 10b0eff

Please sign in to comment.