diff --git a/be/src/exec/connector_scan_node.cpp b/be/src/exec/connector_scan_node.cpp index 9b8720aca3841..82b6b0a914e80 100644 --- a/be/src/exec/connector_scan_node.cpp +++ b/be/src/exec/connector_scan_node.cpp @@ -74,6 +74,17 @@ Status ConnectorScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } _estimate_scan_row_bytes(); + if (tnode.__isset.lake_scan_node) { + if (tnode.lake_scan_node.__isset.enable_topn_filter_back_pressure && + tnode.lake_scan_node.enable_topn_filter_back_pressure) { + _enable_topn_filter_back_pressure = true; + _back_pressure_max_rounds = tnode.lake_scan_node.back_pressure_max_rounds; + _back_pressure_num_rows = tnode.lake_scan_node.back_pressure_num_rows; + _back_pressure_throttle_time = tnode.lake_scan_node.back_pressure_throttle_time; + _back_pressure_throttle_time_upper_bound = tnode.lake_scan_node.back_pressure_throttle_time_upper_bound; + } + } + return Status::OK(); } diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 75e4ed115c274..8c5747e8a1175 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -109,6 +109,15 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } } + if (tnode.olap_scan_node.__isset.enable_topn_filter_back_pressure && + tnode.olap_scan_node.enable_topn_filter_back_pressure) { + _enable_topn_filter_back_pressure = true; + _back_pressure_max_rounds = tnode.olap_scan_node.back_pressure_max_rounds; + _back_pressure_num_rows = tnode.olap_scan_node.back_pressure_num_rows; + _back_pressure_throttle_time = tnode.olap_scan_node.back_pressure_throttle_time; + _back_pressure_throttle_time_upper_bound = tnode.olap_scan_node.back_pressure_throttle_time_upper_bound; + } + _estimate_scan_and_output_row_bytes(); return Status::OK(); diff --git a/be/src/exec/pipeline/operator.h b/be/src/exec/pipeline/operator.h index c2e6cea2769f0..54176a2c362ba 100644 --- a/be/src/exec/pipeline/operator.h +++ b/be/src/exec/pipeline/operator.h @@ -182,7 +182,7 @@ class Operator { Status eval_conjuncts(const std::vector& conjuncts, Chunk* chunk, FilterPtr* filter = nullptr); // equal to ExecNode::eval_join_runtime_filters, is used to apply bloom-filters to Operators. - void eval_runtime_bloom_filters(Chunk* chunk); + virtual void eval_runtime_bloom_filters(Chunk* chunk); // Pseudo plan_node_id for final sink, such as result_sink, table_sink static const int32_t s_pseudo_plan_node_id_for_final_sink; @@ -276,6 +276,8 @@ class Operator { void set_observer(PipelineObserver* observer) { _observer = observer; } PipelineObserver* observer() const { return _observer; } + void _init_rf_counters(bool init_bloom); + protected: OperatorFactory* _factory; const int32_t _id; @@ -334,7 +336,6 @@ class Operator { PipelineObserver* _observer = nullptr; private: - void _init_rf_counters(bool init_bloom); void _init_conjuct_counters(); std::shared_ptr _mem_tracker; diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index 5c62844853296..109292a10a93e 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -90,6 +90,19 @@ Status ScanOperator::prepare(RuntimeState* state) { _prepare_chunk_source_timer = ADD_TIMER(_unique_metrics, "PrepareChunkSourceTime"); _submit_io_task_timer = ADD_TIMER(_unique_metrics, "SubmitTaskTime"); + if (_scan_node->is_enable_topn_filter_back_pressure()) { + if (auto* runtime_filters = runtime_bloom_filters(); runtime_filters != nullptr) { + auto has_topn_filters = + std::any_of(runtime_filters->descriptors().begin(), runtime_filters->descriptors().end(), + [](const auto& e) { return e.second->is_topn_filter(); }); + if (has_topn_filters) { + _topn_filter_back_pressure = std::make_unique( + 0.1, _scan_node->get_back_pressure_throttle_time_upper_bound(), + _scan_node->get_back_pressure_max_rounds(), _scan_node->get_back_pressure_throttle_time(), + _scan_node->get_back_pressure_num_rows()); + } + } + } RETURN_IF_ERROR(do_prepare(state)); return Status::OK(); } @@ -139,6 +152,10 @@ bool ScanOperator::has_output() const { return true; } + if (!_morsel_queue->empty() && _topn_filter_back_pressure && _topn_filter_back_pressure->should_throttle()) { + return false; + } + // Try to buffer enough chunks for exec thread, to reduce scheduling overhead. // It's like the Linux Block-Scheduler's Unplug algorithm, so we just name it unplug. // The default threshould of unpluging is BufferCapacity/DOP/4, and its range is [1, 16] @@ -275,6 +292,7 @@ StatusOr ScanOperator::pull_chunk(RuntimeState* state) { begin_pull_chunk(res); // for query cache mechanism, we should emit EOS chunk when we receive the last chunk. auto [owner_id, is_eos] = _should_emit_eos(res); + evaluate_topn_runtime_filters(res.get()); eval_runtime_bloom_filters(res.get()); res->owner_info().set_owner_id(owner_id, is_eos); } diff --git a/be/src/exec/pipeline/scan/scan_operator.h b/be/src/exec/pipeline/scan/scan_operator.h index bc2e8907110ae..515352e8fac52 100644 --- a/be/src/exec/pipeline/scan/scan_operator.h +++ b/be/src/exec/pipeline/scan/scan_operator.h @@ -14,9 +14,11 @@ #pragma once +#include "exec/exec_node.h" #include "exec/pipeline/pipeline_fwd.h" #include "exec/pipeline/scan/balanced_chunk_buffer.h" #include "exec/pipeline/source_operator.h" +#include "exec/pipeline/topn_runtime_filter_back_pressure.h" #include "exec/query_cache/cache_operator.h" #include "exec/query_cache/lane_arbiter.h" #include "exec/workgroup/work_group_fwd.h" @@ -171,6 +173,57 @@ class ScanOperator : public SourceOperator { return _scan_status; } + void evaluate_topn_runtime_filters(Chunk* chunk) { + if (chunk == nullptr || chunk->is_empty() || !_topn_filter_back_pressure) { + return; + } + if (auto* topn_runtime_filters = runtime_bloom_filters()) { + auto input_num_rows = chunk->num_rows(); + _init_topn_runtime_filter_counters(); + topn_runtime_filters->evaluate(chunk, _topn_filter_eval_context); + _topn_filter_back_pressure->inc_num_rows(chunk->num_rows()); + if (_topn_filter_eval_context.selectivity.empty()) { + _topn_filter_back_pressure->update_selectivity(1.0); + } else { + double selectivity = _topn_filter_eval_context.selectivity.begin()->first; + if (input_num_rows > 1024) { + _topn_filter_back_pressure->update_selectivity(selectivity); + } + } + } + } + + void _init_topn_runtime_filter_counters() { + if (_topn_filter_eval_context.join_runtime_filter_timer == nullptr) { + _topn_filter_eval_context.mode = RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN; + _topn_filter_eval_context.join_runtime_filter_timer = ADD_TIMER(_common_metrics, "TopnRuntimeFilterTime"); + _topn_filter_eval_context.join_runtime_filter_hash_timer = + ADD_TIMER(_common_metrics, "TopnRuntimeFilterHashTime"); + _topn_filter_eval_context.join_runtime_filter_input_counter = + ADD_COUNTER(_common_metrics, "TopnRuntimeFilterInputRows", TUnit::UNIT); + _topn_filter_eval_context.join_runtime_filter_output_counter = + ADD_COUNTER(_common_metrics, "TopnRuntimeFilterOutputRows", TUnit::UNIT); + _topn_filter_eval_context.join_runtime_filter_eval_counter = + ADD_COUNTER(_common_metrics, "TopnRuntimeFilterEvaluate", TUnit::UNIT); + _topn_filter_eval_context.driver_sequence = _runtime_filter_probe_sequence; + } + } + + void eval_runtime_bloom_filters(Chunk* chunk) override { + if (chunk == nullptr || chunk->is_empty()) { + return; + } + + if (auto* bloom_filters = runtime_bloom_filters()) { + _init_rf_counters(true); + if (_topn_filter_back_pressure) { + _bloom_filter_eval_context.mode = RuntimeBloomFilterEvalContext::Mode::M_WITHOUT_TOPN; + } + bloom_filters->evaluate(chunk, _bloom_filter_eval_context); + } + ExecNode::eval_filter_null_values(chunk, filter_null_value_columns()); + } + protected: ScanNode* _scan_node = nullptr; const int32_t _dop; @@ -234,6 +287,9 @@ class ScanOperator : public SourceOperator { RuntimeProfile::Counter* _prepare_chunk_source_timer = nullptr; RuntimeProfile::Counter* _submit_io_task_timer = nullptr; + RuntimeBloomFilterEvalContext _topn_filter_eval_context; + std::unique_ptr _topn_filter_back_pressure = nullptr; + DECLARE_RACE_DETECTOR(race_pull_chunk) }; @@ -261,7 +317,6 @@ class ScanOperatorFactory : public SourceOperatorFactory { protected: ScanNode* const _scan_node; - std::shared_ptr _scan_task_group; }; diff --git a/be/src/exec/pipeline/topn_runtime_filter_back_pressure.h b/be/src/exec/pipeline/topn_runtime_filter_back_pressure.h new file mode 100644 index 0000000000000..b51391865f66f --- /dev/null +++ b/be/src/exec/pipeline/topn_runtime_filter_back_pressure.h @@ -0,0 +1,108 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include + +namespace starrocks::pipeline { +using std::chrono::milliseconds; +using std::chrono::steady_clock; +class TopnRfBackPressure { + enum Phase { PH_UNTHROTTLE, PH_THROTTLE, PH_PASS_THROUGH }; + + template + class ScaleGenerator { + public: + ScaleGenerator(T initial_value, T delta, double factor, std::function next_cb) + : initial_value(initial_value), delta(delta), factor(factor), next_cb(next_cb), value(initial_value) {} + + T limit() { return value; } + void next() { + value += delta; + value *= factor; + } + bool has_next() { return next_cb(value); } + + private: + const T initial_value; + const T delta; + const double factor; + const std::function next_cb; + T value; + }; + +public: + void update_selectivity(double selectivity) { _current_selectivity = selectivity; } + void inc_num_rows(size_t num_rows) { _current_num_rows += num_rows; } + + bool should_throttle() { + if (_phase == PH_PASS_THROUGH) { + return false; + } else if (!_round_limiter.has_next() || !_throttle_time_limiter.has_next() || !_num_rows_limiter.has_next() || + _current_selectivity <= _selectivity_lower_bound || + _current_total_throttle_time >= _throttle_time_upper_bound) { + _phase = PH_PASS_THROUGH; + return false; + } + + if (_phase == PH_UNTHROTTLE) { + if (_current_num_rows <= _num_rows_limiter.limit()) { + return false; + } + _phase = PH_THROTTLE; + _current_throttle_deadline = duration_cast(steady_clock::now().time_since_epoch()).count() + + _throttle_time_limiter.limit(); + return true; + } else { + auto now = duration_cast(steady_clock::now().time_since_epoch()).count(); + if (now < _current_throttle_deadline) { + return true; + } + _phase = PH_UNTHROTTLE; + _current_num_rows = 0; + _current_total_throttle_time += _throttle_time_limiter.limit(); + _round_limiter.next(); + _throttle_time_limiter.next(); + _num_rows_limiter.next(); + return false; + } + } + + TopnRfBackPressure(double selectivity_lower_bound, int64_t throttle_time_upper_bound, int max_rounds, + int64_t throttle_time, size_t num_rows) + : _selectivity_lower_bound(selectivity_lower_bound), + _throttle_time_upper_bound(throttle_time_upper_bound), + _round_limiter(0, 1, 1.0, [max_rounds](int r) { return r < max_rounds; }), + _throttle_time_limiter(throttle_time, 0, 1.0, [](int64_t) { return true; }), + _num_rows_limiter(num_rows, 0, 2.0, [](size_t n) { return n < std::numeric_limits::max() / 2; }) { + } + +private: + const double _selectivity_lower_bound; + const int64_t _throttle_time_upper_bound; + Phase _phase{PH_UNTHROTTLE}; + ScaleGenerator _round_limiter; + ScaleGenerator _throttle_time_limiter; + ScaleGenerator _num_rows_limiter; + int64_t _current_throttle_deadline{-1}; + int64_t _current_total_throttle_time{0}; + size_t _current_num_rows{0}; + double _current_selectivity{1.0}; +}; + +} // namespace starrocks::pipeline diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h index a3853449908f6..29b4ea2ec4b7e 100644 --- a/be/src/exec/scan_node.h +++ b/be/src/exec/scan_node.h @@ -131,6 +131,21 @@ class ScanNode : public ExecNode { const std::vector& column_access_paths() const { return _column_access_paths; } + bool is_enable_topn_filter_back_pressure() const { return this->_enable_topn_filter_back_pressure; } + void set_enable_topn_filter_back_pressure(bool value) { this->_enable_topn_filter_back_pressure = value; } + int get_back_pressure_max_rounds() const { return this->_back_pressure_max_rounds; } + void set_back_pressure_max_rounds(int value) { this->_back_pressure_max_rounds = value; } + size_t get_back_pressure_num_rows() const { return this->_back_pressure_num_rows; } + void set_back_pressure_num_rows(size_t value) { this->_back_pressure_num_rows = value; } + int64_t get_back_pressure_throttle_time() const { return this->_back_pressure_throttle_time; } + void set_back_pressure_throttle_time(int64_t value) { this->_back_pressure_throttle_time = value; } + int64_t get_back_pressure_throttle_time_upper_bound() const { + return this->_back_pressure_throttle_time_upper_bound; + } + void set_back_pressure_throttle_time_upper_bound(int64_t value) { + this->_back_pressure_throttle_time_upper_bound = value; + } + protected: RuntimeProfile::Counter* _bytes_read_counter = nullptr; // # bytes read from the scanner // # rows/tuples read from the scanner (including those discarded by eval_conjucts()) @@ -150,6 +165,12 @@ class ScanNode : public ExecNode { int32_t _io_tasks_per_scan_operator = config::io_tasks_per_scan_operator; std::vector _column_access_paths; + + bool _enable_topn_filter_back_pressure = false; + int _back_pressure_max_rounds = 5; + size_t _back_pressure_num_rows = 10240; + int64_t _back_pressure_throttle_time = 500; + int64_t _back_pressure_throttle_time_upper_bound = 5000; }; } // namespace starrocks diff --git a/be/src/exprs/runtime_filter_bank.cpp b/be/src/exprs/runtime_filter_bank.cpp index 66cb8149b15c2..a804e949a61a9 100644 --- a/be/src/exprs/runtime_filter_bank.cpp +++ b/be/src/exprs/runtime_filter_bank.cpp @@ -391,9 +391,14 @@ void RuntimeFilterProbeCollector::close(RuntimeState* state) { // do_evaluate is reentrant, can be called concurrently by multiple operators that shared the same // RuntimeFilterProbeCollector. void RuntimeFilterProbeCollector::do_evaluate(Chunk* chunk, RuntimeBloomFilterEvalContext& eval_context) { - if ((eval_context.input_chunk_nums++ & 31) == 0) { + if (eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN) { update_selectivity(chunk, eval_context); return; + } else { + if ((eval_context.input_chunk_nums++ & 31) == 0) { + update_selectivity(chunk, eval_context); + return; + } } auto& seletivity_map = eval_context.selectivity; @@ -409,7 +414,8 @@ void RuntimeFilterProbeCollector::do_evaluate(Chunk* chunk, RuntimeBloomFilterEv for (auto& kv : seletivity_map) { RuntimeFilterProbeDescriptor* rf_desc = kv.second; const RuntimeFilter* filter = rf_desc->runtime_filter(eval_context.driver_sequence); - if (filter == nullptr || filter->always_true()) { + bool skip_topn = eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_WITHOUT_TOPN; + if ((skip_topn && rf_desc->is_topn_filter()) || filter == nullptr || filter->always_true()) { continue; } auto* ctx = rf_desc->probe_expr_ctx(); @@ -586,9 +592,18 @@ void RuntimeFilterProbeCollector::update_selectivity(Chunk* chunk, RuntimeBloomF for (auto& kv : _descriptors) { RuntimeFilterProbeDescriptor* rf_desc = kv.second; const RuntimeFilter* filter = rf_desc->runtime_filter(eval_context.driver_sequence); - if (filter == nullptr || filter->always_true()) { + bool should_use = + eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN && rf_desc->is_topn_filter(); + if (filter == nullptr || (!should_use && filter->always_true())) { continue; } + if (eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_WITHOUT_TOPN && rf_desc->is_topn_filter()) { + continue; + } else if (eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN && + !rf_desc->is_topn_filter()) { + continue; + } + auto& selection = eval_context.running_context.use_merged_selection ? eval_context.running_context.merged_selection : eval_context.running_context.selection; @@ -630,6 +645,8 @@ void RuntimeFilterProbeCollector::update_selectivity(Chunk* chunk, RuntimeBloomF dest[j] = src[j] & dest[j]; } } + } else if (rf_desc->is_topn_filter() && eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN) { + seletivity_map.emplace(selectivity, rf_desc); } } if (!seletivity_map.empty()) { diff --git a/be/src/exprs/runtime_filter_bank.h b/be/src/exprs/runtime_filter_bank.h index 84ce68e8b748d..f36257190610a 100644 --- a/be/src/exprs/runtime_filter_bank.h +++ b/be/src/exprs/runtime_filter_bank.h @@ -216,6 +216,7 @@ class RuntimeFilterProbeDescriptor : public WithLayoutMixin { int64_t _ready_timestamp = 0; int8_t _join_mode; bool _is_topn_filter = false; + bool _skip_wait = false; // Indicates that the runtime filter was built from the colocate group execution build side. bool _is_group_colocate_rf = false; @@ -231,7 +232,12 @@ class RuntimeFilterProbeDescriptor : public WithLayoutMixin { // into RuntimeBloomFilterEvalContext and make do_evaluate function can be called concurrently. struct RuntimeBloomFilterEvalContext { RuntimeBloomFilterEvalContext() = default; - + enum Mode { + M_ALL, + M_WITHOUT_TOPN, + M_ONLY_TOPN, + }; + Mode mode = Mode::M_ALL; std::map selectivity; size_t input_chunk_nums = 0; int run_filter_nums = 0; diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java index 455f266ac645d..fb2636b1ca1dd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java @@ -196,6 +196,12 @@ public class OlapScanNode extends ScanNode { // Set just once per query. private boolean alreadyFoundSomeLivingCn = false; + boolean enableTopnFilterBackPressure = false; + long backPressureThrottleTimeUpperBound = -1; + int backPressureMaxRounds = -1; + long backPressureThrottleTime = -1; + long backPressureNumRows = -1; + // Constructs node to scan given data files of table 'tbl'. // Constructs node to scan given data files of table 'tbl'. public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { @@ -1000,13 +1006,19 @@ protected void toThrift(TPlanNode msg) { } assignOrderByHints(keyColumnNames); - if (olapTable.isCloudNativeTableOrMaterializedView()) { msg.node_type = TPlanNodeType.LAKE_SCAN_NODE; msg.lake_scan_node = new TLakeScanNode(desc.getId().asInt(), keyColumnNames, keyColumnTypes, isPreAggregation); msg.lake_scan_node.setSort_key_column_names(keyColumnNames); msg.lake_scan_node.setRollup_name(olapTable.getIndexNameById(selectedIndexId)); + if (enableTopnFilterBackPressure) { + msg.lake_scan_node.setEnable_topn_filter_back_pressure(true); + msg.lake_scan_node.setBack_pressure_max_rounds(backPressureMaxRounds); + msg.lake_scan_node.setBack_pressure_num_rows(backPressureNumRows); + msg.lake_scan_node.setBack_pressure_throttle_time(backPressureThrottleTime); + msg.lake_scan_node.setBack_pressure_throttle_time_upper_bound(backPressureThrottleTimeUpperBound); + } if (!conjuncts.isEmpty()) { msg.lake_scan_node.setSql_predicates(getExplainString(conjuncts)); } @@ -1045,6 +1057,13 @@ protected void toThrift(TPlanNode msg) { msg.olap_scan_node.setSchema_id(schemaId); msg.olap_scan_node.setSort_key_column_names(keyColumnNames); msg.olap_scan_node.setRollup_name(olapTable.getIndexNameById(selectedIndexId)); + if (enableTopnFilterBackPressure) { + msg.olap_scan_node.setEnable_topn_filter_back_pressure(true); + msg.olap_scan_node.setBack_pressure_max_rounds(backPressureMaxRounds); + msg.olap_scan_node.setBack_pressure_num_rows(backPressureNumRows); + msg.olap_scan_node.setBack_pressure_throttle_time(backPressureThrottleTime); + msg.olap_scan_node.setBack_pressure_throttle_time_upper_bound(backPressureThrottleTimeUpperBound); + } if (!conjuncts.isEmpty()) { msg.olap_scan_node.setSql_predicates(getExplainString(conjuncts)); } @@ -1516,4 +1535,27 @@ public void clearScanNodeForThriftBuild() { public boolean isRunningAsConnectorOperator() { return false; } + + @Override + public boolean pushDownRuntimeFilters(RuntimeFilterPushDownContext context, Expr probeExpr, + List partitionByExprs) { + boolean accept = super.pushDownRuntimeFilters(context, probeExpr, partitionByExprs); + if (accept && context.getDescription().runtimeFilterType() + .equals(RuntimeFilterDescription.RuntimeFilterType.TOPN_FILTER)) { + boolean toManyData = this.getCardinality() != -1 && this.cardinality > 50000000; + int backPressureMode = Optional.ofNullable(ConnectContext.get()) + .map(ctx -> ctx.getSessionVariable().getTopnFilterBackPressureMode()) + .orElse(0); + if ((backPressureMode == 1 && toManyData) || backPressureMode == 2) { + this.enableTopnFilterBackPressure = true; + this.backPressureMaxRounds = ConnectContext.get().getSessionVariable().getBackPressureMaxRounds(); + this.backPressureThrottleTimeUpperBound = + ConnectContext.get().getSessionVariable().getBackPressureThrottleTimeUpperBound(); + this.backPressureNumRows = 10 * context.getDescription().getTopN(); + this.backPressureThrottleTime = this.backPressureThrottleTimeUpperBound / + Math.max(this.backPressureMaxRounds, 1); + } + } + return accept; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/RuntimeFilterDescription.java b/fe/fe-core/src/main/java/com/starrocks/planner/RuntimeFilterDescription.java index 282f355ad5de4..fac3ddb7c1316 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/RuntimeFilterDescription.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/RuntimeFilterDescription.java @@ -75,6 +75,8 @@ public enum RuntimeFilterType { private boolean onlyLocal; + private long topn; + // ExecGroupInfo. used for check build colocate runtime filter private boolean isBuildFromColocateGroup = false; private int execGroupId = -1; @@ -149,6 +151,14 @@ public void setSortInfo(SortInfo sortInfo) { this.sortInfo = sortInfo; } + public void setTopN(long value) { + this.topn = value; + } + + public long getTopN() { + return this.topn; + } + public boolean canProbeUse(PlanNode node, RuntimeFilterPushDownContext rfPushCtx) { if (!canAcceptFilter(node, rfPushCtx)) { return false; diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java index 721f419758076..ed553f45ded48 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java @@ -181,6 +181,7 @@ public void buildRuntimeFilters(IdGenerator generator, Descript rf.setSortInfo(getSortInfo()); rf.setBuildExpr(orderBy); rf.setRuntimeFilterType(RuntimeFilterDescription.RuntimeFilterType.TOPN_FILTER); + rf.setTopN(offset < 0 ? limit : offset + limit); RuntimeFilterPushDownContext rfPushDownCtx = new RuntimeFilterPushDownContext(rf, descTbl, execGroupSets); for (PlanNode child : children) { if (child.pushDownRuntimeFilters(rfPushDownCtx, orderBy, Lists.newArrayList())) { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 1dc09971c1123..9b7cbf664ed28 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -855,6 +855,11 @@ public static MaterializedViewRewriteMode parse(String str) { public static final String ENABLE_SCAN_PREDICATE_EXPR_REUSE = "enable_scan_predicate_expr_reuse"; + // 0 for disable, 1 for too many data; 2 for force + public static final String TOPN_FILTER_BACK_PRESSURE_MODE = "topn_filter_back_pressure_mode"; + public static final String BACK_PRESSURE_MAX_ROUNDS = "back_pressure_back_rounds"; + public static final String BACK_PRESSURE_THROTTLE_TIME_UPPER_BOUND = "back_pressure_throttle_time_upper_bound"; + public static final List DEPRECATED_VARIABLES = ImmutableList.builder() .add(CODEGEN_LEVEL) .add(MAX_EXECUTION_TIME) @@ -1679,6 +1684,13 @@ public static MaterializedViewRewriteMode parse(String str) { @VarAttr(name = ENABLE_SCAN_PREDICATE_EXPR_REUSE, flag = VariableMgr.INVISIBLE) private boolean enableScanPredicateExprReuse = true; + @VarAttr(name = TOPN_FILTER_BACK_PRESSURE_MODE) + private int topnFilterBackPressureMode = 0; + @VarAttr(name = BACK_PRESSURE_MAX_ROUNDS) + private int backPressureMaxRounds = 3; + @VarAttr(name = BACK_PRESSURE_THROTTLE_TIME_UPPER_BOUND) + private long backPressureThrottleTimeUpperBound = 300; + public int getCboPruneJsonSubfieldDepth() { return cboPruneJsonSubfieldDepth; } @@ -4575,6 +4587,30 @@ public boolean isEnableRewriteUnnestBitmapToArray() { return enableRewriteUnnestBitmapToArray; } + public int getTopnFilterBackPressureMode() { + return topnFilterBackPressureMode; + } + + public void setTopnFilterBackPressureMode(int value) { + this.topnFilterBackPressureMode = value; + } + + public int getBackPressureMaxRounds() { + return this.backPressureMaxRounds; + } + + public void setBackPressureMaxRounds(int value) { + this.backPressureMaxRounds = value; + } + + public long getBackPressureThrottleTimeUpperBound() { + return this.backPressureThrottleTimeUpperBound; + } + + public void setBackPressureThrottleTimeUpperBound(long value) { + this.backPressureThrottleTimeUpperBound = value; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index bf3a645f85b2d..06f76fa1eba00 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -619,6 +619,13 @@ struct TOlapScanNode { 40: optional TVectorSearchOptions vector_search_options 41: optional TTableSampleOptions sample_options; + + //back pressure + 50: optional bool enable_topn_filter_back_pressure + 51: optional i32 back_pressure_max_rounds + 52: optional i64 back_pressure_throttle_time + 53: optional i64 back_pressure_throttle_time_upper_bound + 54: optional i64 back_pressure_num_rows } struct TJDBCScanNode { @@ -652,6 +659,13 @@ struct TLakeScanNode { 32: optional bool output_chunk_by_bucket 33: optional bool output_asc_hint 34: optional bool partition_order_hint + + //back pressure + 38: optional bool enable_topn_filter_back_pressure + 39: optional i32 back_pressure_max_rounds + 40: optional i64 back_pressure_throttle_time + 41: optional i64 back_pressure_throttle_time_upper_bound + 42: optional i64 back_pressure_num_rows } struct TEqJoinCondition { diff --git a/test/sql/test_topn_filter_throttle_scan/R/test_topn_filter_throttle_scan b/test/sql/test_topn_filter_throttle_scan/R/test_topn_filter_throttle_scan new file mode 100644 index 0000000000000..7104955aedb64 --- /dev/null +++ b/test/sql/test_topn_filter_throttle_scan/R/test_topn_filter_throttle_scan @@ -0,0 +1,72 @@ +-- name: test_topn_filter_throttle_scan +DROP TABLE if exists t0; +-- result: +-- !result +CREATE TABLE if not exists t0 +( +c0 INT NOT NULL, +c1 INT NOT NULL +) ENGINE=OLAP +DUPLICATE KEY(`c0` ) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0` ) BUCKETS 1 +PROPERTIES( +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "default" +); +-- result: +-- !result +DROP TABLE if exists t1; +-- result: +-- !result +CREATE TABLE if not exists t1 +( +c0 INT NOT NULL, +c1 INT NOT NULL +) ENGINE=OLAP +DUPLICATE KEY(`c0` ) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0` ) BUCKETS 1 +PROPERTIES( +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "default" +); +-- result: +-- !result +insert into t0 select c0, 0 from Table(generate_series(1,10000)) ta(c0), Table(generate_series(1,1)); +-- result: +-- !result +insert into t0 select t0.* from t0, Table(generate_series(1,10)); +-- result: +-- !result +insert into t1 select c0, c1 from Table(generate_series(1,10000)) ta(c0), Table(generate_series(1,10)) tb(c1); +-- result: +-- !result +set topn_filter_back_pressure_mode='2'; +-- result: +-- !result +set enable_profile='true'; +-- result: +-- !result +[UC]select t0.c1, repeat('x', t0.c1+t1.c1) as a, repeat('y',t1.c0 + t1.c0) as b from t0 join[broadcast] t1 on t0.c0 = t1.c0 order by 1 limit 10; +-- result: +0 xxxxxxx yy +0 xxxxxxxxx yy +0 xxxxxx yy +0 xxx yy +0 xxxxx yy +0 xx yy +0 x yy +0 xxxx yy +0 xxxxxxxx yy +0 xxxxxxxxxx yy +-- !result +create table profile_table properties("replication_num"="1") as select line from table(unnest(split(get_query_profile(last_query_id()),"\n"))) t(line); +-- result: +-- !result +select assert_true(count(1)>0) from profile_table where line like '%TopnRuntimeFilter%'; +-- result: +1 +-- !result \ No newline at end of file diff --git a/test/sql/test_topn_filter_throttle_scan/T/test_topn_filter_throttle_scan b/test/sql/test_topn_filter_throttle_scan/T/test_topn_filter_throttle_scan new file mode 100644 index 0000000000000..7f2c53590910b --- /dev/null +++ b/test/sql/test_topn_filter_throttle_scan/T/test_topn_filter_throttle_scan @@ -0,0 +1,39 @@ +-- name: test_topn_filter_throttle_scan + DROP TABLE if exists t0; + + CREATE TABLE if not exists t0 + ( + c0 INT NOT NULL, + c1 INT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); + DROP TABLE if exists t1; + + CREATE TABLE if not exists t1 + ( + c0 INT NOT NULL, + c1 INT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); +insert into t0 select c0, 0 from Table(generate_series(1,10000)) ta(c0), Table(generate_series(1,1)); +insert into t0 select t0.* from t0, Table(generate_series(1,10)); +insert into t1 select c0, c1 from Table(generate_series(1,10000)) ta(c0), Table(generate_series(1,10)) tb(c1); +set topn_filter_back_pressure_mode='2'; +set enable_profile='true'; +[UC]select t0.c1, repeat('x', t0.c1+t1.c1) as a, repeat('y',t1.c0 + t1.c0) as b from t0 join[broadcast] t1 on t0.c0 = t1.c0 order by 1 limit 10; +create table profile_table properties("replication_num"="1") as select line from table(unnest(split(get_query_profile(last_query_id()),"\n"))) t(line); +select assert_true(count(1)>0) from profile_table where line like '%TopnRuntimeFilter%';