diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 268fc774..44abbd3e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -50,8 +51,7 @@ static std::string generate_base_token( // will always result in 5 parts. if (namespace_ == "/") { token_ss << "_/"; - } - else { + } else { token_ss << "/"; } // Finally append node name. @@ -253,7 +253,9 @@ GraphCache::remove_subscription(uint64_t handle) } ///============================================================================= -static std::vector split_keyexpr(const std::string & keyexpr) +namespace +{ +std::vector split_keyexpr(const std::string & keyexpr) { std::vector delim_idx = {}; // Insert -1 for starting position to make the split easier when using substr. @@ -280,32 +282,108 @@ static std::vector split_keyexpr(const std::string & keyexpr) result.push_back(keyexpr.substr(delim_idx.back() + 1)); return result; } - ///============================================================================= -void GraphCache::parse_put(const std::string & keyexpr) +// Convert a liveliness token into a +std::optional> _parse_token(const std::string & keyexpr) { std::vector parts = split_keyexpr(keyexpr); // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). + // Basic validation. if (parts.size() < 5) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received invalid liveliness token"); - return; + return std::nullopt; + } + for (const std::string & p : parts) { + if (p.empty()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } } - // TODO(Yadunund): Validate individual parts. // Get the entity, ie N, MP, MS, SS, SC. - const std::string & entity = parts[2]; + std::string & entity = parts[2]; + if (entity != "NN" && + entity != "MP" && + entity != "MS" && + entity != "SS" && + entity != "SC") + { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Invalid entity [%s] in liveliness token", entity.c_str()); + return std::nullopt; + } + + + GraphNode node; + // Nodes with empty namespaces will contain a "_". + node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3); + node.name = std::move(parts[4]); + + if (entity != "NN") { + if (parts.size() < 8) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + GraphNode::PubSubData data; + data.topic = std::move(parts[5]); + data.type = std::move(parts[6]); + data.qos = std::move(parts[7]); + + if (entity == "MP") { + node.pubs.push_back(std::move(data)); + } else if (entity == "MS") { + node.subs.push_back(std::move(data)); + } else { + // TODO. + } + } + + return std::make_pair(std::move(entity), std::move(node)); +} +} // namespace anonymous + +///============================================================================= +void GraphCache::parse_put(const std::string & keyexpr) +{ + auto valid_token = _parse_token(keyexpr); + if (!valid_token.has_value()) { + // Error message has already been logged. + return; + } + const std::string & entity = valid_token->first; + auto node = std::make_shared(std::move(valid_token->second)); std::lock_guard lock(graph_mutex_); + if (entity == "NN") { // Node - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Adding node %s to the graph.", parts.back().c_str()); - // Nodes with empty namespaces will contain an "_". - const bool has_namespace = parts[3] == "_" ? false : true; - graph_[parts.back()] = YAML::Node(); - // TODO(Yadunund): Implement enclave support. - graph_[parts.back()]["enclave"] = ""; - graph_[parts.back()]["namespace"] = has_namespace ? "/" + parts.at(3) : "/"; + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // New namespace. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Unable to add duplicate node %s to the graph.", + node->name.c_str()); + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added node %s to the graph.", + node->name.c_str()); + return; + } else if (entity == "MP") { // Publisher } else if (entity == "MS") { @@ -325,25 +403,25 @@ void GraphCache::parse_put(const std::string & keyexpr) ///============================================================================= void GraphCache::parse_del(const std::string & keyexpr) { - // TODO(Yadunund): Validate data. - std::vector parts = split_keyexpr(keyexpr); - if (parts.size() < 3) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); + auto valid_token = _parse_token(keyexpr); + if (!valid_token.has_value()) { + // Error message has already been logged. return; } - // Get the entity, ie N, MP, MS, SS, SC. - const std::string & entity = parts[2]; + const std::string & entity = valid_token->first; + auto node = std::make_shared(std::move(valid_token->second)); std::lock_guard lock(graph_mutex_); if (entity == "NN") { // Node + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + ns_it->second.erase(node->name); + } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removing node %s from the graph.", - parts.back().c_str() + "Removed node %s from the graph.", + node->name.c_str() ); - graph_.remove(entity.back()); } else if (entity == "MP") { // Publisher } else if (entity == "MS") { @@ -383,7 +461,14 @@ rmw_ret_t GraphCache::get_node_names( RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); - size_t nodes_number = graph_.size(); + size_t nodes_number = 0; + for (auto it = graph_.begin(); it != graph_.end(); ++it) { + nodes_number += it->second.size(); + } + // TODO(Yadunund): Delete. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "nodes_number %ld", nodes_number); rcutils_ret_t rcutils_ret = rcutils_string_array_init(node_names, nodes_number, allocator); @@ -437,30 +522,32 @@ rmw_ret_t GraphCache::get_node_names( } // TODO(Yadunund): Remove this printout. - const std::string & graph_str = YAML::Dump(graph_); - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); + // const std::string & graph_str = YAML::Dump(graph_); + // RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); // Fill node names, namespaces and enclaves. std::size_t j = 0; - for (auto it = graph_.begin(); it != graph_.end(); ++it) { - const auto & node_name = it->first.as(); - const auto & yaml_node = it->second; - node_names->data[j] = rcutils_strdup(node_name.c_str(), *allocator); - if (!node_names->data[j]) { - return RMW_RET_BAD_ALLOC; - } - node_namespaces->data[j] = rcutils_strdup( - yaml_node["namespace"].as().c_str(), *allocator); - if (!node_namespaces->data[j]) { - return RMW_RET_BAD_ALLOC; - } - if (enclaves) { - enclaves->data[j] = rcutils_strdup( - yaml_node["enclaves"].as().c_str(), *allocator); - if (!enclaves->data[j]) { + for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { + const std::string & ns = ns_it->first; + for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { + const auto node = node_it->second; + node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); + if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; } + node_namespaces->data[j] = rcutils_strdup( + ns.c_str(), *allocator); + if (!node_namespaces->data[j]) { + return RMW_RET_BAD_ALLOC; + } + if (enclaves) { + enclaves->data[j] = rcutils_strdup( + node->enclave.c_str(), *allocator); + if (!enclaves->data[j]) { + return RMW_RET_BAD_ALLOC; + } + } + ++j; } - ++j; } if (free_enclaves) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 0bb878b0..0e0c4c14 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -102,6 +102,27 @@ class SubscriptionData final char * type_name_{nullptr}; }; +///============================================================================= +// TODO(Yadunund): Expand to services and clients. +struct GraphNode +{ + + struct PubSubData + { + std::string topic; + std::string type; + std::string qos; + }; + + std::string ns; + std::string name; + // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? + std::string enclave; + std::vector pubs; + std::vector subs; +}; +using GraphNodePtr = std::shared_ptr; + ///============================================================================= class GraphCache final { @@ -141,26 +162,28 @@ class GraphCache final std::map> subscriptions_; /* - node_1: - enclave: - namespace: - publishers: [ - { - topic: - type: - qos: - } - ] - subscriptions: [ - { - topic: - type: - qos: - } - ] - node_n: + namespace_1: + node_1: + enclave: + publishers: [ + { + topic: + type: + qos: + } + ], + subscriptions: [ + { + topic: + type: + qos: + } + ], + namespace_2: + node_n: */ - YAML::Node graph_; + // Map namespace to a map of . + std::unordered_map> graph_; mutable std::mutex graph_mutex_; };