Skip to content

Commit

Permalink
feat: introduce the advanced publisher/subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Jan 7, 2025
1 parent 2429bf2 commit 56c65b5
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 250 deletions.
34 changes: 0 additions & 34 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1253,38 +1253,4 @@ void GraphCache::update_event_counters(
}
}
}

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const std::string & sub_keyexpr,
const std::size_t sub_keyxpr_hash,
QueryingSubscriberCallback cb)
{
std::unordered_map<
std::string,
std::unordered_map<std::size_t, QueryingSubscriberCallback>
>::iterator cb_it = querying_subs_cbs_.find(sub_keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[sub_keyexpr] =
std::unordered_map<std::size_t, QueryingSubscriberCallback>{};
cb_it = querying_subs_cbs_.find(sub_keyexpr);
}
cb_it->second.insert(std::make_pair(sub_keyxpr_hash, std::move(cb)));
}

///=============================================================================
void GraphCache::remove_querying_subscriber_callback(
const std::string & sub_keyexpr,
const std::size_t sub_keyexpr_hash)
{
std::unordered_map<
std::string,
std::unordered_map<std::size_t, QueryingSubscriberCallback>
>::iterator cb_map_it = querying_subs_cbs_.find(sub_keyexpr);
if (cb_map_it == querying_subs_cbs_.end()) {
return;
}
cb_map_it->second.erase(sub_keyexpr_hash);
}

} // namespace rmw_zenoh_cpp
9 changes: 0 additions & 9 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,6 @@ class GraphCache final
/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const std::string & sub_keyexpr,
const std::size_t sub_keyexpr_hash,
QueryingSubscriberCallback cb);

void remove_querying_subscriber_callback(
const std::string & sub_keyexpr,
const std::size_t sub_keyexpr_hash);

private:
// Helper function to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
Expand Down
64 changes: 28 additions & 36 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,40 +107,35 @@ std::shared_ptr<PublisherData> PublisherData::make(
return nullptr;
}

zenoh::ZResult result;
std::optional<zenoh::ext::PublicationCache> pub_cache;
zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);
auto adv_pub_opts = zenoh::ext::SessionExt::AdvancedPublisherOptions::create_default();
adv_pub_opts.publisher_detection = true;
adv_pub_opts.sample_miss_detection = true;

// Create a Publication Cache if durability is transient_local.
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
zenoh::ext::SessionExt::PublicationCacheOptions pub_cache_opts =
zenoh::ext::SessionExt::PublicationCacheOptions::create_default();

pub_cache_opts.history = adapted_qos_profile.depth;
pub_cache_opts.queryable_complete = true;

std::string queryable_prefix = entity->zid();
pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix);

pub_cache = session->ext().declare_publication_cache(
pub_ke, std::move(pub_cache_opts), &result);

if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}
adv_pub_opts.cache = zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default();
adv_pub_opts.cache->max_samples = adapted_qos_profile.depth;
}

zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);
// Set congestion_control to BLOCK if appropriate.
zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default();
opts.congestion_control = Z_CONGESTION_CONTROL_DROP;
auto pub_opts = zenoh::Session::PublisherOptions::create_default();
pub_opts.congestion_control = Z_CONGESTION_CONTROL_DROP;
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
opts.reliability = Z_RELIABILITY_RELIABLE;

pub_opts.reliability = Z_RELIABILITY_RELIABLE;
if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
pub_opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
}
}
auto pub = session->declare_publisher(pub_ke, std::move(opts), &result);
adv_pub_opts.publisher_options = pub_opts;

zenoh::ZResult result;
auto adv_pub = session->ext().declare_advanced_publisher(
pub_ke, std::move(adv_pub_opts), &result);
if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}

if (result != Z_OK) {
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
Expand All @@ -165,8 +160,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
node,
std::move(entity),
std::move(session),
std::move(pub),
std::move(pub_cache),
std::move(adv_pub),
std::move(token),
type_support->data,
std::move(message_type_support)
Expand All @@ -179,8 +173,7 @@ PublisherData::PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> sess,
zenoh::Publisher pub,
std::optional<zenoh::ext::PublicationCache> pub_cache,
zenoh::ext::AdvancedPublisher pub,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
Expand All @@ -189,7 +182,6 @@ PublisherData::PublisherData(
entity_(std::move(entity)),
sess_(std::move(sess)),
pub_(std::move(pub)),
pub_cache_(std::move(pub_cache)),
token_(std::move(token)),
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
Expand Down Expand Up @@ -253,8 +245,8 @@ rmw_ret_t PublisherData::publish(
// will be encoded with CDR so it does not really matter.
zenoh::ZResult result;
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = rmw_zenoh_cpp::AttachmentData(
auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default();
opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
Expand All @@ -265,7 +257,7 @@ rmw_ret_t PublisherData::publish(

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
pub_.put(std::move(payload), std::move(options), &result);
pub_.put(std::move(payload), std::move(opts), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand Down Expand Up @@ -301,8 +293,8 @@ rmw_ret_t PublisherData::publish_serialized_message(
// will be encoded with CDR so it does not really matter.
zenoh::ZResult result;
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = rmw_zenoh_cpp::AttachmentData(
auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default();
opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

std::vector<uint8_t> raw_data(
Expand All @@ -312,7 +304,7 @@ rmw_ret_t PublisherData::publish_serialized_message(

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);
pub_.put(std::move(payload), std::move(options), &result);
pub_.put(std::move(payload), std::move(opts), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand Down
9 changes: 3 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ class PublisherData final
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
zenoh::Publisher pub,
std::optional<zenoh::ext::PublicationCache> pub_cache,
zenoh::ext::AdvancedPublisher pub,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);
Expand All @@ -111,10 +110,8 @@ class PublisherData final
std::shared_ptr<liveliness::Entity> entity_;
// A shared session.
std::shared_ptr<zenoh::Session> sess_;
// An owned publisher.
zenoh::Publisher pub_;
// Optional publication cache when durability is transient_local.
std::optional<zenoh::ext::PublicationCache> pub_cache_;
// An owned AdvancedPublisher.
zenoh::ext::AdvancedPublisher pub_;
// Liveliness token for the publisher.
std::optional<zenoh::LivelinessToken> token_;
// Type support fields
Expand Down
Loading

0 comments on commit 56c65b5

Please sign in to comment.