Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config: add interests timeout #1710

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
/// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
mode: "peer_to_peer",
},
/// The interests-based routing configuration.
/// This configuration applies regardless of the mode (router, peer or client).
interests: {
/// The timeout to wait for incoming interests declarations in milliseconds.
timeout: 10000,
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ pub mod routing {
pub mod peer {
pub const mode: &str = "peer_to_peer";
}
pub mod interests {
pub const timeout: u64 = 10000;
}
}

impl Default for ListenConfig {
Expand Down
7 changes: 7 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ validated_struct::validator! {
/// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
mode: Option<String>,
},
/// The interests-based routing configuration.
/// This configuration applies regardless of the mode (router, peer or client).
pub interests: #[derive(Default)]
InterestsConf {
/// The timeout to wait for incoming interests declarations.
timeout: Option<u64>,
},
},

/// The declarations aggregation strategy.
Expand Down
9 changes: 5 additions & 4 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ use crate::net::routing::{
RoutingContext,
};

static INTEREST_TIMEOUT_MS: u64 = 10000;

pub(crate) struct CurrentInterest {
pub(crate) src_face: Arc<FaceState>,
pub(crate) src_interest_id: InterestId,
Expand Down Expand Up @@ -129,25 +127,28 @@ pub(crate) struct CurrentInterestCleanup {
tables: Arc<TablesLock>,
face: Weak<FaceState>,
id: InterestId,
interests_timeout: Duration,
}

impl CurrentInterestCleanup {
pub(crate) fn spawn_interest_clean_up_task(
face: &Arc<FaceState>,
tables_ref: &Arc<TablesLock>,
id: u32,
interests_timeout: Duration,
) {
let mut cleanup = CurrentInterestCleanup {
tables: tables_ref.clone(),
face: Arc::downgrade(face),
id,
interests_timeout,
};
if let Some((_, cancellation_token)) = face.pending_current_interests.get(&id) {
let c_cancellation_token = cancellation_token.clone();
face.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(INTEREST_TIMEOUT_MS)) => { cleanup.run().await }
_ = tokio::time::sleep(cleanup.interests_timeout) => { cleanup.run().await }
_ = c_cancellation_token.cancelled() => {}
}
});
Expand All @@ -170,7 +171,7 @@ impl Timed for CurrentInterestCleanup {
interest.0.src_face,
self.id,
face,
Duration::from_millis(INTEREST_TIMEOUT_MS),
self.interests_timeout,
);
finalize_pending_interest(interest, &mut |p, m| p.send_declare(m));
}
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct Tables {
pub(crate) hlc: Option<Arc<HLC>>,
pub(crate) drop_future_timestamp: bool,
pub(crate) queries_default_timeout: Duration,
pub(crate) interests_timeout: Duration,
pub(crate) root_res: Arc<Resource>,
pub(crate) faces: HashMap<usize, Arc<FaceState>>,
pub(crate) mcast_groups: Vec<Arc<FaceState>>,
Expand All @@ -93,6 +94,8 @@ impl Tables {
unwrap_or_default!(config.routing().router().peers_failover_brokering());
let queries_default_timeout =
Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let interests_timeout =
Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout()));
let hat_code = hat::new_hat(whatami, config);
Ok(Tables {
zid,
Expand All @@ -102,6 +105,7 @@ impl Tables {
hlc,
drop_future_timestamp,
queries_default_timeout,
interests_timeout,
root_res: Resource::root(),
faces: HashMap::new(),
mcast_groups: vec![],
Expand Down
7 changes: 6 additions & 1 deletion zenoh/src/net/routing/hat/client/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ impl HatInterestTrait for HatCode {
dst_face_mut
.pending_current_interests
.insert(id, (interest.clone(), cancellation_token));
CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id);
CurrentInterestCleanup::spawn_interest_clean_up_task(
dst_face,
tables_ref,
id,
tables.interests_timeout,
);
}
let wire_expr = res
.as_ref()
Expand Down
7 changes: 6 additions & 1 deletion zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ impl HatInterestTrait for HatCode {
dst_face_mut
.pending_current_interests
.insert(id, (interest.clone(), cancellation_token));
CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id);
CurrentInterestCleanup::spawn_interest_clean_up_task(
dst_face,
tables_ref,
id,
tables.interests_timeout,
);
}
let wire_expr = res.as_ref().map(|res| {
Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client)
Expand Down
Loading