From a45556ce9c001dffeed81d8876d71cbb47b4aade Mon Sep 17 00:00:00 2001 From: Ravi Nagarjun Akella Date: Thu, 11 Jul 2024 14:49:04 -0700 Subject: [PATCH 1/3] use boost threadpool to offload raft append entries --- conanfile.py | 89 ++++++++++++++++++++++------------ src/lib/nuraft_mesg_config.fbs | 2 + src/proto/proto_service.cpp | 14 +++++- 3 files changed, 71 insertions(+), 34 deletions(-) diff --git a/conanfile.py b/conanfile.py index 28516c2..188e8c0 100644 --- a/conanfile.py +++ b/conanfile.py @@ -8,9 +8,10 @@ required_conan_version = ">=1.60.0" + class NuRaftMesgConan(ConanFile): name = "nuraft_mesg" - version = "3.5.3" + version = "3.5.4" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" @@ -21,25 +22,25 @@ class NuRaftMesgConan(ConanFile): settings = "arch", "os", "compiler", "build_type" options = { - "shared": ['True', 'False'], - "fPIC": ['True', 'False'], - "coverage": ['True', 'False'], - "sanitize": ['True', 'False'], - } + "shared": ["True", "False"], + "fPIC": ["True", "False"], + "coverage": ["True", "False"], + "sanitize": ["True", "False"], + } default_options = { - 'shared': False, - 'fPIC': True, - 'coverage': False, - 'sanitize': False, - } + "shared": False, + "fPIC": True, + "coverage": False, + "sanitize": False, + } exports_sources = ( - "LICENSE", - "CMakeLists.txt", - "cmake/*", - "include/*", - "src/*", - ) + "LICENSE", + "CMakeLists.txt", + "cmake/*", + "include/*", + "src/*", + ) def _min_cppstd(self): return 20 @@ -53,10 +54,14 @@ def configure(self): self.options.rm_safe("fPIC") if self.settings.build_type == "Debug": if self.options.coverage and self.options.sanitize: - raise ConanInvalidConfiguration("Sanitizer does not work with Code Coverage!") + raise ConanInvalidConfiguration( + "Sanitizer does not work with Code Coverage!" + ) if self.conf.get("tools.build:skip_test", default=False): if self.options.coverage or self.options.sanitize: - raise ConanInvalidConfiguration("Coverage/Sanitizer requires Testing!") + raise ConanInvalidConfiguration( + "Coverage/Sanitizer requires Testing!" + ) def build_requirements(self): if not self.conf.get("tools.build:skip_test", default=False): @@ -66,7 +71,9 @@ def build_requirements(self): def requirements(self): self.requires("boost/1.83.0", transitive_headers=True) - self.requires("sisl/[~12.2, include_prerelease=True]@oss/master", transitive_headers=True) + self.requires( + "sisl/[~12.2, include_prerelease=True]@oss/master", transitive_headers=True + ) self.requires("nuraft/2.4.0", transitive_headers=True) def layout(self): @@ -81,9 +88,9 @@ def generate(self): tc.variables["PACKAGE_VERSION"] = self.version if self.settings.build_type == "Debug": if self.options.get_safe("coverage"): - tc.variables['BUILD_COVERAGE'] = 'ON' + tc.variables["BUILD_COVERAGE"] = "ON" elif self.options.get_safe("sanitize"): - tc.variables['MEMORY_SANITIZER_ON'] = 'ON' + tc.variables["MEMORY_SANITIZER_ON"] = "ON" tc.generate() # This generates "boost-config.cmake" and "grpc-config.cmake" etc in self.generators_folder @@ -99,26 +106,44 @@ def build(self): def package(self): lib_dir = join(self.package_folder, "lib") - copy(self, "LICENSE", self.source_folder, join(self.package_folder, "licenses"), keep_path=False) - copy(self, "*.h*", join(self.source_folder, "include"), join(self.package_folder, "include"), keep_path=True) + copy( + self, + "LICENSE", + self.source_folder, + join(self.package_folder, "licenses"), + keep_path=False, + ) + copy( + self, + "*.h*", + join(self.source_folder, "include"), + join(self.package_folder, "include"), + keep_path=True, + ) copy(self, "*.lib", self.build_folder, lib_dir, keep_path=False) copy(self, "*.a", self.build_folder, lib_dir, keep_path=False) copy(self, "*.so*", self.build_folder, lib_dir, keep_path=False) copy(self, "*.dylib*", self.build_folder, lib_dir, keep_path=False) - copy(self, "*.dll*", self.build_folder, join(self.package_folder, "bin"), keep_path=False) + copy( + self, + "*.dll*", + self.build_folder, + join(self.package_folder, "bin"), + keep_path=False, + ) copy(self, "*.so*", self.build_folder, lib_dir, keep_path=False) def package_info(self): self.cpp_info.components["proto"].libs = ["nuraft_mesg", "nuraft_mesg_proto"] - self.cpp_info.components["proto"].set_property("pkg_config_name", "libnuraft_mesg_proto") - self.cpp_info.components["proto"].requires.extend([ - "nuraft::nuraft", - "boost::boost", - "sisl::sisl" - ]) + self.cpp_info.components["proto"].set_property( + "pkg_config_name", "libnuraft_mesg_proto" + ) + self.cpp_info.components["proto"].requires.extend( + ["nuraft::nuraft", "boost::boost", "sisl::sisl"] + ) for component in self.cpp_info.components.values(): - if self.options.get_safe("sanitize"): + if self.options.get_safe("sanitize"): component.sharedlinkflags.append("-fsanitize=address") component.exelinkflags.append("-fsanitize=address") component.sharedlinkflags.append("-fsanitize=undefined") diff --git a/src/lib/nuraft_mesg_config.fbs b/src/lib/nuraft_mesg_config.fbs index 1ac9706..59ad7ed 100644 --- a/src/lib/nuraft_mesg_config.fbs +++ b/src/lib/nuraft_mesg_config.fbs @@ -25,6 +25,8 @@ table NuraftMesgConfig { raft_leader_change_timeout_ms: uint32 = 3200; raft_scheduler_thread_cnt: uint16 = 2; + + raft_append_entries_thread_cnt: uint16 = 2; } root_type NuraftMesgConfig; diff --git a/src/proto/proto_service.cpp b/src/proto/proto_service.cpp index 51cecd4..2ac348e 100644 --- a/src/proto/proto_service.cpp +++ b/src/proto/proto_service.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -9,6 +10,7 @@ #include "nuraft_mesg/nuraft_mesg.hpp" #include "lib/service.hpp" +#include "lib/nuraft_mesg_config.hpp" #include "messaging_service.grpc.pb.h" #include "utils.hpp" @@ -46,12 +48,20 @@ class proto_service : public msg_service { ::grpc::Status step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply); public: - using msg_service::msg_service; + template < typename... Args > + proto_service(Args&&... args) : + msg_service(std::forward< Args >(args)...), + _raft_thread_pool{NURAFT_MESG_CONFIG(raft_append_entries_thread_cnt)} {} + + virtual ~proto_service() { _raft_thread_pool.join(); } void associate(sisl::GrpcServer* server) override; void bind(sisl::GrpcServer* server) override; // Incomming gRPC message bool raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data); + +private: + boost::asio::thread_pool _raft_thread_pool; }; void proto_service::associate(::sisl::GrpcServer* server) { @@ -127,7 +137,7 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs // Setup our response and process the request. response.set_group_id(group_id); - folly::getGlobalCPUExecutor()->add([this, rpc_data]() { + boost::asio::post(_raft_thread_pool, [this, rpc_data]() { auto gid = boost::uuids::string_generator()(rpc_data->response().group_id()); auto& request = rpc_data->request(); auto& response = rpc_data->response(); From 291f805acaa76075235e6db63f50d36b904481db Mon Sep 17 00:00:00 2001 From: Ravi Nagarjun Akella Date: Mon, 15 Jul 2024 15:23:55 -0700 Subject: [PATCH 2/3] add raft step latency metrics --- src/lib/service.hpp | 1 + src/proto/proto_service.cpp | 3 +++ 2 files changed, 4 insertions(+) diff --git a/src/lib/service.hpp b/src/lib/service.hpp index 688223f..75eb7b0 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -30,6 +30,7 @@ class group_metrics : public sisl::MetricsGroupWrapper { sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_id).c_str()) { REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group", {"op", "step"}); REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group", {"op", "send"}); + REGISTER_HISTOGRAM(group_step_latency, "Latency for processing raft step", "raft_group", {"op", "step"}); register_me_to_farm(); } diff --git a/src/proto/proto_service.cpp b/src/proto/proto_service.cpp index 2ac348e..b57b936 100644 --- a/src/proto/proto_service.cpp +++ b/src/proto/proto_service.cpp @@ -146,8 +146,11 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs if (auto it = _raft_servers.find(gid); _raft_servers.end() != it) { if (it->second.m_metrics) COUNTER_INCREMENT(*it->second.m_metrics, group_steps, 1); try { + auto const time_start = std::chrono::steady_clock::now(); rpc_data->set_status( step(*it->second.m_server->raft_server(), request.msg(), *response.mutable_msg())); + if (it->second.m_metrics) + HISTOGRAM_OBSERVE(*it->second.m_metrics, group_step_latency, get_elapsed_time_ms(time_start)); } catch (std::runtime_error& rte) { LOGE("Caught exception during step(): {}", rte.what()); rpc_data->set_status( From c61ef5a43607f237826d598e2ec430949eaee71e Mon Sep 17 00:00:00 2001 From: Ravi Nagarjun Akella Date: Thu, 18 Jul 2024 14:15:53 -0700 Subject: [PATCH 3/3] boost thread pool joins itself --- conanfile.py | 89 +++++++++++++------------------------ src/proto/proto_service.cpp | 1 - 2 files changed, 32 insertions(+), 58 deletions(-) diff --git a/conanfile.py b/conanfile.py index 188e8c0..38b318b 100644 --- a/conanfile.py +++ b/conanfile.py @@ -8,7 +8,6 @@ required_conan_version = ">=1.60.0" - class NuRaftMesgConan(ConanFile): name = "nuraft_mesg" version = "3.5.4" @@ -22,25 +21,25 @@ class NuRaftMesgConan(ConanFile): settings = "arch", "os", "compiler", "build_type" options = { - "shared": ["True", "False"], - "fPIC": ["True", "False"], - "coverage": ["True", "False"], - "sanitize": ["True", "False"], - } + "shared": ['True', 'False'], + "fPIC": ['True', 'False'], + "coverage": ['True', 'False'], + "sanitize": ['True', 'False'], + } default_options = { - "shared": False, - "fPIC": True, - "coverage": False, - "sanitize": False, - } + 'shared': False, + 'fPIC': True, + 'coverage': False, + 'sanitize': False, + } exports_sources = ( - "LICENSE", - "CMakeLists.txt", - "cmake/*", - "include/*", - "src/*", - ) + "LICENSE", + "CMakeLists.txt", + "cmake/*", + "include/*", + "src/*", + ) def _min_cppstd(self): return 20 @@ -54,14 +53,10 @@ def configure(self): self.options.rm_safe("fPIC") if self.settings.build_type == "Debug": if self.options.coverage and self.options.sanitize: - raise ConanInvalidConfiguration( - "Sanitizer does not work with Code Coverage!" - ) + raise ConanInvalidConfiguration("Sanitizer does not work with Code Coverage!") if self.conf.get("tools.build:skip_test", default=False): if self.options.coverage or self.options.sanitize: - raise ConanInvalidConfiguration( - "Coverage/Sanitizer requires Testing!" - ) + raise ConanInvalidConfiguration("Coverage/Sanitizer requires Testing!") def build_requirements(self): if not self.conf.get("tools.build:skip_test", default=False): @@ -71,9 +66,7 @@ def build_requirements(self): def requirements(self): self.requires("boost/1.83.0", transitive_headers=True) - self.requires( - "sisl/[~12.2, include_prerelease=True]@oss/master", transitive_headers=True - ) + self.requires("sisl/[~12.2, include_prerelease=True]@oss/master", transitive_headers=True) self.requires("nuraft/2.4.0", transitive_headers=True) def layout(self): @@ -88,9 +81,9 @@ def generate(self): tc.variables["PACKAGE_VERSION"] = self.version if self.settings.build_type == "Debug": if self.options.get_safe("coverage"): - tc.variables["BUILD_COVERAGE"] = "ON" + tc.variables['BUILD_COVERAGE'] = 'ON' elif self.options.get_safe("sanitize"): - tc.variables["MEMORY_SANITIZER_ON"] = "ON" + tc.variables['MEMORY_SANITIZER_ON'] = 'ON' tc.generate() # This generates "boost-config.cmake" and "grpc-config.cmake" etc in self.generators_folder @@ -106,44 +99,26 @@ def build(self): def package(self): lib_dir = join(self.package_folder, "lib") - copy( - self, - "LICENSE", - self.source_folder, - join(self.package_folder, "licenses"), - keep_path=False, - ) - copy( - self, - "*.h*", - join(self.source_folder, "include"), - join(self.package_folder, "include"), - keep_path=True, - ) + copy(self, "LICENSE", self.source_folder, join(self.package_folder, "licenses"), keep_path=False) + copy(self, "*.h*", join(self.source_folder, "include"), join(self.package_folder, "include"), keep_path=True) copy(self, "*.lib", self.build_folder, lib_dir, keep_path=False) copy(self, "*.a", self.build_folder, lib_dir, keep_path=False) copy(self, "*.so*", self.build_folder, lib_dir, keep_path=False) copy(self, "*.dylib*", self.build_folder, lib_dir, keep_path=False) - copy( - self, - "*.dll*", - self.build_folder, - join(self.package_folder, "bin"), - keep_path=False, - ) + copy(self, "*.dll*", self.build_folder, join(self.package_folder, "bin"), keep_path=False) copy(self, "*.so*", self.build_folder, lib_dir, keep_path=False) def package_info(self): self.cpp_info.components["proto"].libs = ["nuraft_mesg", "nuraft_mesg_proto"] - self.cpp_info.components["proto"].set_property( - "pkg_config_name", "libnuraft_mesg_proto" - ) - self.cpp_info.components["proto"].requires.extend( - ["nuraft::nuraft", "boost::boost", "sisl::sisl"] - ) + self.cpp_info.components["proto"].set_property("pkg_config_name", "libnuraft_mesg_proto") + self.cpp_info.components["proto"].requires.extend([ + "nuraft::nuraft", + "boost::boost", + "sisl::sisl" + ]) for component in self.cpp_info.components.values(): - if self.options.get_safe("sanitize"): + if self.options.get_safe("sanitize"): component.sharedlinkflags.append("-fsanitize=address") component.exelinkflags.append("-fsanitize=address") component.sharedlinkflags.append("-fsanitize=undefined") @@ -152,4 +127,4 @@ def package_info(self): self.cpp_info.set_property("cmake_file_name", "NuraftMesg") self.cpp_info.set_property("cmake_target_name", "NuraftMesg::NuraftMesg") self.cpp_info.names["cmake_find_package"] = "NuraftMesg" - self.cpp_info.names["cmake_find_package_multi"] = "NuraftMesg" + self.cpp_info.names["cmake_find_package_multi"] = "NuraftMesg" \ No newline at end of file diff --git a/src/proto/proto_service.cpp b/src/proto/proto_service.cpp index b57b936..3fc6c9c 100644 --- a/src/proto/proto_service.cpp +++ b/src/proto/proto_service.cpp @@ -53,7 +53,6 @@ class proto_service : public msg_service { msg_service(std::forward< Args >(args)...), _raft_thread_pool{NURAFT_MESG_CONFIG(raft_append_entries_thread_cnt)} {} - virtual ~proto_service() { _raft_thread_pool.join(); } void associate(sisl::GrpcServer* server) override; void bind(sisl::GrpcServer* server) override;