Skip to content

Commit

Permalink
Add heartbeat test
Browse files Browse the repository at this point in the history
  • Loading branch information
oteffahi committed Jan 10, 2025
1 parent 9dea7c7 commit dd5a392
Showing 1 changed file with 125 additions and 0 deletions.
125 changes: 125 additions & 0 deletions zenoh-ext/tests/advanced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,3 +713,128 @@ async fn test_advanced_late_joiner() {

router.close().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_advanced_retransmission_heartbeat() {
use std::time::Duration;

use zenoh::internal::ztimeout;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const RECONNECT_SLEEP: Duration = Duration::from_secs(5);
const HEARTBEAT_PERIOD: Duration = Duration::from_secs(4);
const ROUTER_ENDPOINT: &str = "tcp/localhost:47456";

const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission";

zenoh_util::init_log_from_env_or("error");

let router = {
let mut c = zenoh::Config::default();
c.listen
.endpoints
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Router));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Router ZID: {}", s.zid());
s
};

let client1 = {
let mut c = zenoh::Config::default();
c.connect
.endpoints
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (1) ZID: {}", s.zid());
s
};

let client2 = {
let mut c = zenoh::Config::default();
c.connect
.endpoints
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (2) ZID: {}", s.zid());
s
};

let sub = ztimeout!(client2
.declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR)
.recovery(RecoveryConfig::default().heartbeat_listener(true)))
.unwrap();
tokio::time::sleep(SLEEP).await;

let publ = ztimeout!(client1
.declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR)
.cache(CacheConfig::default().max_samples(10))
.sample_miss_detection(
MissDetectionConfig::default().last_sample_miss_detection(HEARTBEAT_PERIOD)
))
.unwrap();
ztimeout!(publ.put("1")).unwrap();

tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1");

assert!(sub.try_recv().unwrap().is_none());

router.close().await.unwrap();
tokio::time::sleep(SLEEP).await;

ztimeout!(publ.put("2")).unwrap();
ztimeout!(publ.put("3")).unwrap();
ztimeout!(publ.put("4")).unwrap();
tokio::time::sleep(SLEEP).await;

assert!(sub.try_recv().unwrap().is_none());

let router = {
let mut c = zenoh::Config::default();
c.listen
.endpoints
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Router));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Router ZID: {}", s.zid());
s
};
tokio::time::sleep(RECONNECT_SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2");

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3");

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4");

assert!(sub.try_recv().unwrap().is_none());

publ.undeclare().await.unwrap();
// sub.undeclare().await.unwrap();

client1.close().await.unwrap();
client2.close().await.unwrap();

router.close().await.unwrap();
}

0 comments on commit dd5a392

Please sign in to comment.