Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Throttle scan to wait for topn filter #55660

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

satanson
Copy link
Contributor

@satanson satanson commented Feb 7, 2025

Why I'm doing:

High associative hash join is time-consuming, since it magnifies data volume many times. if there is a topn operator above it, then we can use this topn filter generated by topn operator to reduce input data volume of the hash join; however, when perform tests on this, the scan operator below hash join always transfers data to the hash join so fast that make the topn filter take effects on scan operator too late, so input data volume of the hash join is not reduced successfully, so we design a back pressure mechanism that works as follows:

  1. scan operator allows rows of 10 times of limit+offset in topn operator to pass through to hash join operator, then wait for a small period of time(e.g. 100ms), we call this period the throttle period.
  2. scan operator has_output return false in throttle period, so scan operator does not transfer any data, just give a chance to topn operator to generate a topn filter.
  3. when current throttle period ends, scan operator use topn filter to filter its output data, if the topn filter is high selective, then scan operator can terminate this back pressure mechanism, just use this topn filter to filter incoming data.
  4. otherwise, scan operator begins an another throttle period.
  5. scan operator maybe begin throttle period for several times which controlled by the session variable: back_pressure_back_rounds, the throttle period equals to back_pressure_throttle_time_upper_bound/back_pressure_back_rounds.
  6. topn_filter_back_pressure_mode is used to turn on/off the back pressure mechanism.

Test

when topn filter back pressure mechanism is opened,data volume of left side of hash join is reduced to 1/60.
image

when it is closed
image

data volume of left side of hash join is reduced to 1/60.

  1. pipeline_dop=0, concurrency=20,back_pressure_back_rounds=3
+===================+==============+
| cases             | latency(sec) |
+===================+==============+
| disable opt       | 11.692       |
| enable opt(60ms)  | 5.920        |
| enable opt(100ms) | 5.853        |
| enable opt(300ms) | 5.959        |
| enable opt(600ms) | 6.279        |
+-------------------+--------------+

disable opt means turn off the optimization;
enable opt(60ms) means turn on the optimization; and back_pressure_throttle_time_upper_bound=60, i.e. total throttle time does not exceeds 60ms.

  1. pipeline_dop=1, back_pressure_throttle_time_upper_bound=300,back_pressure_back_rounds=10
+=============+==================+=================+=========+
| concurrency | disable opt(sec) | enable opt(sec) | speedup |
+=============+==================+=================+=========+
| 1           | 0.991            | 0.953           | 1.0X    |
| 10          | 4.089            | 2.831           | 1.4X    |
| 20          | 7.735            | 5.034           | 1.5X    |
| 40          | 15.210           | 9.688           | 1.5X    |
| 60          | 22.760(OOM)      | 14.600          | 1.5X    |
+-------------+------------------+-----------------+---------+

What I'm doing:

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.4
    • 3.3
    • 3.2
    • 3.1
    • 3.0

@satanson satanson requested a review from a team as a code owner February 7, 2025 06:41
@wanpengfei-git wanpengfei-git requested a review from a team February 7, 2025 06:42
}
}
return accept;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Division by zero when calculating backPressureThrottleTime.

You can modify the code like this:

this.backPressureThrottleTime = 
    this.backPressureThrottleTimeUpperBound / Math.max(this.backPressureNumRows, 1);

This change prevents division by zero by using Math.max(this.backPressureNumRows, 1) instead of Math.min(this.backPressureNumRows, 1), ensuring the denominator is never zero.

double _current_selectivity{1.0};
};

} // namespace starrocks::pipeline
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Incorrect calculation of _current_throttle_deadline in should_throttle(), which results in the use of milliseconds instead of correctly casting steady_clock::now.

You can modify the code like this:

#include <chrono> // add this to the includes to use std::chrono_literals

if (_phase == PH_UNTHROTTLE) {
    if (_current_num_rows <= _num_rows_limiter.limit()) {
        return false;
    }
    _phase = PH_THROTTLE;
    _current_throttle_deadline = 
        (steady_clock::now() + milliseconds(_throttle_time_limiter.limit())).time_since_epoch().count();
    return true;
}

@@ -261,7 +316,6 @@ class ScanOperatorFactory : public SourceOperatorFactory {

protected:
ScanNode* const _scan_node;

std::shared_ptr<workgroup::ScanTaskGroup> _scan_task_group;
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Potential segmentation fault due to a null pointer dereference when accessing runtime_bloom_filters().

You can modify the code like this:

void evaluate_topn_runtime_filters(Chunk* chunk) {
    if (chunk == nullptr || chunk->is_empty() || !_topn_filter_back_pressure) {
        return;
    }
    auto* topn_runtime_filters = runtime_bloom_filters();
    if (topn_runtime_filters != nullptr) {
        auto input_num_rows = chunk->num_rows();
        _init_topn_runtime_filter_counters();
        topn_runtime_filters->evaluate(chunk, _topn_filter_eval_context);
        _topn_filter_back_pressure->inc_num_rows(input_num_rows);
        if (_topn_filter_eval_context.selectivity.empty()) {
            _topn_filter_back_pressure->update_selectivity(1.0);
        } else {
            double selectivity = _topn_filter_eval_context.selectivity.begin()->first;
            if (input_num_rows > 1024) {
                _topn_filter_back_pressure->update_selectivity(selectivity);
            }
        }
    }
}

@satanson satanson force-pushed the topn_filter_throttle_scan branch 2 times, most recently from 32ac329 to 0bc798d Compare February 7, 2025 13:50
@satanson satanson force-pushed the topn_filter_throttle_scan branch from 0bc798d to e7db69b Compare February 8, 2025 02:36
Copy link

sonarqubecloud bot commented Feb 8, 2025

Copy link

github-actions bot commented Feb 8, 2025

[Java-Extensions Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

github-actions bot commented Feb 8, 2025

[BE Incremental Coverage Report]

fail : 16 / 124 (12.90%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 be/src/exec/olap_scan_node.cpp 0 7 00.00% [112, 113, 114, 115, 116, 117, 118]
🔵 be/src/exec/pipeline/topn_runtime_filter_back_pressure.h 0 39 00.00% [29, 31, 32, 34, 35, 36, 37, 39, 50, 51, 53, 54, 56, 57, 58, 59, 60, 63, 64, 67, 68, 69, 70, 72, 73, 76, 77, 78, 79, 80, 81, 82, 86, 88, 89, 90, 91, 92, 93]
🔵 be/src/exprs/runtime_filter_bank.cpp 0 16 00.00% [394, 398, 399, 400, 417, 418, 595, 596, 597, 600, 601, 602, 603, 604, 648, 649]
🔵 be/src/exec/connector_scan_node.cpp 1 8 12.50% [78, 79, 80, 81, 82, 83, 84]
🔵 be/src/exec/scan_node.h 1 6 16.67% [136, 138, 140, 142, 143]
🔵 be/src/exec/pipeline/scan/scan_operator.cpp 3 12 25.00% [94, 95, 96, 97, 98, 99, 100, 101, 102]
🔵 be/src/exec/pipeline/scan/scan_operator.h 11 36 30.56% [180, 181, 182, 183, 184, 185, 186, 188, 189, 190, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 210, 220]

Copy link

github-actions bot commented Feb 8, 2025

[FE Incremental Coverage Report]

fail : 22 / 49 (44.90%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/qe/SessionVariable.java 4 12 33.33% [4591, 4592, 4595, 4599, 4600, 4603, 4607, 4608]
🔵 com/starrocks/planner/OlapScanNode.java 15 33 45.45% [1015, 1016, 1017, 1018, 1019, 1020, 1061, 1062, 1063, 1064, 1065, 1550, 1551, 1552, 1553, 1554, 1555, 1556]
🔵 com/starrocks/planner/RuntimeFilterDescription.java 2 3 66.67% [159]
🔵 com/starrocks/planner/SortNode.java 1 1 100.00% []

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants