From d1142287fda38ed52f0929f8f935730f6dc6bbdf Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 8 Jan 2025 19:40:13 +0100 Subject: [PATCH 01/12] Add advanced publisher heartbeat --- zenoh-ext/examples/examples/z_advanced_pub.rs | 6 +- zenoh-ext/src/advanced_publisher.rs | 66 +++++++++++++++++-- zenoh-ext/src/lib.rs | 2 +- zenoh-ext/src/publisher_ext.rs | 14 ++-- zenoh-ext/tests/advanced.rs | 10 +-- 5 files changed, 80 insertions(+), 18 deletions(-) diff --git a/zenoh-ext/examples/examples/z_advanced_pub.rs b/zenoh-ext/examples/examples/z_advanced_pub.rs index 6437515c8..56d05ba7c 100644 --- a/zenoh-ext/examples/examples/z_advanced_pub.rs +++ b/zenoh-ext/examples/examples/z_advanced_pub.rs @@ -16,7 +16,7 @@ use std::time::Duration; use clap::{arg, Parser}; use zenoh::{config::Config, key_expr::KeyExpr}; use zenoh_config::ModeDependentValue; -use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig}; +use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig}; use zenoh_ext_examples::CommonArgs; #[tokio::main] @@ -33,7 +33,9 @@ async fn main() { let publisher = session .declare_publisher(&key_expr) .cache(CacheConfig::default().max_samples(history)) - .sample_miss_detection() + .sample_miss_detection( + MissDetectionConfig::default().last_sample_miss_detection(Duration::from_secs(5)), + ) .publisher_detection() .await .unwrap(); diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 9afc2fb09..2cab49b83 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -13,16 +13,22 @@ // use std::{ future::{IntoFuture, Ready}, - sync::atomic::{AtomicU32, Ordering}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::Duration, }; use zenoh::{ bytes::{Encoding, OptionZBytes, ZBytes}, internal::{ bail, + runtime::ZRuntime, traits::{ EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, }, + TerminatableTask, }, key_expr::{keyexpr, KeyExpr}, liveliness::LivelinessToken, @@ -37,7 +43,10 @@ use zenoh::{ }; use zenoh_macros::ke; -use crate::advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC}; +use crate::{ + advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC}, + z_serialize, +}; pub(crate) static KE_PUB: &keyexpr = ke!("pub"); @@ -49,6 +58,21 @@ pub(crate) enum Sequencing { SequenceNumber, } +#[derive(Default)] +#[zenoh_macros::unstable] +pub struct MissDetectionConfig { + pub(crate) state_publisher: Option, +} + +#[zenoh_macros::unstable] +impl MissDetectionConfig { + #[zenoh_macros::unstable] + pub fn last_sample_miss_detection(mut self, period: Duration) -> Self { + self.state_publisher = Some(period); + self + } +} + /// The builder of PublicationCache, allowing to configure it. #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[zenoh_macros::unstable] @@ -63,6 +87,7 @@ pub struct AdvancedPublisherBuilder<'a, 'b, 'c> { is_express: bool, meta_key_expr: Option>>, sequencing: Sequencing, + miss_config: Option, liveliness: bool, cache: bool, history: CacheConfig, @@ -83,6 +108,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> { is_express: builder.is_express, meta_key_expr: None, sequencing: Sequencing::None, + miss_config: None, liveliness: false, cache: false, history: CacheConfig::default(), @@ -118,8 +144,9 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> { /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled. #[zenoh_macros::unstable] - pub fn sample_miss_detection(mut self) -> Self { + pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self { self.sequencing = Sequencing::SequenceNumber; + self.miss_config = Some(config); self } @@ -230,9 +257,10 @@ impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> { #[zenoh_macros::unstable] pub struct AdvancedPublisher<'a> { publisher: Publisher<'a>, - seqnum: Option, + seqnum: Option>, cache: Option, _token: Option, + _state_publisher: Option, } #[zenoh_macros::unstable] @@ -270,7 +298,7 @@ impl<'a> AdvancedPublisher<'a> { }; let seqnum = match conf.sequencing { - Sequencing::SequenceNumber => Some(AtomicU32::new(0)), + Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))), Sequencing::Timestamp => { if conf.session.hlc().is_none() { bail!( @@ -299,18 +327,44 @@ impl<'a> AdvancedPublisher<'a> { Some( conf.session .liveliness() - .declare_token(prefix / &key_expr) + .declare_token(&prefix / &key_expr) .wait()?, ) } else { None }; + let state_publisher = if let Some(period) = conf.miss_config.and_then(|c| c.state_publisher) + { + if let Some(seqnum) = seqnum.as_ref() { + let seqnum = seqnum.clone(); + + let publisher = conf.session.declare_publisher(prefix / &key_expr).wait()?; + Some(TerminatableTask::spawn_abortable( + ZRuntime::Net, + async move { + loop { + tokio::time::sleep(period).await; + let seqnum = seqnum.load(Ordering::Relaxed); + if seqnum > 0 { + let _ = publisher.put(z_serialize(&(seqnum - 1))).await; + } + } + }, + )) + } else { + None + } + } else { + None + }; + Ok(AdvancedPublisher { publisher, seqnum, cache, _token: token, + _state_publisher: state_publisher, }) } diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 2d73bd16c..60d805730 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -41,7 +41,7 @@ pub use crate::serialization::{ #[allow(deprecated)] pub use crate::{ advanced_cache::{CacheConfig, RepliesConfig}, - advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder}, + advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, MissDetectionConfig}, advanced_subscriber::{ AdvancedSubscriber, AdvancedSubscriberBuilder, HistoryConfig, Miss, RecoveryConfig, SampleMissHandlerUndeclaration, SampleMissListener, SampleMissListenerBuilder, diff --git a/zenoh-ext/src/publisher_ext.rs b/zenoh-ext/src/publisher_ext.rs index de045d0ff..2a4866623 100644 --- a/zenoh-ext/src/publisher_ext.rs +++ b/zenoh-ext/src/publisher_ext.rs @@ -13,7 +13,7 @@ // use zenoh::pubsub::PublisherBuilder; -use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder}; +use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder, MissDetectionConfig}; /// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder) #[zenoh_macros::unstable] @@ -27,7 +27,10 @@ pub trait AdvancedPublisherBuilderExt<'a, 'b, 'c> { /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled. #[zenoh_macros::unstable] - fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c>; + fn sample_miss_detection( + self, + config: MissDetectionConfig, + ) -> AdvancedPublisherBuilder<'a, 'b, 'c>; /// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber). /// @@ -53,8 +56,11 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilderExt<'a, 'b, 'c> for PublisherBuilder<'a /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled. #[zenoh_macros::unstable] - fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c> { - AdvancedPublisherBuilder::new(self).sample_miss_detection() + fn sample_miss_detection( + self, + config: MissDetectionConfig, + ) -> AdvancedPublisherBuilder<'a, 'b, 'c> { + AdvancedPublisherBuilder::new(self).sample_miss_detection(config) } /// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber). diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index f7d3593c6..f21cfb3d6 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -16,7 +16,7 @@ use zenoh::sample::SampleKind; use zenoh_config::{EndPoint, ModeDependentValue, WhatAmI}; use zenoh_ext::{ AdvancedPublisherBuilderExt, AdvancedSubscriberBuilderExt, CacheConfig, HistoryConfig, - RecoveryConfig, + MissDetectionConfig, RecoveryConfig, }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -170,7 +170,7 @@ async fn test_advanced_retransmission() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) .cache(CacheConfig::default().max_samples(10)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -299,7 +299,7 @@ async fn test_advanced_retransmission_periodic() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) .cache(CacheConfig::default().max_samples(10)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -421,7 +421,7 @@ async fn test_advanced_sample_miss() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_SAMPLE_MISS_KEYEXPR) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -544,7 +544,7 @@ async fn test_advanced_retransmission_sample_miss() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_SAMPLE_MISS_KEYEXPR) .cache(CacheConfig::default().max_samples(1)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); From 5748a1d91fc99a28730122f06ee8f066cf1a38e9 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Thu, 9 Jan 2025 19:04:09 +0100 Subject: [PATCH 02/12] Add subscriber heartbeat listener --- zenoh-ext/examples/examples/z_advanced_sub.rs | 6 +- zenoh-ext/src/advanced_subscriber.rs | 136 +++++++++++++++++- 2 files changed, 138 insertions(+), 4 deletions(-) diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs index 5bea70f7d..5d371a407 100644 --- a/zenoh-ext/examples/examples/z_advanced_sub.rs +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -32,7 +32,11 @@ async fn main() { let subscriber = session .declare_subscriber(key_expr) .history(HistoryConfig::default().detect_late_publishers()) - .recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1)))) + .recovery( + RecoveryConfig::default() + .periodic_queries(Some(Duration::from_secs(1))) + .heartbeat_listener(true), + ) .subscriber_detection() .await .unwrap(); diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index e922c4ecb..b5504ab91 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -16,6 +16,7 @@ use std::{collections::BTreeMap, future::IntoFuture, str::FromStr}; use zenoh::{ config::ZenohId, handlers::{Callback, IntoHandler}, + internal::TerminatableTask, key_expr::KeyExpr, liveliness::{LivelinessSubscriberBuilder, LivelinessToken}, pubsub::SubscriberBuilder, @@ -46,7 +47,10 @@ use { zenoh::Result as ZResult, }; -use crate::advanced_cache::{ke_liveliness, KE_UHLC}; +use crate::{ + advanced_cache::{ke_liveliness, KE_UHLC}, + z_deserialize, +}; #[derive(Debug, Default, Clone)] /// Configure query for historical data. @@ -85,11 +89,12 @@ impl HistoryConfig { } } -#[derive(Default)] +#[derive(Default, Clone, Copy)] /// Configure retransmission. #[zenoh_macros::unstable] pub struct RecoveryConfig { periodic_queries: Option, + heartbeat_listener: bool, } impl std::fmt::Debug for RecoveryConfig { @@ -116,6 +121,19 @@ impl RecoveryConfig { self.periodic_queries = period; self } + + /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher). + /// + /// This allows to periodically receive the last published Sample's sequence number and check for misses. + /// Heartbeat listener must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher) + /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and + /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). + #[zenoh_macros::unstable] + #[inline] + pub fn heartbeat_listener(mut self, enabled: bool) -> Self { + self.heartbeat_listener = enabled; + self + } } /// The builder of an [`AdvancedSubscriber`], allowing to configure it. @@ -441,6 +459,7 @@ pub struct AdvancedSubscriber { subscriber: Subscriber<()>, receiver: Receiver, liveliness_subscriber: Option>, + _heartbeat_subscriber_task: Option, } #[zenoh_macros::unstable] @@ -733,12 +752,13 @@ impl AdvancedSubscriber { .wait(); } - let liveliness_subscriber = if let Some(historyconf) = conf.history { + let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() { if historyconf.liveliness { let live_callback = { let session = conf.session.clone(); let statesref = statesref.clone(); let key_expr = key_expr.clone().into_owned(); + let historyconf = historyconf.clone(); move |s: Sample| { if s.kind() == SampleKind::Put { if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { @@ -921,6 +941,115 @@ impl AdvancedSubscriber { None }; + let heartbeat_subscriber_task = if let Some(_historyconf) = conf.history { + if retransmission.is_some_and(|r| r.heartbeat_listener) { + let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; + let heartbeat_sub = conf + .session + .declare_subscriber(ke_heartbeat_sub) + .allowed_origin(conf.origin) + .wait()?; + + let statesref = statesref.clone(); + let task = async move { + loop { + if let Ok(sample_hb) = heartbeat_sub.recv_async().await { + if sample_hb.kind() != SampleKind::Put { + continue; + } + + let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr(); + let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else { + continue; + }; + let source_id = { + let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) + else { + continue; + }; + let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) + else { + continue; + }; + EntityGlobalId::new(zid, eid) + }; + + let Ok(heartbeat_sn) = z_deserialize::(sample_hb.payload()) else { + tracing::debug!( + "Skipping invalid heartbeat payload on '{}'", + heartbeat_keyexpr + ); + continue; + }; + + let mut lock = zlock!(statesref); + let states = &mut *lock; + let entry = states.sequenced_states.entry(source_id); + if matches!(&entry, Entry::Vacant(_)) + && states.global_pending_queries > 0 + { + tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr); + continue; + } + + // FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: BTreeMap::new(), + }); + // TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time + + // check that it's not an old sn or a pending sample's sn + if (state.last_delivered.is_none() + || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) + && !state.pending_samples.contains_key(&heartbeat_sn) + { + let seq_num_range = seq_num_range( + state.last_delivered.map(|s| s + 1), + Some(heartbeat_sn), + ); + + let session = states.session.clone(); + let key_expr = states.key_expr.clone().into_owned(); + let query_target = states.query_target; + let query_timeout = states.query_timeout; + state.pending_queries += 1; + drop(lock); + + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), + }; + let _ = session + .get(Selector::from((heartbeat_keyexpr, seq_num_range))) + .callback({ + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + } + } + }; + Some(TerminatableTask::spawn_abortable(ZRuntime::Net, task)) + } else { + None + } + } else { + None + }; + if conf.liveliness { let prefix = KE_ADV_PREFIX / KE_SUB @@ -944,6 +1073,7 @@ impl AdvancedSubscriber { subscriber, receiver, liveliness_subscriber, + _heartbeat_subscriber_task: heartbeat_subscriber_task, }; Ok(reliable_subscriber) From 045d3077cc0b4ddb3c7b721c269be51fa52b2f8e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Thu, 9 Jan 2025 19:10:41 +0100 Subject: [PATCH 03/12] Remove unrelated history conf condition --- zenoh-ext/src/advanced_subscriber.rs | 178 +++++++++++++-------------- 1 file changed, 85 insertions(+), 93 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index b5504ab91..507695405 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -941,111 +941,103 @@ impl AdvancedSubscriber { None }; - let heartbeat_subscriber_task = if let Some(_historyconf) = conf.history { - if retransmission.is_some_and(|r| r.heartbeat_listener) { - let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; - let heartbeat_sub = conf - .session - .declare_subscriber(ke_heartbeat_sub) - .allowed_origin(conf.origin) - .wait()?; - - let statesref = statesref.clone(); - let task = async move { - loop { - if let Ok(sample_hb) = heartbeat_sub.recv_async().await { - if sample_hb.kind() != SampleKind::Put { - continue; - } + let heartbeat_subscriber_task = if retransmission.is_some_and(|r| r.heartbeat_listener) { + let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; + let heartbeat_sub = conf + .session + .declare_subscriber(ke_heartbeat_sub) + .allowed_origin(conf.origin) + .wait()?; - let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr(); - let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else { + let statesref = statesref.clone(); + let task = async move { + loop { + if let Ok(sample_hb) = heartbeat_sub.recv_async().await { + if sample_hb.kind() != SampleKind::Put { + continue; + } + + let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr(); + let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else { + continue; + }; + let source_id = { + let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else { continue; }; - let source_id = { - let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) - else { - continue; - }; - let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) - else { - continue; - }; - EntityGlobalId::new(zid, eid) - }; - - let Ok(heartbeat_sn) = z_deserialize::(sample_hb.payload()) else { - tracing::debug!( - "Skipping invalid heartbeat payload on '{}'", - heartbeat_keyexpr - ); + let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else { continue; }; + EntityGlobalId::new(zid, eid) + }; + + let Ok(heartbeat_sn) = z_deserialize::(sample_hb.payload()) else { + tracing::debug!( + "Skipping invalid heartbeat payload on '{}'", + heartbeat_keyexpr + ); + continue; + }; + + let mut lock = zlock!(statesref); + let states = &mut *lock; + let entry = states.sequenced_states.entry(source_id); + if matches!(&entry, Entry::Vacant(_)) && states.global_pending_queries > 0 { + tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr); + continue; + } - let mut lock = zlock!(statesref); - let states = &mut *lock; - let entry = states.sequenced_states.entry(source_id); - if matches!(&entry, Entry::Vacant(_)) - && states.global_pending_queries > 0 - { - tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr); - continue; - } + // FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: BTreeMap::new(), + }); + // TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time - // FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample - let state = entry.or_insert(SourceState:: { - last_delivered: None, - pending_queries: 0, - pending_samples: BTreeMap::new(), - }); - // TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time - - // check that it's not an old sn or a pending sample's sn - if (state.last_delivered.is_none() - || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) - && !state.pending_samples.contains_key(&heartbeat_sn) - { - let seq_num_range = seq_num_range( - state.last_delivered.map(|s| s + 1), - Some(heartbeat_sn), - ); + // check that it's not an old sn or a pending sample's sn + if (state.last_delivered.is_none() + || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) + && !state.pending_samples.contains_key(&heartbeat_sn) + { + let seq_num_range = seq_num_range( + state.last_delivered.map(|s| s + 1), + Some(heartbeat_sn), + ); + + let session = states.session.clone(); + let key_expr = states.key_expr.clone().into_owned(); + let query_target = states.query_target; + let query_timeout = states.query_timeout; + state.pending_queries += 1; + drop(lock); - let session = states.session.clone(); - let key_expr = states.key_expr.clone().into_owned(); - let query_target = states.query_target; - let query_timeout = states.query_timeout; - state.pending_queries += 1; - drop(lock); - - let handler = SequencedRepliesHandler { - source_id, - statesref: statesref.clone(), - }; - let _ = session - .get(Selector::from((heartbeat_keyexpr, seq_num_range))) - .callback({ - move |r: Reply| { - if let Ok(s) = r.into_result() { - if key_expr.intersects(s.key_expr()) { - let states = &mut *zlock!(handler.statesref); - handle_sample(states, s); - } + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), + }; + let _ = session + .get(Selector::from((heartbeat_keyexpr, seq_num_range))) + .callback({ + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s); } } - }) - .consolidation(ConsolidationMode::None) - .accept_replies(ReplyKeyExpr::Any) - .target(query_target) - .timeout(query_timeout) - .wait(); - } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); } } - }; - Some(TerminatableTask::spawn_abortable(ZRuntime::Net, task)) - } else { - None - } + } + }; + Some(TerminatableTask::spawn_abortable(ZRuntime::Net, task)) } else { None }; From 9dea7c7f66e5905ebff86645a52f242ce47b2211 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 10 Jan 2025 17:01:27 +0100 Subject: [PATCH 04/12] Fix session close deadlock --- zenoh-ext/src/advanced_subscriber.rs | 170 +++++++++++++-------------- 1 file changed, 82 insertions(+), 88 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 507695405..763ea8633 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -16,7 +16,6 @@ use std::{collections::BTreeMap, future::IntoFuture, str::FromStr}; use zenoh::{ config::ZenohId, handlers::{Callback, IntoHandler}, - internal::TerminatableTask, key_expr::KeyExpr, liveliness::{LivelinessSubscriberBuilder, LivelinessToken}, pubsub::SubscriberBuilder, @@ -459,7 +458,7 @@ pub struct AdvancedSubscriber { subscriber: Subscriber<()>, receiver: Receiver, liveliness_subscriber: Option>, - _heartbeat_subscriber_task: Option, + _heartbeat_subscriber: Option>, } #[zenoh_macros::unstable] @@ -941,103 +940,98 @@ impl AdvancedSubscriber { None }; - let heartbeat_subscriber_task = if retransmission.is_some_and(|r| r.heartbeat_listener) { + let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat_listener) { let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; + let statesref = statesref.clone(); let heartbeat_sub = conf .session .declare_subscriber(ke_heartbeat_sub) - .allowed_origin(conf.origin) - .wait()?; - - let statesref = statesref.clone(); - let task = async move { - loop { - if let Ok(sample_hb) = heartbeat_sub.recv_async().await { - if sample_hb.kind() != SampleKind::Put { - continue; - } + .callback(move |sample_hb| { + if sample_hb.kind() != SampleKind::Put { + return; + } - let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr(); - let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else { - continue; + let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr(); + let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else { + return; + }; + let source_id = { + let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else { + return; }; - let source_id = { - let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else { - continue; - }; - let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else { - continue; - }; - EntityGlobalId::new(zid, eid) + let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else { + return; }; + EntityGlobalId::new(zid, eid) + }; + + let Ok(heartbeat_sn) = z_deserialize::(sample_hb.payload()) else { + tracing::debug!( + "Skipping invalid heartbeat payload on '{}'", + heartbeat_keyexpr + ); + return; + }; + + let mut lock = zlock!(statesref); + let states = &mut *lock; + let entry = states.sequenced_states.entry(source_id); + if matches!(&entry, Entry::Vacant(_)) && states.global_pending_queries > 0 { + tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr); + return; + } - let Ok(heartbeat_sn) = z_deserialize::(sample_hb.payload()) else { - tracing::debug!( - "Skipping invalid heartbeat payload on '{}'", - heartbeat_keyexpr - ); - continue; + // FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: BTreeMap::new(), + }); + // TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time + + // check that it's not an old sn or a pending sample's sn + if (state.last_delivered.is_none() + || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) + && !state.pending_samples.contains_key(&heartbeat_sn) + { + let seq_num_range = seq_num_range( + state.last_delivered.map(|s| s + 1), + Some(heartbeat_sn), + ); + + let session = states.session.clone(); + let key_expr = states.key_expr.clone().into_owned(); + let query_target = states.query_target; + let query_timeout = states.query_timeout; + state.pending_queries += 1; + drop(lock); + + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), }; - - let mut lock = zlock!(statesref); - let states = &mut *lock; - let entry = states.sequenced_states.entry(source_id); - if matches!(&entry, Entry::Vacant(_)) && states.global_pending_queries > 0 { - tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr); - continue; - } - - // FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample - let state = entry.or_insert(SourceState:: { - last_delivered: None, - pending_queries: 0, - pending_samples: BTreeMap::new(), - }); - // TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time - - // check that it's not an old sn or a pending sample's sn - if (state.last_delivered.is_none() - || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) - && !state.pending_samples.contains_key(&heartbeat_sn) - { - let seq_num_range = seq_num_range( - state.last_delivered.map(|s| s + 1), - Some(heartbeat_sn), - ); - - let session = states.session.clone(); - let key_expr = states.key_expr.clone().into_owned(); - let query_target = states.query_target; - let query_timeout = states.query_timeout; - state.pending_queries += 1; - drop(lock); - - let handler = SequencedRepliesHandler { - source_id, - statesref: statesref.clone(), - }; - let _ = session - .get(Selector::from((heartbeat_keyexpr, seq_num_range))) - .callback({ - move |r: Reply| { - if let Ok(s) = r.into_result() { - if key_expr.intersects(s.key_expr()) { - let states = &mut *zlock!(handler.statesref); - handle_sample(states, s); - } + let _ = session + .get(Selector::from((heartbeat_keyexpr, seq_num_range))) + .callback({ + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s); } } - }) - .consolidation(ConsolidationMode::None) - .accept_replies(ReplyKeyExpr::Any) - .target(query_target) - .timeout(query_timeout) - .wait(); - } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); } - } - }; - Some(TerminatableTask::spawn_abortable(ZRuntime::Net, task)) + }) + .allowed_origin(conf.origin) + .wait()?; + Some(heartbeat_sub) } else { None }; @@ -1065,7 +1059,7 @@ impl AdvancedSubscriber { subscriber, receiver, liveliness_subscriber, - _heartbeat_subscriber_task: heartbeat_subscriber_task, + _heartbeat_subscriber: heartbeat_subscriber, }; Ok(reliable_subscriber) From dd5a3926b0de62cf5fd3cd1861323ec3ca907fe8 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 10 Jan 2025 17:02:03 +0100 Subject: [PATCH 05/12] Add heartbeat test --- zenoh-ext/tests/advanced.rs | 125 ++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index f21cfb3d6..6481780b1 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -713,3 +713,128 @@ async fn test_advanced_late_joiner() { router.close().await.unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_retransmission_heartbeat() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(5); + const HEARTBEAT_PERIOD: Duration = Duration::from_secs(4); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47456"; + + const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let client1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (1) ZID: {}", s.zid()); + s + }; + + let client2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(client2 + .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) + .recovery(RecoveryConfig::default().heartbeat_listener(true))) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(client1 + .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) + .cache(CacheConfig::default().max_samples(10)) + .sample_miss_detection( + MissDetectionConfig::default().last_sample_miss_detection(HEARTBEAT_PERIOD) + )) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + assert!(sub.try_recv().unwrap().is_none()); + + router.close().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + client1.close().await.unwrap(); + client2.close().await.unwrap(); + + router.close().await.unwrap(); +} From 34a3c88e281865f361c761fd8e5a38b1fc17c0ff Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 13 Jan 2025 13:05:57 +0100 Subject: [PATCH 06/12] Rename heartbeat API --- zenoh-ext/examples/examples/z_advanced_pub.rs | 4 +--- zenoh-ext/examples/examples/z_advanced_sub.rs | 2 +- zenoh-ext/src/advanced_publisher.rs | 2 +- zenoh-ext/src/advanced_subscriber.rs | 8 ++++---- zenoh-ext/tests/advanced.rs | 6 ++---- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/zenoh-ext/examples/examples/z_advanced_pub.rs b/zenoh-ext/examples/examples/z_advanced_pub.rs index 56d05ba7c..16cf143c1 100644 --- a/zenoh-ext/examples/examples/z_advanced_pub.rs +++ b/zenoh-ext/examples/examples/z_advanced_pub.rs @@ -33,9 +33,7 @@ async fn main() { let publisher = session .declare_publisher(&key_expr) .cache(CacheConfig::default().max_samples(history)) - .sample_miss_detection( - MissDetectionConfig::default().last_sample_miss_detection(Duration::from_secs(5)), - ) + .sample_miss_detection(MissDetectionConfig::default().heartbeat(Duration::from_secs(5))) .publisher_detection() .await .unwrap(); diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs index 5d371a407..c2b04588f 100644 --- a/zenoh-ext/examples/examples/z_advanced_sub.rs +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -35,7 +35,7 @@ async fn main() { .recovery( RecoveryConfig::default() .periodic_queries(Some(Duration::from_secs(1))) - .heartbeat_listener(true), + .heartbeat(true), ) .subscriber_detection() .await diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 2cab49b83..88ebdfc1f 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -67,7 +67,7 @@ pub struct MissDetectionConfig { #[zenoh_macros::unstable] impl MissDetectionConfig { #[zenoh_macros::unstable] - pub fn last_sample_miss_detection(mut self, period: Duration) -> Self { + pub fn heartbeat(mut self, period: Duration) -> Self { self.state_publisher = Some(period); self } diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 763ea8633..b922bc20e 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -93,7 +93,7 @@ impl HistoryConfig { #[zenoh_macros::unstable] pub struct RecoveryConfig { periodic_queries: Option, - heartbeat_listener: bool, + heartbeat: bool, } impl std::fmt::Debug for RecoveryConfig { @@ -129,8 +129,8 @@ impl RecoveryConfig { /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] #[inline] - pub fn heartbeat_listener(mut self, enabled: bool) -> Self { - self.heartbeat_listener = enabled; + pub fn heartbeat(mut self, enabled: bool) -> Self { + self.heartbeat = enabled; self } } @@ -940,7 +940,7 @@ impl AdvancedSubscriber { None }; - let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat_listener) { + let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) { let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; let statesref = statesref.clone(); let heartbeat_sub = conf diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index 6481780b1..01b7d2e93 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -771,16 +771,14 @@ async fn test_advanced_retransmission_heartbeat() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) - .recovery(RecoveryConfig::default().heartbeat_listener(true))) + .recovery(RecoveryConfig::default().heartbeat(true))) .unwrap(); tokio::time::sleep(SLEEP).await; let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) .cache(CacheConfig::default().max_samples(10)) - .sample_miss_detection( - MissDetectionConfig::default().last_sample_miss_detection(HEARTBEAT_PERIOD) - )) + .sample_miss_detection(MissDetectionConfig::default().heartbeat(HEARTBEAT_PERIOD))) .unwrap(); ztimeout!(publ.put("1")).unwrap(); From d966d639c026da107100f7998724754771ec828c Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 13 Jan 2025 13:31:26 +0100 Subject: [PATCH 07/12] Change subscriber RecoveryConf to only allow one of periodic_queries or heartbeat --- zenoh-ext/examples/examples/z_advanced_pub.rs | 2 +- zenoh-ext/examples/examples/z_advanced_sub.rs | 6 +-- zenoh-ext/src/advanced_subscriber.rs | 45 ++++++++++++------- zenoh-ext/src/subscriber_ext.rs | 11 ++++- zenoh-ext/tests/advanced.rs | 6 +-- 5 files changed, 43 insertions(+), 27 deletions(-) diff --git a/zenoh-ext/examples/examples/z_advanced_pub.rs b/zenoh-ext/examples/examples/z_advanced_pub.rs index 16cf143c1..c8ba0e1b6 100644 --- a/zenoh-ext/examples/examples/z_advanced_pub.rs +++ b/zenoh-ext/examples/examples/z_advanced_pub.rs @@ -33,7 +33,7 @@ async fn main() { let publisher = session .declare_publisher(&key_expr) .cache(CacheConfig::default().max_samples(history)) - .sample_miss_detection(MissDetectionConfig::default().heartbeat(Duration::from_secs(5))) + .sample_miss_detection(MissDetectionConfig::default().heartbeat(Duration::from_millis(500))) .publisher_detection() .await .unwrap(); diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs index c2b04588f..e05974bac 100644 --- a/zenoh-ext/examples/examples/z_advanced_sub.rs +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -32,11 +32,7 @@ async fn main() { let subscriber = session .declare_subscriber(key_expr) .history(HistoryConfig::default().detect_late_publishers()) - .recovery( - RecoveryConfig::default() - .periodic_queries(Some(Duration::from_secs(1))) - .heartbeat(true), - ) + .recovery(RecoveryConfig::default().periodic_queries(Duration::from_secs(5))) .subscriber_detection() .await .unwrap(); diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index b922bc20e..1920dd68c 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{collections::BTreeMap, future::IntoFuture, str::FromStr}; +use std::{collections::BTreeMap, future::IntoFuture, marker::PhantomData, str::FromStr}; use zenoh::{ config::ZenohId, @@ -88,24 +88,31 @@ impl HistoryConfig { } } +#[derive(Default, Clone, Copy)] +pub struct Unconfigured; +#[derive(Default, Clone, Copy)] +pub struct Configured; + #[derive(Default, Clone, Copy)] /// Configure retransmission. #[zenoh_macros::unstable] -pub struct RecoveryConfig { +pub struct RecoveryConfig { periodic_queries: Option, - heartbeat: bool, + heartbeat: Option<()>, + phantom: PhantomData, } -impl std::fmt::Debug for RecoveryConfig { +impl std::fmt::Debug for RecoveryConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("RetransmissionConf"); s.field("periodic_queries", &self.periodic_queries); + s.field("heartbeat", &self.heartbeat); s.finish() } } #[zenoh_macros::unstable] -impl RecoveryConfig { +impl RecoveryConfig { /// Enable periodic queries for not yet received Samples and specify their period. /// /// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost. @@ -116,22 +123,28 @@ impl RecoveryConfig { /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] #[inline] - pub fn periodic_queries(mut self, period: Option) -> Self { - self.periodic_queries = period; - self + pub fn periodic_queries(self, period: Duration) -> RecoveryConfig { + RecoveryConfig { + periodic_queries: Some(period), + heartbeat: None, + phantom: PhantomData::, + } } /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher). /// /// This allows to periodically receive the last published Sample's sequence number and check for misses. - /// Heartbeat listener must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher) + /// Heartbeat subscriber must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher) /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and - /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). + /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) with heartbeat. #[zenoh_macros::unstable] #[inline] - pub fn heartbeat(mut self, enabled: bool) -> Self { - self.heartbeat = enabled; - self + pub fn heartbeat(self) -> RecoveryConfig { + RecoveryConfig { + periodic_queries: None, + heartbeat: Some(()), + phantom: PhantomData::, + } } } @@ -141,7 +154,7 @@ pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, pub(crate) origin: Locality, - pub(crate) retransmission: Option, + pub(crate) retransmission: Option>, pub(crate) query_target: QueryTarget, pub(crate) query_timeout: Duration, pub(crate) history: Option, @@ -260,7 +273,7 @@ impl<'a, 'c, Handler, const BACKGROUND: bool> /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] #[inline] - pub fn recovery(mut self, conf: RecoveryConfig) -> Self { + pub fn recovery(mut self, conf: RecoveryConfig) -> Self { self.retransmission = Some(conf); self } @@ -940,7 +953,7 @@ impl AdvancedSubscriber { None }; - let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) { + let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat.is_some()) { let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; let statesref = statesref.clone(); let heartbeat_sub = conf diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 4441ebabd..5c16fb740 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -23,6 +23,7 @@ use zenoh::{ Result as ZResult, }; +use crate::advanced_subscriber::Configured; #[allow(deprecated)] use crate::{ advanced_subscriber::HistoryConfig, querying_subscriber::QueryingSubscriberBuilder, @@ -147,7 +148,10 @@ pub trait AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> { /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] - fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>; + fn recovery( + self, + conf: RecoveryConfig, + ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>; /// Allow this subscriber to be detected through liveliness. #[zenoh_macros::unstable] @@ -284,7 +288,10 @@ impl<'a, 'b, 'c, Handler> AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] - fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> { + fn recovery( + self, + conf: RecoveryConfig, + ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> { AdvancedSubscriberBuilder::new(self).recovery(conf) } diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index 01b7d2e93..584a8b867 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -292,7 +292,7 @@ async fn test_advanced_retransmission_periodic() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) - .recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))) + .recovery(RecoveryConfig::default().periodic_queries(Duration::from_secs(1)))) .unwrap(); tokio::time::sleep(SLEEP).await; @@ -536,7 +536,7 @@ async fn test_advanced_retransmission_sample_miss() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_SAMPLE_MISS_KEYEXPR) - .recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))) + .recovery(RecoveryConfig::default().periodic_queries(Duration::from_secs(1)))) .unwrap(); let miss_listener = ztimeout!(sub.sample_miss_listener()).unwrap(); tokio::time::sleep(SLEEP).await; @@ -771,7 +771,7 @@ async fn test_advanced_retransmission_heartbeat() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) - .recovery(RecoveryConfig::default().heartbeat(true))) + .recovery(RecoveryConfig::default().heartbeat())) .unwrap(); tokio::time::sleep(SLEEP).await; From 970bcf95d3356b3a262c337a1f02211526d83261 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 13 Jan 2025 13:32:48 +0100 Subject: [PATCH 08/12] Fix debug log string --- zenoh-ext/src/advanced_subscriber.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 1920dd68c..2b98b3e59 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -990,11 +990,10 @@ impl AdvancedSubscriber { let states = &mut *lock; let entry = states.sequenced_states.entry(source_id); if matches!(&entry, Entry::Vacant(_)) && states.global_pending_queries > 0 { - tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr); + tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", heartbeat_keyexpr); return; } - // FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample let state = entry.or_insert(SourceState:: { last_delivered: None, pending_queries: 0, From 5bc4b10c54f16a3ef879fb2bfcaa7a47eba4476c Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 13 Jan 2025 13:46:01 +0100 Subject: [PATCH 09/12] Fix known issues with heartbeat subscriber --- zenoh-ext/src/advanced_subscriber.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 2b98b3e59..8b8a67e2c 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -989,9 +989,13 @@ impl AdvancedSubscriber { let mut lock = zlock!(statesref); let states = &mut *lock; let entry = states.sequenced_states.entry(source_id); - if matches!(&entry, Entry::Vacant(_)) && states.global_pending_queries > 0 { - tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", heartbeat_keyexpr); - return; + if matches!(&entry, Entry::Vacant(_)) { + // NOTE: API does not allow both heartbeat and periodic_queries + spawn_periodoic_queries!(states, source_id, statesref.clone()); + if states.global_pending_queries > 0 { + tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", heartbeat_keyexpr); + return; + } } let state = entry.or_insert(SourceState:: { @@ -999,12 +1003,11 @@ impl AdvancedSubscriber { pending_queries: 0, pending_samples: BTreeMap::new(), }); - // TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time - // check that it's not an old sn or a pending sample's sn + // check that it's not an old sn, and that there are no pending queries if (state.last_delivered.is_none() || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) - && !state.pending_samples.contains_key(&heartbeat_sn) + && state.pending_queries == 0 { let seq_num_range = seq_num_range( state.last_delivered.map(|s| s + 1), From 0eb421cb456b58e89734d0f2db1e9b56f2c13a84 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 13 Jan 2025 13:46:49 +0100 Subject: [PATCH 10/12] Change heartbeat test keyexpr --- zenoh-ext/tests/advanced.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index 584a8b867..7e5b428ad 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -726,7 +726,7 @@ async fn test_advanced_retransmission_heartbeat() { const HEARTBEAT_PERIOD: Duration = Duration::from_secs(4); const ROUTER_ENDPOINT: &str = "tcp/localhost:47456"; - const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission"; + const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission/heartbeat"; zenoh_util::init_log_from_env_or("error"); From 6c7bf04fe3f8e3a928fd61ea485190426788e66b Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 13 Jan 2025 14:01:03 +0100 Subject: [PATCH 11/12] Update z_advanced_sub example to use heartbeat --- zenoh-ext/examples/examples/z_advanced_sub.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs index e05974bac..acbde4e64 100644 --- a/zenoh-ext/examples/examples/z_advanced_sub.rs +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -11,8 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::time::Duration; - use clap::{arg, Parser}; use zenoh::config::Config; use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig}; @@ -32,7 +30,7 @@ async fn main() { let subscriber = session .declare_subscriber(key_expr) .history(HistoryConfig::default().detect_late_publishers()) - .recovery(RecoveryConfig::default().periodic_queries(Duration::from_secs(5))) + .recovery(RecoveryConfig::default().heartbeat()) .subscriber_detection() .await .unwrap(); From 20977345bc4293d1b861009a553880e1b588d6ed Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 13 Jan 2025 14:22:41 +0100 Subject: [PATCH 12/12] Fix use statements --- zenoh-ext/src/subscriber_ext.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 5c16fb740..29808b286 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -23,10 +23,10 @@ use zenoh::{ Result as ZResult, }; -use crate::advanced_subscriber::Configured; #[allow(deprecated)] use crate::{ - advanced_subscriber::HistoryConfig, querying_subscriber::QueryingSubscriberBuilder, + advanced_subscriber::{Configured, HistoryConfig}, + querying_subscriber::QueryingSubscriberBuilder, AdvancedSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, RecoveryConfig, };