diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index c22616883c54..f313e662da79 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1443,7 +1443,7 @@ impl std::hash::Hash for ScalarValue { /// return a reference to the values array and the index into it for a /// dictionary array #[inline] -fn get_dict_value( +pub fn get_dict_value( array: &dyn Array, index: usize, ) -> (&ArrayRef, Option) { diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index d9e42e478de0..bf1846ae983d 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -31,6 +31,7 @@ use futures::stream::{Stream, StreamExt}; use crate::execution::context::TaskContext; use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; +use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use crate::physical_plan::aggregates::{ evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem, AggregateMode, PhysicalGroupBy, RowAccumulatorItem, @@ -38,9 +39,7 @@ use crate::physical_plan::aggregates::{ use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; - -use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder}; +use arrow::array::*; use arrow::compute::{cast, filter}; use arrow::datatypes::{DataType, Schema, UInt32Type}; use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch}; @@ -53,6 +52,7 @@ use datafusion_row::layout::RowLayout; use datafusion_row::reader::{read_row, RowReader}; use datafusion_row::MutableRecordBatch; use hashbrown::raw::RawTable; +use itertools::izip; /// Grouping aggregate with row-format aggregation states inside. /// @@ -409,7 +409,7 @@ impl GroupedHashAggregateStream { // Update the accumulator results, according to row_aggr_state. #[allow(clippy::too_many_arguments)] - fn update_accumulators( + fn update_accumulators_using_batch( &mut self, groups_with_rows: &[usize], offsets: &[usize], @@ -490,6 +490,55 @@ impl GroupedHashAggregateStream { Ok(()) } + // Update the accumulator results, according to row_aggr_state. + fn update_accumulators_using_scalar( + &mut self, + groups_with_rows: &[usize], + row_values: &[Vec], + row_filter_values: &[Option], + ) -> Result<()> { + let filter_bool_array = row_filter_values + .iter() + .map(|filter_opt| match filter_opt { + Some(f) => Ok(Some(as_boolean_array(f)?)), + None => Ok(None), + }) + .collect::>>()?; + + for group_idx in groups_with_rows { + let group_state = &mut self.aggr_state.group_states[*group_idx]; + let mut state_accessor = + RowAccessor::new_from_layout(self.row_aggr_layout.clone()); + state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice()); + for idx in &group_state.indices { + for (accumulator, values_array, filter_array) in izip!( + self.row_accumulators.iter_mut(), + row_values.iter(), + filter_bool_array.iter() + ) { + if values_array.len() == 1 { + let scalar_value = + col_to_scalar(&values_array[0], filter_array, *idx as usize)?; + accumulator.update_scalar(&scalar_value, &mut state_accessor)?; + } else { + let scalar_values = values_array + .iter() + .map(|array| { + col_to_scalar(array, filter_array, *idx as usize) + }) + .collect::>>()?; + accumulator + .update_scalar_values(&scalar_values, &mut state_accessor)?; + } + } + } + // clear the group indices in this group + group_state.indices.clear(); + } + + Ok(()) + } + /// Perform group-by aggregation for the given [`RecordBatch`]. /// /// If successful, this returns the additional number of bytes that were allocated during this process. @@ -515,35 +564,50 @@ impl GroupedHashAggregateStream { for group_values in &group_by_values { let groups_with_rows = self.update_group_state(group_values, &mut allocated)?; - - // Collect all indices + offsets based on keys in this vec - let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0); - let mut offsets = vec![0]; - let mut offset_so_far = 0; - for &group_idx in groups_with_rows.iter() { - let indices = &self.aggr_state.group_states[group_idx].indices; - batch_indices.append_slice(indices); - offset_so_far += indices.len(); - offsets.push(offset_so_far); + // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet: + // 1) The aggregation mode is Partial or Single + // 2) There is not normal aggregation expressions + // 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case + if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single) + && normal_aggr_input_values.is_empty() + && normal_filter_values.is_empty() + && groups_with_rows.len() >= batch.num_rows() / 10 + { + self.update_accumulators_using_scalar( + &groups_with_rows, + &row_aggr_input_values, + &row_filter_values, + )?; + } else { + // Collect all indices + offsets based on keys in this vec + let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0); + let mut offsets = vec![0]; + let mut offset_so_far = 0; + for &group_idx in groups_with_rows.iter() { + let indices = &self.aggr_state.group_states[group_idx].indices; + batch_indices.append_slice(indices); + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.finish(); + + let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?; + let normal_values = + get_at_indices(&normal_aggr_input_values, &batch_indices)?; + let row_filter_values = + get_optional_filters(&row_filter_values, &batch_indices); + let normal_filter_values = + get_optional_filters(&normal_filter_values, &batch_indices); + self.update_accumulators_using_batch( + &groups_with_rows, + &offsets, + &row_values, + &normal_values, + &row_filter_values, + &normal_filter_values, + &mut allocated, + )?; } - let batch_indices = batch_indices.finish(); - - let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?; - let normal_values = - get_at_indices(&normal_aggr_input_values, &batch_indices)?; - let row_filter_values = - get_optional_filters(&row_filter_values, &batch_indices); - let normal_filter_values = - get_optional_filters(&normal_filter_values, &batch_indices); - self.update_accumulators( - &groups_with_rows, - &offsets, - &row_values, - &normal_values, - &row_filter_values, - &normal_filter_values, - &mut allocated, - )?; } allocated += self .row_converter @@ -791,3 +855,21 @@ fn slice_and_maybe_filter( }; Ok(filtered_arrays) } + +/// This method is similar to Scalar::try_from_array except for the Null handling. +/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)] +fn col_to_scalar( + array: &ArrayRef, + filter: &Option<&BooleanArray>, + row_index: usize, +) -> Result { + if array.is_null(row_index) { + return Ok(ScalarValue::Null); + } + if let Some(filter) = filter { + if !filter.value(row_index) { + return Ok(ScalarValue::Null); + } + } + ScalarValue::try_from_array(array, row_index) +} diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 10f838e24963..e847ea0c0ebf 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -543,6 +543,57 @@ async fn count_multi_expr() -> Result<()> { Ok(()) } +#[tokio::test] +async fn count_multi_expr_group_by() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + ])); + + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![ + Some(0), + None, + Some(1), + Some(2), + None, + ])), + Arc::new(Int32Array::from(vec![ + Some(1), + Some(1), + Some(0), + None, + None, + ])), + Arc::new(Int32Array::from(vec![ + Some(10), + Some(10), + Some(10), + Some(10), + Some(10), + ])), + ], + )?; + + let ctx = SessionContext::new(); + ctx.register_batch("test", data)?; + let sql = "SELECT c3, count(c1, c2) FROM test group by c3"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec![ + "+----+------------------------+", + "| c3 | COUNT(test.c1,test.c2) |", + "+----+------------------------+", + "| 10 | 2 |", + "+----+------------------------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn simple_avg() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index f898214b4b55..2fe44602d831 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -299,8 +299,24 @@ impl RowAccumulator for AvgRowAccumulator { self.state_index() + 1, accessor, &sum::sum_batch(values, &self.sum_datatype)?, - )?; - Ok(()) + ) + } + + fn update_scalar_values( + &mut self, + values: &[ScalarValue], + accessor: &mut RowAccessor, + ) -> Result<()> { + let value = &values[0]; + sum::update_avg_to_row(self.state_index(), accessor, value) + } + + fn update_scalar( + &mut self, + value: &ScalarValue, + accessor: &mut RowAccessor, + ) -> Result<()> { + sum::update_avg_to_row(self.state_index(), accessor, value) } fn merge_batch( @@ -315,8 +331,7 @@ impl RowAccumulator for AvgRowAccumulator { // sum let difference = sum::sum_batch(&states[1], &self.sum_datatype)?; - sum::add_to_row(self.state_index() + 1, accessor, &difference)?; - Ok(()) + sum::add_to_row(self.state_index() + 1, accessor, &difference) } fn evaluate(&self, accessor: &RowAccessor) -> Result { diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index c00520516a25..15df28b4e38a 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -242,6 +242,31 @@ impl RowAccumulator for CountRowAccumulator { Ok(()) } + fn update_scalar_values( + &mut self, + values: &[ScalarValue], + accessor: &mut RowAccessor, + ) -> Result<()> { + if !values.iter().any(|s| matches!(s, ScalarValue::Null)) { + accessor.add_u64(self.state_index, 1) + } + Ok(()) + } + + fn update_scalar( + &mut self, + value: &ScalarValue, + accessor: &mut RowAccessor, + ) -> Result<()> { + match value { + ScalarValue::Null => { + // do not update the accumulator + } + _ => accessor.add_u64(self.state_index, 1), + } + Ok(()) + } + fn merge_batch( &mut self, states: &[ArrayRef], diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index e695ac400d4d..3a3d52983979 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -565,6 +565,9 @@ macro_rules! min_max_v2 { ScalarValue::Decimal128(rhs, ..) => { typed_min_max_v2!($INDEX, $ACC, rhs, i128, $OP) } + ScalarValue::Null => { + // do nothing + } e => { return Err(DataFusionError::Internal(format!( "MIN/MAX is not expected to receive scalars of incompatible types {:?}", @@ -709,8 +712,24 @@ impl RowAccumulator for MaxRowAccumulator { ) -> Result<()> { let values = &values[0]; let delta = &max_batch(values)?; - max_row(self.index, accessor, delta)?; - Ok(()) + max_row(self.index, accessor, delta) + } + + fn update_scalar_values( + &mut self, + values: &[ScalarValue], + accessor: &mut RowAccessor, + ) -> Result<()> { + let value = &values[0]; + max_row(self.index, accessor, value) + } + + fn update_scalar( + &mut self, + value: &ScalarValue, + accessor: &mut RowAccessor, + ) -> Result<()> { + max_row(self.index, accessor, value) } fn merge_batch( @@ -956,6 +975,23 @@ impl RowAccumulator for MinRowAccumulator { Ok(()) } + fn update_scalar_values( + &mut self, + values: &[ScalarValue], + accessor: &mut RowAccessor, + ) -> Result<()> { + let value = &values[0]; + min_row(self.index, accessor, value) + } + + fn update_scalar( + &mut self, + value: &ScalarValue, + accessor: &mut RowAccessor, + ) -> Result<()> { + min_row(self.index, accessor, value) + } + fn merge_batch( &mut self, states: &[ArrayRef], diff --git a/datafusion/physical-expr/src/aggregate/row_accumulator.rs b/datafusion/physical-expr/src/aggregate/row_accumulator.rs index 00717a113f9b..19e847b3e701 100644 --- a/datafusion/physical-expr/src/aggregate/row_accumulator.rs +++ b/datafusion/physical-expr/src/aggregate/row_accumulator.rs @@ -51,6 +51,20 @@ pub trait RowAccumulator: Send + Sync + Debug { accessor: &mut RowAccessor, ) -> Result<()>; + /// updates the accumulator's state from a vector of Scalar value. + fn update_scalar_values( + &mut self, + values: &[ScalarValue], + accessor: &mut RowAccessor, + ) -> Result<()>; + + /// updates the accumulator's state from a Scalar value. + fn update_scalar( + &mut self, + value: &ScalarValue, + accessor: &mut RowAccessor, + ) -> Result<()>; + /// updates the accumulator's state from a vector of states. fn merge_batch( &mut self, diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index abf67933ebd9..e08726e465a2 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -240,12 +240,26 @@ macro_rules! sum_row { }}; } +macro_rules! avg_row { + ($INDEX:ident, $ACC:ident, $DELTA:expr, $TYPE:ident) => {{ + paste::item! { + if let Some(v) = $DELTA { + $ACC.add_u64($INDEX, 1); + $ACC.[]($INDEX + 1, *v) + } + } + }}; +} + pub(crate) fn add_to_row( index: usize, accessor: &mut RowAccessor, s: &ScalarValue, ) -> Result<()> { match s { + ScalarValue::Null => { + // do nothing + } ScalarValue::Float64(rhs) => { sum_row!(index, accessor, rhs, f64) } @@ -270,6 +284,39 @@ pub(crate) fn add_to_row( Ok(()) } +pub(crate) fn update_avg_to_row( + index: usize, + accessor: &mut RowAccessor, + s: &ScalarValue, +) -> Result<()> { + match s { + ScalarValue::Null => { + // do nothing + } + ScalarValue::Float64(rhs) => { + avg_row!(index, accessor, rhs, f64) + } + ScalarValue::Float32(rhs) => { + avg_row!(index, accessor, rhs, f32) + } + ScalarValue::UInt64(rhs) => { + avg_row!(index, accessor, rhs, u64) + } + ScalarValue::Int64(rhs) => { + avg_row!(index, accessor, rhs, i64) + } + ScalarValue::Decimal128(rhs, _, _) => { + avg_row!(index, accessor, rhs, i128) + } + _ => { + let msg = + format!("Row avg updater is not expected to receive a scalar {s:?}"); + return Err(DataFusionError::Internal(msg)); + } + } + Ok(()) +} + impl Accumulator for SumAccumulator { fn state(&self) -> Result> { Ok(vec![self.sum.clone(), ScalarValue::from(self.count)]) @@ -331,8 +378,24 @@ impl RowAccumulator for SumRowAccumulator { ) -> Result<()> { let values = &values[0]; let delta = sum_batch(values, &self.datatype)?; - add_to_row(self.index, accessor, &delta)?; - Ok(()) + add_to_row(self.index, accessor, &delta) + } + + fn update_scalar_values( + &mut self, + values: &[ScalarValue], + accessor: &mut RowAccessor, + ) -> Result<()> { + let value = &values[0]; + add_to_row(self.index, accessor, value) + } + + fn update_scalar( + &mut self, + value: &ScalarValue, + accessor: &mut RowAccessor, + ) -> Result<()> { + add_to_row(self.index, accessor, value) } fn merge_batch( diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs index bba44f0e56a5..14a7ca264c9b 100644 --- a/datafusion/row/src/accessor.rs +++ b/datafusion/row/src/accessor.rs @@ -71,6 +71,7 @@ macro_rules! fn_add_idx { ($NATIVE: ident) => { paste::item! { /// add field at `idx` with `value` + #[inline(always)] pub fn [](&mut self, idx: usize, value: $NATIVE) { if self.is_valid_at(idx) { self.[](idx, value + self.[](idx)); @@ -87,6 +88,7 @@ macro_rules! fn_max_min_idx { ($NATIVE: ident, $OP: ident) => { paste::item! { /// check max then update + #[inline(always)] pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { if self.is_valid_at(idx) { let v = value.$OP(self.[](idx)); @@ -103,6 +105,7 @@ macro_rules! fn_max_min_idx { macro_rules! fn_get_idx_scalar { ($NATIVE: ident, $SCALAR:ident) => { paste::item! { + #[inline(always)] pub fn [](&self, idx: usize) -> ScalarValue { if self.is_valid_at(idx) { ScalarValue::$SCALAR(Some(self.[](idx)))