Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Throttle scan to wait for topn filter #55660

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ Status ConnectorScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
_estimate_scan_row_bytes();

if (tnode.__isset.lake_scan_node) {
if (tnode.lake_scan_node.__isset.enable_topn_filter_back_pressure &&
tnode.lake_scan_node.enable_topn_filter_back_pressure) {
_enable_topn_filter_back_pressure = true;
_back_pressure_max_rounds = tnode.lake_scan_node.back_pressure_max_rounds;
_back_pressure_num_rows = tnode.lake_scan_node.back_pressure_num_rows;
_back_pressure_throttle_time = tnode.lake_scan_node.back_pressure_throttle_time;
_back_pressure_throttle_time_upper_bound = tnode.lake_scan_node.back_pressure_throttle_time_upper_bound;
}
}

return Status::OK();
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
}

if (tnode.olap_scan_node.__isset.enable_topn_filter_back_pressure &&
tnode.olap_scan_node.enable_topn_filter_back_pressure) {
_enable_topn_filter_back_pressure = true;
_back_pressure_max_rounds = tnode.olap_scan_node.back_pressure_max_rounds;
_back_pressure_num_rows = tnode.olap_scan_node.back_pressure_num_rows;
_back_pressure_throttle_time = tnode.olap_scan_node.back_pressure_throttle_time;
_back_pressure_throttle_time_upper_bound = tnode.olap_scan_node.back_pressure_throttle_time_upper_bound;
}

_estimate_scan_and_output_row_bytes();

return Status::OK();
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class Operator {
Status eval_conjuncts(const std::vector<ExprContext*>& conjuncts, Chunk* chunk, FilterPtr* filter = nullptr);

// equal to ExecNode::eval_join_runtime_filters, is used to apply bloom-filters to Operators.
void eval_runtime_bloom_filters(Chunk* chunk);
virtual void eval_runtime_bloom_filters(Chunk* chunk);

// Pseudo plan_node_id for final sink, such as result_sink, table_sink
static const int32_t s_pseudo_plan_node_id_for_final_sink;
Expand Down Expand Up @@ -276,6 +276,8 @@ class Operator {
void set_observer(PipelineObserver* observer) { _observer = observer; }
PipelineObserver* observer() const { return _observer; }

void _init_rf_counters(bool init_bloom);

protected:
OperatorFactory* _factory;
const int32_t _id;
Expand Down Expand Up @@ -334,7 +336,6 @@ class Operator {
PipelineObserver* _observer = nullptr;

private:
void _init_rf_counters(bool init_bloom);
void _init_conjuct_counters();

std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
18 changes: 18 additions & 0 deletions be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ Status ScanOperator::prepare(RuntimeState* state) {
_prepare_chunk_source_timer = ADD_TIMER(_unique_metrics, "PrepareChunkSourceTime");
_submit_io_task_timer = ADD_TIMER(_unique_metrics, "SubmitTaskTime");

if (_scan_node->is_enable_topn_filter_back_pressure()) {
if (auto* runtime_filters = runtime_bloom_filters(); runtime_filters != nullptr) {
auto has_topn_filters =
std::any_of(runtime_filters->descriptors().begin(), runtime_filters->descriptors().end(),
[](const auto& e) { return e.second->is_topn_filter(); });
if (has_topn_filters) {
_topn_filter_back_pressure = std::make_unique<TopnRfBackPressure>(
0.1, _scan_node->get_back_pressure_throttle_time_upper_bound(),
_scan_node->get_back_pressure_max_rounds(), _scan_node->get_back_pressure_throttle_time(),
_scan_node->get_back_pressure_num_rows());
}
}
}
RETURN_IF_ERROR(do_prepare(state));
return Status::OK();
}
Expand Down Expand Up @@ -139,6 +152,10 @@ bool ScanOperator::has_output() const {
return true;
}

if (!_morsel_queue->empty() && _topn_filter_back_pressure && _topn_filter_back_pressure->should_throttle()) {
return false;
}

// Try to buffer enough chunks for exec thread, to reduce scheduling overhead.
// It's like the Linux Block-Scheduler's Unplug algorithm, so we just name it unplug.
// The default threshould of unpluging is BufferCapacity/DOP/4, and its range is [1, 16]
Expand Down Expand Up @@ -275,6 +292,7 @@ StatusOr<ChunkPtr> ScanOperator::pull_chunk(RuntimeState* state) {
begin_pull_chunk(res);
// for query cache mechanism, we should emit EOS chunk when we receive the last chunk.
auto [owner_id, is_eos] = _should_emit_eos(res);
evaluate_topn_runtime_filters(res.get());
eval_runtime_bloom_filters(res.get());
res->owner_info().set_owner_id(owner_id, is_eos);
}
Expand Down
57 changes: 56 additions & 1 deletion be/src/exec/pipeline/scan/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

#pragma once

#include "exec/exec_node.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/pipeline/source_operator.h"
#include "exec/pipeline/topn_runtime_filter_back_pressure.h"
#include "exec/query_cache/cache_operator.h"
#include "exec/query_cache/lane_arbiter.h"
#include "exec/workgroup/work_group_fwd.h"
Expand Down Expand Up @@ -171,6 +173,57 @@ class ScanOperator : public SourceOperator {
return _scan_status;
}

void evaluate_topn_runtime_filters(Chunk* chunk) {
if (chunk == nullptr || chunk->is_empty() || !_topn_filter_back_pressure) {
return;
}
if (auto* topn_runtime_filters = runtime_bloom_filters()) {
auto input_num_rows = chunk->num_rows();
_init_topn_runtime_filter_counters();
topn_runtime_filters->evaluate(chunk, _topn_filter_eval_context);
_topn_filter_back_pressure->inc_num_rows(chunk->num_rows());
if (_topn_filter_eval_context.selectivity.empty()) {
_topn_filter_back_pressure->update_selectivity(1.0);
} else {
double selectivity = _topn_filter_eval_context.selectivity.begin()->first;
if (input_num_rows > 1024) {
_topn_filter_back_pressure->update_selectivity(selectivity);
}
}
}
}

void _init_topn_runtime_filter_counters() {
if (_topn_filter_eval_context.join_runtime_filter_timer == nullptr) {
_topn_filter_eval_context.mode = RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN;
_topn_filter_eval_context.join_runtime_filter_timer = ADD_TIMER(_common_metrics, "TopnRuntimeFilterTime");
_topn_filter_eval_context.join_runtime_filter_hash_timer =
ADD_TIMER(_common_metrics, "TopnRuntimeFilterHashTime");
_topn_filter_eval_context.join_runtime_filter_input_counter =
ADD_COUNTER(_common_metrics, "TopnRuntimeFilterInputRows", TUnit::UNIT);
_topn_filter_eval_context.join_runtime_filter_output_counter =
ADD_COUNTER(_common_metrics, "TopnRuntimeFilterOutputRows", TUnit::UNIT);
_topn_filter_eval_context.join_runtime_filter_eval_counter =
ADD_COUNTER(_common_metrics, "TopnRuntimeFilterEvaluate", TUnit::UNIT);
_topn_filter_eval_context.driver_sequence = _runtime_filter_probe_sequence;
}
}

void eval_runtime_bloom_filters(Chunk* chunk) override {
if (chunk == nullptr || chunk->is_empty()) {
return;
}

if (auto* bloom_filters = runtime_bloom_filters()) {
_init_rf_counters(true);
if (_topn_filter_back_pressure) {
_bloom_filter_eval_context.mode = RuntimeBloomFilterEvalContext::Mode::M_WITHOUT_TOPN;
}
bloom_filters->evaluate(chunk, _bloom_filter_eval_context);
}
ExecNode::eval_filter_null_values(chunk, filter_null_value_columns());
}

protected:
ScanNode* _scan_node = nullptr;
const int32_t _dop;
Expand Down Expand Up @@ -234,6 +287,9 @@ class ScanOperator : public SourceOperator {
RuntimeProfile::Counter* _prepare_chunk_source_timer = nullptr;
RuntimeProfile::Counter* _submit_io_task_timer = nullptr;

RuntimeBloomFilterEvalContext _topn_filter_eval_context;
std::unique_ptr<TopnRfBackPressure> _topn_filter_back_pressure = nullptr;

DECLARE_RACE_DETECTOR(race_pull_chunk)
};

Expand Down Expand Up @@ -261,7 +317,6 @@ class ScanOperatorFactory : public SourceOperatorFactory {

protected:
ScanNode* const _scan_node;

std::shared_ptr<workgroup::ScanTaskGroup> _scan_task_group;
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Potential segmentation fault due to a null pointer dereference when accessing runtime_bloom_filters().

You can modify the code like this:

void evaluate_topn_runtime_filters(Chunk* chunk) {
    if (chunk == nullptr || chunk->is_empty() || !_topn_filter_back_pressure) {
        return;
    }
    auto* topn_runtime_filters = runtime_bloom_filters();
    if (topn_runtime_filters != nullptr) {
        auto input_num_rows = chunk->num_rows();
        _init_topn_runtime_filter_counters();
        topn_runtime_filters->evaluate(chunk, _topn_filter_eval_context);
        _topn_filter_back_pressure->inc_num_rows(input_num_rows);
        if (_topn_filter_eval_context.selectivity.empty()) {
            _topn_filter_back_pressure->update_selectivity(1.0);
        } else {
            double selectivity = _topn_filter_eval_context.selectivity.begin()->first;
            if (input_num_rows > 1024) {
                _topn_filter_back_pressure->update_selectivity(selectivity);
            }
        }
    }
}

Expand Down
108 changes: 108 additions & 0 deletions be/src/exec/pipeline/topn_runtime_filter_back_pressure.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <limits>

namespace starrocks::pipeline {
using std::chrono::milliseconds;
using std::chrono::steady_clock;
class TopnRfBackPressure {
enum Phase { PH_UNTHROTTLE, PH_THROTTLE, PH_PASS_THROUGH };

template <typename T>
class ScaleGenerator {
public:
ScaleGenerator(T initial_value, T delta, double factor, std::function<bool(T)> next_cb)
: initial_value(initial_value), delta(delta), factor(factor), next_cb(next_cb), value(initial_value) {}

T limit() { return value; }
void next() {
value += delta;
value *= factor;
}
bool has_next() { return next_cb(value); }

private:
const T initial_value;
const T delta;
const double factor;
const std::function<bool(T)> next_cb;
T value;
};

public:
void update_selectivity(double selectivity) { _current_selectivity = selectivity; }
void inc_num_rows(size_t num_rows) { _current_num_rows += num_rows; }

bool should_throttle() {
if (_phase == PH_PASS_THROUGH) {
return false;
} else if (!_round_limiter.has_next() || !_throttle_time_limiter.has_next() || !_num_rows_limiter.has_next() ||
_current_selectivity <= _selectivity_lower_bound ||
_current_total_throttle_time >= _throttle_time_upper_bound) {
_phase = PH_PASS_THROUGH;
return false;
}

if (_phase == PH_UNTHROTTLE) {
if (_current_num_rows <= _num_rows_limiter.limit()) {
return false;
}
_phase = PH_THROTTLE;
_current_throttle_deadline = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count() +
_throttle_time_limiter.limit();
return true;
} else {
auto now = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
if (now < _current_throttle_deadline) {
return true;
}
_phase = PH_UNTHROTTLE;
_current_num_rows = 0;
_current_total_throttle_time += _throttle_time_limiter.limit();
_round_limiter.next();
_throttle_time_limiter.next();
_num_rows_limiter.next();
return false;
}
}

TopnRfBackPressure(double selectivity_lower_bound, int64_t throttle_time_upper_bound, int max_rounds,
int64_t throttle_time, size_t num_rows)
: _selectivity_lower_bound(selectivity_lower_bound),
_throttle_time_upper_bound(throttle_time_upper_bound),
_round_limiter(0, 1, 1.0, [max_rounds](int r) { return r < max_rounds; }),
_throttle_time_limiter(throttle_time, 0, 1.0, [](int64_t) { return true; }),
_num_rows_limiter(num_rows, 0, 2.0, [](size_t n) { return n < std::numeric_limits<size_t>::max() / 2; }) {
}

private:
const double _selectivity_lower_bound;
const int64_t _throttle_time_upper_bound;
Phase _phase{PH_UNTHROTTLE};
ScaleGenerator<int> _round_limiter;
ScaleGenerator<int64_t> _throttle_time_limiter;
ScaleGenerator<int64_t> _num_rows_limiter;
int64_t _current_throttle_deadline{-1};
int64_t _current_total_throttle_time{0};
size_t _current_num_rows{0};
double _current_selectivity{1.0};
};

} // namespace starrocks::pipeline
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Incorrect calculation of _current_throttle_deadline in should_throttle(), which results in the use of milliseconds instead of correctly casting steady_clock::now.

You can modify the code like this:

#include <chrono> // add this to the includes to use std::chrono_literals

if (_phase == PH_UNTHROTTLE) {
    if (_current_num_rows <= _num_rows_limiter.limit()) {
        return false;
    }
    _phase = PH_THROTTLE;
    _current_throttle_deadline = 
        (steady_clock::now() + milliseconds(_throttle_time_limiter.limit())).time_since_epoch().count();
    return true;
}

21 changes: 21 additions & 0 deletions be/src/exec/scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ class ScanNode : public ExecNode {

const std::vector<ColumnAccessPathPtr>& column_access_paths() const { return _column_access_paths; }

bool is_enable_topn_filter_back_pressure() const { return this->_enable_topn_filter_back_pressure; }
void set_enable_topn_filter_back_pressure(bool value) { this->_enable_topn_filter_back_pressure = value; }
int get_back_pressure_max_rounds() const { return this->_back_pressure_max_rounds; }
void set_back_pressure_max_rounds(int value) { this->_back_pressure_max_rounds = value; }
size_t get_back_pressure_num_rows() const { return this->_back_pressure_num_rows; }
void set_back_pressure_num_rows(size_t value) { this->_back_pressure_num_rows = value; }
int64_t get_back_pressure_throttle_time() const { return this->_back_pressure_throttle_time; }
void set_back_pressure_throttle_time(int64_t value) { this->_back_pressure_throttle_time = value; }
int64_t get_back_pressure_throttle_time_upper_bound() const {
return this->_back_pressure_throttle_time_upper_bound;
}
void set_back_pressure_throttle_time_upper_bound(int64_t value) {
this->_back_pressure_throttle_time_upper_bound = value;
}

protected:
RuntimeProfile::Counter* _bytes_read_counter = nullptr; // # bytes read from the scanner
// # rows/tuples read from the scanner (including those discarded by eval_conjucts())
Expand All @@ -150,6 +165,12 @@ class ScanNode : public ExecNode {
int32_t _io_tasks_per_scan_operator = config::io_tasks_per_scan_operator;

std::vector<ColumnAccessPathPtr> _column_access_paths;

bool _enable_topn_filter_back_pressure = false;
int _back_pressure_max_rounds = 5;
size_t _back_pressure_num_rows = 10240;
int64_t _back_pressure_throttle_time = 500;
int64_t _back_pressure_throttle_time_upper_bound = 5000;
};

} // namespace starrocks
23 changes: 20 additions & 3 deletions be/src/exprs/runtime_filter_bank.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,14 @@ void RuntimeFilterProbeCollector::close(RuntimeState* state) {
// do_evaluate is reentrant, can be called concurrently by multiple operators that shared the same
// RuntimeFilterProbeCollector.
void RuntimeFilterProbeCollector::do_evaluate(Chunk* chunk, RuntimeBloomFilterEvalContext& eval_context) {
if ((eval_context.input_chunk_nums++ & 31) == 0) {
if (eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN) {
update_selectivity(chunk, eval_context);
return;
} else {
if ((eval_context.input_chunk_nums++ & 31) == 0) {
update_selectivity(chunk, eval_context);
return;
}
}

auto& seletivity_map = eval_context.selectivity;
Expand All @@ -409,7 +414,8 @@ void RuntimeFilterProbeCollector::do_evaluate(Chunk* chunk, RuntimeBloomFilterEv
for (auto& kv : seletivity_map) {
RuntimeFilterProbeDescriptor* rf_desc = kv.second;
const RuntimeFilter* filter = rf_desc->runtime_filter(eval_context.driver_sequence);
if (filter == nullptr || filter->always_true()) {
bool skip_topn = eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_WITHOUT_TOPN;
if ((skip_topn && rf_desc->is_topn_filter()) || filter == nullptr || filter->always_true()) {
continue;
}
auto* ctx = rf_desc->probe_expr_ctx();
Expand Down Expand Up @@ -586,9 +592,18 @@ void RuntimeFilterProbeCollector::update_selectivity(Chunk* chunk, RuntimeBloomF
for (auto& kv : _descriptors) {
RuntimeFilterProbeDescriptor* rf_desc = kv.second;
const RuntimeFilter* filter = rf_desc->runtime_filter(eval_context.driver_sequence);
if (filter == nullptr || filter->always_true()) {
bool should_use =
eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN && rf_desc->is_topn_filter();
if (filter == nullptr || (!should_use && filter->always_true())) {
continue;
}
if (eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_WITHOUT_TOPN && rf_desc->is_topn_filter()) {
continue;
} else if (eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN &&
!rf_desc->is_topn_filter()) {
continue;
}

auto& selection = eval_context.running_context.use_merged_selection
? eval_context.running_context.merged_selection
: eval_context.running_context.selection;
Expand Down Expand Up @@ -630,6 +645,8 @@ void RuntimeFilterProbeCollector::update_selectivity(Chunk* chunk, RuntimeBloomF
dest[j] = src[j] & dest[j];
}
}
} else if (rf_desc->is_topn_filter() && eval_context.mode == RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN) {
seletivity_map.emplace(selectivity, rf_desc);
}
}
if (!seletivity_map.empty()) {
Expand Down
Loading
Loading