-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce the advanced publisher and subscriber #368
base: rolling
Are you sure you want to change the base?
Changes from all commits
b8ce3c6
ab081f2
05228d0
0b51b3f
25c6d55
a31fd16
448bb6c
9cc776b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,40 +107,38 @@ std::shared_ptr<PublisherData> PublisherData::make( | |
return nullptr; | ||
} | ||
|
||
zenoh::ZResult result; | ||
std::optional<zenoh::ext::PublicationCache> pub_cache; | ||
zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); | ||
auto adv_pub_opts = zenoh::ext::SessionExt::AdvancedPublisherOptions::create_default(); | ||
|
||
// Create a Publication Cache if durability is transient_local. | ||
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { | ||
zenoh::ext::SessionExt::PublicationCacheOptions pub_cache_opts = | ||
zenoh::ext::SessionExt::PublicationCacheOptions::create_default(); | ||
|
||
pub_cache_opts.history = adapted_qos_profile.depth; | ||
pub_cache_opts.queryable_complete = true; | ||
|
||
std::string queryable_prefix = entity->zid(); | ||
pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix); | ||
|
||
pub_cache = session->ext().declare_publication_cache( | ||
pub_ke, std::move(pub_cache_opts), &result); | ||
Comment on lines
-118
to
-125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We previously needed to do this to fix #263. Can you confirm that we no longer need to explicitly call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should no longer need this hack. Otherwise, it's a bug in zenoh. |
||
|
||
if (result != Z_OK) { | ||
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); | ||
return nullptr; | ||
} | ||
// Retransmission can only be done if history is enabled on subscriber side. | ||
adv_pub_opts.publisher_detection = true; | ||
// Allow this publisher to be detected through liveliness. | ||
adv_pub_opts.sample_miss_detection = true; | ||
adv_pub_opts.cache = | ||
zenoh::ext::SessionExt::AdvancedPublisherOptions::CacheOptions::create_default(); | ||
adv_pub_opts.cache->max_samples = adapted_qos_profile.depth; | ||
} | ||
|
||
zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); | ||
// Set congestion_control to BLOCK if appropriate. | ||
zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default(); | ||
opts.congestion_control = Z_CONGESTION_CONTROL_DROP; | ||
auto pub_opts = zenoh::Session::PublisherOptions::create_default(); | ||
pub_opts.congestion_control = Z_CONGESTION_CONTROL_DROP; | ||
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { | ||
opts.reliability = Z_RELIABILITY_RELIABLE; | ||
|
||
pub_opts.reliability = Z_RELIABILITY_RELIABLE; | ||
if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { | ||
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; | ||
pub_opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; | ||
} | ||
} | ||
auto pub = session->declare_publisher(pub_ke, std::move(opts), &result); | ||
adv_pub_opts.publisher_options = pub_opts; | ||
|
||
zenoh::ZResult result; | ||
auto adv_pub = session->ext().declare_advanced_publisher( | ||
pub_ke, std::move(adv_pub_opts), &result); | ||
if (result != Z_OK) { | ||
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); | ||
return nullptr; | ||
} | ||
|
||
if (result != Z_OK) { | ||
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); | ||
|
@@ -165,8 +163,7 @@ std::shared_ptr<PublisherData> PublisherData::make( | |
node, | ||
std::move(entity), | ||
std::move(session), | ||
std::move(pub), | ||
std::move(pub_cache), | ||
std::move(adv_pub), | ||
std::move(token), | ||
type_support->data, | ||
std::move(message_type_support) | ||
|
@@ -179,8 +176,7 @@ PublisherData::PublisherData( | |
const rmw_node_t * rmw_node, | ||
std::shared_ptr<liveliness::Entity> entity, | ||
std::shared_ptr<zenoh::Session> sess, | ||
zenoh::Publisher pub, | ||
std::optional<zenoh::ext::PublicationCache> pub_cache, | ||
zenoh::ext::AdvancedPublisher pub, | ||
zenoh::LivelinessToken token, | ||
const void * type_support_impl, | ||
std::unique_ptr<MessageTypeSupport> type_support) | ||
|
@@ -189,7 +185,6 @@ PublisherData::PublisherData( | |
entity_(std::move(entity)), | ||
sess_(std::move(sess)), | ||
pub_(std::move(pub)), | ||
pub_cache_(std::move(pub_cache)), | ||
token_(std::move(token)), | ||
type_support_impl_(type_support_impl), | ||
type_support_(std::move(type_support)), | ||
|
@@ -253,8 +248,8 @@ rmw_ret_t PublisherData::publish( | |
// will be encoded with CDR so it does not really matter. | ||
zenoh::ZResult result; | ||
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); | ||
auto options = zenoh::Publisher::PutOptions::create_default(); | ||
options.attachment = rmw_zenoh_cpp::AttachmentData( | ||
auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default(); | ||
opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData( | ||
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); | ||
|
||
// TODO(ahcorde): shmbuf | ||
|
@@ -265,7 +260,7 @@ rmw_ret_t PublisherData::publish( | |
|
||
TRACETOOLS_TRACEPOINT( | ||
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp); | ||
pub_.put(std::move(payload), std::move(options), &result); | ||
pub_.put(std::move(payload), std::move(opts), &result); | ||
if (result != Z_OK) { | ||
if (result == Z_ESESSION_CLOSED) { | ||
RMW_ZENOH_LOG_WARN_NAMED( | ||
|
@@ -301,8 +296,8 @@ rmw_ret_t PublisherData::publish_serialized_message( | |
// will be encoded with CDR so it does not really matter. | ||
zenoh::ZResult result; | ||
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); | ||
auto options = zenoh::Publisher::PutOptions::create_default(); | ||
options.attachment = rmw_zenoh_cpp::AttachmentData( | ||
auto opts = zenoh::ext::AdvancedPublisher::PutOptions::create_default(); | ||
opts.put_options.attachment = rmw_zenoh_cpp::AttachmentData( | ||
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); | ||
|
||
std::vector<uint8_t> raw_data( | ||
|
@@ -312,7 +307,7 @@ rmw_ret_t PublisherData::publish_serialized_message( | |
|
||
TRACETOOLS_TRACEPOINT( | ||
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp); | ||
pub_.put(std::move(payload), std::move(options), &result); | ||
pub_.put(std::move(payload), std::move(opts), &result); | ||
if (result != Z_OK) { | ||
if (result == Z_ESESSION_CLOSED) { | ||
RMW_ZENOH_LOG_WARN_NAMED( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll also need to delete
querying_subs_cbs_
and updategraph_cache.cpp
to not iterate overquerying_subs_cbs_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 9cc776b