Skip to content

Commit

Permalink
[BugFix] Fix sequence of selection and input_chunk for streaming_agg_…
Browse files Browse the repository at this point in the history
…with_selection (#54595)

Signed-off-by: zihe.liu <[email protected]>
(cherry picked from commit efe1dde)

# Conflicts:
#	be/src/exec/aggregator.h
  • Loading branch information
ZiheLiu authored and mergify[bot] committed Jan 2, 2025
1 parent 8b431fe commit 0e097be
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 19 deletions.
49 changes: 30 additions & 19 deletions be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,14 +853,23 @@ Status Aggregator::evaluate_groupby_exprs(Chunk* chunk) {
}

Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk) {
return output_chunk_by_streaming(input_chunk, chunk, input_chunk->num_rows(), false);
}

Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows,
bool use_selection) {
// The input chunk is already intermediate-typed, so there is no need to convert it again.
// Only when the input chunk is input-typed, we should convert it into intermediate-typed chunk.
// is_passthrough is on indicate that the chunk is input-typed.
auto use_intermediate_as_input = _use_intermediate_as_input();
const auto& slots = _intermediate_tuple_desc->slots();

DCHECK(!use_selection || !_group_by_columns.empty());
// If using selection, then `_group_by_columns` has been filtered by `_streaming_selection`, and input_chunk has
// not been filtered yet. `input_chunk` is filtered by `_streaming_selection` after `evaluate_agg_fn_exprs`.
const size_t num_rows = use_selection ? _group_by_columns[0]->size() : num_input_rows;

// build group by columns
size_t num_rows = input_chunk->num_rows();
ChunkPtr result_chunk = std::make_shared<Chunk>();
for (size_t i = 0; i < _group_by_columns.size(); i++) {
DCHECK_EQ(num_rows, _group_by_columns[i]->size());
Expand All @@ -879,7 +888,23 @@ Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk
DCHECK(!_group_by_columns.empty());

RETURN_IF_ERROR(evaluate_agg_fn_exprs(input_chunk));
const auto num_rows = _group_by_columns[0]->size();
if (use_selection) {
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
for (auto& agg_input_column : _agg_input_columns[i]) {
// AggColumn and GroupColumn may be the same SharedPtr,
// If ColumnSize and ChunkSize are not equal,
// indicating that the Filter has been executed in GroupByColumn
// e.g.: select c1, count(distinct c1) from t1 group by c1;

// At present, the type of problem cannot be completely solved,
// and a new solution needs to be designed to solve it completely
if (agg_input_column != nullptr && agg_input_column->size() == num_input_rows) {
agg_input_column->filter(_streaming_selection);
}
}
}
}

Columns agg_result_column = _create_agg_result_columns(num_rows, true);
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
size_t id = _group_by_columns.size() + i;
Expand Down Expand Up @@ -953,7 +978,7 @@ Status Aggregator::convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk)

Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk, ChunkPtr* chunk) {
// Streaming aggregate at least has one group by column
size_t chunk_size = _group_by_columns[0]->size();
const size_t num_input_rows = _group_by_columns[0]->size();
for (auto& _group_by_column : _group_by_columns) {
// Multi GroupColumn may be have the same SharedPtr
// If ColumnSize and ChunkSize are not equal,
Expand All @@ -962,26 +987,12 @@ Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk,

// At present, the type of problem cannot be completely solved,
// and a new solution needs to be designed to solve it completely
if (_group_by_column->size() == chunk_size) {
if (_group_by_column->size() == num_input_rows) {
_group_by_column->filter(_streaming_selection);
}
}
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
for (auto& agg_input_column : _agg_input_columns[i]) {
// AggColumn and GroupColumn may be the same SharedPtr,
// If ColumnSize and ChunkSize are not equal,
// indicating that the Filter has been executed in GroupByColumn
// e.g.: select c1, count(distinct c1) from t1 group by c1;

// At present, the type of problem cannot be completely solved,
// and a new solution needs to be designed to solve it completely
if (agg_input_column != nullptr && agg_input_column->size() == chunk_size) {
agg_input_column->filter(_streaming_selection);
}
}
}

RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk));
RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk, num_input_rows, true));
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,12 @@ class Aggregator : public pipeline::ContextWithDependency {
Status evaluate_agg_fn_exprs(Chunk* chunk, bool use_intermediate);
Status evaluate_agg_input_column(Chunk* chunk, std::vector<ExprContext*>& agg_expr_ctxs, int i);

<<<<<<< HEAD
[[nodiscard]] Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk);
=======
Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk);
Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows, bool use_selection);
>>>>>>> efe1ddecb4 ([BugFix] Fix sequence of selection and input_chunk for streaming_agg_with_selection (#54595))

// convert input chunk to spill format
[[nodiscard]] Status convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk);
Expand Down

0 comments on commit 0e097be

Please sign in to comment.