Skip to content

Commit

Permalink
Switch to liveliness tokens
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Nov 16, 2023
1 parent f92dd56 commit 2bbe95a
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 78 deletions.
23 changes: 0 additions & 23 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -259,27 +259,4 @@
},
},

/// Plugins configurations
/// Plugins are only loaded if present in the configuration. When starting
/// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace.
plugins: {

/// Configure the storage manager plugin
storage_manager: {
/// Configure the storages supported by the volumes
storages: {
ros2_lv: {
/// Storages always need to know what set of keys they must work with. These sets are defined by a key expression.
key_expr: "@ros2_lv/**",
/// Storages also need to know which volume will be used to actually store their key-value pairs.
/// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient.
volume: "memory",
/// A complete storage advertises itself as containing all the known keys matching the configured key expression.
/// If not configured, complete defaults to false.
complete: "true",
},
},
},
},

}
15 changes: 10 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ struct rmw_context_impl_s
///==============================================================================
struct rmw_node_data_t
{
// TODO(yadunund): Add a GraphCache object.

// Map topic name to topic types.
std::unordered_set<std::unordered_set<std::string>> publishers;
std::unordered_set<std::unordered_set<std::string>> subscriptions;
// TODO(Yadunund): Do we need a token at the node level? Right now I have one
// for cases where a node may spin up but does not have any publishers or subscriptions.
// Liveliness token for the node.
zc_owned_liveliness_token_t token;
};

///==============================================================================
Expand All @@ -71,6 +70,9 @@ struct rmw_publisher_data_t
// An owned publisher.
z_owned_publisher_t pub;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
Expand Down Expand Up @@ -113,6 +115,9 @@ struct rmw_subscription_data_t
{
z_owned_subscriber_t sub;

// Liveliness token for the subscription.
zc_owned_liveliness_token_t token;

const void * type_support_impl;
const char * typesupport_identifier;
MessageTypeSupport * type_support;
Expand Down
34 changes: 24 additions & 10 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,22 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// Setup liveliness subscriptions for discovery.
const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id);

// Query the router to get graph information before this session was started.
// TODO(Yadunund): This will not be needed once the zenoh-c liveliness API is available.
// Query the router/liveliness participants to get graph information before this session was started.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Sending Query '%s' to fetch discovery data from router...",
"Sending Query '%s' to fetch discovery data...",
liveliness_str.c_str()
);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_get_options_t opts = z_get_options_default();
z_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send),
&opts); // here, the send is moved and will be dropped by zenoh when adequate
zc_liveliness_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()),
z_move(channel.send), NULL);
// Uncomment and rely if #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available wiht zenoh-c.
// z_get_options_t opts = z_get_options_default();
// z_get(
// z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send),
// &opts); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
Expand All @@ -277,14 +281,24 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
liveliness_str.c_str()
);

auto sub_options = z_subscriber_options_default();
sub_options.reliability = Z_RELIABILITY_RELIABLE;
// Uncomment and rely if #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available wiht zenoh-c.
// auto sub_options = z_subscriber_options_default();
// sub_options.reliability = Z_RELIABILITY_RELIABLE;
// context->impl->graph_subscriber = z_declare_subscriber(
// z_loan(context->impl->session),
// z_keyexpr(liveliness_str.c_str()),
// z_move(callback),
// &sub_options);
auto sub_options = zc_liveliness_subscriber_options_null();
z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl);
context->impl->graph_subscriber = z_declare_subscriber(
context->impl->graph_subscriber = zc_liveliness_declare_subscriber(
z_loan(context->impl->session),
z_keyexpr(liveliness_str.c_str()),
z_move(callback),
&sub_options);
// TODO(Yadunund): Uncomment once linker issue is resolved.
// z_drop(z_move(sub_options));
auto undeclare_z_sub = rcpputils::make_scope_exit(
[context]() {
z_undeclare_subscriber(z_move(context->impl->graph_subscriber));
Expand Down
146 changes: 106 additions & 40 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <new>
#include <sstream>

#include <zenoh.h>

#include "detail/guard_condition.hpp"
#include "detail/graph_cache.hpp"
#include "detail/identifier.hpp"
Expand Down Expand Up @@ -168,6 +170,7 @@ rmw_create_node(
// zenohd is not running.
// Put metadata into node->data.
node->data = allocator->zero_allocate(1, sizeof(rmw_node_data_t), allocator->state);
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
RMW_CHECK_FOR_NULL_WITH_MSG(
node->data,
"unable to allocate memory for node data",
Expand All @@ -180,19 +183,43 @@ rmw_create_node(
node->implementation_identifier = rmw_zenoh_identifier;
node->context = context;


// Uncomment and rely if #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available wiht zenoh-c.
// Publish to the graph that a new node is in town
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::node(context->actual_domain_id, namespace_, name)
// const bool pub_result = PublishToken::put(
// &node->context->impl->session,
// GenerateToken::node(context->actual_domain_id, namespace_, name)
// );
// if (!pub_result) {
// return nullptr;
// }
// Initialize liveliness token for the node to advertise that a new node is in town.
node_data->token = zc_liveliness_declare_token(
z_loan(node->context->impl->session),
z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()),
NULL
);
if (!pub_result) {
return nullptr;
}
auto free_token = rcpputils::make_scope_exit(
[node]() {
if (node->data != nullptr) {
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
z_drop(z_move(node_data->token));
}
});
// TODO(Yadunund): Uncomment this after resolving build error.
// if (!z_check(node_data->token)) {
// RCUTILS_LOG_ERROR_NAMED(
// "rmw_zenoh_cpp",
// "Unable to create liveliness token for the node.");
// return nullptr;
// }

free_node_data.cancel();
free_namespace.cancel();
free_name.cancel();
free_node.cancel();
free_token.cancel();
return node;
}

Expand All @@ -204,20 +231,27 @@ rmw_destroy_node(rmw_node_t * node)
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

// Uncomment and rely if #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available wiht zenoh-c.
// Publish to the graph that a node has ridden off into the sunset
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name)
);
if (!del_result) {
return RMW_RET_ERROR;
}
// const bool del_result = PublishToken::del(
// &node->context->impl->session,
// GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name)
// );
// if (!del_result) {
// return RMW_RET_ERROR;
// }

// Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset.
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
z_drop(z_move(node_data->token));

rcutils_allocator_t * allocator = &node->context->options.allocator;

Expand Down Expand Up @@ -516,21 +550,48 @@ rmw_create_publisher(
z_undeclare_publisher(z_move(publisher_data->pub));
});

// Uncomment and rely if #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available wiht zenoh-c.
// Publish to the graph that a new publisher is in town
// TODO(Yadunund): Publish liveliness for the new publisher.
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable")
// const bool pub_result = PublishToken::put(
// &node->context->impl->session,
// GenerateToken::publisher(
// node->context->actual_domain_id,
// node->namespace_,
// node->name,
// rmw_publisher->topic_name,
// publisher_data->type_support->get_name(),
// "reliable")
// );
// if (!pub_result) {
// return nullptr;
// }
publisher_data->token = zc_liveliness_declare_token(
z_loan(node->context->impl->session),
z_keyexpr(
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable").c_str()),
NULL
);
if (!pub_result) {
return nullptr;
}
auto free_token = rcpputils::make_scope_exit(
[publisher_data]() {
if (publisher_data != nullptr) {
z_drop(z_move(publisher_data->token));
}
});
// TODO(Yadunund): Uncomment this after resolving build error.
// if (!z_check(publisher_data->token)) {
// RCUTILS_LOG_ERROR_NAMED(
// "rmw_zenoh_cpp",
// "Unable to create liveliness token for the publisher.");
// return nullptr;
// }

publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher(
rmw_publisher->topic_name, node->name, node->namespace_,
Expand All @@ -540,6 +601,7 @@ rmw_create_publisher(
node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle);
});

free_token.cancel();
remove_from_graph_cache.cancel();
undeclare_z_publisher.cancel();
free_topic_name.cancel();
Expand Down Expand Up @@ -575,22 +637,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)

auto publisher_data = static_cast<rmw_publisher_data_t *>(publisher->data);
if (publisher_data != nullptr) {
// Uncomment and rely if #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available wiht zenoh-c.
// Publish to the graph that a publisher has ridden off into the sunset
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable"
)
);
if (!del_result) {
// TODO(Yadunund): Should this really return an error?
return RMW_RET_ERROR;
}
// const bool del_result = PublishToken::del(
// &node->context->impl->session,
// GenerateToken::publisher(
// node->context->actual_domain_id,
// node->namespace_,
// node->name,
// publisher->topic_name,
// publisher_data->type_support->get_name(),
// "reliable"
// )
// );
// if (!del_result) {
// // TODO(Yadunund): Should this really return an error?
// return RMW_RET_ERROR;
// }
// TODO(Yadunund): Fix linker error.
z_drop(z_move(publisher_data->token));
node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle);

RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );
Expand Down

0 comments on commit 2bbe95a

Please sign in to comment.