diff --git a/Cargo.lock b/Cargo.lock index f662e0c9fbf..6538b174a54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3182,7 +3182,7 @@ dependencies = [ [[package]] name = "libp2p-server" -version = "0.12.6" +version = "0.12.7" dependencies = [ "base64 0.21.7", "clap", diff --git a/Cargo.toml b/Cargo.toml index 556919dcae6..8982a65537e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ libp2p-quic = { version = "0.10.2", path = "transports/quic" } libp2p-relay = { version = "0.17.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" } -libp2p-server = { version = "0.12.6", path = "misc/server" } +libp2p-server = { version = "0.12.7", path = "misc/server" } libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.44.2", path = "swarm" } libp2p-swarm-derive = { version = "=0.34.3", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. diff --git a/misc/server/CHANGELOG.md b/misc/server/CHANGELOG.md index 484964e27e9..254ab1d92be 100644 --- a/misc/server/CHANGELOG.md +++ b/misc/server/CHANGELOG.md @@ -1,6 +1,14 @@ +## 0.12.7 + +### Changed + +- Use periodic and automatic bootstrap of Kademlia. + See [PR 4838](https://github.com/libp2p/rust-libp2p/pull/4838). + ## 0.12.6 ### Changed + - Stop using kad default protocol. See [PR 5122](https://github.com/libp2p/rust-libp2p/pull/5122) diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index af71a57561a..3f46de701e2 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-server" -version = "0.12.6" +version = "0.12.7" authors = ["Max Inden "] edition = "2021" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/misc/server/src/main.rs b/misc/server/src/main.rs index 16e6530e946..6dfa035f3b1 100644 --- a/misc/server/src/main.rs +++ b/misc/server/src/main.rs @@ -1,7 +1,6 @@ use base64::Engine; use clap::Parser; use futures::stream::StreamExt; -use futures_timer::Delay; use libp2p::identity; use libp2p::identity::PeerId; use libp2p::kad; @@ -14,8 +13,6 @@ use prometheus_client::registry::Registry; use std::error::Error; use std::path::PathBuf; use std::str::FromStr; -use std::task::Poll; -use std::time::Duration; use tracing_subscriber::EnvFilter; use zeroize::Zeroizing; @@ -23,8 +20,6 @@ mod behaviour; mod config; mod http_service; -const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60); - #[derive(Debug, Parser)] #[clap(name = "libp2p server", about = "A rust-libp2p server binary.")] struct Opts { @@ -127,18 +122,7 @@ async fn main() -> Result<(), Box> { } }); - let mut bootstrap_timer = Delay::new(BOOTSTRAP_INTERVAL); - loop { - if let Poll::Ready(()) = futures::poll!(&mut bootstrap_timer) { - bootstrap_timer.reset(BOOTSTRAP_INTERVAL); - let _ = swarm - .behaviour_mut() - .kademlia - .as_mut() - .map(|k| k.bootstrap()); - } - let event = swarm.next().await.expect("Swarm not to terminate."); metrics.record(&event); match event { diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index f7baee2d288..0e781c18916 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.45.4 +- Add periodic and automatic bootstrap. + See [PR 4838](https://github.com/libp2p/rust-libp2p/pull/4838). - Make it mandatory to provide protocol names when creating a `kad::Config`. Deprecate `kad::Config::default()`, replaced by `kad::Config::new(StreamProtocol)`. See [PR 5122](https://github.com/libp2p/rust-libp2p/pull/5122). diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index b237fe11dda..31568b0f9a8 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -23,6 +23,7 @@ mod test; use crate::addresses::Addresses; +use crate::bootstrap; use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId}; use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus}; use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig}; @@ -116,6 +117,9 @@ pub struct Behaviour { /// The record storage. store: TStore, + + /// Tracks the status of the current bootstrap. + bootstrap_status: bootstrap::Status, } /// The configurable strategies for the insertion of peers @@ -181,6 +185,8 @@ pub struct Config { provider_publication_interval: Option, kbucket_inserts: BucketInserts, caching: Caching, + periodic_bootstrap_interval: Option, + automatic_bootstrap_throttle: Option, } impl Default for Config { @@ -222,6 +228,8 @@ impl Config { provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)), kbucket_inserts: BucketInserts::OnConnected, caching: Caching::Enabled { max_peers: 1 }, + periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)), + automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE), } } @@ -408,6 +416,34 @@ impl Config { self.caching = c; self } + + /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically. + /// + /// * Default to `5` minutes. + /// * Set to `None` to disable periodic bootstrap. + pub fn set_periodic_bootstrap_interval(&mut self, interval: Option) -> &mut Self { + self.periodic_bootstrap_interval = interval; + self + } + + /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table. + /// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time". + /// This also allows to wait a little bit for other potential peers to be inserted into the routing table before + /// triggering a bootstrap, giving more context to the future bootstrap request. + /// + /// * Default to `500` ms. + /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a new peer + /// is inserted in the routing table. + /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when a new + /// peer is inserted in the routing table). + #[cfg(test)] + pub(crate) fn set_automatic_bootstrap_throttle( + &mut self, + duration: Option, + ) -> &mut Self { + self.automatic_bootstrap_throttle = duration; + self + } } impl Behaviour @@ -465,6 +501,10 @@ where mode: Mode::Client, auto_mode: true, no_events_waker: None, + bootstrap_status: bootstrap::Status::new( + config.periodic_bootstrap_interval, + config.automatic_bootstrap_throttle, + ), } } @@ -566,6 +606,7 @@ where }; match entry.insert(addresses.clone(), status) { kbucket::InsertResult::Inserted => { + self.bootstrap_status.on_new_peer_in_routing_table(); self.queued_events.push_back(ToSwarm::GenerateEvent( Event::RoutingUpdated { peer: *peer, @@ -884,6 +925,13 @@ where /// /// > **Note**: Bootstrapping requires at least one node of the DHT to be known. /// > See [`Behaviour::add_address`]. + /// + /// > **Note**: Bootstrap does not require to be called manually. It is periodically + /// invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see + /// [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically invoked + /// when a new peer is inserted in the routing table. + /// This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically + /// to ensure a healthy routing table. pub fn bootstrap(&mut self) -> Result { let local_key = self.kbuckets.local_key().clone(); let info = QueryInfo::Bootstrap { @@ -895,6 +943,7 @@ where if peers.is_empty() { Err(NoKnownPeers()) } else { + self.bootstrap_status.on_started(); let inner = QueryInner::new(info); Ok(self.queries.add_iter_closest(local_key, peers, inner)) } @@ -1291,6 +1340,7 @@ where let addresses = Addresses::new(a); match entry.insert(addresses.clone(), new_status) { kbucket::InsertResult::Inserted => { + self.bootstrap_status.on_new_peer_in_routing_table(); let event = Event::RoutingUpdated { peer, is_new_peer: true, @@ -1406,6 +1456,7 @@ where .continue_iter_closest(query_id, target.clone(), peers, inner); } else { step.last = true; + self.bootstrap_status.on_finish(); }; Some(Event::OutboundQueryProgressed { @@ -1608,6 +1659,7 @@ where .continue_iter_closest(query_id, target.clone(), peers, inner); } else { step.last = true; + self.bootstrap_status.on_finish(); } Some(Event::OutboundQueryProgressed { @@ -2480,6 +2532,13 @@ where self.put_record_job = Some(job); } + // Poll bootstrap periodically and automatically. + if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) { + if let Err(e) = self.bootstrap() { + tracing::warn!("Failed to trigger bootstrap: {e}"); + } + } + loop { // Drain queued events first. if let Some(event) = self.queued_events.pop_front() { diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 20378bb6a3f..b879084c54f 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -174,6 +174,9 @@ fn bootstrap() { let num_group = rng.gen_range(1..(num_total % K_VALUE.get()) + 2); let mut cfg = Config::new(PROTOCOL_NAME); + // Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically. + cfg.set_periodic_bootstrap_interval(None); + cfg.set_automatic_bootstrap_throttle(None); if rng.gen() { cfg.disjoint_query_paths(true); } @@ -252,7 +255,11 @@ fn query_iter() { fn run(rng: &mut impl Rng) { let num_total = rng.gen_range(2..20); - let mut swarms = build_connected_nodes(num_total, 1) + let mut config = Config::new(PROTOCOL_NAME); + // Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically. + config.set_periodic_bootstrap_interval(None); + config.set_automatic_bootstrap_throttle(None); + let mut swarms = build_connected_nodes_with_config(num_total, 1, config) .into_iter() .map(|(_a, s)| s) .collect::>(); @@ -500,6 +507,9 @@ fn put_record() { let mut config = Config::new(PROTOCOL_NAME); config.set_replication_factor(replication_factor); + // Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically. + config.set_periodic_bootstrap_interval(None); + config.set_automatic_bootstrap_throttle(None); if rng.gen() { config.disjoint_query_paths(true); } @@ -869,6 +879,9 @@ fn add_provider() { let mut config = Config::new(PROTOCOL_NAME); config.set_replication_factor(replication_factor); + // Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically. + config.set_periodic_bootstrap_interval(None); + config.set_automatic_bootstrap_throttle(None); if rng.gen() { config.disjoint_query_paths(true); } @@ -1094,6 +1107,9 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { config.disjoint_query_paths(true); // I.e. setting the amount disjoint paths to be explored to 2. config.set_parallelism(NonZeroUsize::new(2).unwrap()); + // Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically. + config.set_periodic_bootstrap_interval(None); + config.set_automatic_bootstrap_throttle(None); let mut alice = build_node_with_config(config); let mut trudy = build_node(); // Trudy the intrudor, an adversary. diff --git a/protocols/kad/src/bootstrap.rs b/protocols/kad/src/bootstrap.rs new file mode 100644 index 00000000000..58fa662d414 --- /dev/null +++ b/protocols/kad/src/bootstrap.rs @@ -0,0 +1,346 @@ +use futures::FutureExt; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; + +use futures_timer::Delay; + +/// Default value chosen at ``. +pub(crate) const DEFAULT_AUTOMATIC_THROTTLE: Duration = Duration::from_millis(500); + +#[derive(Debug)] +pub(crate) struct Status { + /// If the user did not disable periodic bootstrap (by providing `None` for `periodic_interval`) + /// this is the periodic interval and the delay of the current period. When `Delay` finishes, + /// a bootstrap will be triggered and the `Delay` will be reset. + interval_and_delay: Option<(Duration, Delay)>, + + /// Configured duration to wait before triggering a bootstrap when a new peer + /// is inserted in the routing table. `None` if automatic bootstrap is disabled. + automatic_throttle: Option, + /// Timer that will be set (if automatic bootstrap is not disabled) when a new peer is inserted + /// in the routing table. When it finishes, it will trigger a bootstrap and will be set to `None` + /// again. If an other new peer is inserted in the routing table before this timer finishes, + /// the timer is reset. + throttle_timer: Option, + + /// Number of bootstrap requests currently in progress. We ensure neither periodic bootstrap + /// or automatic bootstrap trigger new requests when there is still some running. + current_bootstrap_requests: usize, + /// Waker to wake up the `poll` method if progress is ready to be made. + waker: Option, +} + +impl Status { + pub(crate) fn new( + periodic_interval: Option, + automatic_throttle: Option, + ) -> Self { + Self { + interval_and_delay: periodic_interval.map(|interval| (interval, Delay::new(interval))), + waker: None, + automatic_throttle, + throttle_timer: None, + current_bootstrap_requests: 0, + } + } + + pub(crate) fn on_new_peer_in_routing_table(&mut self) { + // Registering `self.throttle_timer` means scheduling a bootstrap. + // A bootstrap will be triggered when `self.throttle_timer` finishes. + // A `throttle_timer` is useful to not trigger a batch of bootstraps when a + // batch of peers is inserted into the routing table. + if let Some(throttle_duration) = self.automatic_throttle { + self.throttle_timer = Some(throttle_duration.into()); + } else { + // The user disabled bootstrapping on new peer in the routing table. + } + + // Waking up the waker that could have been registered. + if let Some(waker) = self.waker.take() { + waker.wake() + } + } + + pub(crate) fn on_started(&mut self) { + // No periodic or automatic bootstrap will be triggered as long as + // `self.current_bootstrap_requests > 0` but the user could still manually + // trigger a bootstrap. + self.current_bootstrap_requests += 1; + + // Canceling the `throttle_timer` if any since a bootstrap request is being triggered right now. + self.throttle_timer = None; + + // Resetting the `delay` if any since a bootstrap request is being triggered right now. + if let Some((interval, delay)) = self.interval_and_delay.as_mut() { + delay.reset(*interval); + } + } + + pub(crate) fn on_finish(&mut self) { + if let Some(value) = self.current_bootstrap_requests.checked_sub(1) { + self.current_bootstrap_requests = value; + } else { + debug_assert!( + false, + "Could not decrement current_bootstrap_requests because it's already 0" + ); + } + + // Waking up the waker that could have been registered. + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + + pub(crate) fn poll_next_bootstrap(&mut self, cx: &mut Context<'_>) -> Poll<()> { + if self.current_bootstrap_requests > 0 { + // Some bootstrap request(s) is(are) currently running. + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } + + if let Some(throttle_delay) = &mut self.throttle_timer { + // A `throttle_timer` has been registered. It means one or more peers have been + // inserted into the routing table and that a bootstrap request should be triggered. + // However, to not risk cascading bootstrap requests, we wait a little time to ensure + // the user will not add more peers in the routing table in the next "throttle_timer" remaining. + if throttle_delay.poll_unpin(cx).is_ready() { + // The `throttle_timer` is finished, triggering bootstrap right now. + // The call to `on_started` will reset `throttle_delay`. + return Poll::Ready(()); + } + + // The `throttle_timer` is not finished but the periodic interval for triggering bootstrap might be reached. + } else { + // No new peer has recently been inserted into the routing table or automatic bootstrap is disabled. + } + + // Checking if the user has enabled the periodic bootstrap feature. + if let Some((_, delay)) = self.interval_and_delay.as_mut() { + if let Poll::Ready(()) = delay.poll_unpin(cx) { + // It is time to run the periodic bootstrap. + // The call to `on_started` will reset `delay`. + return Poll::Ready(()); + } + } else { + // The user disabled periodic bootstrap. + } + + // Registering the `waker` so that we can wake up when calling `on_new_peer_in_routing_table`. + self.waker = Some(cx.waker().clone()); + Poll::Pending + } + + #[cfg(test)] + async fn next(&mut self) { + std::future::poll_fn(|cx| self.poll_next_bootstrap(cx)).await + } +} + +/// Simple enum to indicate when the throttle timer resolves. +/// A dedicated `Immediate` variant is necessary because creating +/// `Delay::new(Duration::ZERO)` does not always actually resolve +/// immediately. +#[derive(Debug)] +enum ThrottleTimer { + Immediate, + Delay(Delay), +} + +impl From for ThrottleTimer { + fn from(value: Duration) -> Self { + if value.is_zero() { + Self::Immediate + } else { + Self::Delay(Delay::new(value)) + } + } +} + +impl futures::Future for ThrottleTimer { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut() { + Self::Immediate => Poll::Ready(()), + Self::Delay(delay) => delay.poll_unpin(cx), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use instant::Instant; + + const MS_5: Duration = Duration::from_millis(5); + const MS_100: Duration = Duration::from_millis(100); + + fn do_bootstrap(status: &mut Status) { + status.on_started(); + status.on_finish(); + } + + async fn await_and_do_bootstrap(status: &mut Status) { + status.next().await; + do_bootstrap(status); + } + + #[async_std::test] + async fn immediate_automatic_bootstrap_is_triggered_immediately() { + let mut status = Status::new(Some(Duration::from_secs(1)), Some(Duration::ZERO)); + + await_and_do_bootstrap(&mut status).await; // Wait for periodic bootstrap + + assert!( + status.next().now_or_never().is_none(), + "bootstrap to not be triggered immediately because periodic bootstrap is in ~1s" + ); + + status.on_new_peer_in_routing_table(); // Connected to a new peer though! + assert!( + status.next().now_or_never().is_some(), + "bootstrap to be triggered immediately because we connected to a new peer" + ); + + assert!( + async_std::future::timeout(Duration::from_millis(500), status.next()) + .await + .is_ok(), + "bootstrap to be triggered in less then the configured delay because we connected to a new peer" + ); + } + + #[async_std::test] + async fn delayed_automatic_bootstrap_is_triggered_before_periodic_bootstrap() { + let mut status = Status::new(Some(Duration::from_secs(1)), Some(MS_5)); + + await_and_do_bootstrap(&mut status).await; // Wait for periodic bootstrap + + assert!( + status.next().now_or_never().is_none(), + "bootstrap to not be triggered immediately because periodic bootstrap is in ~1s" + ); + + status.on_new_peer_in_routing_table(); // Connected to a new peer though! + assert!( + status.next().now_or_never().is_none(), + "bootstrap to not be triggered immediately because throttle is 5ms" + ); + + assert!( + async_std::future::timeout(MS_5 * 2, status.next()) + .await + .is_ok(), + "bootstrap to be triggered in less then the configured periodic delay because we connected to a new peer" + ); + } + + #[test] + fn given_no_periodic_bootstrap_and_immediate_automatic_bootstrap_try_on_next_connection() { + let mut status = Status::new(None, Some(Duration::ZERO)); + + // User manually triggered a bootstrap + do_bootstrap(&mut status); + + status.on_new_peer_in_routing_table(); // Connected to a new peer though! + + assert!( + status.next().now_or_never().is_some(), + "bootstrap to be triggered immediately because we connected to a new peer" + ) + } + + #[async_std::test] + async fn given_periodic_bootstrap_when_routing_table_updated_then_wont_bootstrap_until_next_interval( + ) { + let mut status = Status::new(Some(MS_100), Some(MS_5)); + + status.on_new_peer_in_routing_table(); + + let start = Instant::now(); + await_and_do_bootstrap(&mut status).await; + let elapsed = Instant::now().duration_since(start); + + assert!(elapsed < MS_5 * 2); + + let start = Instant::now(); + await_and_do_bootstrap(&mut status).await; + let elapsed = Instant::now().duration_since(start); + + assert!(elapsed > MS_100); + } + + #[async_std::test] + async fn given_no_periodic_bootstrap_and_automatic_bootstrap_when_new_entry_then_will_bootstrap( + ) { + let mut status = Status::new(None, Some(Duration::ZERO)); + + status.on_new_peer_in_routing_table(); + + status.next().await; + } + + #[async_std::test] + async fn given_periodic_bootstrap_and_no_automatic_bootstrap_triggers_periodically() { + let mut status = Status::new(Some(MS_100), None); + + let start = Instant::now(); + for i in 1..6 { + await_and_do_bootstrap(&mut status).await; + + let elapsed = Instant::now().duration_since(start); + + assert!(elapsed > (i * MS_100 - Duration::from_millis(10))); // Subtract 10ms to avoid flakes. + } + } + + #[async_std::test] + async fn given_no_periodic_bootstrap_and_automatic_bootstrap_reset_throttle_when_multiple_peers( + ) { + let mut status = Status::new(None, Some(MS_100)); + + status.on_new_peer_in_routing_table(); + for _ in 0..10 { + Delay::new(MS_100 / 2).await; + status.on_new_peer_in_routing_table(); // should reset throttle_timer + } + assert!( + status.next().now_or_never().is_none(), + "bootstrap to not be triggered immediately because throttle has been reset" + ); + + Delay::new(MS_100 - MS_5).await; + + assert!( + async_std::future::timeout(MS_5*2, status.next()) + .await + .is_ok(), + "bootstrap to be triggered in the configured throttle delay because we connected to a new peer" + ); + } + + #[async_std::test] + async fn given_periodic_bootstrap_and_no_automatic_bootstrap_manually_triggering_prevent_periodic( + ) { + let mut status = Status::new(Some(MS_100), None); + + status.on_started(); // first manually triggering + status.on_started(); // second manually triggering + status.on_finish(); // one finishes + + assert!( + async_std::future::timeout(10 * MS_100, status.next()) + .await + .is_err(), + "periodic bootstrap to never be triggered because one is still being run" + ); + + status.on_finish(); // all manual bootstrap finished + + assert!( + status.next().now_or_never().is_some(), + "bootstrap to be triggered immediately because no more bootstrap requests are running" + ) + } +} diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 519b67f9d7a..bc01b9fd3ce 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -37,6 +37,7 @@ mod addresses; mod behaviour; +mod bootstrap; mod handler; mod jobs; mod kbucket;