diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 23cbd00831..15bfab0290 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -194,6 +194,14 @@ /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate"). mode: "peer_to_peer", }, + /// The interests-based routing configuration. + /// This configuration applies regardless of the mode (router, peer or client). + interests: { + /// The timeout to wait for incoming interests declarations in milliseconds. + /// The expiration of this timeout implies that the discovery protocol might be incomplete, + /// leading to potential loss of messages, queries or liveliness tokens. + timeout: 10000, + }, }, // /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values) diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index eb000923e2..23887929d3 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -141,6 +141,9 @@ pub mod routing { pub mod peer { pub const mode: &str = "peer_to_peer"; } + pub mod interests { + pub const timeout: u64 = 10000; + } } impl Default for ListenConfig { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index a00755c876..94358ee1ec 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -354,6 +354,13 @@ validated_struct::validator! { /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate"). mode: Option, }, + /// The interests-based routing configuration. + /// This configuration applies regardless of the mode (router, peer or client). + pub interests: #[derive(Default)] + InterestsConf { + /// The timeout to wait for incoming interests declarations. + timeout: Option, + }, }, /// The declarations aggregation strategy. diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index ac4de8113d..a2cbcfeaf2 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -42,8 +42,6 @@ use crate::net::routing::{ RoutingContext, }; -static INTEREST_TIMEOUT_MS: u64 = 10000; - pub(crate) struct CurrentInterest { pub(crate) src_face: Arc, pub(crate) src_interest_id: InterestId, @@ -129,6 +127,7 @@ pub(crate) struct CurrentInterestCleanup { tables: Arc, face: Weak, id: InterestId, + interests_timeout: Duration, } impl CurrentInterestCleanup { @@ -136,18 +135,20 @@ impl CurrentInterestCleanup { face: &Arc, tables_ref: &Arc, id: u32, + interests_timeout: Duration, ) { let mut cleanup = CurrentInterestCleanup { tables: tables_ref.clone(), face: Arc::downgrade(face), id, + interests_timeout, }; if let Some((_, cancellation_token)) = face.pending_current_interests.get(&id) { let c_cancellation_token = cancellation_token.clone(); face.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move { tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(INTEREST_TIMEOUT_MS)) => { cleanup.run().await } + _ = tokio::time::sleep(cleanup.interests_timeout) => { cleanup.run().await } _ = c_cancellation_token.cancelled() => {} } }); @@ -166,11 +167,11 @@ impl Timed for CurrentInterestCleanup { { drop(ctrl_lock); tracing::warn!( - "Didn't receive DeclareFinal {}:{} from {}: Timeout({:#?})!", + "Didn't receive DeclareFinal {}:{} from {} for interests: Timeout({:#?})!", interest.0.src_face, self.id, face, - Duration::from_millis(INTEREST_TIMEOUT_MS), + self.interests_timeout, ); finalize_pending_interest(interest, &mut |p, m| p.send_declare(m)); } diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index ed130f1844..43486127cc 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -71,6 +71,7 @@ pub struct Tables { pub(crate) hlc: Option>, pub(crate) drop_future_timestamp: bool, pub(crate) queries_default_timeout: Duration, + pub(crate) interests_timeout: Duration, pub(crate) root_res: Arc, pub(crate) faces: HashMap>, pub(crate) mcast_groups: Vec>, @@ -93,6 +94,8 @@ impl Tables { unwrap_or_default!(config.routing().router().peers_failover_brokering()); let queries_default_timeout = Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); + let interests_timeout = + Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout())); let hat_code = hat::new_hat(whatami, config); Ok(Tables { zid, @@ -102,6 +105,7 @@ impl Tables { hlc, drop_future_timestamp, queries_default_timeout, + interests_timeout, root_res: Resource::root(), faces: HashMap::new(), mcast_groups: vec![], diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index 1edb51eb60..f6fee326fe 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -133,7 +133,12 @@ impl HatInterestTrait for HatCode { dst_face_mut .pending_current_interests .insert(id, (interest.clone(), cancellation_token)); - CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id); + CurrentInterestCleanup::spawn_interest_clean_up_task( + dst_face, + tables_ref, + id, + tables.interests_timeout, + ); } let wire_expr = res .as_ref() diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 26abd364ff..07092c58c9 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -171,7 +171,12 @@ impl HatInterestTrait for HatCode { dst_face_mut .pending_current_interests .insert(id, (interest.clone(), cancellation_token)); - CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id); + CurrentInterestCleanup::spawn_interest_clean_up_task( + dst_face, + tables_ref, + id, + tables.interests_timeout, + ); } let wire_expr = res.as_ref().map(|res| { Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client)