From a72f6b8c01ba91aa4071e9ea87405518f80fd62e Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Tue, 4 Mar 2025 14:50:43 +0800 Subject: [PATCH 1/3] Support to execute tablet_writer_open asynchronously Signed-off-by: PengFei Li --- be/src/common/config.h | 9 ++++ be/src/http/action/update_config_action.cpp | 6 +++ be/src/runtime/load_channel.cpp | 59 ++++++++++++--------- be/src/runtime/load_channel.h | 14 ++++- be/src/runtime/load_channel_mgr.cpp | 52 +++++++++++++++--- be/src/runtime/load_channel_mgr.h | 6 +++ be/src/util/starrocks_metrics.h | 1 + be/test/runtime/load_channel_test.cpp | 28 ++++++---- 8 files changed, 133 insertions(+), 42 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index eb713e2b2f63dc..d7787fafb91187 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -404,6 +404,15 @@ CONF_Int32(arrow_flight_port, "-1"); CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); +// Whether to execute load channel RPC requests asynchronously, that is, +// to run RPCs in a separate thread pool instead of within BRPC workers +CONF_mBool(enable_load_channel_rpc_async, "true"); +// Maximum threads in load channel RPC thread pool. Default: -1 (auto-set to CPU cores), +// aligning with brpc workers' default (brpc_num_threads) to keep compatible after +// switching from sync to async mode +CONF_mInt32(load_channel_rpc_thread_pool_num, "-1"); +// The queue size for Load channel rpc thread pool +CONF_Int32(load_channel_rpc_thread_pool_queue_size, "102400"); CONF_mInt32(number_tablet_writer_threads, "16"); CONF_mInt64(max_queueing_memtable_per_tablet, "2"); // when memory limit exceed and memtable last update time exceed this time, memtable will be flushed diff --git a/be/src/http/action/update_config_action.cpp b/be/src/http/action/update_config_action.cpp index 4c3ab9a607c8c0..40896a1803e7ed 100644 --- a/be/src/http/action/update_config_action.cpp +++ b/be/src/http/action/update_config_action.cpp @@ -54,6 +54,7 @@ #include "http/http_status.h" #include "runtime/batch_write/batch_write_mgr.h" #include "runtime/batch_write/txn_state_cache.h" +#include "runtime/load_channel_mgr.h" #include "storage/compaction_manager.h" #include "storage/lake/compaction_scheduler.h" #include "storage/lake/load_spill_block_manager.h" @@ -305,6 +306,11 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::CREATE); return thread_pool->update_max_threads(config::create_tablet_worker_count); }); + _config_callback.emplace("load_channel_rpc_thread_pool_num", [&]() -> Status { + LOG(INFO) << "set load_channel_rpc_thread_pool_num:" << config::load_channel_rpc_thread_pool_num; + return ExecEnv::GetInstance()->load_channel_mgr()->async_rpc_pool()->update_max_threads( + config::load_channel_rpc_thread_pool_num); + }); _config_callback.emplace("number_tablet_writer_threads", [&]() -> Status { LOG(INFO) << "set number_tablet_writer_threads:" << config::number_tablet_writer_threads; bthreads::ThreadPoolExecutor* executor = static_cast( diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 85461659aa15bd..87ae1e8ced5a8f 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -88,6 +88,8 @@ LoadChannel::LoadChannel(LoadChannelMgr* mgr, LakeTabletManager* lake_tablet_mgr _profile_report_count = ADD_COUNTER(_profile, "ProfileReportCount", TUnit::UNIT); _profile_report_timer = ADD_TIMER(_profile, "ProfileReportTime"); _profile_serialized_size = ADD_COUNTER(_profile, "ProfileSerializedSize", TUnit::BYTES); + _open_request_count = ADD_COUNTER(_profile, "OpenRequestCount", TUnit::UNIT); + _open_request_pending_timer = ADD_TIMER(_profile, "OpenRequestPendingTime"); } LoadChannel::~LoadChannel() { @@ -109,11 +111,16 @@ void LoadChannel::set_profile_config(const PLoadChannelProfileConfig& config) { } } -void LoadChannel::open(brpc::Controller* cntl, const PTabletWriterOpenRequest& request, - PTabletWriterOpenResult* response, google::protobuf::Closure* done) { +void LoadChannel::open(const LoadChannelOpenRequest& open_request) { + int64_t start_time_ns = MonotonicNanos(); + COUNTER_UPDATE(_open_request_count, 1); + COUNTER_UPDATE(_open_request_pending_timer, (start_time_ns - open_request.receive_rpc_time_ns)); + brpc::Controller* cntl = open_request.cntl; + const PTabletWriterOpenRequest& request = *open_request.request; + PTabletWriterOpenResult* response = open_request.response; _span->AddEvent("open_index", {{"index_id", request.index_id()}}); auto scoped = trace::Scope(_span); - ClosureGuard done_guard(done); + ClosureGuard done_guard(open_request.done); _last_updated_time.store(time(nullptr), std::memory_order_relaxed); bool is_lake_tablet = request.has_is_lake_tablet() && request.is_lake_tablet(); @@ -154,6 +161,8 @@ void LoadChannel::open(brpc::Controller* cntl, const PTabletWriterOpenRequest& r if (config::enable_load_colocate_mv) { response->set_is_repeated_chunk(true); } + int64_t cost_ms = (MonotonicNanos() - start_time_ns) / 1000000; + _check_and_log_timeout_rpc("tablet writer open", cost_ms, request.timeout_ms()); } void LoadChannel::_add_chunk(Chunk* chunk, const MonotonicStopWatch* watch, const PTabletWriterAddChunkRequest& request, @@ -243,27 +252,7 @@ void LoadChannel::add_chunks(const PTabletWriterAddChunksRequest& req, PTabletWr StarRocksMetrics::instance()->load_channel_add_chunks_duration_us.increment(total_time_us); report_profile(response, config::pipeline_print_profile); - - // log profile if rpc timeout - if (total_time_us > timeout_ms * 1000) { - // update profile - auto channels = _get_all_channels(); - for (auto& channel : channels) { - channel->update_profile(); - } - - std::stringstream ss; - _root_profile->pretty_print(&ss); - if (timeout_ms > config::load_rpc_slow_log_frequency_threshold_seconds) { - LOG(WARNING) << "tablet writer add chunk timeout. txn_id=" << _txn_id << ", cost=" << total_time_us / 1000 - << "ms, timeout=" << timeout_ms << "ms, profile=" << ss.str(); - } else { - // reduce slow log print frequency if the log job is small batch and high frequency - LOG_EVERY_N(WARNING, 10) << "tablet writer add chunk timeout. txn_id=" << _txn_id - << ", cost=" << total_time_us / 1000 << "ms, timeout=" << timeout_ms - << "ms, profile=" << ss.str(); - } - } + _check_and_log_timeout_rpc("tablet writer add chunk", total_time_us / 1000, timeout_ms); } void LoadChannel::add_segment(brpc::Controller* cntl, const PTabletWriterAddSegmentRequest* request, @@ -486,4 +475,26 @@ Status LoadChannel::_update_and_serialize_profile(std::string* result, bool prin VLOG(2) << "report profile, load_id: " << _load_id << ", txn_id: " << _txn_id << ", size: " << len; return Status::OK(); } + +void LoadChannel::_check_and_log_timeout_rpc(const std::string& rpc_name, int64_t cost_ms, int64_t timeout_ms) { + if (cost_ms <= timeout_ms) { + return; + } + // update profile + auto channels = _get_all_channels(); + for (auto& channel : channels) { + channel->update_profile(); + } + + std::stringstream ss; + _root_profile->pretty_print(&ss); + if (timeout_ms > config::load_rpc_slow_log_frequency_threshold_seconds * 1000) { + LOG(WARNING) << rpc_name << " timeout. txn_id=" << _txn_id << ", cost=" << cost_ms + << "ms, timeout=" << timeout_ms << "ms, profile=" << ss.str(); + } else { + // reduce slow log print frequency if the log job is small batch and high frequency + LOG_EVERY_N(WARNING, 10) << rpc_name << " timeout. txn_id=" << _txn_id << ", cost=" << cost_ms + << "ms, timeout=" << timeout_ms << "ms, profile=" << ss.str(); + } +} } // namespace starrocks diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 882645a60d81cd..ff595903170000 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -71,6 +71,14 @@ namespace lake { class TabletManager; } +struct LoadChannelOpenRequest { + brpc::Controller* cntl; + const PTabletWriterOpenRequest* request; + PTabletWriterOpenResult* response; + google::protobuf::Closure* done; + int64_t receive_rpc_time_ns; +}; + // A LoadChannel manages tablets channels for all indexes // corresponding to a certain load job class LoadChannel { @@ -88,8 +96,7 @@ class LoadChannel { // Open a new load channel if it does not exist. // NOTE: This method may be called multiple times, and each time with a different |request|. - void open(brpc::Controller* cntl, const PTabletWriterOpenRequest& request, PTabletWriterOpenResult* response, - google::protobuf::Closure* done); + void open(const LoadChannelOpenRequest& open_request); void add_chunk(const PTabletWriterAddChunkRequest& request, PTabletWriterAddBatchResult* response); @@ -132,6 +139,7 @@ class LoadChannel { bool _should_enable_profile(); std::vector> _get_all_channels(); Status _update_and_serialize_profile(std::string* serialized_profile, bool print_profile); + void _check_and_log_timeout_rpc(const std::string& rpc_name, int64_t cost_ms, int64_t timeout_ms); LoadChannelMgr* _load_mgr; LakeTabletManager* _lake_tablet_mgr; @@ -178,6 +186,8 @@ class LoadChannel { RuntimeProfile::Counter* _profile_report_count = nullptr; RuntimeProfile::Counter* _profile_report_timer = nullptr; RuntimeProfile::Counter* _profile_serialized_size = nullptr; + RuntimeProfile::Counter* _open_request_count = nullptr; + RuntimeProfile::Counter* _open_request_pending_timer = nullptr; }; inline std::ostream& operator<<(std::ostream& os, const LoadChannel& load_channel) { diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index e68eac67bd627d..0cf84f0e661acd 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -85,23 +85,60 @@ LoadChannelMgr::~LoadChannelMgr() { } void LoadChannelMgr::close() { - std::lock_guard l(_lock); - for (auto iter = _load_channels.begin(); iter != _load_channels.end();) { - iter->second->cancel(); - iter->second->abort(); - iter = _load_channels.erase(iter); + { + std::lock_guard l(_lock); + for (auto iter = _load_channels.begin(); iter != _load_channels.end();) { + iter->second->cancel(); + iter->second->abort(); + iter = _load_channels.erase(iter); + } + } + if (_async_rpc_pool) { + _async_rpc_pool->shutdown(); } } Status LoadChannelMgr::init(MemTracker* mem_tracker) { _mem_tracker = mem_tracker; RETURN_IF_ERROR(_start_bg_worker()); + int num_threads = config::load_channel_rpc_thread_pool_num; + if (num_threads <= 0) { + num_threads = CpuInfo::num_cores(); + } + RETURN_IF_ERROR(ThreadPoolBuilder("load_channel_mgr") + .set_min_threads(5) + .set_max_threads(num_threads) + .set_max_queue_size(config::load_channel_rpc_thread_pool_queue_size) + .set_idle_timeout(MonoDelta::FromMilliseconds(10000)) + .build(&_async_rpc_pool)); + REGISTER_THREAD_POOL_METRICS(load_channel_mgr, _async_rpc_pool.get()); return Status::OK(); } void LoadChannelMgr::open(brpc::Controller* cntl, const PTabletWriterOpenRequest& request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - ClosureGuard done_guard(done); + LoadChannelOpenRequest open_request; + open_request.cntl = cntl; + open_request.request = &request; + open_request.response = response; + open_request.done = done; + open_request.receive_rpc_time_ns = MonotonicNanos(); + if (!config::enable_load_channel_rpc_async) { + _open(open_request); + return; + } + auto task = [=]() { this->_open(open_request); }; + Status status = _async_rpc_pool->submit_func(std::move(task)); + if (!status.ok()) { + ClosureGuard closure_guard(done); + status.to_protobuf(response->mutable_status()); + } +} + +void LoadChannelMgr::_open(LoadChannelOpenRequest open_request) { + ClosureGuard done_guard(open_request.done); + const PTabletWriterOpenRequest& request = *open_request.request; + PTabletWriterOpenResult* response = open_request.response; if (!request.encryption_meta().empty()) { Status st = KeyCache::instance().refresh_keys(request.encryption_meta()); if (!st.ok()) { @@ -143,7 +180,8 @@ void LoadChannelMgr::open(brpc::Controller* cntl, const PTabletWriterOpenRequest return; } } - channel->open(cntl, request, response, done_guard.release()); + done_guard.release(); + channel->open(open_request); } void LoadChannelMgr::add_chunk(const PTabletWriterAddChunkRequest& request, PTabletWriterAddBatchResult* response) { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index a4e5c3972d5a2b..624bb1bb00fc03 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -93,9 +93,12 @@ class LoadChannelMgr { void close(); + ThreadPool* async_rpc_pool() { return _async_rpc_pool.get(); } + private: static void* load_channel_clean_bg_worker(void* arg); + void _open(LoadChannelOpenRequest open_request); Status _start_bg_worker(); std::shared_ptr _find_load_channel(const UniqueId& load_id); std::shared_ptr _find_load_channel(int64_t txn_id); @@ -111,6 +114,9 @@ class LoadChannelMgr { // thread to clean timeout load channels bthread_t _load_channels_clean_thread; + + // Thread pool used to handle rpc request asynchronously + std::unique_ptr _async_rpc_pool; }; } // namespace starrocks diff --git a/be/src/util/starrocks_metrics.h b/be/src/util/starrocks_metrics.h index e7ed2097105be8..96b0c92b28b6b7 100644 --- a/be/src/util/starrocks_metrics.h +++ b/be/src/util/starrocks_metrics.h @@ -205,6 +205,7 @@ class StarRocksMetrics { METRIC_DEFINE_INT_COUNTER(load_bytes_total, MetricUnit::BYTES); // Metrics for LoadChannel + METRICS_DEFINE_THREAD_POOL(load_channel_mgr); // The number that LoadChannel#add_chunks is accessed METRIC_DEFINE_INT_COUNTER(load_channel_add_chunks_total, MetricUnit::OPERATIONS); // The number that LoadChannel#add_chunks eos is accessed diff --git a/be/test/runtime/load_channel_test.cpp b/be/test/runtime/load_channel_test.cpp index bb1a0c202ad761..f007d5986456dc 100644 --- a/be/test/runtime/load_channel_test.cpp +++ b/be/test/runtime/load_channel_test.cpp @@ -249,11 +249,21 @@ class LoadChannelTestForLakeTablet : public testing::Test { std::shared_ptr _load_channel; }; +LoadChannelOpenRequest create_open_request(const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response) { + LoadChannelOpenRequest open_request; + open_request.cntl = nullptr; + open_request.request = request; + open_request.response = response; + open_request.done = nullptr; + open_request.receive_rpc_time_ns = MonotonicNanos(); + return open_request; +} + TEST_F(LoadChannelTestForLakeTablet, test_simple_write) { PTabletWriterOpenRequest open_request = _open_request; PTabletWriterOpenResult open_response; open_request.set_num_senders(1); - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()); constexpr int kChunkSize = 128; @@ -325,7 +335,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_simple_write) { TEST_F(LoadChannelTestForLakeTablet, test_write_concurrently) { PTabletWriterOpenRequest open_request = _open_request; PTabletWriterOpenResult open_response; - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()); constexpr int kChunkSize = 128; @@ -398,7 +408,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_abort) { PTabletWriterOpenRequest open_request = _open_request; PTabletWriterOpenResult open_response; open_request.set_num_senders(1); - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()); constexpr int kChunkSize = 128; @@ -469,7 +479,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_incremental_open) { PTabletWriterOpenResult open_response; open_request.set_num_senders(1); open_request.set_is_incremental(true); - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()) << open_response.status().error_msgs(0); ch = _load_channel->get_tablets_channel(TabletsChannelKey(open_request.id(), 0, kIndexId)); ASSERT_NE(nullptr, ch.get()); @@ -482,7 +492,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_incremental_open) { PTabletWriterOpenResult open_response; open_request.set_is_incremental(true); // open again with incremental info. - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); EXPECT_EQ(TStatusCode::OK, open_response.status().status_code()) << open_response.status().error_msgs(0); auto ch2 = _load_channel->get_tablets_channel(TabletsChannelKey(open_request.id(), 0, kIndexId)); @@ -497,7 +507,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_add_segment) { PTabletWriterOpenRequest open_request = _open_request; PTabletWriterOpenResult open_response; open_request.set_num_senders(1); - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()); } @@ -597,7 +607,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_final_profile) { open_request.mutable_load_channel_profile_config()->CopyFrom(profile_config); PTabletWriterOpenResult open_response; - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()); constexpr int kChunkSize = 128; @@ -676,7 +686,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_slow_log_profile) { open_request.mutable_load_channel_profile_config()->CopyFrom(profile_config); PTabletWriterOpenResult open_response; - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()); constexpr int kChunkSize = 128; @@ -746,7 +756,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_load_diagnose) { open_request.set_num_senders(1); PTabletWriterOpenResult open_response; - _load_channel->open(nullptr, open_request, &open_response, nullptr); + _load_channel->open(create_open_request(&open_request, &open_response)); ASSERT_EQ(TStatusCode::OK, open_response.status().status_code()); constexpr int kChunkSize = 128; From f659f51f4de65d08bb12815e4359c6a879a2bd81 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Wed, 5 Mar 2025 15:01:31 +0800 Subject: [PATCH 2/3] Add tests Signed-off-by: PengFei Li --- be/src/runtime/load_channel_mgr.h | 6 + be/test/CMakeLists.txt | 1 + be/test/runtime/load_channel_mgr_test.cpp | 232 ++++++++++++++++++++++ be/test/util/starrocks_metrics_test.cpp | 1 + 4 files changed, 240 insertions(+) create mode 100644 be/test/runtime/load_channel_mgr_test.cpp diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 624bb1bb00fc03..a0dca7e8bbb609 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -95,6 +95,12 @@ class LoadChannelMgr { ThreadPool* async_rpc_pool() { return _async_rpc_pool.get(); } + std::shared_ptr TEST_get_load_channel(UniqueId load_id) { + std::lock_guard l(_lock); + auto it = _load_channels.find(load_id); + return it != _load_channels.end() ? it->second : nullptr; + } + private: static void* load_channel_clean_bg_worker(void* arg); diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 30a56703794ff2..1db2365281dd8a 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -383,6 +383,7 @@ set(EXEC_FILES ./runtime/lake_tablets_channel_test.cpp ./runtime/large_int_value_test.cpp ./runtime/load_channel_test.cpp + ./runtime/load_channel_mgr_test.cpp ./runtime/memory/mem_chunk_allocator_test.cpp ./runtime/memory/system_allocator_test.cpp ./runtime/memory/memory_resource_test.cpp diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp new file mode 100644 index 00000000000000..a581aa6db370c7 --- /dev/null +++ b/be/test/runtime/load_channel_mgr_test.cpp @@ -0,0 +1,232 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "runtime/load_channel_mgr.h" + +#include +#include + +#include "storage/chunk_helper.h" +#include "storage/storage_engine.h" +#include "storage/tablet_manager.h" +#include "storage/tablet_schema.h" +#include "testutil/assert.h" +#include "util/await.h" + +namespace starrocks { + +class LoadChannelMgrTest : public testing::Test { +public: + LoadChannelMgrTest() = default; + ~LoadChannelMgrTest() override = default; + +protected: + void SetUp() override { + _mem_tracker = std::make_unique(-1); + _load_channel_mgr = std::make_unique(); + _node_id = 100; + _db_id = 100; + _table_id = 101; + _partition_id = 10; + _index_id = 1; + _tablet_id = rand(); + _tablet = create_tablet(_tablet_id, rand()); + _schema = std::make_shared(ChunkHelper::convert_schema(_tablet->tablet_schema())); + } + void TearDown() override { + if (_tablet) { + _tablet.reset(); + ASSERT_OK(StorageEngine::instance()->tablet_manager()->drop_tablet(_tablet_id)); + } + if (_load_channel_mgr) { + _load_channel_mgr->close(); + } + } + + TabletSharedPtr create_tablet(int64_t tablet_id, int32_t schema_hash) { + TCreateTabletReq request; + request.tablet_id = tablet_id; + request.__set_version(1); + request.tablet_schema.schema_hash = schema_hash; + request.tablet_schema.short_key_column_count = 1; + request.tablet_schema.keys_type = TKeysType::DUP_KEYS; + request.tablet_schema.storage_type = TStorageType::COLUMN; + + TColumn c0; + c0.column_name = "c0"; + c0.__set_is_key(true); + c0.column_type.type = TPrimitiveType::INT; + request.tablet_schema.columns.push_back(c0); + + TColumn c1; + c1.column_name = "c1"; + c1.__set_is_key(false); + c1.column_type.type = TPrimitiveType::INT; + request.tablet_schema.columns.push_back(c1); + + auto st = StorageEngine::instance()->create_tablet(request); + CHECK(st.ok()) << st.to_string(); + return StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false); + } + + PTabletWriterOpenRequest create_open_request(PUniqueId load_id, int64_t txn_id) { + PTabletWriterOpenRequest request; + request.mutable_id()->CopyFrom(load_id); + request.set_index_id(_index_id); + request.set_txn_id(txn_id); + request.set_is_lake_tablet(false); + request.set_is_replicated_storage(true); + request.set_node_id(_node_id); + request.set_write_quorum(WriteQuorumTypePB::MAJORITY); + request.set_miss_auto_increment_column(false); + request.set_table_id(_table_id); + request.set_is_incremental(false); + request.set_num_senders(1); + request.set_sender_id(0); + request.set_need_gen_rollup(false); + request.set_load_channel_timeout_s(10); + request.set_is_vectorized(true); + request.set_timeout_ms(10000); + + request.set_immutable_tablet_size(0); + auto tablet = request.add_tablets(); + tablet->set_partition_id(_partition_id); + tablet->set_tablet_id(_tablet_id); + auto replica = tablet->add_replicas(); + replica->set_host("127.0.0.1"); + replica->set_port(8060); + replica->set_node_id(_node_id); + + auto schema = request.mutable_schema(); + schema->set_db_id(_db_id); + schema->set_table_id(_table_id); + schema->set_version(1); + auto index = schema->add_indexes(); + index->set_id(_index_id); + index->set_schema_hash(0); + for (int i = 0, sz = _tablet->tablet_schema()->num_columns(); i < sz; i++) { + auto slot = request.mutable_schema()->add_slot_descs(); + auto& column = _tablet->tablet_schema()->column(i); + slot->set_id(i); + slot->set_byte_offset(i * sizeof(int) /*unused*/); + slot->set_col_name(std::string(column.name())); + slot->set_slot_idx(i); + slot->set_is_materialized(true); + slot->mutable_slot_type()->add_types()->mutable_scalar_type()->set_type(column.type()); + index->add_columns(std::string(column.name())); + } + auto tuple_desc = schema->mutable_tuple_desc(); + tuple_desc->set_id(1); + tuple_desc->set_byte_size(8 /*unused*/); + tuple_desc->set_num_null_bytes(0 /*unused*/); + tuple_desc->set_table_id(_table_id); + + return request; + } + + std::unique_ptr _mem_tracker; + std::unique_ptr _load_channel_mgr; + TabletSharedPtr _tablet; + + int64_t _node_id; + int64_t _db_id; + int64_t _table_id; + int64_t _partition_id; + int32_t _index_id; + int64_t _tablet_id; + std::shared_ptr _schema; + std::shared_ptr _schema_param; +}; + +class MockClosure : public ::google::protobuf::Closure { +public: + MockClosure() = default; + ~MockClosure() override = default; + + void Run() override { _run.store(true); } + + bool has_run() { return _run.load(); } + +private: + std::atomic_bool _run = false; +}; + +TEST_F(LoadChannelMgrTest, async_open_success) { + ASSERT_OK(_load_channel_mgr->init(_mem_tracker.get())); + PUniqueId load_id; + load_id.set_hi(456789); + load_id.set_lo(987654); + brpc::Controller cntl; + MockClosure closure; + PTabletWriterOpenRequest request = create_open_request(load_id, rand()); + PTabletWriterOpenResult result; + _load_channel_mgr->open(&cntl, request, &result, &closure); + ASSERT_TRUE(Awaitility().timeout(60000).until( + [&] { return _load_channel_mgr->async_rpc_pool()->total_executed_tasks() == 1; })); + ASSERT_TRUE(closure.has_run()); + ASSERT_TRUE(result.status().status_code() == TStatusCode::OK); + auto load_channel = _load_channel_mgr->TEST_get_load_channel(UniqueId(load_id)); + ASSERT_TRUE(load_channel != nullptr); +} + +TEST_F(LoadChannelMgrTest, async_open_submit_task_fail) { + ASSERT_OK(_load_channel_mgr->init(_mem_tracker.get())); + PUniqueId load_id; + load_id.set_hi(456789); + load_id.set_lo(987654); + brpc::Controller cntl; + MockClosure closure; + PTabletWriterOpenRequest request = create_open_request(load_id, rand()); + PTabletWriterOpenResult result; + + DeferOp defer([]() { + SyncPoint::GetInstance()->ClearCallBack("ThreadPool::do_submit:1"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + SyncPoint::GetInstance()->EnableProcessing(); + SyncPoint::GetInstance()->SetCallBack("ThreadPool::do_submit:1", [](void* arg) { *(int64_t*)arg = 0; }); + + _load_channel_mgr->open(&cntl, request, &result, &closure); + ASSERT_TRUE(closure.has_run()); + ASSERT_TRUE(result.status().status_code() == TStatusCode::SERVICE_UNAVAILABLE); + auto load_channel = _load_channel_mgr->TEST_get_load_channel(UniqueId(load_id)); + ASSERT_TRUE(load_channel == nullptr); +} + +TEST_F(LoadChannelMgrTest, sync_open_success) { + ASSERT_OK(_load_channel_mgr->init(_mem_tracker.get())); + PUniqueId load_id; + load_id.set_hi(456789); + load_id.set_lo(987654); + brpc::Controller cntl; + MockClosure closure; + PTabletWriterOpenRequest request = create_open_request(load_id, rand()); + PTabletWriterOpenResult result; + + DeferOp defer([]() { + SyncPoint::GetInstance()->ClearCallBack("ThreadPool::do_submit:1"); + SyncPoint::GetInstance()->DisableProcessing(); + config::enable_load_channel_rpc_async = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + SyncPoint::GetInstance()->SetCallBack("ThreadPool::do_submit:1", [](void* arg) { *(int64_t*)arg = 0; }); + config::enable_load_channel_rpc_async = false; + _load_channel_mgr->open(&cntl, request, &result, &closure); + ASSERT_TRUE(closure.has_run()); + ASSERT_TRUE(result.status().status_code() == TStatusCode::OK); + auto load_channel = _load_channel_mgr->TEST_get_load_channel(UniqueId(load_id)); + ASSERT_TRUE(load_channel != nullptr); +} + +} // namespace starrocks \ No newline at end of file diff --git a/be/test/util/starrocks_metrics_test.cpp b/be/test/util/starrocks_metrics_test.cpp index d7af3270ee614e..8887e587cdc31a 100644 --- a/be/test/util/starrocks_metrics_test.cpp +++ b/be/test/util/starrocks_metrics_test.cpp @@ -342,6 +342,7 @@ TEST_F(StarRocksMetricsTest, test_metrics_register) { assert_threadpool_metrics_register("clone", instance); assert_threadpool_metrics_register("remote_snapshot", instance); assert_threadpool_metrics_register("replicate_snapshot", instance); + assert_threadpool_metrics_register("load_channel_mgr", instance); ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_total")); ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_eos_total")); ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_duration_us")); From 431c4cd66fa4dbb8157b8e98cb70aa0d47079743 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Wed, 5 Mar 2025 16:28:32 +0800 Subject: [PATCH 3/3] remove unused cntl Signed-off-by: PengFei Li --- be/src/runtime/load_channel.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 87ae1e8ced5a8f..8c9a993821cce9 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -115,7 +115,6 @@ void LoadChannel::open(const LoadChannelOpenRequest& open_request) { int64_t start_time_ns = MonotonicNanos(); COUNTER_UPDATE(_open_request_count, 1); COUNTER_UPDATE(_open_request_pending_timer, (start_time_ns - open_request.receive_rpc_time_ns)); - brpc::Controller* cntl = open_request.cntl; const PTabletWriterOpenRequest& request = *open_request.request; PTabletWriterOpenResult* response = open_request.response; _span->AddEvent("open_index", {{"index_id", request.index_id()}});