From 9e2c3cc06d85c9a70410e5b0b91ddd934a5fb173 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Mon, 13 Jan 2025 17:00:08 +0100 Subject: [PATCH 1/5] Config: add interests timeout --- DEFAULT_CONFIG.json5 | 6 ++++++ commons/zenoh-config/src/defaults.rs | 3 +++ commons/zenoh-config/src/lib.rs | 7 +++++++ zenoh/src/net/routing/dispatcher/interests.rs | 9 +++++---- zenoh/src/net/routing/hat/client/interests.rs | 7 ++++++- zenoh/src/net/routing/hat/client/mod.rs | 5 ++++- zenoh/src/net/routing/hat/mod.rs | 16 +++++++++++++--- zenoh/src/net/routing/hat/p2p_peer/interests.rs | 7 ++++++- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 5 ++++- 9 files changed, 54 insertions(+), 11 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 23cbd00831..5bbe934e63 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -194,6 +194,12 @@ /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate"). mode: "peer_to_peer", }, + /// The interests-based routing configuration. + /// This configuration applies regardless of the mode (router, peer or client). + interests: { + /// The timeout to wait for incoming interests declarations. + timeout: 10000, + }, }, // /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values) diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index eb000923e2..23887929d3 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -141,6 +141,9 @@ pub mod routing { pub mod peer { pub const mode: &str = "peer_to_peer"; } + pub mod interests { + pub const timeout: u64 = 10000; + } } impl Default for ListenConfig { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index a00755c876..94358ee1ec 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -354,6 +354,13 @@ validated_struct::validator! { /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate"). mode: Option, }, + /// The interests-based routing configuration. + /// This configuration applies regardless of the mode (router, peer or client). + pub interests: #[derive(Default)] + InterestsConf { + /// The timeout to wait for incoming interests declarations. + timeout: Option, + }, }, /// The declarations aggregation strategy. diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index ac4de8113d..f8887f6757 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -42,8 +42,6 @@ use crate::net::routing::{ RoutingContext, }; -static INTEREST_TIMEOUT_MS: u64 = 10000; - pub(crate) struct CurrentInterest { pub(crate) src_face: Arc, pub(crate) src_interest_id: InterestId, @@ -129,6 +127,7 @@ pub(crate) struct CurrentInterestCleanup { tables: Arc, face: Weak, id: InterestId, + interests_timeout: Duration, } impl CurrentInterestCleanup { @@ -136,18 +135,20 @@ impl CurrentInterestCleanup { face: &Arc, tables_ref: &Arc, id: u32, + interests_timeout: Duration, ) { let mut cleanup = CurrentInterestCleanup { tables: tables_ref.clone(), face: Arc::downgrade(face), id, + interests_timeout, }; if let Some((_, cancellation_token)) = face.pending_current_interests.get(&id) { let c_cancellation_token = cancellation_token.clone(); face.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move { tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(INTEREST_TIMEOUT_MS)) => { cleanup.run().await } + _ = tokio::time::sleep(cleanup.interests_timeout) => { cleanup.run().await } _ = c_cancellation_token.cancelled() => {} } }); @@ -170,7 +171,7 @@ impl Timed for CurrentInterestCleanup { interest.0.src_face, self.id, face, - Duration::from_millis(INTEREST_TIMEOUT_MS), + self.interests_timeout, ); finalize_pending_interest(interest, &mut |p, m| p.send_declare(m)); } diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index 1edb51eb60..e8d0796225 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -133,7 +133,12 @@ impl HatInterestTrait for HatCode { dst_face_mut .pending_current_interests .insert(id, (interest.clone(), cancellation_token)); - CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id); + CurrentInterestCleanup::spawn_interest_clean_up_task( + dst_face, + tables_ref, + id, + self.interests_timeout, + ); } let wire_expr = res .as_ref() diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 216e8732c4..274b09c591 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -21,6 +21,7 @@ use std::{ any::Any, collections::HashMap, sync::{atomic::AtomicU32, Arc}, + time::Duration, }; use token::{token_new_face, undeclare_simple_token}; @@ -81,7 +82,9 @@ impl HatTables { } } -pub(crate) struct HatCode {} +pub(crate) struct HatCode { + pub(crate) interests_timeout: Duration, +} impl HatBaseTrait for HatCode { fn init(&self, _tables: &mut Tables, _runtime: Runtime) -> ZResult<()> { diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 27b772b3fc..43321eca00 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -17,7 +17,7 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) -use std::{any::Any, sync::Arc}; +use std::{any::Any, sync::Arc, time::Duration}; use zenoh_config::{unwrap_or_default, Config, WhatAmI}; use zenoh_protocol::{ @@ -248,12 +248,22 @@ pub(crate) trait HatQueriesTrait { pub(crate) fn new_hat(whatami: WhatAmI, config: &Config) -> Box { match whatami { - WhatAmI::Client => Box::new(client::HatCode {}), + WhatAmI::Client => Box::new(client::HatCode { + interests_timeout: Duration::from_millis(unwrap_or_default!(config + .routing() + .interests() + .timeout())), + }), WhatAmI::Peer => { if unwrap_or_default!(config.routing().peer().mode()) == *"linkstate" { Box::new(linkstate_peer::HatCode {}) } else { - Box::new(p2p_peer::HatCode {}) + Box::new(p2p_peer::HatCode { + interests_timeout: Duration::from_millis(unwrap_or_default!(config + .routing() + .interests() + .timeout())), + }) } } WhatAmI::Router => Box::new(router::HatCode {}), diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 26abd364ff..264ec3972e 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -171,7 +171,12 @@ impl HatInterestTrait for HatCode { dst_face_mut .pending_current_interests .insert(id, (interest.clone(), cancellation_token)); - CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id); + CurrentInterestCleanup::spawn_interest_clean_up_task( + dst_face, + tables_ref, + id, + self.interests_timeout, + ); } let wire_expr = res.as_ref().map(|res| { Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client) diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 9beefbff5e..824f10b04e 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -21,6 +21,7 @@ use std::{ any::Any, collections::HashMap, sync::{atomic::AtomicU32, Arc}, + time::Duration, }; use token::{token_new_face, undeclare_simple_token}; @@ -106,7 +107,9 @@ impl HatTables { } } -pub(crate) struct HatCode {} +pub(crate) struct HatCode { + pub(crate) interests_timeout: Duration, +} impl HatBaseTrait for HatCode { fn init(&self, tables: &mut Tables, runtime: Runtime) -> ZResult<()> { From 70326c8a143e252004470211cc43ddb13d45a04a Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 13 Jan 2025 23:15:18 +0100 Subject: [PATCH 2/5] Fix indentation --- DEFAULT_CONFIG.json5 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 5bbe934e63..8b0445837d 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -197,8 +197,8 @@ /// The interests-based routing configuration. /// This configuration applies regardless of the mode (router, peer or client). interests: { - /// The timeout to wait for incoming interests declarations. - timeout: 10000, + /// The timeout to wait for incoming interests declarations in milliseconds. + timeout: 10000, }, }, From 747264a7a9649e6bb6e0302dd5e8f190077c24d3 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 13 Jan 2025 23:16:01 +0100 Subject: [PATCH 3/5] Remove fields from HatCode structs --- zenoh/src/net/routing/hat/client/interests.rs | 6 ++-- zenoh/src/net/routing/hat/client/mod.rs | 36 +++++++++++++++---- zenoh/src/net/routing/hat/mod.rs | 16 ++------- .../src/net/routing/hat/p2p_peer/interests.rs | 6 ++-- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 22 +++++++++--- 5 files changed, 57 insertions(+), 29 deletions(-) diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index e8d0796225..ac9e50b876 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -23,7 +23,9 @@ use zenoh_protocol::{ }; use zenoh_sync::get_mut_unchecked; -use super::{face_hat, face_hat_mut, token::declare_token_interest, HatCode, HatFace}; +use super::{ + face_hat, face_hat_mut, hat, token::declare_token_interest, HatCode, HatFace, HatTables, +}; use crate::net::routing::{ dispatcher::{ face::{FaceState, InterestState}, @@ -137,7 +139,7 @@ impl HatInterestTrait for HatCode { dst_face, tables_ref, id, - self.interests_timeout, + hat!(tables).interests_timeout, ); } let wire_expr = res diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 274b09c591..49b22197a5 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -25,7 +25,7 @@ use std::{ }; use token::{token_new_face, undeclare_simple_token}; -use zenoh_config::WhatAmI; +use zenoh_config::{unwrap_or_default, WhatAmI}; use zenoh_protocol::network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId}, interest::InterestId, @@ -60,6 +60,20 @@ mod pubsub; mod queries; mod token; +macro_rules! hat { + ($t:expr) => { + $t.hat.downcast_ref::().unwrap() + }; +} +use hat; + +macro_rules! hat_mut { + ($t:expr) => { + $t.hat.downcast_mut::().unwrap() + }; +} +use hat_mut; + macro_rules! face_hat { ($f:expr) => { $f.hat.downcast_ref::().unwrap() @@ -74,20 +88,28 @@ macro_rules! face_hat_mut { } use face_hat_mut; -struct HatTables {} +struct HatTables { + pub(crate) interests_timeout: Duration, +} impl HatTables { fn new() -> Self { - Self {} + Self { + interests_timeout: Duration::from_millis( + zenoh_config::defaults::routing::interests::timeout, + ), + } } } -pub(crate) struct HatCode { - pub(crate) interests_timeout: Duration, -} +pub(crate) struct HatCode {} impl HatBaseTrait for HatCode { - fn init(&self, _tables: &mut Tables, _runtime: Runtime) -> ZResult<()> { + fn init(&self, tables: &mut Tables, runtime: Runtime) -> ZResult<()> { + let config_guard = runtime.config().lock(); + let config = &config_guard.0; + hat_mut!(tables).interests_timeout = + Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout())); Ok(()) } diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 43321eca00..27b772b3fc 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -17,7 +17,7 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) -use std::{any::Any, sync::Arc, time::Duration}; +use std::{any::Any, sync::Arc}; use zenoh_config::{unwrap_or_default, Config, WhatAmI}; use zenoh_protocol::{ @@ -248,22 +248,12 @@ pub(crate) trait HatQueriesTrait { pub(crate) fn new_hat(whatami: WhatAmI, config: &Config) -> Box { match whatami { - WhatAmI::Client => Box::new(client::HatCode { - interests_timeout: Duration::from_millis(unwrap_or_default!(config - .routing() - .interests() - .timeout())), - }), + WhatAmI::Client => Box::new(client::HatCode {}), WhatAmI::Peer => { if unwrap_or_default!(config.routing().peer().mode()) == *"linkstate" { Box::new(linkstate_peer::HatCode {}) } else { - Box::new(p2p_peer::HatCode { - interests_timeout: Duration::from_millis(unwrap_or_default!(config - .routing() - .interests() - .timeout())), - }) + Box::new(p2p_peer::HatCode {}) } } WhatAmI::Router => Box::new(router::HatCode {}), diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 264ec3972e..e63fd2159c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -24,8 +24,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest, - queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, + face_hat, face_hat_mut, hat, initial_interest, pubsub::declare_sub_interest, + queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, HatTables, INITIAL_INTEREST_ID, }; use crate::net::routing::{ @@ -175,7 +175,7 @@ impl HatInterestTrait for HatCode { dst_face, tables_ref, id, - self.interests_timeout, + hat!(tables).interests_timeout, ); } let wire_expr = res.as_ref().map(|res| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 824f10b04e..c7b239c932 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -76,6 +76,13 @@ mod pubsub; mod queries; mod token; +macro_rules! hat { + ($t:expr) => { + $t.hat.downcast_ref::().unwrap() + }; +} +use hat; + macro_rules! hat_mut { ($t:expr) => { $t.hat.downcast_mut::().unwrap() @@ -99,17 +106,21 @@ use face_hat_mut; struct HatTables { gossip: Option, + interests_timeout: Duration, } impl HatTables { fn new() -> Self { - Self { gossip: None } + Self { + gossip: None, + interests_timeout: Duration::from_millis( + zenoh_config::defaults::routing::interests::timeout, + ), + } } } -pub(crate) struct HatCode { - pub(crate) interests_timeout: Duration, -} +pub(crate) struct HatCode {} impl HatBaseTrait for HatCode { fn init(&self, tables: &mut Tables, runtime: Runtime) -> ZResult<()> { @@ -130,6 +141,8 @@ impl HatBaseTrait for HatCode { let wait_declares = unwrap_or_default!(config.open().return_conditions().declares()); let router_peers_failover_brokering = unwrap_or_default!(config.routing().router().peers_failover_brokering()); + let interests_timeout = + Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout())); drop(config_guard); if gossip { @@ -145,6 +158,7 @@ impl HatBaseTrait for HatCode { wait_declares, )); } + hat_mut!(tables).interests_timeout = interests_timeout; Ok(()) } From 313fc7d07a5739d2a9253feb2d79c7e0e046238c Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 13 Jan 2025 23:32:35 +0100 Subject: [PATCH 4/5] Move interests_timeout to generic Tables --- zenoh/src/net/routing/dispatcher/tables.rs | 4 +++ zenoh/src/net/routing/hat/client/interests.rs | 6 ++-- zenoh/src/net/routing/hat/client/mod.rs | 33 +++---------------- .../src/net/routing/hat/p2p_peer/interests.rs | 6 ++-- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 19 +---------- 5 files changed, 14 insertions(+), 54 deletions(-) diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index ed130f1844..43486127cc 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -71,6 +71,7 @@ pub struct Tables { pub(crate) hlc: Option>, pub(crate) drop_future_timestamp: bool, pub(crate) queries_default_timeout: Duration, + pub(crate) interests_timeout: Duration, pub(crate) root_res: Arc, pub(crate) faces: HashMap>, pub(crate) mcast_groups: Vec>, @@ -93,6 +94,8 @@ impl Tables { unwrap_or_default!(config.routing().router().peers_failover_brokering()); let queries_default_timeout = Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); + let interests_timeout = + Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout())); let hat_code = hat::new_hat(whatami, config); Ok(Tables { zid, @@ -102,6 +105,7 @@ impl Tables { hlc, drop_future_timestamp, queries_default_timeout, + interests_timeout, root_res: Resource::root(), faces: HashMap::new(), mcast_groups: vec![], diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index ac9e50b876..f6fee326fe 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -23,9 +23,7 @@ use zenoh_protocol::{ }; use zenoh_sync::get_mut_unchecked; -use super::{ - face_hat, face_hat_mut, hat, token::declare_token_interest, HatCode, HatFace, HatTables, -}; +use super::{face_hat, face_hat_mut, token::declare_token_interest, HatCode, HatFace}; use crate::net::routing::{ dispatcher::{ face::{FaceState, InterestState}, @@ -139,7 +137,7 @@ impl HatInterestTrait for HatCode { dst_face, tables_ref, id, - hat!(tables).interests_timeout, + tables.interests_timeout, ); } let wire_expr = res diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 49b22197a5..216e8732c4 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -21,11 +21,10 @@ use std::{ any::Any, collections::HashMap, sync::{atomic::AtomicU32, Arc}, - time::Duration, }; use token::{token_new_face, undeclare_simple_token}; -use zenoh_config::{unwrap_or_default, WhatAmI}; +use zenoh_config::WhatAmI; use zenoh_protocol::network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId}, interest::InterestId, @@ -60,20 +59,6 @@ mod pubsub; mod queries; mod token; -macro_rules! hat { - ($t:expr) => { - $t.hat.downcast_ref::().unwrap() - }; -} -use hat; - -macro_rules! hat_mut { - ($t:expr) => { - $t.hat.downcast_mut::().unwrap() - }; -} -use hat_mut; - macro_rules! face_hat { ($f:expr) => { $f.hat.downcast_ref::().unwrap() @@ -88,28 +73,18 @@ macro_rules! face_hat_mut { } use face_hat_mut; -struct HatTables { - pub(crate) interests_timeout: Duration, -} +struct HatTables {} impl HatTables { fn new() -> Self { - Self { - interests_timeout: Duration::from_millis( - zenoh_config::defaults::routing::interests::timeout, - ), - } + Self {} } } pub(crate) struct HatCode {} impl HatBaseTrait for HatCode { - fn init(&self, tables: &mut Tables, runtime: Runtime) -> ZResult<()> { - let config_guard = runtime.config().lock(); - let config = &config_guard.0; - hat_mut!(tables).interests_timeout = - Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout())); + fn init(&self, _tables: &mut Tables, _runtime: Runtime) -> ZResult<()> { Ok(()) } diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index e63fd2159c..07092c58c9 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -24,8 +24,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, hat, initial_interest, pubsub::declare_sub_interest, - queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, HatTables, + face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest, + queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, INITIAL_INTEREST_ID, }; use crate::net::routing::{ @@ -175,7 +175,7 @@ impl HatInterestTrait for HatCode { dst_face, tables_ref, id, - hat!(tables).interests_timeout, + tables.interests_timeout, ); } let wire_expr = res.as_ref().map(|res| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index c7b239c932..9beefbff5e 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -21,7 +21,6 @@ use std::{ any::Any, collections::HashMap, sync::{atomic::AtomicU32, Arc}, - time::Duration, }; use token::{token_new_face, undeclare_simple_token}; @@ -76,13 +75,6 @@ mod pubsub; mod queries; mod token; -macro_rules! hat { - ($t:expr) => { - $t.hat.downcast_ref::().unwrap() - }; -} -use hat; - macro_rules! hat_mut { ($t:expr) => { $t.hat.downcast_mut::().unwrap() @@ -106,17 +98,11 @@ use face_hat_mut; struct HatTables { gossip: Option, - interests_timeout: Duration, } impl HatTables { fn new() -> Self { - Self { - gossip: None, - interests_timeout: Duration::from_millis( - zenoh_config::defaults::routing::interests::timeout, - ), - } + Self { gossip: None } } } @@ -141,8 +127,6 @@ impl HatBaseTrait for HatCode { let wait_declares = unwrap_or_default!(config.open().return_conditions().declares()); let router_peers_failover_brokering = unwrap_or_default!(config.routing().router().peers_failover_brokering()); - let interests_timeout = - Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout())); drop(config_guard); if gossip { @@ -158,7 +142,6 @@ impl HatBaseTrait for HatCode { wait_declares, )); } - hat_mut!(tables).interests_timeout = interests_timeout; Ok(()) } From dcb4b9ad64dee9382518903bb3b4942f543ff904 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Tue, 14 Jan 2025 09:56:48 +0100 Subject: [PATCH 5/5] improve doc and log --- DEFAULT_CONFIG.json5 | 2 ++ zenoh/src/net/routing/dispatcher/interests.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 8b0445837d..15bfab0290 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -198,6 +198,8 @@ /// This configuration applies regardless of the mode (router, peer or client). interests: { /// The timeout to wait for incoming interests declarations in milliseconds. + /// The expiration of this timeout implies that the discovery protocol might be incomplete, + /// leading to potential loss of messages, queries or liveliness tokens. timeout: 10000, }, }, diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index f8887f6757..a2cbcfeaf2 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -167,7 +167,7 @@ impl Timed for CurrentInterestCleanup { { drop(ctrl_lock); tracing::warn!( - "Didn't receive DeclareFinal {}:{} from {}: Timeout({:#?})!", + "Didn't receive DeclareFinal {}:{} from {} for interests: Timeout({:#?})!", interest.0.src_face, self.id, face,