Skip to content

Commit

Permalink
Rely on unordered_map instead of yaml for graph cache
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Nov 15, 2023
1 parent c2bf0a4 commit a6ce6e5
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 65 deletions.
179 changes: 133 additions & 46 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <utility>
Expand Down Expand Up @@ -50,8 +51,7 @@ static std::string generate_base_token(
// will always result in 5 parts.
if (namespace_ == "/") {
token_ss << "_/";
}
else {
} else {
token_ss << "/";
}
// Finally append node name.
Expand Down Expand Up @@ -253,7 +253,9 @@ GraphCache::remove_subscription(uint64_t handle)
}

///=============================================================================
static std::vector<std::string> split_keyexpr(const std::string & keyexpr)
namespace
{
std::vector<std::string> split_keyexpr(const std::string & keyexpr)
{
std::vector<std::size_t> delim_idx = {};
// Insert -1 for starting position to make the split easier when using substr.
Expand All @@ -280,32 +282,108 @@ static std::vector<std::string> split_keyexpr(const std::string & keyexpr)
result.push_back(keyexpr.substr(delim_idx.back() + 1));
return result;
}

///=============================================================================
void GraphCache::parse_put(const std::string & keyexpr)
// Convert a liveliness token into a <entity, Node>
std::optional<std::pair<std::string, GraphNode>> _parse_token(const std::string & keyexpr)
{
std::vector<std::string> parts = split_keyexpr(keyexpr);
// At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name).
// Basic validation.
if (parts.size() < 5) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received invalid liveliness token");
return;
return std::nullopt;
}
for (const std::string & p : parts) {
if (p.empty()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received invalid liveliness token");
return std::nullopt;
}
}
// TODO(Yadunund): Validate individual parts.

// Get the entity, ie N, MP, MS, SS, SC.
const std::string & entity = parts[2];
std::string & entity = parts[2];
if (entity != "NN" &&
entity != "MP" &&
entity != "MS" &&
entity != "SS" &&
entity != "SC")
{
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid entity [%s] in liveliness token", entity.c_str());
return std::nullopt;
}


GraphNode node;
// Nodes with empty namespaces will contain a "_".
node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3);
node.name = std::move(parts[4]);

if (entity != "NN") {
if (parts.size() < 8) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received invalid liveliness token");
return std::nullopt;
}
GraphNode::PubSubData data;
data.topic = std::move(parts[5]);
data.type = std::move(parts[6]);
data.qos = std::move(parts[7]);

if (entity == "MP") {
node.pubs.push_back(std::move(data));
} else if (entity == "MS") {
node.subs.push_back(std::move(data));
} else {
// TODO.
}
}

return std::make_pair(std::move(entity), std::move(node));
}
} // namespace anonymous

///=============================================================================
void GraphCache::parse_put(const std::string & keyexpr)
{
auto valid_token = _parse_token(keyexpr);
if (!valid_token.has_value()) {
// Error message has already been logged.
return;
}
const std::string & entity = valid_token->first;
auto node = std::make_shared<GraphNode>(std::move(valid_token->second));
std::lock_guard<std::mutex> lock(graph_mutex_);

if (entity == "NN") {
// Node
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Adding node %s to the graph.", parts.back().c_str());
// Nodes with empty namespaces will contain an "_".
const bool has_namespace = parts[3] == "_" ? false : true;
graph_[parts.back()] = YAML::Node();
// TODO(Yadunund): Implement enclave support.
graph_[parts.back()]["enclave"] = "";
graph_[parts.back()]["namespace"] = has_namespace ? "/" + parts.at(3) : "/";
auto ns_it = graph_.find(node->ns);
if (ns_it == graph_.end()) {
// New namespace.
std::string ns = node->ns;
std::unordered_map<std::string, GraphNodePtr> map = {
{node->name, node}
};
graph_.insert(std::make_pair(std::move(ns), std::move(map)));
} else {
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second) {
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Unable to add duplicate node %s to the graph.",
node->name.c_str());
}
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node %s to the graph.",
node->name.c_str());
return;

} else if (entity == "MP") {
// Publisher
} else if (entity == "MS") {
Expand All @@ -325,25 +403,25 @@ void GraphCache::parse_put(const std::string & keyexpr)
///=============================================================================
void GraphCache::parse_del(const std::string & keyexpr)
{
// TODO(Yadunund): Validate data.
std::vector<std::string> parts = split_keyexpr(keyexpr);
if (parts.size() < 3) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received invalid liveliness token");
auto valid_token = _parse_token(keyexpr);
if (!valid_token.has_value()) {
// Error message has already been logged.
return;
}
// Get the entity, ie N, MP, MS, SS, SC.
const std::string & entity = parts[2];
const std::string & entity = valid_token->first;
auto node = std::make_shared<GraphNode>(std::move(valid_token->second));
std::lock_guard<std::mutex> lock(graph_mutex_);
if (entity == "NN") {
// Node
auto ns_it = graph_.find(node->ns);
if (ns_it != graph_.end()) {
ns_it->second.erase(node->name);
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removing node %s from the graph.",
parts.back().c_str()
"Removed node %s from the graph.",
node->name.c_str()
);
graph_.remove(entity.back());
} else if (entity == "MP") {
// Publisher
} else if (entity == "MS") {
Expand Down Expand Up @@ -383,7 +461,14 @@ rmw_ret_t GraphCache::get_node_names(
RCUTILS_CHECK_ALLOCATOR_WITH_MSG(
allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT);

size_t nodes_number = graph_.size();
size_t nodes_number = 0;
for (auto it = graph_.begin(); it != graph_.end(); ++it) {
nodes_number += it->second.size();
}
// TODO(Yadunund): Delete.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"nodes_number %ld", nodes_number);

rcutils_ret_t rcutils_ret =
rcutils_string_array_init(node_names, nodes_number, allocator);
Expand Down Expand Up @@ -437,30 +522,32 @@ rmw_ret_t GraphCache::get_node_names(
}

// TODO(Yadunund): Remove this printout.
const std::string & graph_str = YAML::Dump(graph_);
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str());
// const std::string & graph_str = YAML::Dump(graph_);
// RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str());
// Fill node names, namespaces and enclaves.
std::size_t j = 0;
for (auto it = graph_.begin(); it != graph_.end(); ++it) {
const auto & node_name = it->first.as<std::string>();
const auto & yaml_node = it->second;
node_names->data[j] = rcutils_strdup(node_name.c_str(), *allocator);
if (!node_names->data[j]) {
return RMW_RET_BAD_ALLOC;
}
node_namespaces->data[j] = rcutils_strdup(
yaml_node["namespace"].as<std::string>().c_str(), *allocator);
if (!node_namespaces->data[j]) {
return RMW_RET_BAD_ALLOC;
}
if (enclaves) {
enclaves->data[j] = rcutils_strdup(
yaml_node["enclaves"].as<std::string>().c_str(), *allocator);
if (!enclaves->data[j]) {
for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) {
const std::string & ns = ns_it->first;
for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) {
const auto node = node_it->second;
node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator);
if (!node_names->data[j]) {
return RMW_RET_BAD_ALLOC;
}
node_namespaces->data[j] = rcutils_strdup(
ns.c_str(), *allocator);
if (!node_namespaces->data[j]) {
return RMW_RET_BAD_ALLOC;
}
if (enclaves) {
enclaves->data[j] = rcutils_strdup(
node->enclave.c_str(), *allocator);
if (!enclaves->data[j]) {
return RMW_RET_BAD_ALLOC;
}
}
++j;
}
++j;
}

if (free_enclaves) {
Expand Down
61 changes: 42 additions & 19 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,27 @@ class SubscriptionData final
char * type_name_{nullptr};
};

///=============================================================================
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{

struct PubSubData
{
std::string topic;
std::string type;
std::string qos;
};

std::string ns;
std::string name;
// TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node?
std::string enclave;
std::vector<PubSubData> pubs;
std::vector<PubSubData> subs;
};
using GraphNodePtr = std::shared_ptr<GraphNode>;

///=============================================================================
class GraphCache final
{
Expand Down Expand Up @@ -141,26 +162,28 @@ class GraphCache final
std::map<uint64_t, std::unique_ptr<SubscriptionData>> subscriptions_;

/*
node_1:
enclave:
namespace:
publishers: [
{
topic:
type:
qos:
}
]
subscriptions: [
{
topic:
type:
qos:
}
]
node_n:
namespace_1:
node_1:
enclave:
publishers: [
{
topic:
type:
qos:
}
],
subscriptions: [
{
topic:
type:
qos:
}
],
namespace_2:
node_n:
*/
YAML::Node graph_;
// Map namespace to a map of <node_name, GraphNodePtr>.
std::unordered_map<std::string, std::unordered_map<std::string, GraphNodePtr>> graph_;
mutable std::mutex graph_mutex_;
};

Expand Down

0 comments on commit a6ce6e5

Please sign in to comment.