From 271da8dab6ab00287f7962ca143b2b44e1e1fe37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Hern=C3=A1ndez=20Cordero?= Date: Tue, 17 Dec 2024 22:36:42 +0100 Subject: [PATCH] Port zenoh_c to zenoh_cpp (#327) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * zenoh_c to zenoh_cpp removed zenoh_router_check removed rmw_init_options_impl.hpp Publisher Added subscribers Removed zid_to_str use zenoh::KeyExpr service data client data make linters happy wrap the zenoh session with a shared pointer * Removed shm and small fix Fix the high latency on the subscriber due to zenoh bytes conversion * Fixed windows * Switch to storing GIDs with a std::array. They are a fixed size, so should use the fixed-size data structure. * Remove unnecessary ClientData init() method. Since we aren't calling shared_from_this() in it anymore, we can do all of the work in the constructor. * Rewrite connecting to the router to only print when it fails. This simplifies the code, and makes it less noisy. Signed-off-by: Alejandro Hernández Cordero Signed-off-by: Yadunund Signed-off-by: Chris Lalancette Co-authored-by: Yuyuan Yuan --- .github/workflows/build.yaml | 2 +- README.md | 2 +- rmw_zenoh_cpp/CMakeLists.txt | 7 +- rmw_zenoh_cpp/package.xml | 4 +- .../src/detail/attachment_helpers.cpp | 123 +++---- .../src/detail/attachment_helpers.hpp | 16 +- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 9 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 3 +- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 32 +- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 18 +- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 273 ++++++---------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 20 +- .../src/detail/rmw_context_impl_s.cpp | 271 +++++++-------- .../src/detail/rmw_context_impl_s.hpp | 10 +- .../src/detail/rmw_init_options_impl.hpp | 30 -- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 46 +-- rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 18 +- .../src/detail/rmw_publisher_data.cpp | 218 +++++------- .../src/detail/rmw_publisher_data.hpp | 31 +- rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 227 ++++++------- rmw_zenoh_cpp/src/detail/rmw_service_data.hpp | 39 ++- .../src/detail/rmw_subscription_data.cpp | 309 ++++++++++-------- .../src/detail/rmw_subscription_data.hpp | 19 +- rmw_zenoh_cpp/src/detail/type_support.cpp | 6 +- rmw_zenoh_cpp/src/detail/zenoh_config.cpp | 24 +- rmw_zenoh_cpp/src/detail/zenoh_config.hpp | 9 +- .../src/detail/zenoh_router_check.cpp | 60 ---- .../src/detail/zenoh_router_check.hpp | 30 -- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 38 +-- rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 21 +- rmw_zenoh_cpp/src/rmw_init.cpp | 6 +- rmw_zenoh_cpp/src/rmw_init_options.cpp | 4 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 10 +- rmw_zenoh_cpp/src/zenohd/main.cpp | 47 ++- .../CMakeLists.txt | 13 +- .../package.xml | 2 +- 36 files changed, 881 insertions(+), 1116 deletions(-) delete mode 100644 rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp delete mode 100644 rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp delete mode 100644 rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp rename {zenoh_c_vendor => zenoh_cpp_vendor}/CMakeLists.txt (85%) rename {zenoh_c_vendor => zenoh_cpp_vendor}/package.xml (95%) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index a66af168..4fa7b445 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -40,6 +40,6 @@ jobs: with: package-name: | rmw_zenoh_cpp - zenoh_c_vendor + zenoh_cpp_vendor target-ros2-distro: ${{ matrix.ROS_DISTRO }} vcs-repo-file-url: ${{ matrix.BUILD_TYPE == 'source' && env.ROS2_REPOS_FILE_URL || '' }} diff --git a/README.md b/README.md index 164fe0f8..0359f92b 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Build `rmw_zenoh_cpp` >Note: By default, we vendor and compile `zenoh-c` with a subset of `zenoh` features. The `ZENOHC_CARGO_FLAGS` CMake argument may be overwritten with other features included if required. -See [zenoh_c_vendor/CMakeLists.txt](./zenoh_c_vendor/CMakeLists.txt) for more details. +See [zenoh_cpp_vendor/CMakeLists.txt](./zenoh_cpp_vendor/CMakeLists.txt) for more details. ```bash mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 89b5a598..185fa804 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,7 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) -find_package(zenoh_c_vendor REQUIRED) +find_package(zenoh_cpp_vendor REQUIRED) add_library(rmw_zenoh_cpp SHARED src/detail/attachment_helpers.cpp @@ -45,7 +45,6 @@ add_library(rmw_zenoh_cpp SHARED src/detail/type_support.cpp src/detail/type_support_common.cpp src/detail/zenoh_config.cpp - src/detail/zenoh_router_check.cpp src/detail/zenoh_utils.cpp src/rmw_event.cpp src/rmw_get_network_flow_endpoints.cpp @@ -68,7 +67,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw - zenohc::lib + zenohcxx::zenohc ) configure_rmw_library(rmw_zenoh_cpp) @@ -130,7 +129,7 @@ target_link_libraries(rmw_zenohd rcutils::rcutils rcpputils::rcpputils rmw::rmw - zenohc::lib + zenohcxx::zenohc ) install( diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index e1d2a24a..79272665 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -15,8 +15,8 @@ ament_cmake - zenoh_c_vendor - zenoh_c_vendor + zenoh_cpp_vendor + zenoh_cpp_vendor ament_index_cpp fastcdr diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index da2571ce..6f128b47 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - +#include #include #include #include -#include +#include #include +#include + #include "rmw/types.h" #include "attachment_helpers.hpp" @@ -27,75 +28,24 @@ namespace rmw_zenoh_cpp { -///============================================================================= + AttachmentData::AttachmentData( const int64_t sequence_number, const int64_t source_timestamp, - const uint8_t source_gid[RMW_GID_STORAGE_SIZE]) + const std::array source_gid) : sequence_number_(sequence_number), - source_timestamp_(source_timestamp) + source_timestamp_(source_timestamp), + source_gid_(source_gid), + gid_hash_(hash_gid(source_gid)) { - memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE); - gid_hash_ = hash_gid(source_gid_); } -///============================================================================= AttachmentData::AttachmentData(AttachmentData && data) -: sequence_number_(std::move(data.sequence_number_)), - source_timestamp_(std::move(data.source_timestamp_)), - gid_hash_(std::move(data.gid_hash_)) -{ - memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE); -} - -///============================================================================= -AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment) { - ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment); - z_owned_string_t key; - - // Deserialize the sequence_number - ze_deserializer_deserialize_string(&deserializer, &key); - if (std::string_view( - z_string_data(z_loan(key)), - z_string_len(z_loan(key))) != "sequence_number") - { - throw std::runtime_error("sequence_number is not found in the attachment."); - } - z_drop(z_move(key)); - if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) { - throw std::runtime_error("Failed to deserialize the sequence_number."); - } - - // Deserialize the source_timestamp - ze_deserializer_deserialize_string(&deserializer, &key); - if (std::string_view( - z_string_data(z_loan(key)), - z_string_len(z_loan(key))) != "source_timestamp") - { - throw std::runtime_error("source_timestamp is not found in the attachment"); - } - z_drop(z_move(key)); - if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) { - throw std::runtime_error("Failed to deserialize the source_timestamp."); - } - - // Deserialize the source_gid - ze_deserializer_deserialize_string(&deserializer, &key); - if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") { - throw std::runtime_error("Invalid attachment: the key source_gid is not found"); - } - z_drop(z_move(key)); - z_owned_slice_t slice; - if (ze_deserializer_deserialize_slice(&deserializer, &slice)) { - throw std::runtime_error("Failed to deserialize the source_gid."); - } - if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) { - throw std::runtime_error("The length of source_gid mismatched."); - } - memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice))); - z_drop(z_move(slice)); - gid_hash_ = hash_gid(this->source_gid_); + gid_hash_ = std::move(data.gid_hash_); + sequence_number_ = std::move(data.sequence_number_); + source_timestamp_ = std::move(data.source_timestamp_); + source_gid_ = data.source_gid_; } ///============================================================================= @@ -111,9 +61,9 @@ int64_t AttachmentData::source_timestamp() const } ///============================================================================= -void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +std::array AttachmentData::copy_gid() const { - memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE); + return source_gid_; } ///============================================================================= @@ -122,17 +72,38 @@ size_t AttachmentData::gid_hash() const return gid_hash_; } -///============================================================================= -void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment) +zenoh::Bytes AttachmentData::serialize_to_zbytes() +{ + auto serializer = zenoh::ext::Serializer(); + serializer.serialize(std::string("sequence_number")); + serializer.serialize(this->sequence_number_); + serializer.serialize(std::string("source_timestamp")); + serializer.serialize(this->source_timestamp_); + serializer.serialize(std::string("source_gid")); + serializer.serialize(this->source_gid_); + return std::move(serializer).finish(); +} + +AttachmentData::AttachmentData(const zenoh::Bytes & bytes) { - ze_owned_serializer_t serializer; - ze_serializer_empty(&serializer); - ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); - ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_); - ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); - ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_); - ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); - ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE); - ze_serializer_finish(z_move(serializer), attachment); + zenoh::ext::Deserializer deserializer(bytes); + const auto sequence_number_str = deserializer.deserialize(); + if (sequence_number_str != "sequence_number") { + throw std::runtime_error("sequence_number is not found in the attachment."); + } + this->sequence_number_ = deserializer.deserialize(); + + const auto source_timestamp_str = deserializer.deserialize(); + if (source_timestamp_str != "source_timestamp") { + throw std::runtime_error("source_timestamp is not found in the attachment."); + } + this->source_timestamp_ = deserializer.deserialize(); + + const auto source_gid_str = deserializer.deserialize(); + if (source_gid_str != "source_gid") { + throw std::runtime_error("source_gid is not found in the attachment."); + } + this->source_gid_ = deserializer.deserialize>(); + gid_hash_ = hash_gid(this->source_gid_); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index ec460620..64f444d1 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -15,7 +15,10 @@ #ifndef DETAIL__ATTACHMENT_HELPERS_HPP_ #define DETAIL__ATTACHMENT_HELPERS_HPP_ -#include +#include +#include + +#include #include "rmw/types.h" @@ -28,21 +31,22 @@ class AttachmentData final AttachmentData( const int64_t sequence_number, const int64_t source_timestamp, - const uint8_t source_gid[RMW_GID_STORAGE_SIZE]); - explicit AttachmentData(const z_loaned_bytes_t *); + const std::array source_gid); + + explicit AttachmentData(const zenoh::Bytes & bytes); explicit AttachmentData(AttachmentData && data); int64_t sequence_number() const; int64_t source_timestamp() const; - void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + std::array copy_gid() const; size_t gid_hash() const; - void serialize_to_zbytes(z_owned_bytes_t *); + zenoh::Bytes serialize_to_zbytes(); private: int64_t sequence_number_; int64_t source_timestamp_; - uint8_t source_gid_[RMW_GID_STORAGE_SIZE]; + std::array source_gid_; size_t gid_hash_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 9401a5c8..a1af2d46 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -71,8 +71,8 @@ TopicData::TopicData(ConstEntityPtr entity) } ///============================================================================= -GraphCache::GraphCache(const z_id_t & zid) -: zid_str_(liveliness::zid_to_str(zid)) +GraphCache::GraphCache(const zenoh::Id & zid) +: zid_str_(zid.to_string()) { // Do nothing. } @@ -1168,8 +1168,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( } } - memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE); - entity->copy_gid(ep.endpoint_gid); + memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE); endpoints.push_back(ep); } @@ -1202,7 +1201,7 @@ rmw_ret_t GraphCache::service_server_is_available( service_it->second.find(client_topic_info.type_); if (type_it != service_it->second.end()) { for (const auto & [_, topic_data] : type_it->second) { - if (topic_data->subs_.size() > 0) { + if (!topic_data->subs_.empty()) { *is_available = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 8ac2172d..6c4f6e54 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -36,6 +36,7 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/names_and_types.h" +#include namespace rmw_zenoh_cpp { @@ -110,7 +111,7 @@ class GraphCache final /// @param id The id of the zenoh session that is building the graph cache. /// This is used to infer which entities originated from the current session /// so that appropriate event callbacks may be triggered. - explicit GraphCache(const z_id_t & zid); + explicit GraphCache(const zenoh::Id & zid); // Parse a PUT message over a token's key-expression and update the graph. void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false); diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 52c8ec7f..c1792091 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -26,6 +26,8 @@ #include #include +#include + #include "logging_macros.hpp" #include "qos.hpp" #include "simplified_xxhash3.hpp" @@ -371,16 +373,6 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) return qos; } -///============================================================================= -std::string zid_to_str(const z_id_t & id) -{ - z_owned_string_t z_str; - z_id_to_string(&id, &z_str); - std::string str(z_string_data(z_loan(z_str)), z_string_len(z_loan(z_str))); - z_drop(z_move(z_str)); - return str; -} - ///============================================================================= std::string subscription_token(size_t domain_id) { @@ -445,8 +437,9 @@ Entity::Entity( // returned to the RMW layer as necessary. simplified_XXH128_hash_t keyexpr_gid = simplified_XXH3_128bits(this->liveliness_keyexpr_.c_str(), this->liveliness_keyexpr_.length()); - memcpy(this->gid_, &keyexpr_gid.low64, sizeof(keyexpr_gid.low64)); - memcpy(this->gid_ + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, sizeof(keyexpr_gid.high64)); + memcpy(this->gid_.data(), &keyexpr_gid.low64, sizeof(keyexpr_gid.low64)); + memcpy(this->gid_.data() + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, + sizeof(keyexpr_gid.high64)); // We also hash the liveliness keyexpression into a size_t that we use to index into our maps. this->keyexpr_hash_ = hash_gid(this->gid_); @@ -454,7 +447,7 @@ Entity::Entity( ///============================================================================= std::shared_ptr Entity::make( - z_id_t zid, + zenoh::Id zid, const std::string & nid, const std::string & id, EntityType type, @@ -480,7 +473,7 @@ std::shared_ptr Entity::make( return std::make_shared( Entity{ - zid_to_str(zid), + std::string(zid.to_string()), nid, id, std::move(type), @@ -638,9 +631,9 @@ std::string Entity::liveliness_keyexpr() const } ///============================================================================= -void Entity::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +std::array Entity::copy_gid() const { - memcpy(out_gid, gid_, RMW_GID_STORAGE_SIZE); + return gid_; } ///============================================================================= @@ -679,13 +672,12 @@ std::string demangle_name(const std::string & input) } // namespace liveliness ///============================================================================= -size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]) +size_t hash_gid(const std::array gid) { std::stringstream hash_str; hash_str << std::hex; - size_t i = 0; - for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) { - hash_str << static_cast(gid[i]); + for (const auto & g : gid) { + hash_str << static_cast(g); } return std::hash{}(hash_str.str()); } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 895ae604..81880df6 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -15,8 +15,7 @@ #ifndef DETAIL__LIVELINESS_UTILS_HPP_ #define DETAIL__LIVELINESS_UTILS_HPP_ -#include - +#include #include #include #include @@ -25,6 +24,8 @@ #include #include +#include + #include "rmw/types.h" namespace rmw_zenoh_cpp @@ -127,7 +128,7 @@ class Entity /// @param topic_info An optional topic information for relevant entities. /// @return An entity if all inputs are valid. This way no invalid entities can be created. static EntityPtr make( - z_id_t zid, + zenoh::Id zid, const std::string & nid, const std::string & id, EntityType type, @@ -172,7 +173,7 @@ class Entity // Two entities are equal if their keyexpr_hash are equal. bool operator==(const Entity & other) const; - void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + std::array copy_gid() const; private: Entity( @@ -191,7 +192,7 @@ class Entity NodeInfo node_info_; std::optional topic_info_; std::string liveliness_keyexpr_; - uint8_t gid_[RMW_GID_STORAGE_SIZE]; + std::array gid_{}; }; ///============================================================================= @@ -231,15 +232,10 @@ std::string qos_to_keyexpr(const rmw_qos_profile_t & qos); ///============================================================================= /// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. std::optional keyexpr_to_qos(const std::string & keyexpr); - -///============================================================================= -/// Convert a Zenoh id to a string. -std::string zid_to_str(const z_id_t & id); } // namespace liveliness ///============================================================================= -/// Generate a hash for a given GID. -size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]); +size_t hash_gid(const std::array gid); } // namespace rmw_zenoh_cpp ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 1236d453..1c24570d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -25,6 +26,9 @@ #include #include #include +#include + +#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -40,82 +44,11 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" -namespace -{ -///============================================================================= -struct ClientDataWrapper -{ - explicit ClientDataWrapper(std::shared_ptr data) - : client_data(std::move(data)) - { - } - - std::shared_ptr client_data; -}; - -///============================================================================= -void client_data_handler(z_loaned_reply_t * reply, void * data) -{ - auto wrapper = static_cast(data); - if (wrapper == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t from data in client_data_handler." - ); - return; - } - - if (wrapper->client_data->is_shutdown()) { - return; - } - - if (!z_reply_is_ok(reply)) { - const z_loaned_reply_err_t * err = z_reply_err(reply); - const z_loaned_bytes_t * err_payload = z_reply_err_payload(err); - - z_owned_string_t err_str; - z_bytes_to_string(err_payload, &err_str); - - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - wrapper->client_data->topic_info().topic_keyexpr_.c_str(), - static_cast(z_string_len(z_loan(err_str))), - z_string_data(z_loan(err_str))); - z_drop(z_move(err_str)); - - return; - } - - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - - wrapper->client_data->add_new_reply( - std::make_unique(reply, received_timestamp)); -} - -///============================================================================= -void client_data_drop(void * data) -{ - auto wrapper = static_cast(data); - if (wrapper == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t from data in client_data_drop." - ); - return; - } - - delete wrapper; -} - -} // namespace - namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr ClientData::make( - std::shared_ptr session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -177,7 +110,7 @@ std::shared_ptr ClientData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session->loan()), + session->get_zid(), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Client, @@ -209,11 +142,6 @@ std::shared_ptr ClientData::make( response_type_support }); - if (!client_data->init(session)) { - // init() already set the error. - return nullptr; - } - return client_data; } @@ -222,7 +150,7 @@ ClientData::ClientData( const rmw_node_t * rmw_node, const rmw_client_t * rmw_client, std::shared_ptr entity, - std::shared_ptr sess, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, @@ -240,51 +168,22 @@ ClientData::ClientData( is_shutdown_(false), initialized_(false) { - // Do nothing. -} - -///============================================================================= -bool ClientData::init(std::shared_ptr session) -{ - if (z_keyexpr_from_str( - &this->keyexpr_, - this->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK) - { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return false; - } - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(this->keyexpr_)); - }); - + std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; + keyexpr_ = zenoh::KeyExpr(topic_keyexpr); std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - if (z_liveliness_declare_token( - session->loan(), - &this->token_, - z_loan(liveliness_ke), - NULL - ) != Z_OK) - { + zenoh::ZResult result; + this->token_ = sess_->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the client."); - return false; + throw std::runtime_error("Unable to create liveliness token for the client."); } - auto free_token = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(this->token_)); - }); - sess_ = session; initialized_ = true; - - free_ros_keyexpr.cancel(); - free_token.cancel(); - - return true; } ///============================================================================= @@ -305,10 +204,9 @@ bool ClientData::liveliness_is_valid() const } ///============================================================================= -void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +std::array ClientData::copy_gid() const { - std::lock_guard lock(mutex_); - entity_->copy_gid(out_gid); + return entity_->copy_gid(); } ///============================================================================= @@ -321,15 +219,12 @@ void ClientData::add_new_reply(std::unique_ptr reply) reply_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - z_view_string_t keystr; - z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " - "for client for %.*s", + "for client for %s", adapted_qos_profile.depth, - static_cast(z_string_len(z_loan(keystr))), - z_string_data(z_loan(keystr))); + std::string(keyexpr_.value().as_string_view()).c_str()); reply_queue_.pop_front(); } reply_queue_.emplace_back(std::move(reply)); @@ -359,18 +254,34 @@ rmw_ret_t ClientData::take_response( std::unique_ptr latest_reply = std::move(reply_queue_.front()); reply_queue_.pop_front(); - if (!latest_reply->get_sample().has_value()) { + auto & reply = latest_reply->get_sample(); + + if (!reply.is_ok()) { RMW_SET_ERROR_MSG("invalid reply sample"); return RMW_RET_ERROR; } - const z_loaned_sample_t * sample = latest_reply->get_sample().value(); + + const zenoh::Sample & sample = reply.get_ok(); // Object that manages the raw buffer - z_owned_slice_t payload; - z_bytes_to_slice(z_sample_payload(sample), &payload); + std::vector payload = sample.get_payload().as_vector(); + if (payload.size() == 0) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ClientData not able to get slice data"); + return RMW_RET_ERROR; + } + + // Fill in the request_header + if (!sample.get_attachment().has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ClientData take_request attachment is empty"); + return RMW_RET_ERROR; + } + eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), - z_slice_len(z_loan(payload))); + reinterpret_cast(const_cast(payload.data())), payload.size()); // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); @@ -383,8 +294,7 @@ rmw_ret_t ClientData::take_response( return RMW_RET_ERROR; } - // Fill in the request_header - AttachmentData attachment(z_sample_attachment(sample)); + rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get())); request_header->request_id.sequence_number = attachment.sequence_number(); if (request_header->request_id.sequence_number < 0) { RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); @@ -395,10 +305,12 @@ rmw_ret_t ClientData::take_response( RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - attachment.copy_gid(request_header->request_id.writer_guid); + memcpy( + request_header->request_id.writer_guid, + attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); request_header->received_timestamp = latest_reply->get_received_timestamp(); - z_drop(z_move(payload)); *taken = true; return RMW_RET_OK; @@ -452,16 +364,9 @@ rmw_ret_t ClientData::send_request( *sequence_id = sequence_number_++; // Send request - z_get_options_t opts; - z_get_options_default(&opts); - z_owned_bytes_t attachment; - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - create_map_and_set_sequence_num( - &attachment, *sequence_id, - local_gid); - opts.attachment = z_move(attachment); - + zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); + std::array local_gid = entity_->copy_gid(); + opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within // this window, the queryable will return an invalid reply. However, it is common for actions, @@ -471,23 +376,57 @@ rmw_ret_t ClientData::send_request( // Latest consolidation guarantees unicity of replies for the same key expression, // which optimizes bandwidth. The default is "None", which imples replies may come in any order // and any number. - opts.consolidation = z_query_consolidation_latest(); - z_owned_bytes_t payload; - z_bytes_copy_from_buf( - &payload, reinterpret_cast(request_bytes), data_length); - opts.payload = z_move(payload); - - // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, - // capture shared_from_this() instead of this. - ClientDataWrapper * wrapper = new ClientDataWrapper(shared_from_this()); - z_owned_closure_reply_t zn_closure_reply; - z_closure(&zn_closure_reply, client_data_handler, client_data_drop, wrapper); - z_get( - sess_->loan(), - z_loan(keyexpr_), "", - z_move(zn_closure_reply), - &opts); - + opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; + + std::vector raw_bytes( + reinterpret_cast(request_bytes), + reinterpret_cast(request_bytes) + data_length); + opts.payload = zenoh::Bytes(std::move(raw_bytes)); + + std::weak_ptr client_data = shared_from_this(); + zenoh::ZResult result; + std::string parameters; + context_impl->session()->get( + keyexpr_.value(), + parameters, + [client_data](const zenoh::Reply & reply) { + if (!reply.is_ok()) { + auto reply_err_str = reply.get_err().get_payload().as_string(); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False Reason: %s", + reply_err_str.c_str()) + return; + } + const zenoh::Sample & sample = reply.get_ok(); + + auto sub_data = client_data.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain ClientData from data for %s.", + std::string(sample.get_keyexpr().as_string_view()).c_str()); + return; + } + + if (sub_data->is_shutdown()) { + return; + } + + std::chrono::time_point now = std::chrono::system_clock::now(); + + sub_data->add_new_reply( + std::make_unique(reply, now.time_since_epoch().count())); + }, + zenoh::closures::none, + std::move(opts), + &result); + if (result != Z_OK) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ClientData unable to call get"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -545,8 +484,14 @@ rmw_ret_t ClientData::shutdown() // Unregister this node from the ROS graph. if (initialized_) { - z_liveliness_undeclare_token(z_move(token_)); - z_drop(z_move(keyexpr_)); + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } } sess_.reset(); diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index f582f539..44b187ba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -15,8 +15,7 @@ #ifndef DETAIL__RMW_CLIENT_DATA_HPP_ #define DETAIL__RMW_CLIENT_DATA_HPP_ -#include - +#include #include #include #include @@ -26,6 +25,8 @@ #include #include +#include + #include "event.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" @@ -47,7 +48,7 @@ class ClientData final : public std::enable_shared_from_this public: // Make a shared_ptr of ClientData. static std::shared_ptr make( - std::shared_ptr session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -64,7 +65,7 @@ class ClientData final : public std::enable_shared_from_this bool liveliness_is_valid() const; // Copy the GID of this ClientData into an rmw_gid_t. - void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + std::array copy_gid() const; // Add a new ZenohReply to the queue. void add_new_reply(std::unique_ptr reply); @@ -107,15 +108,12 @@ class ClientData final : public std::enable_shared_from_this const rmw_node_t * rmw_node, const rmw_client_t * client, std::shared_ptr entity, - std::shared_ptr sess, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, std::shared_ptr response_type_support); - // Initialize the Zenoh objects for this entity. - bool init(std::shared_ptr session); - // Internal mutex. mutable std::recursive_mutex mutex_; // The parent node. @@ -124,11 +122,11 @@ class ClientData final : public std::enable_shared_from_this // The Entity generated for the service. std::shared_ptr entity_; // A shared session. - std::shared_ptr sess_; + std::shared_ptr sess_; // An owned keyexpression. - z_owned_keyexpr_t keyexpr_; + std::optional keyexpr_; // Liveliness token for the service. - z_owned_liveliness_token_t token_; + std::optional token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index e800ea1f..206d2d3f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -26,13 +26,14 @@ #include #include +#include + #include "graph_cache.hpp" #include "guard_condition.hpp" #include "identifier.hpp" #include "logging_macros.hpp" #include "rmw_node_data.hpp" #include "zenoh_config.hpp" -#include "zenoh_router_check.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" @@ -49,8 +50,6 @@ static std::mutex data_to_data_shared_ptr_map_mutex; static std::unordered_map> data_to_data_shared_ptr_map; -static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data); - // Bundle all class members into a data struct which can be passed as a // weak ptr to various threads for thread-safe access without capturing // "this" ptr by reference. @@ -68,49 +67,49 @@ class rmw_context_impl_s::Data final nodes_({}) { // Initialize the zenoh configuration. - z_owned_config_t config; - rmw_ret_t ret; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) - { + std::optional config = rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session); + + if (!config.has_value()) { throw std::runtime_error("Error configuring Zenoh session."); } + zenoh::ZResult result; + #ifndef _MSC_VER // Check if shm is enabled. - z_owned_string_t shm_enabled; - zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled); - auto always_free_shm_enabled = rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); + std::string shm_enabled = config.value().get(Z_CONFIG_SHARED_MEMORY_KEY, &result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Not able to get %s from the config file", + Z_CONFIG_SHARED_MEMORY_KEY); + } #endif // Initialize the zenoh session. - z_owned_session_t raw_session; - if (z_open(&raw_session, z_move(config), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error setting up zenoh session."); - throw std::runtime_error("Error setting up zenoh session."); - } - if (session_ != nullptr) { - session_.reset(); + session_ = std::make_shared( + std::move(config.value()), + zenoh::Session::SessionOptions::create_default(), + &result); + if (result != Z_OK) { + throw std::runtime_error("Error setting up zenoh session. "); } - session_ = std::make_shared(raw_session); - auto close_session = rcpputils::make_scope_exit( - [&raw_session]() { - z_close(z_loan_mut(raw_session), NULL); - }); // Verify if the zenoh router is running if configured. const std::optional configured_connection_attempts = rmw_zenoh_cpp::zenoh_router_check_attempts(); if (configured_connection_attempts.has_value()) { uint64_t connection_attempts = 0; + // Retry until the connection is successful. constexpr std::chrono::milliseconds sleep_time(1000); constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time); - while ((ret = rmw_zenoh_cpp::zenoh_router_check(session_->loan())) != RMW_RET_OK) { + do { + zenoh::ZResult result; + this->session_->get_routers_z_id(&result); + if (result == Z_OK) { + break; + } if ((connection_attempts % ticks_between_print) == 0) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -121,12 +120,12 @@ class rmw_context_impl_s::Data final break; } std::this_thread::sleep_for(sleep_time); - } + } while(connection_attempts < configured_connection_attempts.value()); } // Initialize the graph cache. - const z_id_t zid = z_info_zid(session_->loan()); - graph_cache_ = std::make_shared(zid); + graph_cache_ = + std::make_shared(this->session_->get_zid()); // Setup liveliness subscriptions for discovery. std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); @@ -145,59 +144,51 @@ class rmw_context_impl_s::Data final // the code will be simpler, and if we're just going to spin over the non-blocking // reads until we obtain responses, we'll just be hogging CPU time by convincing // the OS that we're doing actual work when it could instead park the thread. - z_owned_fifo_handler_reply_t handler; - z_owned_closure_reply_t closure; - z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1); - - z_view_keyexpr_t keyexpr; - z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); - z_liveliness_get( - session_->loan(), z_loan(keyexpr), - z_move(closure), NULL); - z_owned_reply_t reply; - while (z_recv(z_loan(handler), &reply) == Z_OK) { - if (z_reply_is_ok(z_loan(reply))) { - const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply)); - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - graph_cache_->parse_put(std::move(liveliness_str), true); + + // Intuitively use a FIFO channel rather than building it up with a closure and a + // handler to FIFO channel + zenoh::KeyExpr keyexpr(liveliness_str); + + zenoh::Session::GetOptions options = zenoh::Session::GetOptions::create_default(); + options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL; + options.payload = ""; + + auto replies = session_->liveliness_get( + keyexpr, + zenoh::channels::FifoChannel(SIZE_MAX - 1), + zenoh::Session::LivelinessGetOptions::create_default(), + &result); + + if (result != Z_OK) { + throw std::runtime_error("Error getting liveliness. "); + } + + for (auto res = replies.recv(); std::holds_alternative(res); + res = replies.recv()) + { + const zenoh::Reply & reply = std::get(res); + if (reply.is_ok()) { + const auto & sample = reply.get_ok(); + graph_cache_->parse_put(std::string(sample.get_keyexpr().as_string_view()), true); } else { RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); } - z_drop(z_move(reply)); } - z_drop(z_move(handler)); // Initialize the shm manager if shared_memory is enabled in the config. shm_provider_ = std::nullopt; #ifndef _MSC_VER - if (strncmp( - z_string_data(z_loan(shm_enabled)), - "true", - z_string_len(z_loan(shm_enabled))) == 0) - { - // TODO(yuyuan): determine the default alignment of SHM - z_alloc_alignment_t alignment = {5}; - z_owned_memory_layout_t layout; - z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment); - - z_owned_shm_provider_t provider; - if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create an SHM provider."); - throw std::runtime_error("Unable to create an SHM provider."); + if (shm_enabled == "true") { + auto layout = zenoh::MemoryLayout( + SHM_BUFFER_SIZE_MB * 1024 * 1024, + zenoh::AllocAlignment({5})); + zenoh::PosixShmProvider provider(layout, &result); + if (result != Z_OK) { + throw std::runtime_error("Unable to create shm provider."); } - shm_provider_ = provider; + shm_provider_ = std::move(provider); } - auto free_shm_provider = rcpputils::make_scope_exit( - [this]() { - if (shm_provider_.has_value()) { - z_drop(z_move(shm_provider_.value())); - } - }); #endif graph_guard_condition_ = std::make_unique(); graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -205,30 +196,37 @@ class rmw_context_impl_s::Data final // Setup the liveliness subscriber to receives updates from the ROS graph // and update the graph cache. - z_liveliness_subscriber_options_t sub_options; - z_liveliness_subscriber_options_default(&sub_options); - z_owned_closure_sample_t callback; - z_closure(&callback, graph_sub_data_handler, nullptr, this); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); - if (z_liveliness_declare_subscriber( - session_->loan(), - &graph_subscriber_, z_loan(liveliness_ke), - z_move(callback), &sub_options) != Z_OK) - { + zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str()); + zenoh::Session::LivelinessSubscriberOptions sub_options = + zenoh::Session::LivelinessSubscriberOptions::create_default(); + sub_options.history = true; + graph_subscriber_ = session_->liveliness_declare_subscriber( + keyexpr_cpp, + [&](const zenoh::Sample & sample) { + // Look up the data shared_ptr in the global map. If it is in there, use it. + // If not, it is being shutdown so we can just ignore this update. + std::shared_ptr data_shared_ptr{nullptr}; + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + if (data_to_data_shared_ptr_map.count(this) == 0) { + return; + } + data_shared_ptr = data_to_data_shared_ptr_map[this]; + } + + // Update the graph cache. + data_shared_ptr->update_graph_cache( + sample, + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &result); + + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); throw std::runtime_error("Unable to subscribe to ROS graph updates."); } - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - - close_session.cancel(); -#ifndef _MSC_VER - free_shm_provider.cancel(); -#endif - undeclare_z_sub.cancel(); } // Shutdown the Zenoh session. @@ -241,10 +239,15 @@ class rmw_context_impl_s::Data final return ret; } - z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_provider_.has_value()) { - z_drop(z_move(shm_provider_.value())); + zenoh::ZResult result; + std::move(graph_subscriber_).value().undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; } + is_shutdown_ = true; // We specifically do *not* hold the mutex_ while tearing down the session; this allows us @@ -263,13 +266,13 @@ class rmw_context_impl_s::Data final return enclave_; } - std::shared_ptr session() const + const std::shared_ptr session() const { std::lock_guard lock(mutex_); return session_; } - std::optional & shm_provider() + std::optional & shm_provider() { std::lock_guard lock(mutex_); return shm_provider_; @@ -296,7 +299,7 @@ class rmw_context_impl_s::Data final bool session_is_valid() const { std::lock_guard lock(mutex_); - return !z_session_is_closed(session_->loan()); + return !session_->is_closed(); } std::shared_ptr graph_cache() @@ -317,7 +320,7 @@ class rmw_context_impl_s::Data final } // Check that the Zenoh session is still valid. - if (z_session_is_closed(session_->loan())) { + if (!session_is_valid()) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create NodeData as Zenoh session is invalid."); @@ -327,7 +330,7 @@ class rmw_context_impl_s::Data final auto node_data = rmw_zenoh_cpp::NodeData::make( node, this->get_next_entity_id(), - session_->loan(), + session(), domain_id_, ns, node_name, @@ -361,17 +364,17 @@ class rmw_context_impl_s::Data final nodes_.erase(node); } - void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr) + void update_graph_cache(const zenoh::Sample & sample_kind, const std::string & keystr) { std::lock_guard lock(mutex_); if (is_shutdown_) { return; } - switch (sample_kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + switch (sample_kind.get_kind()) { + case zenoh::SampleKind::Z_SAMPLE_KIND_PUT: graph_cache_->parse_put(keystr); break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + case zenoh::SampleKind::Z_SAMPLE_KIND_DELETE: graph_cache_->parse_del(keystr); break; default: @@ -383,7 +386,7 @@ class rmw_context_impl_s::Data final if (RMW_RET_OK != rmw_ret) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition." + "[update_graph_cache] Unable to trigger graph guard condition." ); } } @@ -403,15 +406,24 @@ class rmw_context_impl_s::Data final std::size_t domain_id_; // Enclave, name used to find security artifacts in a sros2 keystore. std::string enclave_; - // A shared session. - std::shared_ptr session_{nullptr}; + // An owned session. + std::shared_ptr session_; // An optional SHM manager that is initialized of SHM is enabled in the // zenoh session config. - std::optional shm_provider_; + std::optional shm_provider_; // Graph cache. std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. - z_owned_subscriber_t graph_subscriber_; + // The graph_subscriber *must* exist in order for anything in this Data class, + // and hence rmw_zenoh_cpp, to work. + // However, zenoh::Subscriber does not have an empty constructor, + // so just declaring this as a zenoh::Subscriber fails to compile. + // We work around that by wrapping it in a std::optional, + // so the std::optional gets constructed at Data constructor time, + // and then we initialize graph_subscriber_ later. Note that the zenoh-cpp API + // liveliness_declare_subscriber() throws an exception if it fails, + // so this should all be safe to do. + std::optional> graph_subscriber_; // Equivalent to rmw_dds_common::Context's guard condition. // Guard condition that should be triggered when the graph changes. std::unique_ptr graph_guard_condition_; @@ -425,37 +437,6 @@ class rmw_context_impl_s::Data final std::unordered_map> nodes_; }; -///============================================================================= -static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data) -{ - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - - auto data_ptr = static_cast(data); - if (data_ptr == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Invalid data_ptr." - ); - return; - } - - // Look up the data shared_ptr in the global map. If it is in there, use it. - // If not, it is being shutdown so we can just ignore this update. - std::shared_ptr data_shared_ptr{nullptr}; - { - std::lock_guard lk(data_to_data_shared_ptr_map_mutex); - if (data_to_data_shared_ptr_map.count(data_ptr) == 0) { - return; - } - data_shared_ptr = data_to_data_shared_ptr_map[data_ptr]; - } - - // Update the graph cache. - std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(liveliness_str)); -} - ///============================================================================= rmw_context_impl_s::rmw_context_impl_s( const std::size_t domain_id, @@ -480,13 +461,13 @@ std::string rmw_context_impl_s::enclave() const } ///============================================================================= -std::shared_ptr rmw_context_impl_s::session() const +const std::shared_ptr rmw_context_impl_s::session() const { return data_->session(); } ///============================================================================= -std::optional & rmw_context_impl_s::shm_provider() +std::optional & rmw_context_impl_s::shm_provider() { return data_->shm_provider(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index ee30b3e0..a2fdaf5e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -15,13 +15,13 @@ #ifndef DETAIL__RMW_CONTEXT_IMPL_S_HPP_ #define DETAIL__RMW_CONTEXT_IMPL_S_HPP_ -#include - #include #include #include #include +#include + #include "graph_cache.hpp" #include "rmw_node_data.hpp" @@ -47,14 +47,14 @@ struct rmw_context_impl_s final // Get a copy of the enclave. std::string enclave() const; - // Share the Zenoh session. - std::shared_ptr session() const; + // Loan the Zenoh session. + const std::shared_ptr session() const; // Get a reference to the shm_provider. // Note: This is not thread-safe. // TODO(Yadunund): Remove this API and instead include a publish() API // that handles the shm_provider once the context manages publishers. - std::optional & shm_provider(); + std::optional & shm_provider(); // Get the graph guard condition. rmw_guard_condition_t * graph_guard_condition(); diff --git a/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp b/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp deleted file mode 100644 index 1574abc5..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_ -#define DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_ - -#include - -namespace rmw_zenoh_cpp -{ -// TODO(YV): Consider using this again. -struct rmw_init_options_impl_s -{ - // An owned config. - z_owned_config_t config; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index c61daa2a..ed1b5b2b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -30,7 +30,7 @@ namespace rmw_zenoh_cpp std::shared_ptr NodeData::make( const rmw_node_t * const node, std::size_t id, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, @@ -38,7 +38,7 @@ std::shared_ptr NodeData::make( { // Create the entity. auto entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), + session->get_zid(), std::to_string(id), std::to_string(id), rmw_zenoh_cpp::liveliness::EntityType::Node, @@ -58,21 +58,18 @@ std::shared_ptr NodeData::make( // Create the liveliness token. std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - z_owned_liveliness_token_t token; - if (z_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { + zenoh::ZResult result; + auto token = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the node."); return nullptr; } - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - free_token.cancel(); return std::shared_ptr( new NodeData{ node, @@ -87,7 +84,7 @@ NodeData::NodeData( const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - z_owned_liveliness_token_t token) + zenoh::LivelinessToken token) : node_(node), id_(std::move(id)), entity_(std::move(entity)), @@ -121,7 +118,7 @@ std::size_t NodeData::id() const ///============================================================================= bool NodeData::create_pub_data( const rmw_publisher_t * const publisher, - std::shared_ptr session, + std::shared_ptr session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -143,7 +140,7 @@ bool NodeData::create_pub_data( } auto pub_data = PublisherData::make( - std::move(session), + session, node_, entity_->node_info(), id_, @@ -187,7 +184,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= bool NodeData::create_sub_data( const rmw_subscription_t * const subscription, - std::shared_ptr session, + std::shared_ptr session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -210,7 +207,7 @@ bool NodeData::create_sub_data( } auto sub_data = SubscriptionData::make( - std::move(session), + session, std::move(graph_cache), node_, entity_->node_info(), @@ -255,7 +252,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) ///============================================================================= bool NodeData::create_service_data( const rmw_service_t * const service, - std::shared_ptr session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -277,7 +274,7 @@ bool NodeData::create_service_data( } auto service_data = ServiceData::make( - std::move(session), + session, node_, entity_->node_info(), id_, @@ -322,7 +319,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service) ///============================================================================= bool NodeData::create_client_data( const rmw_client_t * const client, - std::shared_ptr session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -344,7 +341,7 @@ bool NodeData::create_client_data( } auto client_data = ClientData::make( - std::move(session), + session, node_, client, entity_->node_info(), @@ -396,7 +393,14 @@ rmw_ret_t NodeData::shutdown() } // Unregister this node from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } is_shutdown_ = true; return ret; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index 4429b589..e6fbede5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -15,14 +15,14 @@ #ifndef DETAIL__RMW_NODE_DATA_HPP_ #define DETAIL__RMW_NODE_DATA_HPP_ -#include - #include #include #include #include #include +#include + #include "graph_cache.hpp" #include "liveliness_utils.hpp" #include "rmw_client_data.hpp" @@ -43,7 +43,7 @@ class NodeData final static std::shared_ptr make( const rmw_node_t * const node, std::size_t id, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, @@ -55,7 +55,7 @@ class NodeData final // Create a new PublisherData for a given rmw_publisher_t. bool create_pub_data( const rmw_publisher_t * const publisher, - std::shared_ptr sess, + std::shared_ptr session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -70,7 +70,7 @@ class NodeData final // Create a new SubscriptionData for a given rmw_subscription_t. bool create_sub_data( const rmw_subscription_t * const subscription, - std::shared_ptr sess, + std::shared_ptr session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -86,7 +86,7 @@ class NodeData final // Create a new ServiceData for a given rmw_service_t. bool create_service_data( const rmw_service_t * const service, - std::shared_ptr session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -101,7 +101,7 @@ class NodeData final // Create a new ClientData for a given rmw_client_t. bool create_client_data( const rmw_client_t * const client, - std::shared_ptr session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -128,7 +128,7 @@ class NodeData final const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - z_owned_liveliness_token_t token); + zenoh::LivelinessToken token); // Internal mutex. mutable std::recursive_mutex mutex_; // The rmw_node_t associated with this NodeData. @@ -139,7 +139,7 @@ class NodeData final // The Entity generated for the node. std::shared_ptr entity_; // Liveliness token for the node. - z_owned_liveliness_token_t token_; + std::optional token_; // Shutdown flag. bool is_shutdown_; // Map of publishers. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f54c50ab..b228c7cb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -16,11 +16,13 @@ #include +#include #include #include #include #include #include +#include #include "cdr.hpp" #include "rmw_context_impl_s.hpp" @@ -42,7 +44,7 @@ namespace rmw_zenoh_cpp ///============================================================================= std::shared_ptr PublisherData::make( - std::shared_ptr session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -82,7 +84,7 @@ std::shared_ptr PublisherData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session->loan()), + session->get_zid(), std::to_string(node_id), std::to_string(publisher_id), liveliness::EntityType::Publisher, @@ -102,52 +104,31 @@ std::shared_ptr PublisherData::make( return nullptr; } - std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_; - z_view_keyexpr_t pub_ke; - if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - + zenoh::ZResult result; + std::optional pub_cache; + zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); // Create a Publication Cache if durability is transient_local. - std::optional pub_cache = std::nullopt; if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_publication_cache_options_t pub_cache_opts; - ze_publication_cache_options_default(&pub_cache_opts); + zenoh::Session::PublicationCacheOptions pub_cache_opts = + zenoh::Session::PublicationCacheOptions::create_default(); + pub_cache_opts.history = adapted_qos_profile.depth; pub_cache_opts.queryable_complete = true; - // Set the queryable_prefix to the session id so that querying subscribers can specify this - // session id to obtain latest data from this specific publication caches when querying over - // the same keyexpression. - // When such a prefix is added to the PublicationCache, it listens to queries with this extra - // prefix (allowing to be queried in a unique way), but still replies with the original - // publications' key expressions. + std::string queryable_prefix = entity->zid(); - z_view_keyexpr_t prefix_ke; - z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str()); - pub_cache_opts.queryable_prefix = z_loan(prefix_ke); - - ze_owned_publication_cache_t pub_cache_; - if (ze_declare_publication_cache( - session->loan(), &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) - { + pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix); + + pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &result); + + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; } - pub_cache = pub_cache_; } - auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( - [&pub_cache]() { - if (pub_cache.has_value()) { - z_drop(z_move(pub_cache.value())); - } - }); // Set congestion_control to BLOCK if appropriate. - z_publisher_options_t opts; - z_publisher_options_default(&opts); + zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default(); opts.congestion_control = Z_CONGESTION_CONTROL_DROP; - if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { opts.reliability = Z_RELIABILITY_RELIABLE; @@ -155,40 +136,24 @@ std::shared_ptr PublisherData::make( opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } } - z_owned_publisher_t pub; - // TODO(clalancette): What happens if the key name is a valid but empty string? - if (z_declare_publisher( - session->loan(), &pub, z_loan(pub_ke), &opts) != Z_OK) - { + auto pub = session->declare_publisher(pub_ke, std::move(opts), &result); + + if (result != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } - auto undeclare_z_publisher = rcpputils::make_scope_exit( - [&pub]() { - z_undeclare_publisher(z_move(pub)); - }); std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - z_owned_liveliness_token_t token; - if (z_liveliness_declare_token( - session->loan(), &token, z_loan(liveliness_ke), - NULL) != Z_OK) - { + auto token = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the publisher."); return nullptr; } - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - - undeclare_z_publisher_cache.cancel(); - undeclare_z_publisher.cancel(); - free_token.cancel(); return std::shared_ptr( new PublisherData{ @@ -207,10 +172,10 @@ std::shared_ptr PublisherData::make( PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, - std::shared_ptr sess, - z_owned_publisher_t pub, - std::optional pub_cache, - z_owned_liveliness_token_t token, + std::shared_ptr sess, + zenoh::Publisher pub, + std::optional pub_cache, + zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), @@ -230,7 +195,7 @@ PublisherData::PublisherData( ///============================================================================= rmw_ret_t PublisherData::publish( const void * ros_message, - std::optional & shm_provider) + std::optional & /*shm_provider*/) { std::lock_guard lock(mutex_); if (is_shutdown_) { @@ -245,47 +210,20 @@ rmw_ret_t PublisherData::publish( // To store serialized message byte array. char * msg_bytes = nullptr; - std::optional shmbuf = std::nullopt; - auto always_free_shmbuf = rcpputils::make_scope_exit( - [&shmbuf]() { - if (shmbuf.has_value()) { - z_drop(z_move(shmbuf.value())); - } - }); rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator, &shmbuf]() { - if (msg_bytes && !shmbuf.has_value()) { + [&msg_bytes, allocator]() { + if (msg_bytes) { allocator->deallocate(msg_bytes, allocator->state); } }); - // Get memory from SHM buffer if available. - if (shm_provider.has_value()) { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); - - auto provider = shm_provider.value(); - z_buf_layout_alloc_result_t alloc; - // TODO(yuyuan): SHM, configure this - z_alloc_alignment_t alignment = {5}; - z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); - - if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { - shmbuf = std::make_optional(alloc.buf); - msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); - } else { - // TODO(Yadunund): Should we revert to regular allocation and not return an error? - RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); - return RMW_RET_ERROR; - } - } else { - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - } + // Get memory from the allocator. + msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); @@ -306,24 +244,21 @@ rmw_ret_t PublisherData::publish( // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - z_owned_bytes_t attachment; - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - options.attachment = z_move(attachment); - - z_owned_bytes_t payload; - if (shmbuf.has_value()) { - z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); - } else { - z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); - } - - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { + zenoh::ZResult result; + auto options = zenoh::Publisher::PutOptions::create_default(); + options.attachment = create_map_and_set_sequence_num( + sequence_number_++, + entity_->copy_gid()); + + // TODO(ahcorde): shmbuf + std::vector raw_data( + reinterpret_cast(msg_bytes), + reinterpret_cast(msg_bytes) + data_length); + zenoh::Bytes payload(std::move(raw_data)); + + pub_.put(std::move(payload), std::move(options), &result); + if (result != Z_OK) { + if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -339,7 +274,7 @@ rmw_ret_t PublisherData::publish( ///============================================================================= rmw_ret_t PublisherData::publish_serialized_message( const rmw_serialized_message_t * serialized_message, - std::optional & /*shm_provider*/) + std::optional & /*shm_provider*/) { eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); @@ -355,19 +290,18 @@ rmw_ret_t PublisherData::publish_serialized_message( // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - z_owned_bytes_t attachment; - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - options.attachment = z_move(attachment); - z_owned_bytes_t payload; - z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); - - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { + zenoh::ZResult result; + auto options = zenoh::Publisher::PutOptions::create_default(); + options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid()); + + std::vector raw_data( + serialized_message->buffer, + serialized_message->buffer + data_length); + zenoh::Bytes payload(std::move(raw_data)); + + pub_.put(std::move(payload), std::move(options), &result); + if (result != Z_OK) { + if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -376,7 +310,6 @@ rmw_ret_t PublisherData::publish_serialized_message( return RMW_RET_ERROR; } } - return RMW_RET_OK; } @@ -394,11 +327,10 @@ liveliness::TopicInfo PublisherData::topic_info() const return entity_->topic_info().value(); } -///============================================================================= -void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +std::array PublisherData::copy_gid() const { std::lock_guard lock(mutex_); - entity_->copy_gid(out_gid); + return entity_->copy_gid(); } ///============================================================================= @@ -440,11 +372,21 @@ rmw_ret_t PublisherData::shutdown() } // Unregister this publisher from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); - if (pub_cache_.has_value()) { - ze_undeclare_publication_cache(z_move(pub_cache_.value())); + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } + std::move(pub_).undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare publisher"); + return RMW_RET_ERROR; } - z_undeclare_publisher(z_move(pub_)); sess_.reset(); is_shutdown_ = true; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 1853ff8a..52c22523 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -15,8 +15,7 @@ #ifndef DETAIL__RMW_PUBLISHER_DATA_HPP_ #define DETAIL__RMW_PUBLISHER_DATA_HPP_ -#include - +#include #include #include #include @@ -24,6 +23,8 @@ #include #include +#include + #include "event.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" @@ -43,7 +44,7 @@ class PublisherData final public: // Make a shared_ptr of PublisherData. static std::shared_ptr make( - std::shared_ptr session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -55,12 +56,12 @@ class PublisherData final // Publish a ROS message. rmw_ret_t publish( const void * ros_message, - std::optional & shm_provider); + std::optional & shm_provider); // Publish a serialized ROS message. rmw_ret_t publish_serialized_message( const rmw_serialized_message_t * serialized_message, - std::optional & shm_provider); + std::optional & shm_provider); // Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity. std::size_t keyexpr_hash() const; @@ -68,8 +69,8 @@ class PublisherData final // Get a copy of the TopicInfo of this PublisherData. liveliness::TopicInfo topic_info() const; - // Copy the GID of this PublisherData into an rmw_gid_t. - void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + // Return a copy of the GID of this publisher. + std::array copy_gid() const; // Returns true if liveliness token is still valid. bool liveliness_is_valid() const; @@ -91,10 +92,10 @@ class PublisherData final PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, - std::shared_ptr sess, - z_owned_publisher_t pub, - std::optional pub_cache, - z_owned_liveliness_token_t token, + std::shared_ptr session, + zenoh::Publisher pub, + std::optional pub_cache, + zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support); @@ -105,13 +106,13 @@ class PublisherData final // The Entity generated for the publisher. std::shared_ptr entity_; // A shared session. - std::shared_ptr sess_; + std::shared_ptr sess_; // An owned publisher. - z_owned_publisher_t pub_; + zenoh::Publisher pub_; // Optional publication cache when durability is transient_local. - std::optional pub_cache_; + std::optional pub_cache_; // Liveliness token for the publisher. - z_owned_liveliness_token_t token_; + std::optional token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 3d456a02..9fe075a9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -16,12 +16,16 @@ #include +#include #include #include #include #include #include #include +#include + +#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -38,34 +42,9 @@ namespace rmw_zenoh_cpp { -///============================================================================== -void service_data_handler(z_loaned_query_t * query, void * data) -{ - z_view_string_t keystr; - z_keyexpr_as_view_string(z_query_keyexpr(query), &keystr); - - ServiceData * service_data = - static_cast(data); - if (service_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain ServiceData from data for " - "service for %.*s", - static_cast(z_string_len(z_loan(keystr))), - z_string_data(z_loan(keystr)) - ); - return; - } - - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - - service_data->add_new_query(std::make_unique(query, received_timestamp)); -} - ///============================================================================= std::shared_ptr ServiceData::make( - std::shared_ptr session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -126,7 +105,7 @@ std::shared_ptr ServiceData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session->loan()), + session->get_zid(), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Service, @@ -157,71 +136,56 @@ std::shared_ptr ServiceData::make( std::move(response_type_support) }); - // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr - // in the closure callback once we switch to zenoh-cpp. - if (z_keyexpr_from_str( - &service_data->keyexpr_, - service_data->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK) - { + zenoh::ZResult result; + service_data->keyexpr_ = service_data->entity_->topic_info()->topic_keyexpr_; + zenoh::KeyExpr service_ke(service_data->keyexpr_, true, &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_drop(z_move(service_data->keyexpr_)); + + zenoh::Session::QueryableOptions qable_options = + zenoh::Session::QueryableOptions::create_default(); + qable_options.complete = true; + + std::weak_ptr data_wp = service_data; + service_data->qable_ = session->declare_queryable( + service_ke, + [data_wp](const zenoh::Query & query) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain ServiceData from data for %s.", + query.get_keyexpr().as_string_view()); + return; } - }); - z_owned_closure_query_t callback; - z_closure(&callback, service_data_handler, nullptr, service_data.get()); + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); - // Configure the queryable to process complete queries. - z_queryable_options_t qable_options; - z_queryable_options_default(&qable_options); - qable_options.complete = true; - if (z_declare_queryable( - session->loan(), &service_data->qable_, z_loan(service_data->keyexpr_), - z_move(callback), &qable_options) != Z_OK) - { + sub_data->add_new_query(std::make_unique(query, received_timestamp)); + }, + zenoh::closures::none, + std::move(qable_options), + &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); return nullptr; } - auto undeclare_z_queryable = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_undeclare_queryable(z_move(service_data->qable_)); - } - }); std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - if (z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()) != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - auto free_token = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_drop(z_move(service_data->token_)); - } - }); - if (z_liveliness_declare_token( - session->loan(), &service_data->token_, z_loan(liveliness_ke), - NULL) != Z_OK) - { + service_data->token_ = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the service."); return nullptr; } - service_data->initialized_ = true; - - free_ros_keyexpr.cancel(); - undeclare_z_queryable.cancel(); - free_token.cancel(); - return service_data; } @@ -229,14 +193,14 @@ std::shared_ptr ServiceData::make( ServiceData::ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, - std::shared_ptr sess, + std::shared_ptr session, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, std::unique_ptr response_type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), - sess_(std::move(sess)), + sess_(std::move(session)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(std::move(request_type_support)), @@ -282,15 +246,12 @@ void ServiceData::add_new_query(std::unique_ptr query) query_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - z_view_string_t keystr; - z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " - "for service for %.*s", + "for service for %s", adapted_qos_profile.depth, - static_cast(z_string_len(z_loan(keystr))), - z_string_data(z_loan(keystr))); + keyexpr_.c_str()); query_queue_.pop_front(); } query_queue_.emplace_back(std::move(query)); @@ -319,16 +280,28 @@ rmw_ret_t ServiceData::take_request( } std::unique_ptr query = std::move(query_queue_.front()); query_queue_.pop_front(); - const z_loaned_query_t * loaned_query = query->get_query(); + const zenoh::Query & loaned_query = query->get_query(); // DESERIALIZE MESSAGE ======================================================== - z_owned_slice_t payload; - z_bytes_to_slice(z_query_payload(loaned_query), &payload); + auto payload = loaned_query.get_payload(); + if (!payload.has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ServiceData take_request payload is empty"); + return RMW_RET_ERROR; + } + + auto payload_data = payload.value().get().as_vector(); + if (payload_data.empty()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ServiceData not able to get slice data"); + return RMW_RET_ERROR; + } // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), - z_slice_len(z_loan(payload))); + reinterpret_cast(const_cast(payload_data.data())), payload_data.size()); // Object that serializes the data Cdr deser(fastbuffer); @@ -343,7 +316,15 @@ rmw_ret_t ServiceData::take_request( // Fill in the request header. // Get the sequence_number out of the attachment - AttachmentData attachment(z_query_attachment(loaned_query)); + if (!loaned_query.get_attachment().has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ServiceData take_request attachment is empty"); + return RMW_RET_ERROR; + } + + rmw_zenoh_cpp::AttachmentData attachment(std::move( + loaned_query.get_attachment().value().get())); request_header->request_id.sequence_number = attachment.sequence_number(); if (request_header->request_id.sequence_number < 0) { @@ -351,7 +332,8 @@ rmw_ret_t ServiceData::take_request( return RMW_RET_ERROR; } - attachment.copy_gid(request_header->request_id.writer_guid); + std::array writer_guid = attachment.copy_gid(); + memcpy(request_header->request_id.writer_guid, writer_guid.data(), RMW_GID_STORAGE_SIZE); request_header->source_timestamp = attachment.source_timestamp(); if (request_header->source_timestamp < 0) { @@ -361,7 +343,7 @@ rmw_ret_t ServiceData::take_request( request_header->received_timestamp = query->get_received_timestamp(); // Add this query to the map, so that rmw_send_response can quickly look it up later. - const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid); + const size_t hash = rmw_zenoh_cpp::hash_gid(writer_guid); std::unordered_map::iterator it = sequence_to_query_map_.find(hash); if (it == sequence_to_query_map_.end()) { SequenceToQuery stq; @@ -378,8 +360,6 @@ rmw_ret_t ServiceData::take_request( it->second.insert(std::make_pair(request_header->request_id.sequence_number, std::move(query))); *taken = true; - z_drop(z_move(payload)); - return RMW_RET_OK; } @@ -396,8 +376,12 @@ rmw_ret_t ServiceData::send_response( ); return RMW_RET_OK; } + + std::array writer_guid; + memcpy(writer_guid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE); + // Create the queryable payload - const size_t hash = hash_gid(request_id->writer_guid); + const size_t hash = hash_gid(writer_guid); std::unordered_map::iterator it = sequence_to_query_map_.find(hash); if (it == sequence_to_query_map_.end()) { // If there is no data associated with this request, the higher layers of @@ -450,21 +434,31 @@ rmw_ret_t ServiceData::send_response( size_t data_length = ser.get_serialized_data_length(); - const z_loaned_query_t * loaned_query = query->get_query(); - z_query_reply_options_t options; - z_query_reply_options_default(&options); - - z_owned_bytes_t attachment; - rmw_zenoh_cpp::create_map_and_set_sequence_num( - &attachment, request_id->sequence_number, - request_id->writer_guid); - options.attachment = z_move(attachment); + const zenoh::Query & loaned_query = query->get_query(); + zenoh::Query::ReplyOptions options = zenoh::Query::ReplyOptions::create_default(); + std::array writer_gid; + memcpy(writer_gid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE); + options.attachment = create_map_and_set_sequence_num( + request_id->sequence_number, + writer_gid); + + std::vector raw_bytes( + reinterpret_cast(response_bytes), + reinterpret_cast(response_bytes) + data_length); + zenoh::Bytes payload(std::move(raw_bytes)); + + zenoh::ZResult result; + zenoh::KeyExpr service_ke(keyexpr_.c_str(), true, &result); + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to create KeyExpr"); + return RMW_RET_ERROR; + } - z_owned_bytes_t payload; - z_bytes_copy_from_buf( - &payload, reinterpret_cast(response_bytes), data_length); - z_query_reply( - loaned_query, z_loan(keyexpr_), z_move(payload), &options); + loaned_query.reply(service_ke, std::move(payload), std::move(options), &result); + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to reply"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -524,9 +518,22 @@ rmw_ret_t ServiceData::shutdown() // Unregister this node from the ROS graph. if (initialized_) { - z_drop(z_move(keyexpr_)); - z_liveliness_undeclare_token(z_move(token_)); - z_undeclare_queryable(z_move(qable_)); + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } + + std::move(qable_).value().undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare queryable"); + return RMW_RET_ERROR; + } } sess_.reset(); diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index a8bcbd66..eeeeee6c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -15,8 +15,6 @@ #ifndef DETAIL__RMW_SERVICE_DATA_HPP_ #define DETAIL__RMW_SERVICE_DATA_HPP_ -#include - #include #include #include @@ -26,6 +24,8 @@ #include #include +#include + #include "event.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" @@ -42,12 +42,12 @@ namespace rmw_zenoh_cpp { ///============================================================================= -class ServiceData final +class ServiceData final : public std::enable_shared_from_this { public: // Make a shared_ptr of ServiceData. static std::shared_ptr make( - std::shared_ptr session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -99,7 +99,7 @@ class ServiceData final ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, - std::shared_ptr sess, + std::shared_ptr session, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, @@ -111,14 +111,31 @@ class ServiceData final const rmw_node_t * rmw_node_; // The Entity generated for the service. std::shared_ptr entity_; - // An owned keyexpression. - z_owned_keyexpr_t keyexpr_; - // A shared session. - std::shared_ptr sess_; + + // A shared session + std::shared_ptr sess_; + // The keyexpr string. + std::string keyexpr_; // An owned queryable. - z_owned_queryable_t qable_; + // The Queryable *must* exist in order for anything in this ServiceData class, + // and hence rmw_zenoh_cpp, to work. + // However, zenoh::Queryable does not have an empty constructor, + // so just declaring this as a zenoh::Queryable fails to compile. + // We work around that by wrapping it in a std::optional, so the std::optional + // gets constructed at ServiceData constructor time, + // and then we initialize qable_ later. Note that the zenoh-cpp API declare_queryable() throws an + // exception if it fails, so this should all be safe to do. + std::optional> qable_; // Liveliness token for the service. - z_owned_liveliness_token_t token_; + // The token_ *must* exist in order for anything in this ServiceData class, + // and hence rmw_zenoh_cpp, to work. + // However, zenoh::LivelinessToken does not have an empty constructor, + // so just declaring this as a zenoh::LivelinessToken fails to compile. + // We work around that by wrapping it in a std::optional, so the std::optional + // gets constructed at ServiceData constructor time, + // and then we initialize token_ later. Note that the zenoh-cpp API + // liveliness_declare_token() throws an exception if it fails, so this should all be safe to do. + std::optional token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 14f1ed8f..dc1cba81 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -23,6 +24,7 @@ #include #include #include +#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -40,61 +42,23 @@ namespace rmw_zenoh_cpp { -namespace -{ -//============================================================================== -// TODO(Yadunund): Make this a class member and lambda capture weak_from_this() -// instead of passing a rawptr to SubscriptionData when we switch to zenoh-cpp. -void sub_data_handler(z_loaned_sample_t * sample, void * data) -{ - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - - auto sub_data = static_cast(data); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain SubscriptionData from data for %s.", - z_loan(keystr) - ); - return; - } - - AttachmentData attachment(z_sample_attachment(sample)); - const z_loaned_bytes_t * payload = z_sample_payload(sample); - - z_owned_slice_t slice; - z_bytes_to_slice(payload, &slice); - - std::string topic_name(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - - sub_data->add_new_message( - std::make_unique( - slice, - std::chrono::system_clock::now().time_since_epoch().count(), - std::move(attachment)), - topic_name); -} -} // namespace - ///============================================================================= SubscriptionData::Message::Message( - z_owned_slice_t p, + std::vector && p, uint64_t recv_ts, AttachmentData && attachment_) -: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) +: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } ///============================================================================= SubscriptionData::Message::~Message() { - z_drop(z_move(payload)); } ///============================================================================= std::shared_ptr SubscriptionData::make( - std::shared_ptr session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -136,7 +100,7 @@ std::shared_ptr SubscriptionData::make( // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session->loan()), + session->get_zid(), std::to_string(node_id), std::to_string(subscription_id), liveliness::EntityType::Subscription, @@ -179,13 +143,13 @@ SubscriptionData::SubscriptionData( const rmw_node_t * rmw_node, std::shared_ptr graph_cache, std::shared_ptr entity, - std::shared_ptr sess, + std::shared_ptr session, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), - sess_(std::move(sess)), + sess_(std::move(session)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), last_known_published_msg_({}), @@ -201,14 +165,9 @@ SubscriptionData::SubscriptionData( // enable_shared_from_this, which is not available in constructors. bool SubscriptionData::init() { - // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr - // in the closure callback once we switch to zenoh-cpp. - z_owned_closure_sample_t callback; - z_closure(&callback, sub_data_handler, nullptr, this); - - std::string topic_keyexpr = entity_->topic_info()->topic_keyexpr_; - z_view_keyexpr_t sub_ke; - if (z_view_keyexpr_from_str(&sub_ke, topic_keyexpr.c_str()) != Z_OK) { + zenoh::ZResult result; + zenoh::KeyExpr sub_ke(entity_->topic_info()->topic_keyexpr_, true, &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } @@ -222,14 +181,11 @@ bool SubscriptionData::init() // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_querying_subscriber_options_t sub_options; - ze_querying_subscriber_options_default(&sub_options); - // Make the initial query to hit all the PublicationCaches, using a query_selector with - // '*' in place of the queryable_prefix of each PublicationCache + zenoh::Session::QueryingSubscriberOptions sub_options = + zenoh::Session::QueryingSubscriberOptions::create_default(); const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; - z_view_keyexpr_t selector_ke; - z_view_keyexpr_from_str(&selector_ke, selector.c_str()); - sub_options.query_selector = z_loan(selector_ke); + zenoh::KeyExpr selector_ke(selector); + sub_options.query_keyexpr = std::move(selector_ke); // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. // By default a query accepts only replies that matches its query selector. // This allows us to selectively query certain PublicationCaches when defining the @@ -240,19 +196,51 @@ bool SubscriptionData::init() // We set consolidation to none as we need to receive transient local messages // from a number of publishers. Eg: To receive TF data published over /tf_static // by various publishers. - sub_options.query_consolidation = z_query_consolidation_none(); - ze_owned_querying_subscriber_t sub; - if (ze_declare_querying_subscriber( - sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) - { + sub_options.query_consolidation = + zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); + + std::weak_ptr data_wp = shared_from_this(); + auto sub = context_impl->session()->declare_querying_subscriber( + sub_ke, + [data_wp](const zenoh::Sample & sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain SubscriptionData from data for %s.", + sample.get_keyexpr().as_string_view()); + return; + } + + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment") + return; + } + + auto attachment_value = attachment.value(); + AttachmentData attachment_data(attachment_value); + + sub_data->add_new_message( + std::make_unique( + sample.get_payload().as_vector(), + std::chrono::system_clock::now().time_since_epoch().count(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } - sub_ = sub; + sub_ = std::move(sub); // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. - std::weak_ptr data_wp = shared_from_this(); graph_cache_->set_querying_subscriber_callback( entity_->topic_info().value().topic_keyexpr_, entity_->keyexpr_hash(), @@ -276,71 +264,79 @@ bool SubscriptionData::init() "QueryingSubscriberCallback triggered over %s.", selector.c_str() ); - z_get_options_t opts; - z_get_options_default(&opts); + zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); opts.timeout_ms = std::numeric_limits::max(); - opts.consolidation = z_query_consolidation_latest(); + opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; - z_view_keyexpr_t ke; - z_view_keyexpr_from_str(&ke, selector.c_str()); - ze_querying_subscriber_get( - z_loan(std::get(sub_data->sub_)), - z_loan(ke), - &opts); + zenoh::ZResult result; + std::get>(sub_data->sub_.value()).get( + zenoh::KeyExpr(selector), + std::move(opts), + &result); + + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to get querying subscriber."); + return; + } } ); } else { - // Create a regular subscriber for all other durability settings. - z_subscriber_options_t sub_options; - z_subscriber_options_default(&sub_options); - - z_owned_subscriber_t sub; - if (z_declare_subscriber( - sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), - &sub_options) != Z_OK) - { + zenoh::Session::SubscriberOptions sub_options = + zenoh::Session::SubscriberOptions::create_default(); + std::weak_ptr data_wp = shared_from_this(); + zenoh::Subscriber sub = context_impl->session()->declare_subscriber( + sub_ke, + [data_wp](const zenoh::Sample & sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback." + ); + return; + } + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment") + return; + } + auto payload = sample.get_payload().clone(); + auto attachment_value = attachment.value(); + + AttachmentData attachment_data(attachment_value); + sub_data->add_new_message( + std::make_unique( + payload.as_vector(), + std::chrono::system_clock::now().time_since_epoch().count(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } - sub_ = sub; + sub_ = std::move(sub); } - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub == nullptr || z_undeclare_subscriber(z_move(*sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_); - if (querying_sub == nullptr || ze_undeclare_querying_subscriber(z_move(*querying_sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } - } - }); - // Publish to the graph that a new subscription is in town. std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - - auto free_token = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(token_)); - }); - if (z_liveliness_declare_token( - sess_->loan(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) - { + token_ = context_impl->session()->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the subscription."); return false; } - undeclare_z_sub.cancel(); - free_token.cancel(); - initialized_ = true; return true; @@ -408,21 +404,36 @@ rmw_ret_t SubscriptionData::shutdown() graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); // Unregister this subscription from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } - z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub != nullptr) { - if (z_undeclare_subscriber(z_move(*sub)) != Z_OK) { - RMW_SET_ERROR_MSG("failed to undeclare sub."); - ret = RMW_RET_ERROR; - } - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_); - if (querying_sub != nullptr) { - if (ze_undeclare_querying_subscriber(z_move(*querying_sub)) != Z_OK) { - RMW_SET_ERROR_MSG("failed to undeclare querying sub."); - ret = RMW_RET_ERROR; + if (sub_.has_value()) { + zenoh::Subscriber * sub = std::get_if>(&sub_.value()); + if (sub != nullptr) { + std::move(*sub).undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare sub."); + return RMW_RET_ERROR; + } + } else { + zenoh::ext::QueryingSubscriber * sub = + std::get_if>(&sub_.value()); + if (sub != nullptr) { + std::move(*sub).undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare querying sub."); + return RMW_RET_ERROR; + } } } } @@ -479,13 +490,18 @@ rmw_ret_t SubscriptionData::take_one_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + auto & payload_data = msg_data->payload; + if (payload_data.empty()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); + return RMW_RET_ERROR; + } // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload)), - payload_len); + reinterpret_cast(const_cast(payload_data.data())), + payload_data.size()); // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); @@ -505,10 +521,12 @@ rmw_ret_t SubscriptionData::take_one_message( // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - msg_data->attachment.copy_gid(message_info->publisher_gid.data); + memcpy( + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } - *taken = true; return RMW_RET_OK; @@ -530,18 +548,26 @@ rmw_ret_t SubscriptionData::take_serialized_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + auto & payload_data = msg_data->payload; - if (serialized_message->buffer_capacity < payload_len) { + if (payload_data.empty()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); + return RMW_RET_ERROR; + } + if (serialized_message->buffer_capacity < payload_data.size()) { rmw_ret_t ret = - rmw_serialized_message_resize(serialized_message, payload_len); + rmw_serialized_message_resize(serialized_message, payload_data.size()); if (ret != RMW_RET_OK) { return ret; // Error message already set } } - serialized_message->buffer_length = payload_len; - memcpy(serialized_message->buffer, payload, payload_len); + serialized_message->buffer_length = payload_data.size(); + memcpy( + serialized_message->buffer, + reinterpret_cast(const_cast(payload_data.data())), + payload_data.size()); *taken = true; @@ -552,7 +578,10 @@ rmw_ret_t SubscriptionData::take_serialized_message( // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - msg_data->attachment.copy_gid(message_info->publisher_gid.data); + memcpy( + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } @@ -590,7 +619,7 @@ void SubscriptionData::add_new_message( } // Check for messages lost if the new sequence number is not monotonically increasing. - const size_t gid_hash = msg->attachment.gid_hash(); + const size_t gid_hash = hash_gid(msg->attachment.copy_gid()); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { const int64_t seq_increment = std::abs( diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 08a09def..a3fab3f9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -15,8 +15,6 @@ #ifndef DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ #define DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ -#include - #include #include #include @@ -27,6 +25,9 @@ #include #include #include +#include + +#include #include "event.hpp" #include "graph_cache.hpp" @@ -50,20 +51,20 @@ class SubscriptionData final : public std::enable_shared_from_this && p, uint64_t recv_ts, AttachmentData && attachment); ~Message(); - z_owned_slice_t payload; + std::vector payload; uint64_t recv_timestamp; AttachmentData attachment; }; // Make a shared_ptr of SubscriptionData. static std::shared_ptr make( - std::shared_ptr session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -127,7 +128,7 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache, std::shared_ptr entity, - std::shared_ptr sess, + std::shared_ptr session, const void * type_support_impl, std::unique_ptr type_support); @@ -142,11 +143,11 @@ class SubscriptionData final : public std::enable_shared_from_this entity_; // A shared session - std::shared_ptr sess_; + std::shared_ptr sess_; // An owned subscriber or querying_subscriber depending on the QoS settings. - std::variant sub_; + std::optional, zenoh::ext::QueryingSubscriber>> sub_; // Liveliness token for the subscription. - z_owned_liveliness_token_t token_; + std::optional token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/detail/type_support.cpp b/rmw_zenoh_cpp/src/detail/type_support.cpp index fff22474..fc462975 100644 --- a/rmw_zenoh_cpp/src/detail/type_support.cpp +++ b/rmw_zenoh_cpp/src/detail/type_support.cpp @@ -131,10 +131,10 @@ bool TypeSupport::deserialize_ros_message( uint8_t dump = 0; deser >> dump; (void)dump; - } catch (const eprosima::fastcdr::exception::Exception &) { + } catch (const eprosima::fastcdr::exception::Exception & e) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Fast CDR exception deserializing message of type %s.", - get_name()); + "Fast CDR exception deserializing message of type %s. %s", + get_name(), e.what()); return false; } diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index 0e4116ef..fa100d0a 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -19,6 +19,9 @@ #include #include +#include +#include + #include "logging_macros.hpp" #include @@ -41,10 +44,9 @@ static const std::unordered_map _get_z_config( const char * envar_name, - const char * default_uri, - z_owned_config_t * config) + const char * default_uri) { const char * configured_uri; const char * envar_uri; @@ -53,40 +55,42 @@ rmw_ret_t _get_z_config( // NULL is returned if everything is ok. RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Envar %s cannot be read.", envar_name); - return RMW_RET_ERROR; + return std::nullopt; } // If the environment variable is set, try to read the configuration from the file, // if the environment variable is not set use internal configuration configured_uri = envar_uri[0] != '\0' ? envar_uri : default_uri; // Try to read the configuration - if (zc_config_from_file(config, configured_uri) != Z_OK) { + zenoh::ZResult result; + zenoh::Config config = zenoh::Config::from_file(configured_uri, &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Invalid configuration file %s", configured_uri); - return RMW_RET_ERROR; + return std::nullopt; } RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "configured using configuration file %s", configured_uri); - return RMW_RET_OK; + return config; } } // namespace ///============================================================================= -rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * config) +std::optional get_z_config(const ConfigurableEntity & entity) { auto envar_map_it = envar_map.find(entity); if (envar_map_it == envar_map.end()) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "get_z_config called with invalid ConfigurableEntity."); - return RMW_RET_ERROR; + return std::nullopt; } // Get the absolute path to the default configuration file. static const std::string path_to_config_folder = ament_index_cpp::get_package_share_directory("rmw_zenoh_cpp") + "/config/"; const std::string default_config_path = path_to_config_folder + envar_map_it->second.second; - return _get_z_config(envar_map_it->second.first, default_config_path.c_str(), config); + return _get_z_config(envar_map_it->second.first, default_config_path.c_str()); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp index dceafe5d..1f1e194f 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp @@ -15,12 +15,13 @@ #ifndef DETAIL__ZENOH_CONFIG_HPP_ #define DETAIL__ZENOH_CONFIG_HPP_ -#include - #include #include #include +#include +#include + #include "rmw/ret_types.h" #define ZENOH_LOG_ENV_VAR_STR "RUST_LOG" @@ -46,9 +47,9 @@ enum class ConfigurableEntity : uint8_t /// is configured using the rmw_zenoh default configuration file. /// @param entity The zenoh entity to be configured. /// @param config The zenoh configuration to be filled. -/// @returns `RMW_RET_OK` if the configuration was successfully loaded. +/// @returns The zenoh configuration to be filled. [[nodiscard]] -rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * config); +std::optional get_z_config(const ConfigurableEntity & entity); ///============================================================================= /// Get the number of times rmw_init should try to connect to a zenoh router diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp deleted file mode 100644 index b6c5d21e..00000000 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "zenoh_router_check.hpp" - -#include - -#include -#include -#include - -#include "logging_macros.hpp" -#include "liveliness_utils.hpp" - -namespace rmw_zenoh_cpp -{ -///============================================================================= -rmw_ret_t zenoh_router_check(const z_loaned_session_t * session) -{ - // Initialize context for callback - int context = 0; - - // Define callback - auto callback = [](const struct z_id_t * id, void * ctx) { - const std::string id_str = liveliness::zid_to_str(*id); - RMW_ZENOH_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Successfully connected to a Zenoh router with id %s.", id_str.c_str()); - // Note: Callback is guaranteed to never be called - // concurrently according to z_info_routers_zid docstring - (*(static_cast(ctx)))++; - }; - - z_owned_closure_zid_t router_callback; - z_closure(&router_callback, callback, NULL, &context); - if (z_info_routers_zid(session, z_move(router_callback)) != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Failed to evaluate if Zenoh routers are connected to the session."); - return RMW_RET_ERROR; - } else { - if (context == 0) { - return RMW_RET_ERROR; - } - } - - return RMW_RET_OK; -} -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp deleted file mode 100644 index 1db74a4c..00000000 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__ZENOH_ROUTER_CHECK_HPP_ -#define DETAIL__ZENOH_ROUTER_CHECK_HPP_ - -#include - -#include "rmw/ret_types.h" - -namespace rmw_zenoh_cpp -{ -/// Check if a Zenoh router is connected to the session. -/// @param session Zenoh session to check. -/// @return RMW_RET_OK if a Zenoh router is connected to the session. -rmw_ret_t zenoh_router_check(const z_loaned_session_t * session); -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__ZENOH_ROUTER_CHECK_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index be5721b7..35173656 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -14,8 +14,10 @@ #include "zenoh_utils.hpp" +#include #include #include +#include #include "attachment_helpers.hpp" #include "rcpputils/scope_exit.hpp" @@ -39,23 +41,23 @@ ZenohSession::~ZenohSession() } ///============================================================================= -void create_map_and_set_sequence_num( - z_owned_bytes_t * out_bytes, int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) +zenoh::Bytes create_map_and_set_sequence_num( + int64_t sequence_number, std::array gid) { auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); int64_t source_timestamp = now_ns.count(); - AttachmentData data(sequence_number, source_timestamp, gid); - data.serialize_to_zbytes(out_bytes); + rmw_zenoh_cpp::AttachmentData data(sequence_number, source_timestamp, gid); + return data.serialize_to_zbytes(); } ///============================================================================= ZenohQuery::ZenohQuery( - const z_loaned_query_t * query, + const zenoh::Query & query, std::chrono::nanoseconds::rep received_timestamp) +: query_(query.clone()) { - z_query_clone(&query_, query); received_timestamp_ = received_timestamp; } @@ -66,37 +68,27 @@ std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const } ///============================================================================= -ZenohQuery::~ZenohQuery() -{ - z_drop(z_move(query_)); -} +ZenohQuery::~ZenohQuery() {} ///============================================================================= -const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);} +const zenoh::Query & ZenohQuery::get_query() const {return query_;} ///============================================================================= ZenohReply::ZenohReply( - const z_loaned_reply_t * reply, + const zenoh::Reply & reply, std::chrono::nanoseconds::rep received_timestamp) { - z_reply_clone(&reply_, reply); + reply_ = reply.clone(); received_timestamp_ = received_timestamp; } ///============================================================================= -ZenohReply::~ZenohReply() -{ - z_drop(z_move(reply_)); -} +ZenohReply::~ZenohReply() {} ///============================================================================= -std::optional ZenohReply::get_sample() const +const zenoh::Reply & ZenohReply::get_sample() const { - if (z_reply_is_ok(z_loan(reply_))) { - return z_reply_ok(z_loan(reply_)); - } - - return std::nullopt; + return reply_.value(); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index ae2aed3a..1b1efc87 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -15,8 +15,9 @@ #ifndef DETAIL__ZENOH_UTILS_HPP_ #define DETAIL__ZENOH_UTILS_HPP_ -#include +#include +#include #include #include #include @@ -41,26 +42,24 @@ class ZenohSession final }; ///============================================================================= -void -create_map_and_set_sequence_num( - z_owned_bytes_t * out_bytes, int64_t sequence_number, - uint8_t gid[RMW_GID_STORAGE_SIZE]); +zenoh::Bytes create_map_and_set_sequence_num( + int64_t sequence_number, std::array gid); ///============================================================================= // A class to store the replies to service requests. class ZenohReply final { public: - ZenohReply(const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp); + ZenohReply(const zenoh::Reply & reply, std::chrono::nanoseconds::rep received_timestamp); ~ZenohReply(); - std::optional get_sample() const; + const zenoh::Reply & get_sample() const; std::chrono::nanoseconds::rep get_received_timestamp() const; private: - z_owned_reply_t reply_; + std::optional reply_; std::chrono::nanoseconds::rep received_timestamp_; }; @@ -69,16 +68,16 @@ class ZenohReply final class ZenohQuery final { public: - ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp); + ZenohQuery(const zenoh::Query & query, std::chrono::nanoseconds::rep received_timestamp); ~ZenohQuery(); - const z_loaned_query_t * get_query() const; + const zenoh::Query & get_query() const; std::chrono::nanoseconds::rep get_received_timestamp() const; private: - z_owned_query_t query_; + zenoh::Query query_; std::chrono::nanoseconds::rep received_timestamp_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index ac754aaa..cb34697c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - -#include #include #include -#include "detail/guard_condition.hpp" +#include + #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" #include "detail/rmw_context_impl_s.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init_options.cpp b/rmw_zenoh_cpp/src/rmw_init_options.cpp index 783c5a6a..15e06e2b 100644 --- a/rmw_zenoh_cpp/src/rmw_init_options.cpp +++ b/rmw_zenoh_cpp/src/rmw_init_options.cpp @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include +#include #include "detail/identifier.hpp" -#include "detail/rmw_init_options_impl.hpp" #include "rcpputils/scope_exit.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 1e13a710..23220fdf 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -13,8 +13,7 @@ // limitations under the License. #include - -#include +#include #include #include @@ -26,7 +25,8 @@ #include #include -#include "detail/attachment_helpers.hpp" +#include + #include "detail/cdr.hpp" #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" @@ -2451,7 +2451,7 @@ rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - pub_data->copy_gid(gid->data); + memcpy(gid->data, pub_data->copy_gid().data(), RMW_GID_STORAGE_SIZE); return RMW_RET_OK; } @@ -2468,7 +2468,7 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - client_data->copy_gid(gid->data); + memcpy(gid->data, client_data->copy_gid().data(), RMW_GID_STORAGE_SIZE); return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/zenohd/main.cpp b/rmw_zenoh_cpp/src/zenohd/main.cpp index 58a1b39f..bbba50f1 100644 --- a/rmw_zenoh_cpp/src/zenohd/main.cpp +++ b/rmw_zenoh_cpp/src/zenohd/main.cpp @@ -17,22 +17,18 @@ #include #include +#include +#include + #ifdef _WIN32 #include #else #include #endif -#include - -#include - #include "../detail/zenoh_config.hpp" -#include "../detail/liveliness_utils.hpp" #include "rmw/error_handling.h" - -#include "rcpputils/scope_exit.hpp" #include "rcutils/env.h" static bool running = true; @@ -66,32 +62,31 @@ int main(int argc, char ** argv) return 1; } - // Enable the zenoh built-in logger. - zc_try_init_log_from_env(); + // Enable the zenoh built-in logger + zenoh::try_init_log_from_env(); - // Initialize the zenoh configuration for the router. - z_owned_config_t config; - if ((rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Router, - &config)) != RMW_RET_OK) - { + std::optional config = rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Router); + + if (!config.has_value()) { RMW_SET_ERROR_MSG("Error configuring Zenoh router."); return 1; } - z_owned_session_t session; - if (z_open(&session, z_move(config), NULL) != Z_OK) { - printf("Unable to open router session!\n"); + zenoh::ZResult result; + auto session = zenoh::Session::open( + std::move(config.value()), + zenoh::Session::SessionOptions::create_default(), + &result); + if (result != Z_OK) { + std::cout << "Error opening Session!" << "\\n"; return 1; } - auto always_close_session = rcpputils::make_scope_exit( - [&session]() { - z_close(z_loan_mut(session), NULL); - }); - - printf( - "Started Zenoh router with id %s.\n", - rmw_zenoh_cpp::liveliness::zid_to_str(z_info_zid(z_session_loan(&session))).c_str()); + + std::cout << "Started Zenoh router with id" + << session.get_zid().to_string() + << std::endl; + #ifdef _WIN32 SetConsoleCtrlHandler(quit, TRUE); #else diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt similarity index 85% rename from zenoh_c_vendor/CMakeLists.txt rename to zenoh_cpp_vendor/CMakeLists.txt index cddd2fd4..37bdbd41 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.16) -project(zenoh_c_vendor) +project(zenoh_cpp_vendor) # Default to C++17 if(NOT CMAKE_CXX_STANDARD) @@ -35,4 +35,15 @@ ament_vendor(zenoh_c_vendor ament_export_dependencies(zenohc) +ament_vendor(zenoh_cpp_vendor + VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp + VCS_VERSION e8eca99b4eaff0963e52aefd8405909c1552080d + CMAKE_ARGS + -DZENOHCXX_ZENOHC=OFF +) + +externalproject_add_stepdependencies(zenoh_cpp_vendor configure zenoh_c_vendor) + +ament_export_dependencies(zenohcxx) + ament_package() diff --git a/zenoh_c_vendor/package.xml b/zenoh_cpp_vendor/package.xml similarity index 95% rename from zenoh_c_vendor/package.xml rename to zenoh_cpp_vendor/package.xml index b907feff..eb2c8c20 100644 --- a/zenoh_c_vendor/package.xml +++ b/zenoh_cpp_vendor/package.xml @@ -1,7 +1,7 @@ - zenoh_c_vendor + zenoh_cpp_vendor 0.0.1 Vendor pkg to install zenoh-c Yadunund