Skip to content

Commit

Permalink
Add advanced pub/sub heartbeat based last sample miss detection (#1701)
Browse files Browse the repository at this point in the history
* Add advanced publisher heartbeat

* Add subscriber heartbeat listener

* Remove unrelated history conf condition

* Fix session close deadlock

* Add heartbeat test

* Rename heartbeat API

* Change subscriber RecoveryConf to only allow one of periodic_queries or heartbeat

* Fix debug log string

* Fix known issues with heartbeat subscriber

* Change heartbeat test keyexpr

* Update z_advanced_sub example to use heartbeat

* Fix use statements
  • Loading branch information
oteffahi authored Jan 13, 2025
1 parent f9237ab commit 70820dd
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 38 deletions.
4 changes: 2 additions & 2 deletions zenoh-ext/examples/examples/z_advanced_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;
use clap::{arg, Parser};
use zenoh::{config::Config, key_expr::KeyExpr};
use zenoh_config::ModeDependentValue;
use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig};
use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig};
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
Expand All @@ -33,7 +33,7 @@ async fn main() {
let publisher = session
.declare_publisher(&key_expr)
.cache(CacheConfig::default().max_samples(history))
.sample_miss_detection()
.sample_miss_detection(MissDetectionConfig::default().heartbeat(Duration::from_millis(500)))
.publisher_detection()
.await
.unwrap();
Expand Down
4 changes: 1 addition & 3 deletions zenoh-ext/examples/examples/z_advanced_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::{arg, Parser};
use zenoh::config::Config;
use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
Expand All @@ -32,7 +30,7 @@ async fn main() {
let subscriber = session
.declare_subscriber(key_expr)
.history(HistoryConfig::default().detect_late_publishers())
.recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))
.recovery(RecoveryConfig::default().heartbeat())
.subscriber_detection()
.await
.unwrap();
Expand Down
66 changes: 60 additions & 6 deletions zenoh-ext/src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@
//
use std::{
future::{IntoFuture, Ready},
sync::atomic::{AtomicU32, Ordering},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

use zenoh::{
bytes::{Encoding, OptionZBytes, ZBytes},
internal::{
bail,
runtime::ZRuntime,
traits::{
EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
},
TerminatableTask,
},
key_expr::{keyexpr, KeyExpr},
liveliness::LivelinessToken,
Expand All @@ -37,7 +43,10 @@ use zenoh::{
};
use zenoh_macros::ke;

use crate::advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC};
use crate::{
advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC},
z_serialize,
};

pub(crate) static KE_PUB: &keyexpr = ke!("pub");

Expand All @@ -49,6 +58,21 @@ pub(crate) enum Sequencing {
SequenceNumber,
}

#[derive(Default)]
#[zenoh_macros::unstable]
pub struct MissDetectionConfig {
pub(crate) state_publisher: Option<Duration>,
}

#[zenoh_macros::unstable]
impl MissDetectionConfig {
#[zenoh_macros::unstable]
pub fn heartbeat(mut self, period: Duration) -> Self {
self.state_publisher = Some(period);
self
}
}

/// The builder of PublicationCache, allowing to configure it.
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[zenoh_macros::unstable]
Expand All @@ -63,6 +87,7 @@ pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
is_express: bool,
meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
sequencing: Sequencing,
miss_config: Option<MissDetectionConfig>,
liveliness: bool,
cache: bool,
history: CacheConfig,
Expand All @@ -83,6 +108,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
is_express: builder.is_express,
meta_key_expr: None,
sequencing: Sequencing::None,
miss_config: None,
liveliness: false,
cache: false,
history: CacheConfig::default(),
Expand Down Expand Up @@ -118,8 +144,9 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
///
/// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled.
#[zenoh_macros::unstable]
pub fn sample_miss_detection(mut self) -> Self {
pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self {
self.sequencing = Sequencing::SequenceNumber;
self.miss_config = Some(config);
self
}

Expand Down Expand Up @@ -230,9 +257,10 @@ impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
#[zenoh_macros::unstable]
pub struct AdvancedPublisher<'a> {
publisher: Publisher<'a>,
seqnum: Option<AtomicU32>,
seqnum: Option<Arc<AtomicU32>>,
cache: Option<AdvancedCache>,
_token: Option<LivelinessToken>,
_state_publisher: Option<TerminatableTask>,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -270,7 +298,7 @@ impl<'a> AdvancedPublisher<'a> {
};

let seqnum = match conf.sequencing {
Sequencing::SequenceNumber => Some(AtomicU32::new(0)),
Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))),
Sequencing::Timestamp => {
if conf.session.hlc().is_none() {
bail!(
Expand Down Expand Up @@ -299,18 +327,44 @@ impl<'a> AdvancedPublisher<'a> {
Some(
conf.session
.liveliness()
.declare_token(prefix / &key_expr)
.declare_token(&prefix / &key_expr)
.wait()?,
)
} else {
None
};

let state_publisher = if let Some(period) = conf.miss_config.and_then(|c| c.state_publisher)
{
if let Some(seqnum) = seqnum.as_ref() {
let seqnum = seqnum.clone();

let publisher = conf.session.declare_publisher(prefix / &key_expr).wait()?;
Some(TerminatableTask::spawn_abortable(
ZRuntime::Net,
async move {
loop {
tokio::time::sleep(period).await;
let seqnum = seqnum.load(Ordering::Relaxed);
if seqnum > 0 {
let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
}
}
},
))
} else {
None
}
} else {
None
};

Ok(AdvancedPublisher {
publisher,
seqnum,
cache,
_token: token,
_state_publisher: state_publisher,
})
}

Expand Down
Loading

0 comments on commit 70820dd

Please sign in to comment.