From 0157efbd6430958bbb6999b86fb7a83059da9efa Mon Sep 17 00:00:00 2001 From: stdpain Date: Thu, 6 Feb 2025 15:53:53 +0800 Subject: [PATCH] [BugFix] Only force preagg when streaming agg has limit Signed-off-by: stdpain --- .../aggregate_distinct_streaming_sink_operator.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp index 9483baf36107bd..211f894edcc2db 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp @@ -18,6 +18,7 @@ #include "runtime/current_thread.h" #include "simd/simd.h" + namespace starrocks::pipeline { Status AggregateDistinctStreamingSinkOperator::prepare(RuntimeState* state) { @@ -26,7 +27,11 @@ Status AggregateDistinctStreamingSinkOperator::prepare(RuntimeState* state) { if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::LIMITED_MEM) { _limited_mem_state.limited_memory_size = config::streaming_agg_limited_memory_size; } - _aggregator->streaming_preaggregation_mode() = TStreamingPreaggregationMode::FORCE_PREAGGREGATION; + // If limit is small, streaming distinct forces pre-aggregation. After the limit is reached the operator will quickly finish. + // The limit in streaming agg is controlled by session variable: cbo_push_down_distinct_limit + if (_aggregator->limit() != -1) { + _aggregator->streaming_preaggregation_mode() = TStreamingPreaggregationMode::FORCE_PREAGGREGATION; + } _aggregator->attach_sink_observer(state, this->_observer); return _aggregator->open(state); }