From ad2778eba82895abeac36c4a6fa138d32e6321e5 Mon Sep 17 00:00:00 2001 From: "zihe.liu" Date: Thu, 2 Jan 2025 22:21:47 +0800 Subject: [PATCH 1/2] [BugFix] Fix sequence of selection and input_chunk for streaming_agg_with_selection (#54595) Signed-off-by: zihe.liu (cherry picked from commit efe1ddecb46645d3e9b5932a08530d8257a5fc6f) # Conflicts: # be/src/exec/aggregator.h --- be/src/exec/aggregator.cpp | 49 +++++++++++++++++++++++--------------- be/src/exec/aggregator.h | 5 ++++ 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/be/src/exec/aggregator.cpp b/be/src/exec/aggregator.cpp index 3b71df448a188..fe7eeb830b28f 100644 --- a/be/src/exec/aggregator.cpp +++ b/be/src/exec/aggregator.cpp @@ -870,14 +870,23 @@ Status Aggregator::evaluate_groupby_exprs(Chunk* chunk) { } Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk) { + return output_chunk_by_streaming(input_chunk, chunk, input_chunk->num_rows(), false); +} + +Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows, + bool use_selection) { // The input chunk is already intermediate-typed, so there is no need to convert it again. // Only when the input chunk is input-typed, we should convert it into intermediate-typed chunk. // is_passthrough is on indicate that the chunk is input-typed. auto use_intermediate_as_input = _use_intermediate_as_input(); const auto& slots = _intermediate_tuple_desc->slots(); + DCHECK(!use_selection || !_group_by_columns.empty()); + // If using selection, then `_group_by_columns` has been filtered by `_streaming_selection`, and input_chunk has + // not been filtered yet. `input_chunk` is filtered by `_streaming_selection` after `evaluate_agg_fn_exprs`. + const size_t num_rows = use_selection ? _group_by_columns[0]->size() : num_input_rows; + // build group by columns - size_t num_rows = input_chunk->num_rows(); ChunkPtr result_chunk = std::make_shared(); for (size_t i = 0; i < _group_by_columns.size(); i++) { DCHECK_EQ(num_rows, _group_by_columns[i]->size()); @@ -896,7 +905,23 @@ Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk DCHECK(!_group_by_columns.empty()); RETURN_IF_ERROR(evaluate_agg_fn_exprs(input_chunk)); - const auto num_rows = _group_by_columns[0]->size(); + if (use_selection) { + for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) { + for (auto& agg_input_column : _agg_input_columns[i]) { + // AggColumn and GroupColumn may be the same SharedPtr, + // If ColumnSize and ChunkSize are not equal, + // indicating that the Filter has been executed in GroupByColumn + // e.g.: select c1, count(distinct c1) from t1 group by c1; + + // At present, the type of problem cannot be completely solved, + // and a new solution needs to be designed to solve it completely + if (agg_input_column != nullptr && agg_input_column->size() == num_input_rows) { + agg_input_column->filter(_streaming_selection); + } + } + } + } + Columns agg_result_column = _create_agg_result_columns(num_rows, true); for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) { size_t id = _group_by_columns.size() + i; @@ -970,7 +995,7 @@ Status Aggregator::convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk) Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk, ChunkPtr* chunk) { // Streaming aggregate at least has one group by column - size_t chunk_size = _group_by_columns[0]->size(); + const size_t num_input_rows = _group_by_columns[0]->size(); for (auto& _group_by_column : _group_by_columns) { // Multi GroupColumn may be have the same SharedPtr // If ColumnSize and ChunkSize are not equal, @@ -979,26 +1004,12 @@ Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk, // At present, the type of problem cannot be completely solved, // and a new solution needs to be designed to solve it completely - if (_group_by_column->size() == chunk_size) { + if (_group_by_column->size() == num_input_rows) { _group_by_column->filter(_streaming_selection); } } - for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) { - for (auto& agg_input_column : _agg_input_columns[i]) { - // AggColumn and GroupColumn may be the same SharedPtr, - // If ColumnSize and ChunkSize are not equal, - // indicating that the Filter has been executed in GroupByColumn - // e.g.: select c1, count(distinct c1) from t1 group by c1; - - // At present, the type of problem cannot be completely solved, - // and a new solution needs to be designed to solve it completely - if (agg_input_column != nullptr && agg_input_column->size() == chunk_size) { - agg_input_column->filter(_streaming_selection); - } - } - } - RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk)); + RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk, num_input_rows, true)); return Status::OK(); } diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h index 24157df7f8ca4..e36764eb355de 100644 --- a/be/src/exec/aggregator.h +++ b/be/src/exec/aggregator.h @@ -344,7 +344,12 @@ class Aggregator : public pipeline::ContextWithDependency { [[nodiscard]] Status evaluate_agg_fn_exprs(Chunk* chunk, bool use_intermediate); [[nodiscard]] Status evaluate_agg_input_column(Chunk* chunk, std::vector& agg_expr_ctxs, int i); +<<<<<<< HEAD [[nodiscard]] Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk); +======= + Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk); + Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows, bool use_selection); +>>>>>>> efe1ddecb4 ([BugFix] Fix sequence of selection and input_chunk for streaming_agg_with_selection (#54595)) // convert input chunk to spill format [[nodiscard]] Status convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk); From 174820300123cbd2c31b1f018543d9c8003ac17f Mon Sep 17 00:00:00 2001 From: "zihe.liu" Date: Tue, 7 Jan 2025 11:22:08 +0800 Subject: [PATCH 2/2] fix conflicts Signed-off-by: zihe.liu --- be/src/exec/aggregator.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h index e36764eb355de..5b28e3cf8d067 100644 --- a/be/src/exec/aggregator.h +++ b/be/src/exec/aggregator.h @@ -344,12 +344,9 @@ class Aggregator : public pipeline::ContextWithDependency { [[nodiscard]] Status evaluate_agg_fn_exprs(Chunk* chunk, bool use_intermediate); [[nodiscard]] Status evaluate_agg_input_column(Chunk* chunk, std::vector& agg_expr_ctxs, int i); -<<<<<<< HEAD [[nodiscard]] Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk); -======= - Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk); - Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows, bool use_selection); ->>>>>>> efe1ddecb4 ([BugFix] Fix sequence of selection and input_chunk for streaming_agg_with_selection (#54595)) + [[nodiscard]] Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows, + bool use_selection); // convert input chunk to spill format [[nodiscard]] Status convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk);