From 303421306ef0497c561021572ce4e156d7070223 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 5 Dec 2024 16:41:09 +0300 Subject: [PATCH 1/4] Support background Session close that can be safely waited even in atexit --- .../src/detail/rmw_context_impl_s.cpp | 60 +++++++++++++------ .../src/detail/rmw_context_impl_s.hpp | 5 +- zenoh_c_vendor/CMakeLists.txt | 4 +- 3 files changed, 48 insertions(+), 21 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index ed3bd9a9..e997330f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -218,29 +218,43 @@ class rmw_context_impl_s::Data final // Shutdown the Zenoh session. rmw_ret_t shutdown() { - { - std::lock_guard lock(mutex_); - rmw_ret_t ret = RMW_RET_OK; - if (is_shutdown_) { - return ret; - } - - z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_provider_.has_value()) { - z_drop(z_move(shm_provider_.value())); - } - is_shutdown_ = true; - - // We specifically do *not* hold the mutex_ while tearing down the session; this allows us - // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; } - // Close the zenoh session - if (z_close(z_loan_mut(session_), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_provider_.has_value()) { + z_drop(z_move(shm_provider_.value())); + } + is_shutdown_ = true; + + // Close the zenoh session in background + close_handle = zc_owned_concurrent_close_handle_t(); + z_close_options_t options; + z_close_options_default(&options); + options.internal_out_concurrent = &close_handle.value(); + if (z_close(z_loan_mut(session_), &options) != Z_OK) { + close_handle.reset(); + RMW_SET_ERROR_MSG("Error while starting zenoh session close!"); return RMW_RET_ERROR; } + + return RMW_RET_OK; + } + ///============================================================================= + rmw_ret_t wait_for_session_close() + { + if (close_handle.has_value()) { + zc_owned_concurrent_close_handle_t handle = close_handle.value(); + close_handle.reset(); + if (zc_concurrent_close_handle_wait(z_move(handle)) < 0) { + RMW_SET_ERROR_MSG("Error closing session!"); + return RMW_RET_ERROR; + } + } return RMW_RET_OK; } @@ -379,8 +393,10 @@ class rmw_context_impl_s::Data final ~Data() { auto ret = this->shutdown(); + auto ret2 = this->wait_for_session_close(); nodes_.clear(); static_cast(ret); + static_cast(ret2); } private: @@ -406,6 +422,8 @@ class rmw_context_impl_s::Data final rmw_zenoh_cpp::GuardCondition guard_condition_data_; // Shutdown flag. bool is_shutdown_; + // Close operation handle + std::optional close_handle; // A counter to assign a local id for every entity created in this session. std::size_t next_entity_id_; // Nodes created from this context. @@ -501,6 +519,12 @@ rmw_ret_t rmw_context_impl_s::shutdown() return data_->shutdown(); } +///============================================================================= +rmw_ret_t rmw_context_impl_s::wait_for_session_close() +{ + return data_->wait_for_session_close(); +} + ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 1e46d6af..0461c1df 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -62,9 +62,12 @@ class rmw_context_impl_s final // Get a unique id for a new entity. std::size_t get_next_entity_id(); - // Shutdown the Zenoh session. + // Asynchronously shutdown the Zenoh session. rmw_ret_t shutdown(); + // Wait for Zenoh session shutdown to comlete. + rmw_ret_t wait_for_session_close(); + // Check if the Zenoh session is shutdown. bool is_shutdown() const; diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index cddd2fd4..d2cdb718 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -25,8 +25,8 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh/pull/1150 (fix deadlock issue https://github.com/ros2/rmw_zenoh/issues/182) # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor - VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 42e717ff7b633649f11ebb7800b71d4939cd21c7 + VCS_URL https://github.com/ZettaScaleLabs/zenoh-c.git + VCS_VERSION 4cf9b92e0ad8ba494b935057ecc771a640e8b72d CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" From f57740926d677ee1f4569560406b87ff4c5bd413 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 5 Dec 2024 16:57:01 +0300 Subject: [PATCH 2/4] Update CMakeLists.txt --- zenoh_c_vendor/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index d2cdb718..a872e719 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -26,7 +26,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/ZettaScaleLabs/zenoh-c.git - VCS_VERSION 4cf9b92e0ad8ba494b935057ecc771a640e8b72d + VCS_VERSION 3cdfce09b9b3fec27c528589f5dae663811a18e3 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" From 5ebce21cac5228d93e4a9d3cc3dc0f06f8dd8652 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 5 Dec 2024 17:00:52 +0300 Subject: [PATCH 3/4] code format fix --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 14 +++++++------- rmw_zenoh_cpp/src/detail/logging_macros.hpp | 10 +++++----- rmw_zenoh_cpp/src/detail/qos.hpp | 2 +- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 2 +- rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp | 16 ++++++++-------- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b622e06d..6692c3bb 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -700,13 +700,13 @@ rmw_ret_t GraphCache::get_node_names( }); auto free_enclaves_lambda = [enclaves]() -> void { - rcutils_ret_t ret = rcutils_string_array_fini(enclaves); - if (ret != RCUTILS_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string().str); - } - }; + rcutils_ret_t ret = rcutils_string_array_fini(enclaves); + if (ret != RCUTILS_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string().str); + } + }; std::shared_ptr> free_enclaves{nullptr}; if (enclaves) { diff --git a/rmw_zenoh_cpp/src/detail/logging_macros.hpp b/rmw_zenoh_cpp/src/detail/logging_macros.hpp index 81201856..44a4bc34 100644 --- a/rmw_zenoh_cpp/src/detail/logging_macros.hpp +++ b/rmw_zenoh_cpp/src/detail/logging_macros.hpp @@ -24,14 +24,14 @@ // invoke GraphCache::parse_put() and GraphCache::parse_del() functions. // See https://github.com/ros2/rmw_zenoh/issues/182 for more details. #define RMW_ZENOH_LOG_DEBUG_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_DEBUG, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_DEBUG, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_ERROR_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_ERROR, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_ERROR, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_FATAL_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_FATAL, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_FATAL, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_INFO_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_INFO, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_INFO, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_WARN_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_WARN, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_WARN, __func__, __FILE__, __LINE__, __VA_ARGS__);} #endif // DETAIL__LOGGING_MACROS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/qos.hpp b/rmw_zenoh_cpp/src/detail/qos.hpp index eee74c05..6b8e9230 100644 --- a/rmw_zenoh_cpp/src/detail/qos.hpp +++ b/rmw_zenoh_cpp/src/detail/qos.hpp @@ -25,7 +25,7 @@ namespace rmw_zenoh_cpp { //============================================================================== /// Signature matching rmw_get_publishers_info_by_topic and rmw_get_subscriptions_info_by_topic -using GetEndpointInfoByTopicFunction = std::function(ctx)))++; - }; + const std::string id_str = liveliness::zid_to_str(*id); + RMW_ZENOH_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Successfully connected to a Zenoh router with id %s.", id_str.c_str()); + // Note: Callback is guaranteed to never be called + // concurrently according to z_info_routers_zid docstring + (*(static_cast(ctx)))++; + }; z_owned_closure_zid_t router_callback; z_closure(&router_callback, callback, NULL, &context); From 2b33def6287594a48fbab785157eeaa806ab769a Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 5 Dec 2024 17:04:07 +0300 Subject: [PATCH 4/4] Revert "code format fix" This reverts commit 5ebce21cac5228d93e4a9d3cc3dc0f06f8dd8652. --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 14 +++++++------- rmw_zenoh_cpp/src/detail/logging_macros.hpp | 10 +++++----- rmw_zenoh_cpp/src/detail/qos.hpp | 2 +- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 2 +- rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp | 16 ++++++++-------- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 6692c3bb..b622e06d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -700,13 +700,13 @@ rmw_ret_t GraphCache::get_node_names( }); auto free_enclaves_lambda = [enclaves]() -> void { - rcutils_ret_t ret = rcutils_string_array_fini(enclaves); - if (ret != RCUTILS_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string().str); - } - }; + rcutils_ret_t ret = rcutils_string_array_fini(enclaves); + if (ret != RCUTILS_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string().str); + } + }; std::shared_ptr> free_enclaves{nullptr}; if (enclaves) { diff --git a/rmw_zenoh_cpp/src/detail/logging_macros.hpp b/rmw_zenoh_cpp/src/detail/logging_macros.hpp index 44a4bc34..81201856 100644 --- a/rmw_zenoh_cpp/src/detail/logging_macros.hpp +++ b/rmw_zenoh_cpp/src/detail/logging_macros.hpp @@ -24,14 +24,14 @@ // invoke GraphCache::parse_put() and GraphCache::parse_del() functions. // See https://github.com/ros2/rmw_zenoh/issues/182 for more details. #define RMW_ZENOH_LOG_DEBUG_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_DEBUG, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_DEBUG, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_ERROR_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_ERROR, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_ERROR, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_FATAL_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_FATAL, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_FATAL, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_INFO_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_INFO, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_INFO, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_WARN_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_WARN, __func__, __FILE__, __LINE__, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_WARN, __func__, __FILE__, __LINE__, __VA_ARGS__);} #endif // DETAIL__LOGGING_MACROS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/qos.hpp b/rmw_zenoh_cpp/src/detail/qos.hpp index 6b8e9230..eee74c05 100644 --- a/rmw_zenoh_cpp/src/detail/qos.hpp +++ b/rmw_zenoh_cpp/src/detail/qos.hpp @@ -25,7 +25,7 @@ namespace rmw_zenoh_cpp { //============================================================================== /// Signature matching rmw_get_publishers_info_by_topic and rmw_get_subscriptions_info_by_topic -using GetEndpointInfoByTopicFunction = std::function(ctx)))++; - }; + const std::string id_str = liveliness::zid_to_str(*id); + RMW_ZENOH_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Successfully connected to a Zenoh router with id %s.", id_str.c_str()); + // Note: Callback is guaranteed to never be called + // concurrently according to z_info_routers_zid docstring + (*(static_cast(ctx)))++; + }; z_owned_closure_zid_t router_callback; z_closure(&router_callback, callback, NULL, &context);