Skip to content

Commit

Permalink
Merge branch 'master' into master-log
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei authored Jan 24, 2025
2 parents 18f0639 + 4aaa657 commit 735d2b9
Show file tree
Hide file tree
Showing 194 changed files with 2,667 additions and 232 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b
return true;
});
DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
DEFINE_Int32(remote_split_source_batch_size, "10240");
DEFINE_Int32(remote_split_source_batch_size, "1000");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
Expand Down
24 changes: 23 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/defer_op.h"
#include "util/proto_util.h"
#include "util/time.h"
#include "vec/sink/vdata_stream_sender.h"
Expand Down Expand Up @@ -442,7 +443,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
// When the receiving side reaches eof, it means the receiver has finished early.
// The remaining data in the current rpc_channel does not need to be sent,
// and the rpc_channel should be turned off immediately.
_turn_off_channel(id, lock);
Defer turn_off([&]() { _turn_off_channel(id, lock); });

std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
Expand All @@ -458,12 +460,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
// Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF,
// ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked
_total_queue_size--;
if (q.front().block) {
COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(),
-q.front().block->ByteSizeLong());
}
}

// Try to wake up pipeline after clearing the queue
if (_total_queue_size <= _queue_capacity) {
for (auto& [_, dep] : _queue_deps) {
dep->set_ready();
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
Expand Down Expand Up @@ -575,6 +587,16 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
}
}

std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
fmt::memory_buffer debug_string_buffer;
for (auto& [id, m] : _instance_to_package_queue_mutex) {
std::unique_lock<std::mutex> lock(*m);
fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id,
_instance_to_package_queue[id].size());
}
return fmt::to_string(debug_string_buffer);
}

} // namespace pipeline
#include "common/compile_check_end.h"
} // namespace doris
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
_queue_deps[sender_ins_id] = queue_dependency;
_parents[sender_ins_id] = local_state;
}

std::string debug_each_instance_queue_size();
#ifdef BE_TEST
public:
#else
Expand Down Expand Up @@ -306,6 +308,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();

// _total_queue_size is the sum of the sizes of all instance_to_package_queues.
// Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size.
std::atomic<int> _total_queue_size = 0;

// _running_sink_count is used to track how many sinks have not finished yet.
Expand Down
13 changes: 7 additions & 6 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,12 +499,13 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
if (_sink_buffer) {
fmt::format_to(debug_string_buffer,
", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: "
"{}, queue dep: {}), _reach_limit: {}, working channels: {}",
_sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size,
_sink_buffer->_queue_capacity, (void*)_queue_dependency.get(),
_reach_limit.load(), _working_channels_count.load());
fmt::format_to(
debug_string_buffer,
", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: "
"{}, queue dep: {}), _reach_limit: {}, working channels: {} , each queue size: {}",
_sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size,
_sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(),
_working_channels_count.load(), _sink_buffer->debug_each_instance_queue_size());
}
return fmt::to_string(debug_string_buffer);
}
Expand Down
21 changes: 19 additions & 2 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,23 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {

auto& request = arg->request;

MonotonicStopWatch watch;
watch.start();
int64_t total_download_bytes = 0;
int64_t total_download_files = 0;
TStatus tstatus;
std::vector<std::string> download_success_files;
Defer defer {[=, &engine, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
LOG(INFO) << "ingest binlog. result: " << apache::thrift::ThriftDebugString(tstatus);
Defer defer {[=, &engine, &tstatus, ingest_binlog_tstatus = arg->tstatus, &watch,
&total_download_bytes, &total_download_files]() {
auto elapsed_time_ms = static_cast<int64_t>(watch.elapsed_time() / 1000000);
double copy_rate = 0.0;
if (elapsed_time_ms > 0) {
copy_rate = total_download_bytes / ((double)elapsed_time_ms) / 1000;
}
LOG(INFO) << "ingest binlog elapsed " << elapsed_time_ms << " ms, download "
<< total_download_files << " files, total " << total_download_bytes
<< " bytes, avg rate " << copy_rate
<< " MB/s. result: " << apache::thrift::ThriftDebugString(tstatus);
if (tstatus.status_code != TStatusCode::OK) {
// abort txn
engine.txn_manager()->abort_txn(partition_id, txn_id, local_tablet_id,
Expand Down Expand Up @@ -269,6 +282,8 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
status.to_thrift(&tstatus);
return;
}
total_download_bytes = total_size;
total_download_files = num_segments;

// Step 5.3: get all segment files
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
Expand Down Expand Up @@ -442,6 +457,8 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
status.to_thrift(&tstatus);
return;
}
total_download_bytes += total_index_size;
total_download_files += segment_index_file_urls.size();

// Step 6.3: get all segment index files
DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size());
Expand Down
13 changes: 3 additions & 10 deletions be/src/vec/functions/function_case.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,9 @@ class FunctionCase : public IFunction {
break;
}
} else {
if constexpr (when_null) {
if (!then_idx_ptr[row_idx] && when_column_ptr->get_bool(row_idx)) {
then_idx_ptr[row_idx] = i;
break;
}
} else {
if (!then_idx_ptr[row_idx]) {
then_idx_ptr[row_idx] = i;
break;
}
if (!then_idx_ptr[row_idx] && when_column_ptr->get_bool(row_idx)) {
then_idx_ptr[row_idx] = i;
break;
}
}
}
Expand Down
Loading

0 comments on commit 735d2b9

Please sign in to comment.