Skip to content

Commit

Permalink
Make transient_local work
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 16, 2024
1 parent 13509dc commit c2b749b
Showing 1 changed file with 29 additions and 6 deletions.
35 changes: 29 additions & 6 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,12 @@ rmw_create_publisher(
return nullptr;
}
}

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[rmw_create_publisher] %s",
topic_name);

// Adapt any 'best available' QoS options
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher(
Expand Down Expand Up @@ -584,7 +590,6 @@ rmw_create_publisher(
}

// Create a Publication Cache if durability is transient_local.
publisher_data->pub_cache = ze_publication_cache_null();
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default();
pub_cache_opts.history = adapted_qos_profile.depth;
Expand Down Expand Up @@ -1192,6 +1197,11 @@ rmw_create_subscription(
}
RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[rmw_create_subscription] %s",
topic_name);

const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports);
if (type_support == nullptr) {
// error was already set by find_message_type_support
Expand Down Expand Up @@ -1327,13 +1337,18 @@ rmw_create_subscription(
// adapted_qos_profile.
// TODO(Yadunund): Rely on a separate function to return the sub
// as we start supporting more qos settings.
z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr));
auto always_drop_keystr = rcpputils::make_scope_exit(
[&owned_key_str]() {
z_drop(z_move(owned_key_str));
});

sub_data->reliable = false;
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default();
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
sub_options.reliability = Z_RELIABILITY_RELIABLE;
sub_data->reliable = true;
sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE;
}
sub_data->sub = ze_declare_querying_subscriber(
z_loan(context_impl->session),
Expand All @@ -1345,6 +1360,7 @@ rmw_create_subscription(
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
return nullptr;
}
printf("Created querying sub %s\n", owned_key_str._cstr);
}
// Create a regular subscriber for all other durability settings.
else {
Expand All @@ -1363,14 +1379,21 @@ rmw_create_subscription(
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
return nullptr;
}
printf("Created regular sub %s\n", owned_key_str._cstr);
}

auto undeclare_z_sub = rcpputils::make_scope_exit(
[sub_data]() {
// TODO(Yadunund): Check if this is okay or if it is better
// to cast into explicit types and call appropriate undeclare method.
// See rmw_destroy_subscription()
z_drop(z_move(sub_data->sub));
z_owned_subscriber_t * sub = std::get_if<z_owned_subscriber_t>(&sub_data->sub);
if (sub == nullptr || z_undeclare_subscriber(sub)) {
RMW_SET_ERROR_MSG("failed to undeclare sub");
} else {
ze_owned_querying_subscriber_t * querying_sub =
std::get_if<ze_owned_querying_subscriber_t>(&sub_data->sub);
if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) {
RMW_SET_ERROR_MSG("failed to undeclare sub");
}
}
});

// Publish to the graph that a new subscription is in town
Expand Down

0 comments on commit c2b749b

Please sign in to comment.