From 2d7b6a73d3a0957a49f3d574c071e0eaaaed2286 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sat, 20 Jul 2024 09:43:15 +0300 Subject: [PATCH] rfc: optional skipping partial aggregation --- datafusion/common/src/config.rs | 9 + datafusion/expr/src/groups_accumulator.rs | 20 +- datafusion/functions-aggregate/src/count.rs | 44 +++++ .../aggregate/groups_accumulator/prim_op.rs | 42 +++- .../physical-plan/src/aggregates/mod.rs | 181 ++++++++++++++++++ .../physical-plan/src/aggregates/row_hash.rs | 151 +++++++++++++++ .../test_files/aggregate_skip_partial.slt | 177 +++++++++++++++++ 7 files changed, 622 insertions(+), 2 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/aggregate_skip_partial.slt diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 3cbe14cb558eb..509287b060cfd 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -321,6 +321,15 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false + + /// Aggregation rate (number of distinct groups / number of input rows) + /// threshold for skipping partial aggregation. If the value is greater + /// then partial aggregation will skip aggregation for further input + pub skip_partial_aggregation_probe_rate_threshold: f64, default = 0.8 + + /// Number of input rows partial aggregation partition should process, before + /// aggregation rate check and trying to switch to skipping aggregation mode + pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 } } diff --git a/datafusion/expr/src/groups_accumulator.rs b/datafusion/expr/src/groups_accumulator.rs index 2ffbfb266e9ca..d9bf1457efad8 100644 --- a/datafusion/expr/src/groups_accumulator.rs +++ b/datafusion/expr/src/groups_accumulator.rs @@ -18,7 +18,7 @@ //! Vectorized [`GroupsAccumulator`] use arrow_array::{ArrayRef, BooleanArray}; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] @@ -158,6 +158,24 @@ pub trait GroupsAccumulator: Send { total_num_groups: usize, ) -> Result<()>; + /// Converts input batch to intermediate aggregate state, + /// without grouping (each input row considered as a separate + /// group). + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + not_impl_err!("Input batch conversion to state not implemented") + } + + /// Returns `true` is groups accumulator supports input batch + /// to intermediate aggregate state conversion (`convert_to_state` + /// method is implemented). + fn convert_to_state_supported(&self) -> bool { + false + } + /// Amount of memory used to store the state of this accumulator, /// in bytes. This function is called once per batch, so it should /// be `O(n)` to compute, not `O(num_groups)` diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 0ead22e90a163..079816dc44d85 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,6 +16,7 @@ // under the License. use ahash::RandomState; +use arrow::array::Int64Builder; use std::collections::HashSet; use std::ops::BitAnd; use std::{fmt::Debug, sync::Arc}; @@ -425,6 +426,45 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(Arc::new(array)) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = &values[0]; + + let state_array = match (values.logical_nulls(), opt_filter) { + (Some(nulls), None) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls + .into_iter() + .for_each(|is_valid| builder.append_value(is_valid as i64)); + builder.finish() + } + (Some(nulls), Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls.into_iter().zip(filter.iter()).for_each( + |(is_valid, filter_value)| { + builder.append_value( + (is_valid && filter_value.is_some_and(|val| val)) as i64, + ) + }, + ); + builder.finish() + } + (None, Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + filter.into_iter().for_each(|filter_value| { + builder.append_value(filter_value.is_some_and(|val| val) as i64) + }); + builder.finish() + } + (None, None) => Int64Array::from_value(1, values.len()), + }; + + Ok(vec![Arc::new(state_array)]) + } + // return arrays for counts fn state(&mut self, emit_to: EmitTo) -> Result> { let counts = emit_to.take_needed(&mut self.counts); @@ -432,6 +472,10 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(vec![Arc::new(counts) as ArrayRef]) } + fn convert_to_state_supported(&self) -> bool { + true + } + fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() } diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index debb36852b224..e766d9facffd1 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, PrimitiveBuilder}; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; use datafusion_common::Result; @@ -111,6 +111,42 @@ where Ok(()) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = values[0].as_primitive::(); + let mut state = PrimitiveBuilder::::with_capacity(values.len()) + .with_data_type(self.data_type.clone()); + + match opt_filter { + Some(filter) => { + values + .iter() + .zip(filter.iter()) + .for_each(|(val, filter_val)| match (val, filter_val) { + (Some(val), Some(true)) => { + let mut state_val = self.starting_value; + (self.prim_fn)(&mut state_val, val); + state.append_value(state_val); + } + (_, _) => state.append_null(), + }) + } + None => values.iter().for_each(|val| match val { + Some(val) => { + let mut state_val = self.starting_value; + (self.prim_fn)(&mut state_val, val); + state.append_value(state_val); + } + None => state.append_null(), + }), + }; + + Ok(vec![Arc::new(state.finish())]) + } + fn evaluate(&mut self, emit_to: EmitTo) -> Result { let values = emit_to.take_needed(&mut self.values); let nulls = self.null_state.build(emit_to); @@ -134,6 +170,10 @@ where self.update_batch(values, group_indices, opt_filter, total_num_groups) } + fn convert_to_state_supported(&self) -> bool { + true + } + fn size(&self) -> usize { self.values.capacity() * std::mem::size_of::() + self.null_state.size() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e7cd5cb2725be..831e1c6d03cf7 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2442,4 +2442,185 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_skip_aggregation_after_first_batch() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = vec![create_aggregate_expr( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(2)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rate_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } + + #[tokio::test] + async fn test_skip_aggregation_after_threshold() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = vec![create_aggregate_expr( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(5)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rate_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 2 |", + "| 4 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 167ca72407503..8262b54fd6b71 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -62,6 +62,7 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), + SkippingAggregation, Done, } @@ -90,6 +91,52 @@ struct SpillState { merging_group_by: PhysicalGroupBy, } +struct SkipAggregationProbe { + input_rows: usize, + num_groups: usize, + + probe_rows_threshold: usize, + probe_rate_threshold: f64, + + should_skip: bool, + is_locked: bool, +} + +impl SkipAggregationProbe { + fn new(probe_rows_threshold: usize, probe_rate_threshold: f64) -> Self { + Self { + input_rows: 0, + num_groups: 0, + probe_rows_threshold, + probe_rate_threshold, + should_skip: false, + is_locked: false, + } + } + + fn update_state(&mut self, input_rows: usize, num_groups: usize) { + if self.is_locked { + return; + } + self.input_rows += input_rows; + self.num_groups = num_groups; + if self.input_rows >= self.probe_rows_threshold { + self.should_skip = self.num_groups as f64 / self.input_rows as f64 + >= self.probe_rate_threshold; + self.is_locked = true; + } + } + + fn should_skip(&self) -> bool { + self.should_skip + } + + fn forbid_skipping(&mut self) { + self.should_skip = false; + self.is_locked = true; + } +} + /// HashTable based Grouping Aggregator /// /// # Design Goals @@ -275,6 +322,8 @@ pub(crate) struct GroupedHashAggregateStream { /// the `GroupedHashAggregateStream` operation immediately switches to /// output mode and emits all groups. group_values_soft_limit: Option, + + skip_aggregation_probe: Option, } impl GroupedHashAggregateStream { @@ -365,6 +414,29 @@ impl GroupedHashAggregateStream { merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), }; + let skip_aggregation_probe = if agg.mode == AggregateMode::Partial + && matches!(group_ordering, GroupOrdering::None) + && accumulators + .iter() + .all(|acc| acc.convert_to_state_supported()) + && agg_group_by.is_single() + { + Some(SkipAggregationProbe::new( + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_rows_threshold, + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_rate_threshold, + )) + } else { + None + }; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -384,6 +456,7 @@ impl GroupedHashAggregateStream { runtime: context.runtime_env(), spill_state, group_values_soft_limit: agg.limit, + skip_aggregation_probe, }) } } @@ -434,12 +507,16 @@ impl Stream for GroupedHashAggregateStream { // new batch to aggregate Some(Ok(batch)) => { let timer = elapsed_compute.timer(); + let input_rows = batch.num_rows(); + // Make sure we have enough capacity for `batch`, otherwise spill extract_ok!(self.spill_previous_if_necessary(&batch)); // Do the grouping extract_ok!(self.group_aggregate_batch(batch)); + self.update_skip_aggregation_probe(input_rows); + // If we can begin emitting rows, do so, // otherwise keep consuming input assert!(!self.input_done); @@ -463,6 +540,8 @@ impl Stream for GroupedHashAggregateStream { extract_ok!(self.emit_early_if_necessary()); + extract_ok!(self.switch_to_skip_aggregation()); + timer.done(); } Some(Err(e)) => { @@ -476,6 +555,26 @@ impl Stream for GroupedHashAggregateStream { } } + ExecutionState::SkippingAggregation => { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let _timer = elapsed_compute.timer(); + let states = self.transform_to_states(batch)?; + return Poll::Ready(Some(Ok( + states.record_output(&self.baseline_metrics) + ))); + } + Some(Err(e)) => { + // inner had error, return to caller + return Poll::Ready(Some(Err(e))); + } + None => { + // inner is done, switching to `Done` state + self.exec_state = ExecutionState::Done; + } + } + } + ExecutionState::ProducingOutput(batch) => { // slice off a part of the batch, if needed let output_batch; @@ -484,6 +583,12 @@ impl Stream for GroupedHashAggregateStream { ( if self.input_done { ExecutionState::Done + } else if self + .skip_aggregation_probe + .as_ref() + .is_some_and(|probe| probe.should_skip()) + { + ExecutionState::SkippingAggregation } else { ExecutionState::ReadingInput }, @@ -797,4 +902,50 @@ impl GroupedHashAggregateStream { timer.done(); Ok(()) } + + fn update_skip_aggregation_probe(&mut self, input_rows: usize) { + self.skip_aggregation_probe.as_mut().map(|probe| { + if !self.spill_state.spills.is_empty() { + probe.forbid_skipping(); + } else { + probe.update_state(input_rows, self.group_values.len()); + } + }); + } + + fn switch_to_skip_aggregation(&mut self) -> Result<()> { + if self + .skip_aggregation_probe + .as_ref() + .is_some_and(|probe| probe.should_skip()) + { + let batch = self.emit(EmitTo::All, false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); + } + + Ok(()) + } + + fn transform_to_states(&self, batch: RecordBatch) -> Result { + let group_values = evaluate_group_by(&self.group_by, &batch)?; + let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; + let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + + let mut output = group_values.get(0).unwrap().clone(); + + let iter = self + .accumulators + .iter() + .zip(input_values.iter()) + .zip(filter_values.iter()); + + for ((acc, values), opt_filter) in iter { + let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); + output.extend(acc.convert_to_state(values, opt_filter)?); + } + + let states_batch = RecordBatch::try_new(self.schema(), output)?; + + Ok(states_batch) + } } diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt new file mode 100644 index 0000000000000..a9d25d086e4b6 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The main goal of these tests is to verify correctness of tranfsorming +# input values to state by accululators, supporting `convert_to_state`. + + +# Setup test data table +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +# Create table for nullable aggregations test +statement ok +CREATE TABLE aggregate_test_100_null ( + c2 TINYINT NOT NULL, + c3 SMALLINT, + c11 FLOAT +); + +# Prepare settings to always skip aggregation after couple of batches +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rate_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4; + +# Inserting into nullable table with batch_size specified above +# to prevent creation on single in-memory batch +statement ok +INSERT INTO aggregate_test_100_null +SELECT + c2, + CASE WHEN c1 = 'e' THEN NULL ELSE c3 END as c3, + CASE WHEN c1 = 'a' THEN NULL ELSE c11 END as c11 +FROM aggregate_test_100; + +# Test count varchar / int / float +query IIII +SELECT c2, count(c1), count(c5), count(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 22 22 22 +2 22 22 22 +3 19 19 19 +4 23 23 23 +5 14 14 14 + +# Test min / max for int / float +query IIIRR +SELECT c2, min(c5), max(c5), min(c11), max(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -1991133944 2143473091 0.064453244 0.89651865 +2 -2138770630 2053379412 0.055064857 0.8315913 +3 -2141999138 2030965207 0.034291923 0.9488028 +4 -1885422396 2064155045 0.028003037 0.7459874 +5 -2117946883 2025611582 0.12559289 0.87989986 + +# Test sum for int / float +query IIR +SELECT c2, sum(c5), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -438598674 12.153253793716 +2 -8259865364 9.577824473381 +3 1956035476 9.590891361237 +4 16155718643 9.531112968922 +5 6449337880 7.074412226677 + +# Enabling PG dialect for filtered aggregates tests +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# Test count with filter +query III +SELECT + c2, + count(c3) FILTER (WHERE c3 > 0), + count(c3) FILTER (WHERE c11 > 10) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 13 0 +2 13 0 +3 13 0 +4 13 0 +5 5 0 + +# Test min / max with filter +query III +SELECT + c2, + min(c3) FILTER (WHERE c3 > 0), + max(c3) FILTER (WHERE c3 < 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 12 -5 +2 1 -29 +3 13 -2 +4 3 -38 +5 36 -5 + +# Test sum with filter +query II +SELECT + c2, + sum(c3) FILTER (WHERE c1 != 'e' AND c3 > 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 612 +2 565 +3 466 +4 417 +5 284 + +# Test count with nullable fields +query III +SELECT c2, count(c3), count(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 19 17 +2 17 19 +3 15 13 +4 16 19 +5 12 11 + +# Test min / max with nullable fields +query IIIRR +SELECT c2, min(c3), max(c3), min(c11), max(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 -99 125 0.064453244 0.89651865 +2 -117 122 0.09683716 0.8315913 +3 -101 123 0.034291923 0.94669616 +4 -117 123 0.028003037 0.7085086 +5 -101 118 0.12559289 0.87989986 + +# Test sum with nullable fields +query IIR +SELECT c2, sum(c3), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 367 12.153253793716 +2 184 9.577824473381 +3 395 9.590891361237 +4 29 9.531112968922 +5 -194 7.074412226677