Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix sequence of selection and input_chunk for streaming_agg_with_selection (backport #54595) #54615

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,14 +870,23 @@ Status Aggregator::evaluate_groupby_exprs(Chunk* chunk) {
}

Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk) {
return output_chunk_by_streaming(input_chunk, chunk, input_chunk->num_rows(), false);
}

Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows,
bool use_selection) {
// The input chunk is already intermediate-typed, so there is no need to convert it again.
// Only when the input chunk is input-typed, we should convert it into intermediate-typed chunk.
// is_passthrough is on indicate that the chunk is input-typed.
auto use_intermediate_as_input = _use_intermediate_as_input();
const auto& slots = _intermediate_tuple_desc->slots();

DCHECK(!use_selection || !_group_by_columns.empty());
// If using selection, then `_group_by_columns` has been filtered by `_streaming_selection`, and input_chunk has
// not been filtered yet. `input_chunk` is filtered by `_streaming_selection` after `evaluate_agg_fn_exprs`.
const size_t num_rows = use_selection ? _group_by_columns[0]->size() : num_input_rows;

// build group by columns
size_t num_rows = input_chunk->num_rows();
ChunkPtr result_chunk = std::make_shared<Chunk>();
for (size_t i = 0; i < _group_by_columns.size(); i++) {
DCHECK_EQ(num_rows, _group_by_columns[i]->size());
Expand All @@ -896,7 +905,23 @@ Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk
DCHECK(!_group_by_columns.empty());

RETURN_IF_ERROR(evaluate_agg_fn_exprs(input_chunk));
const auto num_rows = _group_by_columns[0]->size();
if (use_selection) {
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
for (auto& agg_input_column : _agg_input_columns[i]) {
// AggColumn and GroupColumn may be the same SharedPtr,
// If ColumnSize and ChunkSize are not equal,
// indicating that the Filter has been executed in GroupByColumn
// e.g.: select c1, count(distinct c1) from t1 group by c1;

// At present, the type of problem cannot be completely solved,
// and a new solution needs to be designed to solve it completely
if (agg_input_column != nullptr && agg_input_column->size() == num_input_rows) {
agg_input_column->filter(_streaming_selection);
}
}
}
}

Columns agg_result_column = _create_agg_result_columns(num_rows, true);
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
size_t id = _group_by_columns.size() + i;
Expand Down Expand Up @@ -970,7 +995,7 @@ Status Aggregator::convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk)

Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk, ChunkPtr* chunk) {
// Streaming aggregate at least has one group by column
size_t chunk_size = _group_by_columns[0]->size();
const size_t num_input_rows = _group_by_columns[0]->size();
for (auto& _group_by_column : _group_by_columns) {
// Multi GroupColumn may be have the same SharedPtr
// If ColumnSize and ChunkSize are not equal,
Expand All @@ -979,26 +1004,12 @@ Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk,

// At present, the type of problem cannot be completely solved,
// and a new solution needs to be designed to solve it completely
if (_group_by_column->size() == chunk_size) {
if (_group_by_column->size() == num_input_rows) {
_group_by_column->filter(_streaming_selection);
}
}
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
for (auto& agg_input_column : _agg_input_columns[i]) {
// AggColumn and GroupColumn may be the same SharedPtr,
// If ColumnSize and ChunkSize are not equal,
// indicating that the Filter has been executed in GroupByColumn
// e.g.: select c1, count(distinct c1) from t1 group by c1;

// At present, the type of problem cannot be completely solved,
// and a new solution needs to be designed to solve it completely
if (agg_input_column != nullptr && agg_input_column->size() == chunk_size) {
agg_input_column->filter(_streaming_selection);
}
}
}

RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk));
RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk, num_input_rows, true));
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ class Aggregator : public pipeline::ContextWithDependency {
[[nodiscard]] Status evaluate_agg_input_column(Chunk* chunk, std::vector<ExprContext*>& agg_expr_ctxs, int i);

[[nodiscard]] Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk);
[[nodiscard]] Status output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk, size_t num_input_rows,
bool use_selection);

// convert input chunk to spill format
[[nodiscard]] Status convert_to_spill_format(Chunk* input_chunk, ChunkPtr* chunk);
Expand Down
Loading