diff --git a/zenoh-ext/examples/examples/z_advanced_pub.rs b/zenoh-ext/examples/examples/z_advanced_pub.rs index 6437515c8f..c8ba0e1b64 100644 --- a/zenoh-ext/examples/examples/z_advanced_pub.rs +++ b/zenoh-ext/examples/examples/z_advanced_pub.rs @@ -16,7 +16,7 @@ use std::time::Duration; use clap::{arg, Parser}; use zenoh::{config::Config, key_expr::KeyExpr}; use zenoh_config::ModeDependentValue; -use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig}; +use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig}; use zenoh_ext_examples::CommonArgs; #[tokio::main] @@ -33,7 +33,7 @@ async fn main() { let publisher = session .declare_publisher(&key_expr) .cache(CacheConfig::default().max_samples(history)) - .sample_miss_detection() + .sample_miss_detection(MissDetectionConfig::default().heartbeat(Duration::from_millis(500))) .publisher_detection() .await .unwrap(); diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs index 5bea70f7d5..acbde4e643 100644 --- a/zenoh-ext/examples/examples/z_advanced_sub.rs +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -11,8 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::time::Duration; - use clap::{arg, Parser}; use zenoh::config::Config; use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig}; @@ -32,7 +30,7 @@ async fn main() { let subscriber = session .declare_subscriber(key_expr) .history(HistoryConfig::default().detect_late_publishers()) - .recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1)))) + .recovery(RecoveryConfig::default().heartbeat()) .subscriber_detection() .await .unwrap(); diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 9afc2fb091..88ebdfc1f4 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -13,16 +13,22 @@ // use std::{ future::{IntoFuture, Ready}, - sync::atomic::{AtomicU32, Ordering}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::Duration, }; use zenoh::{ bytes::{Encoding, OptionZBytes, ZBytes}, internal::{ bail, + runtime::ZRuntime, traits::{ EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, }, + TerminatableTask, }, key_expr::{keyexpr, KeyExpr}, liveliness::LivelinessToken, @@ -37,7 +43,10 @@ use zenoh::{ }; use zenoh_macros::ke; -use crate::advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC}; +use crate::{ + advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC}, + z_serialize, +}; pub(crate) static KE_PUB: &keyexpr = ke!("pub"); @@ -49,6 +58,21 @@ pub(crate) enum Sequencing { SequenceNumber, } +#[derive(Default)] +#[zenoh_macros::unstable] +pub struct MissDetectionConfig { + pub(crate) state_publisher: Option, +} + +#[zenoh_macros::unstable] +impl MissDetectionConfig { + #[zenoh_macros::unstable] + pub fn heartbeat(mut self, period: Duration) -> Self { + self.state_publisher = Some(period); + self + } +} + /// The builder of PublicationCache, allowing to configure it. #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[zenoh_macros::unstable] @@ -63,6 +87,7 @@ pub struct AdvancedPublisherBuilder<'a, 'b, 'c> { is_express: bool, meta_key_expr: Option>>, sequencing: Sequencing, + miss_config: Option, liveliness: bool, cache: bool, history: CacheConfig, @@ -83,6 +108,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> { is_express: builder.is_express, meta_key_expr: None, sequencing: Sequencing::None, + miss_config: None, liveliness: false, cache: false, history: CacheConfig::default(), @@ -118,8 +144,9 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> { /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled. #[zenoh_macros::unstable] - pub fn sample_miss_detection(mut self) -> Self { + pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self { self.sequencing = Sequencing::SequenceNumber; + self.miss_config = Some(config); self } @@ -230,9 +257,10 @@ impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> { #[zenoh_macros::unstable] pub struct AdvancedPublisher<'a> { publisher: Publisher<'a>, - seqnum: Option, + seqnum: Option>, cache: Option, _token: Option, + _state_publisher: Option, } #[zenoh_macros::unstable] @@ -270,7 +298,7 @@ impl<'a> AdvancedPublisher<'a> { }; let seqnum = match conf.sequencing { - Sequencing::SequenceNumber => Some(AtomicU32::new(0)), + Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))), Sequencing::Timestamp => { if conf.session.hlc().is_none() { bail!( @@ -299,18 +327,44 @@ impl<'a> AdvancedPublisher<'a> { Some( conf.session .liveliness() - .declare_token(prefix / &key_expr) + .declare_token(&prefix / &key_expr) .wait()?, ) } else { None }; + let state_publisher = if let Some(period) = conf.miss_config.and_then(|c| c.state_publisher) + { + if let Some(seqnum) = seqnum.as_ref() { + let seqnum = seqnum.clone(); + + let publisher = conf.session.declare_publisher(prefix / &key_expr).wait()?; + Some(TerminatableTask::spawn_abortable( + ZRuntime::Net, + async move { + loop { + tokio::time::sleep(period).await; + let seqnum = seqnum.load(Ordering::Relaxed); + if seqnum > 0 { + let _ = publisher.put(z_serialize(&(seqnum - 1))).await; + } + } + }, + )) + } else { + None + } + } else { + None + }; + Ok(AdvancedPublisher { publisher, seqnum, cache, _token: token, + _state_publisher: state_publisher, }) } diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index e922c4ecb2..8b8a67e2c1 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{collections::BTreeMap, future::IntoFuture, str::FromStr}; +use std::{collections::BTreeMap, future::IntoFuture, marker::PhantomData, str::FromStr}; use zenoh::{ config::ZenohId, @@ -46,7 +46,10 @@ use { zenoh::Result as ZResult, }; -use crate::advanced_cache::{ke_liveliness, KE_UHLC}; +use crate::{ + advanced_cache::{ke_liveliness, KE_UHLC}, + z_deserialize, +}; #[derive(Debug, Default, Clone)] /// Configure query for historical data. @@ -85,23 +88,31 @@ impl HistoryConfig { } } -#[derive(Default)] +#[derive(Default, Clone, Copy)] +pub struct Unconfigured; +#[derive(Default, Clone, Copy)] +pub struct Configured; + +#[derive(Default, Clone, Copy)] /// Configure retransmission. #[zenoh_macros::unstable] -pub struct RecoveryConfig { +pub struct RecoveryConfig { periodic_queries: Option, + heartbeat: Option<()>, + phantom: PhantomData, } -impl std::fmt::Debug for RecoveryConfig { +impl std::fmt::Debug for RecoveryConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("RetransmissionConf"); s.field("periodic_queries", &self.periodic_queries); + s.field("heartbeat", &self.heartbeat); s.finish() } } #[zenoh_macros::unstable] -impl RecoveryConfig { +impl RecoveryConfig { /// Enable periodic queries for not yet received Samples and specify their period. /// /// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost. @@ -112,9 +123,28 @@ impl RecoveryConfig { /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] #[inline] - pub fn periodic_queries(mut self, period: Option) -> Self { - self.periodic_queries = period; - self + pub fn periodic_queries(self, period: Duration) -> RecoveryConfig { + RecoveryConfig { + periodic_queries: Some(period), + heartbeat: None, + phantom: PhantomData::, + } + } + + /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher). + /// + /// This allows to periodically receive the last published Sample's sequence number and check for misses. + /// Heartbeat subscriber must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher) + /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and + /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) with heartbeat. + #[zenoh_macros::unstable] + #[inline] + pub fn heartbeat(self) -> RecoveryConfig { + RecoveryConfig { + periodic_queries: None, + heartbeat: Some(()), + phantom: PhantomData::, + } } } @@ -124,7 +154,7 @@ pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, pub(crate) origin: Locality, - pub(crate) retransmission: Option, + pub(crate) retransmission: Option>, pub(crate) query_target: QueryTarget, pub(crate) query_timeout: Duration, pub(crate) history: Option, @@ -243,7 +273,7 @@ impl<'a, 'c, Handler, const BACKGROUND: bool> /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] #[inline] - pub fn recovery(mut self, conf: RecoveryConfig) -> Self { + pub fn recovery(mut self, conf: RecoveryConfig) -> Self { self.retransmission = Some(conf); self } @@ -441,6 +471,7 @@ pub struct AdvancedSubscriber { subscriber: Subscriber<()>, receiver: Receiver, liveliness_subscriber: Option>, + _heartbeat_subscriber: Option>, } #[zenoh_macros::unstable] @@ -733,12 +764,13 @@ impl AdvancedSubscriber { .wait(); } - let liveliness_subscriber = if let Some(historyconf) = conf.history { + let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() { if historyconf.liveliness { let live_callback = { let session = conf.session.clone(); let statesref = statesref.clone(); let key_expr = key_expr.clone().into_owned(); + let historyconf = historyconf.clone(); move |s: Sample| { if s.kind() == SampleKind::Put { if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { @@ -921,6 +953,104 @@ impl AdvancedSubscriber { None }; + let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat.is_some()) { + let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; + let statesref = statesref.clone(); + let heartbeat_sub = conf + .session + .declare_subscriber(ke_heartbeat_sub) + .callback(move |sample_hb| { + if sample_hb.kind() != SampleKind::Put { + return; + } + + let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr(); + let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else { + return; + }; + let source_id = { + let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else { + return; + }; + let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else { + return; + }; + EntityGlobalId::new(zid, eid) + }; + + let Ok(heartbeat_sn) = z_deserialize::(sample_hb.payload()) else { + tracing::debug!( + "Skipping invalid heartbeat payload on '{}'", + heartbeat_keyexpr + ); + return; + }; + + let mut lock = zlock!(statesref); + let states = &mut *lock; + let entry = states.sequenced_states.entry(source_id); + if matches!(&entry, Entry::Vacant(_)) { + // NOTE: API does not allow both heartbeat and periodic_queries + spawn_periodoic_queries!(states, source_id, statesref.clone()); + if states.global_pending_queries > 0 { + tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", heartbeat_keyexpr); + return; + } + } + + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: BTreeMap::new(), + }); + + // check that it's not an old sn, and that there are no pending queries + if (state.last_delivered.is_none() + || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) + && state.pending_queries == 0 + { + let seq_num_range = seq_num_range( + state.last_delivered.map(|s| s + 1), + Some(heartbeat_sn), + ); + + let session = states.session.clone(); + let key_expr = states.key_expr.clone().into_owned(); + let query_target = states.query_target; + let query_timeout = states.query_timeout; + state.pending_queries += 1; + drop(lock); + + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), + }; + let _ = session + .get(Selector::from((heartbeat_keyexpr, seq_num_range))) + .callback({ + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + }) + .allowed_origin(conf.origin) + .wait()?; + Some(heartbeat_sub) + } else { + None + }; + if conf.liveliness { let prefix = KE_ADV_PREFIX / KE_SUB @@ -944,6 +1074,7 @@ impl AdvancedSubscriber { subscriber, receiver, liveliness_subscriber, + _heartbeat_subscriber: heartbeat_subscriber, }; Ok(reliable_subscriber) diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 2d73bd16c7..60d805730b 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -41,7 +41,7 @@ pub use crate::serialization::{ #[allow(deprecated)] pub use crate::{ advanced_cache::{CacheConfig, RepliesConfig}, - advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder}, + advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, MissDetectionConfig}, advanced_subscriber::{ AdvancedSubscriber, AdvancedSubscriberBuilder, HistoryConfig, Miss, RecoveryConfig, SampleMissHandlerUndeclaration, SampleMissListener, SampleMissListenerBuilder, diff --git a/zenoh-ext/src/publisher_ext.rs b/zenoh-ext/src/publisher_ext.rs index de045d0ff0..2a48666235 100644 --- a/zenoh-ext/src/publisher_ext.rs +++ b/zenoh-ext/src/publisher_ext.rs @@ -13,7 +13,7 @@ // use zenoh::pubsub::PublisherBuilder; -use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder}; +use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder, MissDetectionConfig}; /// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder) #[zenoh_macros::unstable] @@ -27,7 +27,10 @@ pub trait AdvancedPublisherBuilderExt<'a, 'b, 'c> { /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled. #[zenoh_macros::unstable] - fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c>; + fn sample_miss_detection( + self, + config: MissDetectionConfig, + ) -> AdvancedPublisherBuilder<'a, 'b, 'c>; /// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber). /// @@ -53,8 +56,11 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilderExt<'a, 'b, 'c> for PublisherBuilder<'a /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled. #[zenoh_macros::unstable] - fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c> { - AdvancedPublisherBuilder::new(self).sample_miss_detection() + fn sample_miss_detection( + self, + config: MissDetectionConfig, + ) -> AdvancedPublisherBuilder<'a, 'b, 'c> { + AdvancedPublisherBuilder::new(self).sample_miss_detection(config) } /// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber). diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 4441ebabdc..29808b2863 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -25,7 +25,8 @@ use zenoh::{ #[allow(deprecated)] use crate::{ - advanced_subscriber::HistoryConfig, querying_subscriber::QueryingSubscriberBuilder, + advanced_subscriber::{Configured, HistoryConfig}, + querying_subscriber::QueryingSubscriberBuilder, AdvancedSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, RecoveryConfig, }; @@ -147,7 +148,10 @@ pub trait AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> { /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] - fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>; + fn recovery( + self, + conf: RecoveryConfig, + ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>; /// Allow this subscriber to be detected through liveliness. #[zenoh_macros::unstable] @@ -284,7 +288,10 @@ impl<'a, 'b, 'c, Handler> AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). #[zenoh_macros::unstable] - fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> { + fn recovery( + self, + conf: RecoveryConfig, + ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> { AdvancedSubscriberBuilder::new(self).recovery(conf) } diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index f7d3593c63..7e5b428add 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -16,7 +16,7 @@ use zenoh::sample::SampleKind; use zenoh_config::{EndPoint, ModeDependentValue, WhatAmI}; use zenoh_ext::{ AdvancedPublisherBuilderExt, AdvancedSubscriberBuilderExt, CacheConfig, HistoryConfig, - RecoveryConfig, + MissDetectionConfig, RecoveryConfig, }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -170,7 +170,7 @@ async fn test_advanced_retransmission() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) .cache(CacheConfig::default().max_samples(10)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -292,14 +292,14 @@ async fn test_advanced_retransmission_periodic() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) - .recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))) + .recovery(RecoveryConfig::default().periodic_queries(Duration::from_secs(1)))) .unwrap(); tokio::time::sleep(SLEEP).await; let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) .cache(CacheConfig::default().max_samples(10)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -421,7 +421,7 @@ async fn test_advanced_sample_miss() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_SAMPLE_MISS_KEYEXPR) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -536,7 +536,7 @@ async fn test_advanced_retransmission_sample_miss() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_SAMPLE_MISS_KEYEXPR) - .recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))) + .recovery(RecoveryConfig::default().periodic_queries(Duration::from_secs(1)))) .unwrap(); let miss_listener = ztimeout!(sub.sample_miss_listener()).unwrap(); tokio::time::sleep(SLEEP).await; @@ -544,7 +544,7 @@ async fn test_advanced_retransmission_sample_miss() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_SAMPLE_MISS_KEYEXPR) .cache(CacheConfig::default().max_samples(1)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -713,3 +713,126 @@ async fn test_advanced_late_joiner() { router.close().await.unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_retransmission_heartbeat() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(5); + const HEARTBEAT_PERIOD: Duration = Duration::from_secs(4); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47456"; + + const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission/heartbeat"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let client1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (1) ZID: {}", s.zid()); + s + }; + + let client2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(client2 + .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) + .recovery(RecoveryConfig::default().heartbeat())) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(client1 + .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) + .cache(CacheConfig::default().max_samples(10)) + .sample_miss_detection(MissDetectionConfig::default().heartbeat(HEARTBEAT_PERIOD))) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + assert!(sub.try_recv().unwrap().is_none()); + + router.close().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + client1.close().await.unwrap(); + client2.close().await.unwrap(); + + router.close().await.unwrap(); +}