Skip to content

Commit

Permalink
Merge branch 'master' into master-vault
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei authored Oct 25, 2024
2 parents 7a3acea + bf737b1 commit fcea16a
Show file tree
Hide file tree
Showing 56 changed files with 644 additions and 184 deletions.
61 changes: 26 additions & 35 deletions .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,40 @@
# specific language governing permissions and limitations
# under the License.
#
name: Auto Cherry-Pick to Branch

on:
pull_request:
types:
- closed
branches:
- master
types: ["closed"]

permissions:
checks: write
contents: write
pull-requests: write
repository-projects: write
jobs:
cherry_pick_branch_2.1:
runs-on: ubuntu-latest
name: Cherry pick into branch-2.1
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') && github.event.pull_request.merged == true }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Cherry pick into branch-2.1
uses: carloscastrojumo/[email protected]
with:
branch: branch-2.1
labels: |
cherry-pick
reviewers: |
yiguolei
cherry_pick_branch-3.0:
auto_cherry_pick:
runs-on: ubuntu-latest
name: Cherry pick into branch-3.0
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') && github.event.pull_request.merged == true }}
steps:
- name: Checkout
- name: Checkout repository
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
fetch-depth: 0
- name: Cherry pick into branch-3.0
uses: carloscastrojumo/[email protected]
with:
branch: branch-3.0
labels: |
cherry-pick
reviewers: |
dataroaring
title: '[cherry-pick] {old_title}'
body: 'Cherry picking #{old_pull_request_id} onto this branch'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
python-version: '3.x'

- name: Install dependencies
run: |
pip install PyGithub
- name: Auto cherry-pick
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-3.0
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0
4 changes: 2 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,8 @@ DEFINE_mInt32(schema_cache_sweep_time_sec, "100");
// max number of segment cache, default -1 for backward compatibility fd_number*2/5
DEFINE_Int32(segment_cache_capacity, "-1");
DEFINE_Int32(segment_cache_fd_percentage, "20");
DEFINE_mInt32(estimated_mem_per_column_reader, "1024");
DEFINE_Int32(segment_cache_memory_percentage, "2");
DEFINE_mInt32(estimated_mem_per_column_reader, "512");
DEFINE_Int32(segment_cache_memory_percentage, "5");

// enable feature binlog, default false
DEFINE_Bool(enable_feature_binlog, "false");
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,19 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
// Notify downstream pipeline tasks this dependency is ready.
void set_ready();
void set_ready_to_read() {
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->set_ready();
}
void set_block_to_read() {
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->block();
}
void set_ready_to_write() {
DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
_shared_state->sink_deps.front()->set_ready();
}
void set_block_to_write() {
DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
_shared_state->sink_deps.front()->block();
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_without_key(tnode.agg_node.grouping_exprs.empty()) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
}

Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));
Expand Down
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
return _needs_finalize
? DataDistribution(ExchangeType::NOOP)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
Expand Down Expand Up @@ -204,8 +203,8 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;

RowDescriptor _agg_fn_output_row_descriptor;
const bool _without_key;
};

} // namespace doris::pipeline
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_needs_finalize(tnode.agg_node.need_finalize),
_without_key(tnode.agg_node.grouping_exprs.empty()) {}
_without_key(tnode.agg_node.grouping_exprs.empty()) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
}

Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
_require_bucket_distribution(require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.analytic_node.partition_exprs) {}
: tnode.analytic_node.partition_exprs) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
}

Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo
_has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE),
_has_window_start(tnode.analytic_node.window.__isset.window_start),
_has_window_end(tnode.analytic_node.window.__isset.window_end) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
_fn_scope = AnalyticFnScope::PARTITION;
if (tnode.analytic_node.__isset.window &&
tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode
: StreamingOperatorX<AssertNumRowsLocalState>(pool, tnode, operator_id, descs),
_desired_num_rows(tnode.assert_num_rows_node.desired_num_rows),
_subquery_string(tnode.assert_num_rows_node.subquery_string) {
_is_serial_operator = true;
if (tnode.assert_num_rows_node.__isset.assertion) {
_assertion = tnode.assert_num_rows_node.assertion;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution) {
_require_bucket_distribution(require_bucket_distribution),
_without_key(tnode.agg_node.grouping_exprs.empty()) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class DistinctStreamingAggOperatorX final
bool need_more_input_data(RuntimeState* state) const override;

DataDistribution required_data_distribution() const override {
if (_needs_finalize && _probe_expr_ctxs.empty()) {
return {ExchangeType::NOOP};
}
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
Expand Down Expand Up @@ -136,6 +139,7 @@ class DistinctStreamingAggOperatorX final
/// The total size of the row from the aggregate functions.
size_t _total_size_of_aggregate_states = 0;
bool _is_streaming_preagg = false;
const bool _without_key;
};

} // namespace pipeline
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* src, PBlock* dest,
int num_receivers = 1);
DataDistribution required_data_distribution() const override;
bool is_serial_operator() const override { return true; }

private:
friend class ExchangeSinkLocalState;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/join_build_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
_short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join),
_runtime_filter_descs(tnode.runtime_filters) {
DataSinkOperatorX<LocalStateType>::_is_serial_operator =
tnode.__isset.is_serial_operator && tnode.is_serial_operator;
_init_join_op();
if (_is_mark_join) {
DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN ||
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
: true)

) {
Base::_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
if (tnode.__isset.hash_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ class NestedLoopJoinProbeOperatorX final
}

DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN ||
_join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
return {ExchangeType::NOOP};
}
return {ExchangeType::ADAPTIVE_PASSTHROUGH};
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ std::string PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentatio

std::string OperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}",
std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks);
fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}, _is_serial_operator={}",
std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks,
_is_serial_operator);
return fmt::to_string(debug_string_buffer);
}

Expand Down Expand Up @@ -363,8 +364,8 @@ void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;

fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '),
_name, node_id());
fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}",
std::string(indentation_level * 2, ' '), _name, node_id(), _is_serial_operator);
return fmt::to_string(debug_string_buffer);
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class OperatorBase {
return Status::OK();
}

// Operators need to be executed serially. (e.g. finalized agg without key)
[[nodiscard]] virtual bool is_serial_operator() const { return _is_serial_operator; }

[[nodiscard]] bool is_closed() const { return _is_closed; }

virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
Expand All @@ -122,6 +125,7 @@ class OperatorBase {

bool _is_closed;
bool _followed_by_shuffled_operator = false;
bool _is_serial_operator = false;
};

class PipelineXLocalStateBase {
Expand Down Expand Up @@ -444,7 +448,7 @@ class DataSinkOperatorXBase : public OperatorBase {

Status init(const TDataSink& tsink) override;
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
const bool is_shuffled_hash_join,
const bool use_global_hash_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
return Status::InternalError("init() is only implemented in local exchange!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
return _agg_source_operator->close(state);
}

bool PartitionedAggSourceOperatorX::is_serial_operator() const {
return _agg_source_operator->is_serial_operator();
}

Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class PartitionedAggSourceOperatorX : public OperatorX<PartitionedAggLocalState>

bool is_source() const override { return true; }

bool is_serial_operator() const override;

private:
friend class PartitionedAggLocalState;

Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP
: std::vector<TExpr> {}),
_algorithm(tnode.sort_node.__isset.algorithm ? tnode.sort_node.algorithm
: TSortAlgorithm::FULL_SORT),
_reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
_reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
}

Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
} else if (_merge_by_exchange) {
// The current sort node is used for the ORDER BY
return {ExchangeType::PASSTHROUGH};
} else {
return {ExchangeType::NOOP};
}
return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
bool require_shuffled_data_distribution() const override { return _is_analytic_sort; }
bool require_data_distribution() const override { return _is_colocate; }
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnod
const DescriptorTbl& descs)
: OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) {}
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
}

Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(Base::init(tnode, state));
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
using Base = OperatorX<UnionSourceLocalState>;
UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {};
: Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
}
~UnionSourceOperatorX() override = default;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
}

Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets,
const bool should_disable_bucket_shuffle,
const bool use_global_hash_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
_use_global_shuffle = should_disable_bucket_shuffle;
_use_global_shuffle = use_global_hash_shuffle;
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
if (should_disable_bucket_shuffle) {
if (use_global_hash_shuffle) {
std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(),
[&](const auto& item) {
DCHECK(item.first != -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}

Status init(ExchangeType type, const int num_buckets, const bool should_disable_bucket_shuffle,
Status init(ExchangeType type, const int num_buckets, const bool use_global_hash_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx) override;

Status open(RuntimeState* state) override;
Expand Down
Loading

0 comments on commit fcea16a

Please sign in to comment.