From 9f4bbab03565ca2a9933760973b362c63b81bffa Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:50:27 +0000 Subject: [PATCH] [BugFix] Fix sequence of selection and input_chunk for streaming_agg_with_selection (backport #54595) (#54619) Co-authored-by: zihe.liu --- be/src/exec/aggregator.cpp | 49 +++++++++++++++++++++++--------------- be/src/exec/aggregator.h | 1 + 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/be/src/exec/aggregator.cpp b/be/src/exec/aggregator.cpp index a14f33672f018..af69018bdef6c 100644 --- a/be/src/exec/aggregator.cpp +++ b/be/src/exec/aggregator.cpp @@ -938,14 +938,23 @@ Status Aggregator::evaluate_groupby_exprs(Chunk* chunk) { } Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk) { + return output_chunk_by_streaming(input_chunk, chunk, input_chunk->num_rows(), false); +} + +Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows, + bool use_selection) { // The input chunk is already intermediate-typed, so there is no need to convert it again. // Only when the input chunk is input-typed, we should convert it into intermediate-typed chunk. // is_passthrough is on indicate that the chunk is input-typed. auto use_intermediate_as_input = _use_intermediate_as_input(); const auto& slots = _intermediate_tuple_desc->slots(); + DCHECK(!use_selection || !_group_by_columns.empty()); + // If using selection, then `_group_by_columns` has been filtered by `_streaming_selection`, and input_chunk has + // not been filtered yet. `input_chunk` is filtered by `_streaming_selection` after `evaluate_agg_fn_exprs`. + const size_t num_rows = use_selection ? _group_by_columns[0]->size() : num_input_rows; + // build group by columns - size_t num_rows = input_chunk->num_rows(); ChunkPtr result_chunk = std::make_shared(); for (size_t i = 0; i < _group_by_columns.size(); i++) { DCHECK_EQ(num_rows, _group_by_columns[i]->size()); @@ -964,7 +973,23 @@ Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk DCHECK(!_group_by_columns.empty()); RETURN_IF_ERROR(evaluate_agg_fn_exprs(input_chunk)); - const auto num_rows = _group_by_columns[0]->size(); + if (use_selection) { + for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) { + for (auto& agg_input_column : _agg_input_columns[i]) { + // AggColumn and GroupColumn may be the same SharedPtr, + // If ColumnSize and ChunkSize are not equal, + // indicating that the Filter has been executed in GroupByColumn + // e.g.: select c1, count(distinct c1) from t1 group by c1; + + // At present, the type of problem cannot be completely solved, + // and a new solution needs to be designed to solve it completely + if (agg_input_column != nullptr && agg_input_column->size() == num_input_rows) { + agg_input_column->filter(_streaming_selection); + } + } + } + } + Columns agg_result_column = _create_agg_result_columns(num_rows, true); for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) { size_t id = _group_by_columns.size() + i; @@ -1041,7 +1066,7 @@ Status Aggregator::convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk) Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk, ChunkPtr* chunk) { // Streaming aggregate at least has one group by column - size_t chunk_size = _group_by_columns[0]->size(); + const size_t num_input_rows = _group_by_columns[0]->size(); for (auto& _group_by_column : _group_by_columns) { // Multi GroupColumn may be have the same SharedPtr // If ColumnSize and ChunkSize are not equal, @@ -1050,26 +1075,12 @@ Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk, // At present, the type of problem cannot be completely solved, // and a new solution needs to be designed to solve it completely - if (_group_by_column->size() == chunk_size) { + if (_group_by_column->size() == num_input_rows) { _group_by_column->filter(_streaming_selection); } } - for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) { - for (auto& agg_input_column : _agg_input_columns[i]) { - // AggColumn and GroupColumn may be the same SharedPtr, - // If ColumnSize and ChunkSize are not equal, - // indicating that the Filter has been executed in GroupByColumn - // e.g.: select c1, count(distinct c1) from t1 group by c1; - - // At present, the type of problem cannot be completely solved, - // and a new solution needs to be designed to solve it completely - if (agg_input_column != nullptr && agg_input_column->size() == chunk_size) { - agg_input_column->filter(_streaming_selection); - } - } - } - RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk)); + RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk, num_input_rows, true)); return Status::OK(); } diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h index 555bdf6e7663e..125a92b135ade 100644 --- a/be/src/exec/aggregator.h +++ b/be/src/exec/aggregator.h @@ -348,6 +348,7 @@ class Aggregator : public pipeline::ContextWithDependency { Status evaluate_agg_input_column(Chunk* chunk, std::vector& agg_expr_ctxs, int i); Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk); + Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows, bool use_selection); // convert input chunk to spill format Status convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk);