From a6576cd1f47c1f3622ddaeb407711015a4eac986 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Wed, 4 Dec 2024 05:17:09 +0530 Subject: [PATCH] --wip-- [skip ci] --- ant-networking/Cargo.toml | 1 + ant-networking/src/driver.rs | 103 +++++++++++++++++++++++++++--- ant-networking/src/event/kad.rs | 3 +- ant-networking/src/event/mod.rs | 10 ++- ant-networking/src/event/swarm.rs | 24 ++++++- ant-node/Cargo.toml | 1 + ant-node/src/node.rs | 1 - 7 files changed, 126 insertions(+), 17 deletions(-) diff --git a/ant-networking/Cargo.toml b/ant-networking/Cargo.toml index 98613fabf8..e1a9d7d20c 100644 --- a/ant-networking/Cargo.toml +++ b/ant-networking/Cargo.toml @@ -21,6 +21,7 @@ websockets = ["libp2p/tcp"] [dependencies] aes-gcm-siv = "0.11.1" +ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" } ant-build-info = { path = "../ant-build-info", version = "0.1.19" } ant-evm = { path = "../ant-evm", version = "0.1.4" } ant-protocol = { path = "../ant-protocol", version = "0.17.15" } diff --git a/ant-networking/src/driver.rs b/ant-networking/src/driver.rs index a9792700da..bf88693d0f 100644 --- a/ant-networking/src/driver.rs +++ b/ant-networking/src/driver.rs @@ -30,6 +30,7 @@ use crate::{ }; use crate::{transport, NodeIssue}; +use ant_bootstrap::BootstrapCacheStore; use ant_evm::PaymentQuote; use ant_protocol::{ messages::{ChunkProof, Nonce, Request, Response}, @@ -71,8 +72,11 @@ use std::{ num::NonZeroUsize, path::PathBuf, }; -use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; +use tokio::{ + sync::{mpsc, oneshot}, + time::Interval, +}; use tracing::warn; use xor_name::XorName; @@ -260,13 +264,13 @@ pub(super) struct NodeBehaviour { #[derive(Debug)] pub struct NetworkBuilder { + bootstrap_cache: Option, is_behind_home_network: bool, keypair: Keypair, local: bool, listen_addr: Option, request_timeout: Option, concurrency_limit: Option, - initial_peers: Vec, #[cfg(feature = "open-metrics")] metrics_registries: Option, #[cfg(feature = "open-metrics")] @@ -278,13 +282,13 @@ pub struct NetworkBuilder { impl NetworkBuilder { pub fn new(keypair: Keypair, local: bool) -> Self { Self { + bootstrap_cache: None, is_behind_home_network: false, keypair, local, listen_addr: None, request_timeout: None, concurrency_limit: None, - initial_peers: Default::default(), #[cfg(feature = "open-metrics")] metrics_registries: None, #[cfg(feature = "open-metrics")] @@ -294,6 +298,10 @@ impl NetworkBuilder { } } + pub fn bootstrap_cache(&mut self, bootstrap_cache: BootstrapCacheStore) { + self.bootstrap_cache = Some(bootstrap_cache); + } + pub fn is_behind_home_network(&mut self, enable: bool) { self.is_behind_home_network = enable; } @@ -310,10 +318,6 @@ impl NetworkBuilder { self.concurrency_limit = Some(concurrency_limit); } - pub fn initial_peers(&mut self, initial_peers: Vec) { - self.initial_peers = initial_peers; - } - /// Set the registries used inside the metrics server. /// Configure the `metrics_server_port` to enable the metrics server. #[cfg(feature = "open-metrics")] @@ -720,6 +724,7 @@ impl NetworkBuilder { close_group: Vec::with_capacity(CLOSE_GROUP_SIZE), peers_in_rt: 0, bootstrap, + bootstrap_cache: self.bootstrap_cache, relay_manager, connected_relay_clients: Default::default(), external_address_manager, @@ -815,6 +820,7 @@ pub struct SwarmDriver { pub(crate) close_group: Vec, pub(crate) peers_in_rt: usize, pub(crate) bootstrap: ContinuousNetworkDiscover, + pub(crate) bootstrap_cache: Option, pub(crate) external_address_manager: Option, pub(crate) relay_manager: Option, /// The peers that are using our relay service. @@ -843,7 +849,7 @@ pub struct SwarmDriver { pub(crate) bootstrap_peers: BTreeMap, HashSet>, // Peers that having live connection to. Any peer got contacted during kad network query // will have live connection established. And they may not appear in the RT. - pub(crate) live_connected_peers: BTreeMap, + pub(crate) live_connected_peers: BTreeMap, /// The list of recently established connections ids. /// This is used to prevent log spamming. pub(crate) latest_established_connection_ids: HashMap, @@ -876,6 +882,17 @@ impl SwarmDriver { let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL); let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL); + let mut bootstrap_cache_save_interval = self.bootstrap_cache.as_ref().and_then(|cache| { + if cache.config().disable_cache_writing { + None + } else { + // add a variance of 10% to the interval, to avoid all nodes writing to disk at the same time. + let duration = + Self::duration_with_variance(cache.config().min_cache_save_duration, 10); + Some(interval(duration)) + } + }); + // temporarily skip processing IncomingConnectionError swarm event to avoid log spamming let mut previous_incoming_connection_error_event = None; loop { @@ -1005,6 +1022,36 @@ impl SwarmDriver { relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes) } }, + Some(_) = Self::conditional_interval(&mut bootstrap_cache_save_interval) => { + let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() else { + continue; + }; + let Some(current_interval) = bootstrap_cache_save_interval.as_mut() else { + continue; + }; + + if let Err(err) = bootstrap_cache.sync_and_save_to_disk(true).await { + error!("Failed to save bootstrap cache: {err}"); + } + + if current_interval.period() >= bootstrap_cache.config().max_cache_save_duration { + continue; + } + + // add a variance of 1% to the max interval to avoid all nodes writing to disk at the same time. + let max_cache_save_duration = + Self::duration_with_variance(bootstrap_cache.config().max_cache_save_duration, 1); + + // scale up the interval until we reach the max + let new_duration = Duration::from_secs( + std::cmp::min( + current_interval.period().as_secs() * bootstrap_cache.config().cache_save_scaling_factor, + max_cache_save_duration.as_secs(), + )); + debug!("Scaling up the bootstrap cache save interval to {new_duration:?}"); + *current_interval = interval(new_duration); + + }, } } } @@ -1156,13 +1203,35 @@ impl SwarmDriver { info!("Listening on {id:?} with addr: {addr:?}"); Ok(()) } + + /// Returns a new duration that is within +/- variance of the provided duration. + fn duration_with_variance(duration: Duration, variance: u32) -> Duration { + let actual_variance = duration / variance; + let random_adjustment = + Duration::from_secs(rand::thread_rng().gen_range(0..actual_variance.as_secs())); + if random_adjustment.as_secs() % 2 == 0 { + duration - random_adjustment + } else { + duration + random_adjustment + } + } + + /// To tick an optional interval inside tokio::select! without looping forever. + async fn conditional_interval(i: &mut Option) -> Option<()> { + match i { + Some(i) => { + i.tick().await; + Some(()) + } + None => None, + } + } } #[cfg(test)] mod tests { use super::check_and_wipe_storage_dir_if_necessary; - - use std::{fs, io::Read}; + use std::{fs, io::Read, time::Duration}; #[tokio::test] async fn version_file_update() { @@ -1219,4 +1288,18 @@ mod tests { // The storage_dir shall be removed as version_key changed assert!(fs::metadata(storage_dir.clone()).is_err()); } + + #[tokio::test] + async fn test_duration_variance_fn() { + let duration = Duration::from_secs(100); + let variance = 10; + for _ in 0..10000 { + let new_duration = crate::SwarmDriver::duration_with_variance(duration, variance); + if new_duration < duration - duration / variance + || new_duration > duration + duration / variance + { + panic!("new_duration: {new_duration:?} is not within the expected range",); + } + } + } } diff --git a/ant-networking/src/event/kad.rs b/ant-networking/src/event/kad.rs index 5934b11bfa..1af95f9d1d 100644 --- a/ant-networking/src/event/kad.rs +++ b/ant-networking/src/event/kad.rs @@ -242,11 +242,12 @@ impl SwarmDriver { peer, is_new_peer, old_peer, + addresses, .. } => { event_string = "kad_event::RoutingUpdated"; if is_new_peer { - self.update_on_peer_addition(peer); + self.update_on_peer_addition(peer, addresses); // This should only happen once if self.bootstrap.notify_new_peer() { diff --git a/ant-networking/src/event/mod.rs b/ant-networking/src/event/mod.rs index ad44f83da2..ae6e2aefca 100644 --- a/ant-networking/src/event/mod.rs +++ b/ant-networking/src/event/mod.rs @@ -16,7 +16,7 @@ use custom_debug::Debug as CustomDebug; #[cfg(feature = "local")] use libp2p::mdns; use libp2p::{ - kad::{Record, RecordKey, K_VALUE}, + kad::{Addresses, Record, RecordKey, K_VALUE}, request_response::ResponseChannel as PeerResponseChannel, Multiaddr, PeerId, }; @@ -232,7 +232,7 @@ impl SwarmDriver { } /// Update state on addition of a peer to the routing table. - pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId) { + pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) { self.peers_in_rt = self.peers_in_rt.saturating_add(1); let n_peers = self.peers_in_rt; info!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers"); @@ -240,6 +240,12 @@ impl SwarmDriver { #[cfg(feature = "loud")] println!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers"); + if let Some(bootstrap_cache) = &mut self.bootstrap_cache { + for addr in addresses.iter() { + bootstrap_cache.add_addr(addr.clone()); + } + } + self.log_kbuckets(&added_peer); self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt)); diff --git a/ant-networking/src/event/swarm.rs b/ant-networking/src/event/swarm.rs index c5fad1256b..6d0c283a0c 100644 --- a/ant-networking/src/event/swarm.rs +++ b/ant-networking/src/event/swarm.rs @@ -375,8 +375,17 @@ impl SwarmDriver { let _ = self.live_connected_peers.insert( connection_id, - (peer_id, Instant::now() + Duration::from_secs(60)), + ( + peer_id, + endpoint.get_remote_address().clone(), + Instant::now() + Duration::from_secs(60), + ), ); + + if let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() { + bootstrap_cache.update_addr_status(endpoint.get_remote_address(), true); + } + self.insert_latest_established_connection_ids( connection_id, endpoint.get_remote_address(), @@ -406,7 +415,7 @@ impl SwarmDriver { } => { event_string = "OutgoingConnErr"; warn!("OutgoingConnectionError to {failed_peer_id:?} on {connection_id:?} - {error:?}"); - let _ = self.live_connected_peers.remove(&connection_id); + let connection_details = self.live_connected_peers.remove(&connection_id); self.record_connection_metrics(); // we need to decide if this was a critical error and the peer should be removed from the routing table @@ -506,6 +515,15 @@ impl SwarmDriver { } }; + // Just track failures during outgoing connection with `failed_peer_id` inside the bootstrap cache. + // OutgoingConnectionError without peer_id can happen when dialing multiple addresses of a peer. + // And similarly IncomingConnectionError can happen when a peer has multiple transports/listen addrs. + if let (Some((_, failed_addr, _)), Some(bootstrap_cache)) = + (connection_details, self.bootstrap_cache.as_mut()) + { + bootstrap_cache.update_addr_status(&failed_addr, false); + } + if should_clean_peer { warn!("Tracking issue of {failed_peer_id:?}. Clearing it out for now"); @@ -641,7 +659,7 @@ impl SwarmDriver { self.last_connection_pruning_time = Instant::now(); let mut removed_conns = 0; - self.live_connected_peers.retain(|connection_id, (peer_id, timeout_time)| { + self.live_connected_peers.retain(|connection_id, (peer_id, _addr, timeout_time)| { // skip if timeout isn't reached yet if Instant::now() < *timeout_time { diff --git a/ant-node/Cargo.toml b/ant-node/Cargo.toml index a1a5700b64..283dc940a3 100644 --- a/ant-node/Cargo.toml +++ b/ant-node/Cargo.toml @@ -28,6 +28,7 @@ upnp = ["ant-networking/upnp"] websockets = ["ant-networking/websockets"] [dependencies] +ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" } ant-build-info = { path = "../ant-build-info", version = "0.1.19" } ant-evm = { path = "../ant-evm", version = "0.1.4" } ant-logging = { path = "../ant-logging", version = "0.2.40" } diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index c1ea235239..e9d2b14155 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -163,7 +163,6 @@ impl NodeBuilder { network_builder.listen_addr(self.addr); #[cfg(feature = "open-metrics")] network_builder.metrics_server_port(self.metrics_server_port); - network_builder.initial_peers(self.initial_peers.clone()); network_builder.is_behind_home_network(self.is_behind_home_network); #[cfg(feature = "upnp")]