Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Move delta write open out of brpc worker #56517

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ CONF_Int32(arrow_flight_port, "-1");
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
// Whether to execute load channel RPC requests asynchronously, that is,
// to run RPCs in a separate thread pool instead of within BRPC workers
CONF_mBool(enable_load_channel_rpc_async, "true");
// Maximum threads in load channel RPC thread pool. Default: -1 (auto-set to CPU cores),
// aligning with brpc workers' default (brpc_num_threads) to keep compatible after
// switching from sync to async mode
CONF_mInt32(load_channel_rpc_thread_pool_num, "-1");
// The queue size for Load channel rpc thread pool
CONF_Int32(load_channel_rpc_thread_pool_queue_size, "102400");
CONF_mInt32(number_tablet_writer_threads, "16");
CONF_mInt64(max_queueing_memtable_per_tablet, "2");
// when memory limit exceed and memtable last update time exceed this time, memtable will be flushed
Expand Down
6 changes: 6 additions & 0 deletions be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "http/http_status.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "runtime/load_channel_mgr.h"
#include "storage/compaction_manager.h"
#include "storage/lake/compaction_scheduler.h"
#include "storage/lake/load_spill_block_manager.h"
Expand Down Expand Up @@ -305,6 +306,11 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::CREATE);
return thread_pool->update_max_threads(config::create_tablet_worker_count);
});
_config_callback.emplace("load_channel_rpc_thread_pool_num", [&]() -> Status {
LOG(INFO) << "set load_channel_rpc_thread_pool_num:" << config::load_channel_rpc_thread_pool_num;
return ExecEnv::GetInstance()->load_channel_mgr()->async_rpc_pool()->update_max_threads(
config::load_channel_rpc_thread_pool_num);
});
_config_callback.emplace("number_tablet_writer_threads", [&]() -> Status {
LOG(INFO) << "set number_tablet_writer_threads:" << config::number_tablet_writer_threads;
bthreads::ThreadPoolExecutor* executor = static_cast<bthreads::ThreadPoolExecutor*>(
Expand Down
58 changes: 34 additions & 24 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ LoadChannel::LoadChannel(LoadChannelMgr* mgr, LakeTabletManager* lake_tablet_mgr
_profile_report_count = ADD_COUNTER(_profile, "ProfileReportCount", TUnit::UNIT);
_profile_report_timer = ADD_TIMER(_profile, "ProfileReportTime");
_profile_serialized_size = ADD_COUNTER(_profile, "ProfileSerializedSize", TUnit::BYTES);
_open_request_count = ADD_COUNTER(_profile, "OpenRequestCount", TUnit::UNIT);
_open_request_pending_timer = ADD_TIMER(_profile, "OpenRequestPendingTime");
}

LoadChannel::~LoadChannel() {
Expand All @@ -109,11 +111,15 @@ void LoadChannel::set_profile_config(const PLoadChannelProfileConfig& config) {
}
}

void LoadChannel::open(brpc::Controller* cntl, const PTabletWriterOpenRequest& request,
PTabletWriterOpenResult* response, google::protobuf::Closure* done) {
void LoadChannel::open(const LoadChannelOpenRequest& open_request) {
int64_t start_time_ns = MonotonicNanos();
COUNTER_UPDATE(_open_request_count, 1);
COUNTER_UPDATE(_open_request_pending_timer, (start_time_ns - open_request.receive_rpc_time_ns));
const PTabletWriterOpenRequest& request = *open_request.request;
PTabletWriterOpenResult* response = open_request.response;
_span->AddEvent("open_index", {{"index_id", request.index_id()}});
auto scoped = trace::Scope(_span);
ClosureGuard done_guard(done);
ClosureGuard done_guard(open_request.done);

_last_updated_time.store(time(nullptr), std::memory_order_relaxed);
bool is_lake_tablet = request.has_is_lake_tablet() && request.is_lake_tablet();
Expand Down Expand Up @@ -154,6 +160,8 @@ void LoadChannel::open(brpc::Controller* cntl, const PTabletWriterOpenRequest& r
if (config::enable_load_colocate_mv) {
response->set_is_repeated_chunk(true);
}
int64_t cost_ms = (MonotonicNanos() - start_time_ns) / 1000000;
_check_and_log_timeout_rpc("tablet writer open", cost_ms, request.timeout_ms());
}

void LoadChannel::_add_chunk(Chunk* chunk, const MonotonicStopWatch* watch, const PTabletWriterAddChunkRequest& request,
Expand Down Expand Up @@ -243,27 +251,7 @@ void LoadChannel::add_chunks(const PTabletWriterAddChunksRequest& req, PTabletWr
StarRocksMetrics::instance()->load_channel_add_chunks_duration_us.increment(total_time_us);

report_profile(response, config::pipeline_print_profile);

// log profile if rpc timeout
if (total_time_us > timeout_ms * 1000) {
// update profile
auto channels = _get_all_channels();
for (auto& channel : channels) {
channel->update_profile();
}

std::stringstream ss;
_root_profile->pretty_print(&ss);
if (timeout_ms > config::load_rpc_slow_log_frequency_threshold_seconds) {
LOG(WARNING) << "tablet writer add chunk timeout. txn_id=" << _txn_id << ", cost=" << total_time_us / 1000
<< "ms, timeout=" << timeout_ms << "ms, profile=" << ss.str();
} else {
// reduce slow log print frequency if the log job is small batch and high frequency
LOG_EVERY_N(WARNING, 10) << "tablet writer add chunk timeout. txn_id=" << _txn_id
<< ", cost=" << total_time_us / 1000 << "ms, timeout=" << timeout_ms
<< "ms, profile=" << ss.str();
}
}
_check_and_log_timeout_rpc("tablet writer add chunk", total_time_us / 1000, timeout_ms);
}

void LoadChannel::add_segment(brpc::Controller* cntl, const PTabletWriterAddSegmentRequest* request,
Expand Down Expand Up @@ -486,4 +474,26 @@ Status LoadChannel::_update_and_serialize_profile(std::string* result, bool prin
VLOG(2) << "report profile, load_id: " << _load_id << ", txn_id: " << _txn_id << ", size: " << len;
return Status::OK();
}

void LoadChannel::_check_and_log_timeout_rpc(const std::string& rpc_name, int64_t cost_ms, int64_t timeout_ms) {
if (cost_ms <= timeout_ms) {
return;
}
// update profile
auto channels = _get_all_channels();
for (auto& channel : channels) {
channel->update_profile();
}

std::stringstream ss;
_root_profile->pretty_print(&ss);
if (timeout_ms > config::load_rpc_slow_log_frequency_threshold_seconds * 1000) {
LOG(WARNING) << rpc_name << " timeout. txn_id=" << _txn_id << ", cost=" << cost_ms
<< "ms, timeout=" << timeout_ms << "ms, profile=" << ss.str();
} else {
// reduce slow log print frequency if the log job is small batch and high frequency
LOG_EVERY_N(WARNING, 10) << rpc_name << " timeout. txn_id=" << _txn_id << ", cost=" << cost_ms
<< "ms, timeout=" << timeout_ms << "ms, profile=" << ss.str();
}
}
} // namespace starrocks
14 changes: 12 additions & 2 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ namespace lake {
class TabletManager;
}

struct LoadChannelOpenRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also the same as the RPC's controller, maybe name it LoadChannelOpenController or LoadChannelOpenContext

brpc::Controller* cntl;
const PTabletWriterOpenRequest* request;
PTabletWriterOpenResult* response;
google::protobuf::Closure* done;
int64_t receive_rpc_time_ns;
};

// A LoadChannel manages tablets channels for all indexes
// corresponding to a certain load job
class LoadChannel {
Expand All @@ -88,8 +96,7 @@ class LoadChannel {

// Open a new load channel if it does not exist.
// NOTE: This method may be called multiple times, and each time with a different |request|.
void open(brpc::Controller* cntl, const PTabletWriterOpenRequest& request, PTabletWriterOpenResult* response,
google::protobuf::Closure* done);
void open(const LoadChannelOpenRequest& open_request);

void add_chunk(const PTabletWriterAddChunkRequest& request, PTabletWriterAddBatchResult* response);

Expand Down Expand Up @@ -132,6 +139,7 @@ class LoadChannel {
bool _should_enable_profile();
std::vector<std::shared_ptr<TabletsChannel>> _get_all_channels();
Status _update_and_serialize_profile(std::string* serialized_profile, bool print_profile);
void _check_and_log_timeout_rpc(const std::string& rpc_name, int64_t cost_ms, int64_t timeout_ms);

LoadChannelMgr* _load_mgr;
LakeTabletManager* _lake_tablet_mgr;
Expand Down Expand Up @@ -178,6 +186,8 @@ class LoadChannel {
RuntimeProfile::Counter* _profile_report_count = nullptr;
RuntimeProfile::Counter* _profile_report_timer = nullptr;
RuntimeProfile::Counter* _profile_serialized_size = nullptr;
RuntimeProfile::Counter* _open_request_count = nullptr;
RuntimeProfile::Counter* _open_request_pending_timer = nullptr;
};

inline std::ostream& operator<<(std::ostream& os, const LoadChannel& load_channel) {
Expand Down
52 changes: 45 additions & 7 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,60 @@ LoadChannelMgr::~LoadChannelMgr() {
}

void LoadChannelMgr::close() {
std::lock_guard l(_lock);
for (auto iter = _load_channels.begin(); iter != _load_channels.end();) {
iter->second->cancel();
iter->second->abort();
iter = _load_channels.erase(iter);
{
std::lock_guard l(_lock);
for (auto iter = _load_channels.begin(); iter != _load_channels.end();) {
iter->second->cancel();
iter->second->abort();
iter = _load_channels.erase(iter);
}
}
if (_async_rpc_pool) {
_async_rpc_pool->shutdown();
}
}

Status LoadChannelMgr::init(MemTracker* mem_tracker) {
_mem_tracker = mem_tracker;
RETURN_IF_ERROR(_start_bg_worker());
int num_threads = config::load_channel_rpc_thread_pool_num;
if (num_threads <= 0) {
num_threads = CpuInfo::num_cores();
}
RETURN_IF_ERROR(ThreadPoolBuilder("load_channel_mgr")
.set_min_threads(5)
.set_max_threads(num_threads)
.set_max_queue_size(config::load_channel_rpc_thread_pool_queue_size)
.set_idle_timeout(MonoDelta::FromMilliseconds(10000))
.build(&_async_rpc_pool));
REGISTER_THREAD_POOL_METRICS(load_channel_mgr, _async_rpc_pool.get());
return Status::OK();
}

void LoadChannelMgr::open(brpc::Controller* cntl, const PTabletWriterOpenRequest& request,
PTabletWriterOpenResult* response, google::protobuf::Closure* done) {
ClosureGuard done_guard(done);
LoadChannelOpenRequest open_request;
open_request.cntl = cntl;
open_request.request = &request;
open_request.response = response;
open_request.done = done;
open_request.receive_rpc_time_ns = MonotonicNanos();
if (!config::enable_load_channel_rpc_async) {
_open(open_request);
return;
}
auto task = [=]() { this->_open(open_request); };
Status status = _async_rpc_pool->submit_func(std::move(task));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the task can be submit successful but not executed at all when doing pool shutdown, hence lead the done never get called and resulting the brpc server exit hanging.

if (!status.ok()) {
ClosureGuard closure_guard(done);
status.to_protobuf(response->mutable_status());
}
}

void LoadChannelMgr::_open(LoadChannelOpenRequest open_request) {
ClosureGuard done_guard(open_request.done);
const PTabletWriterOpenRequest& request = *open_request.request;
PTabletWriterOpenResult* response = open_request.response;
if (!request.encryption_meta().empty()) {
Status st = KeyCache::instance().refresh_keys(request.encryption_meta());
if (!st.ok()) {
Expand Down Expand Up @@ -143,7 +180,8 @@ void LoadChannelMgr::open(brpc::Controller* cntl, const PTabletWriterOpenRequest
return;
}
}
channel->open(cntl, request, response, done_guard.release());
done_guard.release();
channel->open(open_request);
}

void LoadChannelMgr::add_chunk(const PTabletWriterAddChunkRequest& request, PTabletWriterAddBatchResult* response) {
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/load_channel_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,18 @@ class LoadChannelMgr {

void close();

ThreadPool* async_rpc_pool() { return _async_rpc_pool.get(); }

std::shared_ptr<LoadChannel> TEST_get_load_channel(UniqueId load_id) {
std::lock_guard l(_lock);
auto it = _load_channels.find(load_id);
return it != _load_channels.end() ? it->second : nullptr;
}

private:
static void* load_channel_clean_bg_worker(void* arg);

void _open(LoadChannelOpenRequest open_request);
Status _start_bg_worker();
std::shared_ptr<LoadChannel> _find_load_channel(const UniqueId& load_id);
std::shared_ptr<LoadChannel> _find_load_channel(int64_t txn_id);
Expand All @@ -111,6 +120,9 @@ class LoadChannelMgr {

// thread to clean timeout load channels
bthread_t _load_channels_clean_thread;

// Thread pool used to handle rpc request asynchronously
std::unique_ptr<ThreadPool> _async_rpc_pool;
};

} // namespace starrocks
1 change: 1 addition & 0 deletions be/src/util/starrocks_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class StarRocksMetrics {
METRIC_DEFINE_INT_COUNTER(load_bytes_total, MetricUnit::BYTES);

// Metrics for LoadChannel
METRICS_DEFINE_THREAD_POOL(load_channel_mgr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_channel_pool? a little confused a thread pool is named as *_mgr

// The number that LoadChannel#add_chunks is accessed
METRIC_DEFINE_INT_COUNTER(load_channel_add_chunks_total, MetricUnit::OPERATIONS);
// The number that LoadChannel#add_chunks eos is accessed
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ set(EXEC_FILES
./runtime/lake_tablets_channel_test.cpp
./runtime/large_int_value_test.cpp
./runtime/load_channel_test.cpp
./runtime/load_channel_mgr_test.cpp
./runtime/memory/mem_chunk_allocator_test.cpp
./runtime/memory/system_allocator_test.cpp
./runtime/memory/memory_resource_test.cpp
Expand Down
Loading
Loading