From eb695604a983eeb3aded49dc222ee7a19b17414c Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 00:09:36 +0800 Subject: [PATCH] Update graph cache with publisher data Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 54 ++++++++++++++++++++++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 43 +++++++++++++++---- 2 files changed, 88 insertions(+), 9 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 44abbd3e..99403ec8 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -79,6 +79,7 @@ std::string GenerateToken::publisher( { std::string token = generate_base_token("MP", domain_id, node_namespace, node_name); token += topic + "/" + type + "/" + qos; + printf("GenerateToken::Publisher: %s\n", token.c_str()); return token; } @@ -386,6 +387,26 @@ void GraphCache::parse_put(const std::string & keyexpr) } else if (entity == "MP") { // Publisher + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // Potential edge case where a liveliness update for a node creation was missed. + // So we add the node here. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second && !node->pubs.empty()) { + // Node already exists so just append the publisher. + insertion.first->second->pubs.push_back(std::move(node->pubs[0])); + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added publisher to node %s in graph.", + node->name.c_str()); + return; } else if (entity == "MS") { // Subscription } else if (entity == "SS") { @@ -424,6 +445,39 @@ void GraphCache::parse_del(const std::string & keyexpr) ); } else if (entity == "MP") { // Publisher + if (node->pubs.empty()) { + // This should never happen but we make sure _parse_token() has no error. + return; + } + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + auto node_it = ns_it->second.find(node->name); + if (node_it != ns_it->second.end()) { + const auto found_node = node_it->second; + // Here we iterate throught the list of publishers and remove the one + // with matching name, type and qos. + // TODO(Yadunund): This can be more optimal than O(n) with some caching. + auto erase_it = found_node->pubs.begin(); + for (; erase_it != found_node->pubs.end(); ++erase_it) { + const auto & pub = *erase_it; + if (pub.topic == node->pubs.at(0).topic && + pub.type == node->pubs.at(0).type && + pub.qos == node->pubs.at(0).qos) + { + break; + } + } + if (erase_it != found_node->pubs.end()) { + found_node->pubs.erase(erase_it); + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Removed publisher %s from node %s in the graph.", + node->pubs.at(0).topic.c_str(), + node->name.c_str() + ); + } + } + } } else if (entity == "MS") { // Subscription } else if (entity == "SS") { diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5df96482..af7ca85e 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -181,11 +181,11 @@ rmw_create_node( node->context = context; // Publish to the graph that a new node is in town - const bool result = PublishToken::put( + const bool pub_result = PublishToken::put( &node->context->impl->session, GenerateToken::node(context->actual_domain_id, namespace_, name) ); - if (!result) { + if (!pub_result) { return nullptr; } @@ -211,11 +211,11 @@ rmw_destroy_node(rmw_node_t * node) return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); // Publish to the graph that a node has ridden off into the sunset - const bool result = PublishToken::del( + const bool del_result = PublishToken::del( &node->context->impl->session, GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) ); - if (!result) { + if (!del_result) { return RMW_RET_ERROR; } @@ -518,6 +518,19 @@ rmw_create_publisher( // Publish to the graph that a new publisher is in town // TODO(Yadunund): Publish liveliness for the new publisher. + const bool pub_result = PublishToken::put( + &node->context->impl->session, + GenerateToken::publisher( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_publisher->topic_name, + publisher_data->type_support->get_name(), + "reliable") + ); + if (!pub_result) { + return nullptr; + } publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher( rmw_publisher->topic_name, node->name, node->namespace_, @@ -558,15 +571,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) rmw_ret_t ret = RMW_RET_OK; - // Publish to the graph that a publisher has ridden off into the sunset - // TODO(Yadunund): Publish liveliness for the deleted publisher. - rcutils_allocator_t * allocator = &node->context->options.allocator; - allocator->deallocate(const_cast(publisher->topic_name), allocator->state); - auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { + // Publish to the graph that a publisher has ridden off into the sunset + const bool del_result = PublishToken::del( + &node->context->impl->session, + GenerateToken::publisher( + node->context->actual_domain_id, + node->namespace_, + node->name, + publisher->topic_name, + publisher_data->type_support->get_name(), + "reliable" + ) + ); + if (!del_result) { + // TODO(Yadunund): Should this really return an error? + return RMW_RET_ERROR; + } node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); @@ -577,6 +601,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) } allocator->deallocate(publisher_data, allocator->state); } + allocator->deallocate(const_cast(publisher->topic_name), allocator->state); allocator->deallocate(publisher, allocator->state); return ret;