diff --git a/conanfile.py b/conanfile.py index 28516c2..38b318b 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class NuRaftMesgConan(ConanFile): name = "nuraft_mesg" - version = "3.5.3" + version = "3.5.4" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" @@ -127,4 +127,4 @@ def package_info(self): self.cpp_info.set_property("cmake_file_name", "NuraftMesg") self.cpp_info.set_property("cmake_target_name", "NuraftMesg::NuraftMesg") self.cpp_info.names["cmake_find_package"] = "NuraftMesg" - self.cpp_info.names["cmake_find_package_multi"] = "NuraftMesg" + self.cpp_info.names["cmake_find_package_multi"] = "NuraftMesg" \ No newline at end of file diff --git a/src/lib/nuraft_mesg_config.fbs b/src/lib/nuraft_mesg_config.fbs index 1ac9706..59ad7ed 100644 --- a/src/lib/nuraft_mesg_config.fbs +++ b/src/lib/nuraft_mesg_config.fbs @@ -25,6 +25,8 @@ table NuraftMesgConfig { raft_leader_change_timeout_ms: uint32 = 3200; raft_scheduler_thread_cnt: uint16 = 2; + + raft_append_entries_thread_cnt: uint16 = 2; } root_type NuraftMesgConfig; diff --git a/src/lib/service.hpp b/src/lib/service.hpp index 688223f..75eb7b0 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -30,6 +30,7 @@ class group_metrics : public sisl::MetricsGroupWrapper { sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_id).c_str()) { REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group", {"op", "step"}); REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group", {"op", "send"}); + REGISTER_HISTOGRAM(group_step_latency, "Latency for processing raft step", "raft_group", {"op", "step"}); register_me_to_farm(); } diff --git a/src/proto/proto_service.cpp b/src/proto/proto_service.cpp index 51cecd4..3fc6c9c 100644 --- a/src/proto/proto_service.cpp +++ b/src/proto/proto_service.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -9,6 +10,7 @@ #include "nuraft_mesg/nuraft_mesg.hpp" #include "lib/service.hpp" +#include "lib/nuraft_mesg_config.hpp" #include "messaging_service.grpc.pb.h" #include "utils.hpp" @@ -46,12 +48,19 @@ class proto_service : public msg_service { ::grpc::Status step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply); public: - using msg_service::msg_service; + template < typename... Args > + proto_service(Args&&... args) : + msg_service(std::forward< Args >(args)...), + _raft_thread_pool{NURAFT_MESG_CONFIG(raft_append_entries_thread_cnt)} {} + void associate(sisl::GrpcServer* server) override; void bind(sisl::GrpcServer* server) override; // Incomming gRPC message bool raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data); + +private: + boost::asio::thread_pool _raft_thread_pool; }; void proto_service::associate(::sisl::GrpcServer* server) { @@ -127,7 +136,7 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs // Setup our response and process the request. response.set_group_id(group_id); - folly::getGlobalCPUExecutor()->add([this, rpc_data]() { + boost::asio::post(_raft_thread_pool, [this, rpc_data]() { auto gid = boost::uuids::string_generator()(rpc_data->response().group_id()); auto& request = rpc_data->request(); auto& response = rpc_data->response(); @@ -136,8 +145,11 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs if (auto it = _raft_servers.find(gid); _raft_servers.end() != it) { if (it->second.m_metrics) COUNTER_INCREMENT(*it->second.m_metrics, group_steps, 1); try { + auto const time_start = std::chrono::steady_clock::now(); rpc_data->set_status( step(*it->second.m_server->raft_server(), request.msg(), *response.mutable_msg())); + if (it->second.m_metrics) + HISTOGRAM_OBSERVE(*it->second.m_metrics, group_step_latency, get_elapsed_time_ms(time_start)); } catch (std::runtime_error& rte) { LOGE("Caught exception during step(): {}", rte.what()); rpc_data->set_status(