diff --git a/Cargo.lock b/Cargo.lock index bed4a26d61..0fa6aa094e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -759,6 +759,7 @@ dependencies = [ name = "ant-cli" version = "0.1.5" dependencies = [ + "ant-bootstrap", "ant-build-info", "ant-logging", "ant-peers-acquisition", @@ -852,6 +853,7 @@ name = "ant-networking" version = "0.19.5" dependencies = [ "aes-gcm-siv", + "ant-bootstrap", "ant-build-info", "ant-evm", "ant-protocol", @@ -898,6 +900,7 @@ dependencies = [ name = "ant-node" version = "0.112.6" dependencies = [ + "ant-bootstrap", "ant-build-info", "ant-evm", "ant-logging", diff --git a/ant-cli/Cargo.toml b/ant-cli/Cargo.toml index 7f1983fcfa..05cbd82eac 100644 --- a/ant-cli/Cargo.toml +++ b/ant-cli/Cargo.toml @@ -25,6 +25,7 @@ name = "files" harness = false [dependencies] +ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" } ant-build-info = { path = "../ant-build-info", version = "0.1.19" } ant-logging = { path = "../ant-logging", version = "0.2.40" } ant-peers-acquisition = { path = "../ant-peers-acquisition", version = "0.5.7" } diff --git a/ant-cli/src/access/network.rs b/ant-cli/src/access/network.rs index fb7d5fe597..45f049e31f 100644 --- a/ant-cli/src/access/network.rs +++ b/ant-cli/src/access/network.rs @@ -6,15 +6,14 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use ant_peers_acquisition::PeersArgs; -use ant_peers_acquisition::ANT_PEERS_ENV; +use ant_bootstrap::{PeersArgs, ANT_PEERS_ENV}; use autonomi::Multiaddr; use color_eyre::eyre::Context; use color_eyre::Result; use color_eyre::Section; pub async fn get_peers(peers: PeersArgs) -> Result> { - peers.get_peers().await + peers.get_addrs().await .wrap_err("Please provide valid Network peers to connect to") .with_suggestion(|| format!("make sure you've provided network peers using the --peers option or the {ANT_PEERS_ENV} env var")) .with_suggestion(|| "a peer address looks like this: /ip4/42.42.42.42/udp/4242/quic-v1/p2p/B64nodePeerIDvdjb3FAJF4ks3moreBase64CharsHere") diff --git a/ant-cli/src/commands.rs b/ant-cli/src/commands.rs index 663898b6ea..a1d1fd487a 100644 --- a/ant-cli/src/commands.rs +++ b/ant-cli/src/commands.rs @@ -11,11 +11,10 @@ mod register; mod vault; mod wallet; +use crate::opt::Opt; use clap::Subcommand; use color_eyre::Result; -use crate::opt::Opt; - #[derive(Subcommand, Debug)] pub enum SubCmd { /// Operations related to file handling. diff --git a/ant-cli/src/opt.rs b/ant-cli/src/opt.rs index 804156e4bd..3e84379fc0 100644 --- a/ant-cli/src/opt.rs +++ b/ant-cli/src/opt.rs @@ -6,14 +6,12 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use std::time::Duration; - +use crate::commands::SubCmd; +use ant_bootstrap::PeersArgs; use ant_logging::{LogFormat, LogOutputDest}; -use ant_peers_acquisition::PeersArgs; use clap::Parser; use color_eyre::Result; - -use crate::commands::SubCmd; +use std::time::Duration; // Please do not remove the blank lines in these doc comments. // They are used for inserting line breaks when the help menu is rendered in the UI. diff --git a/ant-networking/Cargo.toml b/ant-networking/Cargo.toml index 98613fabf8..e1a9d7d20c 100644 --- a/ant-networking/Cargo.toml +++ b/ant-networking/Cargo.toml @@ -21,6 +21,7 @@ websockets = ["libp2p/tcp"] [dependencies] aes-gcm-siv = "0.11.1" +ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" } ant-build-info = { path = "../ant-build-info", version = "0.1.19" } ant-evm = { path = "../ant-evm", version = "0.1.4" } ant-protocol = { path = "../ant-protocol", version = "0.17.15" } diff --git a/ant-networking/src/driver.rs b/ant-networking/src/driver.rs index a9792700da..87df73825b 100644 --- a/ant-networking/src/driver.rs +++ b/ant-networking/src/driver.rs @@ -30,6 +30,7 @@ use crate::{ }; use crate::{transport, NodeIssue}; +use ant_bootstrap::BootstrapCacheStore; use ant_evm::PaymentQuote; use ant_protocol::{ messages::{ChunkProof, Nonce, Request, Response}, @@ -71,8 +72,11 @@ use std::{ num::NonZeroUsize, path::PathBuf, }; -use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; +use tokio::{ + sync::{mpsc, oneshot}, + time::Interval, +}; use tracing::warn; use xor_name::XorName; @@ -260,13 +264,13 @@ pub(super) struct NodeBehaviour { #[derive(Debug)] pub struct NetworkBuilder { + bootstrap_cache: Option, is_behind_home_network: bool, keypair: Keypair, local: bool, listen_addr: Option, request_timeout: Option, concurrency_limit: Option, - initial_peers: Vec, #[cfg(feature = "open-metrics")] metrics_registries: Option, #[cfg(feature = "open-metrics")] @@ -278,13 +282,13 @@ pub struct NetworkBuilder { impl NetworkBuilder { pub fn new(keypair: Keypair, local: bool) -> Self { Self { + bootstrap_cache: None, is_behind_home_network: false, keypair, local, listen_addr: None, request_timeout: None, concurrency_limit: None, - initial_peers: Default::default(), #[cfg(feature = "open-metrics")] metrics_registries: None, #[cfg(feature = "open-metrics")] @@ -294,6 +298,10 @@ impl NetworkBuilder { } } + pub fn bootstrap_cache(&mut self, bootstrap_cache: BootstrapCacheStore) { + self.bootstrap_cache = Some(bootstrap_cache); + } + pub fn is_behind_home_network(&mut self, enable: bool) { self.is_behind_home_network = enable; } @@ -310,10 +318,6 @@ impl NetworkBuilder { self.concurrency_limit = Some(concurrency_limit); } - pub fn initial_peers(&mut self, initial_peers: Vec) { - self.initial_peers = initial_peers; - } - /// Set the registries used inside the metrics server. /// Configure the `metrics_server_port` to enable the metrics server. #[cfg(feature = "open-metrics")] @@ -720,6 +724,7 @@ impl NetworkBuilder { close_group: Vec::with_capacity(CLOSE_GROUP_SIZE), peers_in_rt: 0, bootstrap, + bootstrap_cache: self.bootstrap_cache, relay_manager, connected_relay_clients: Default::default(), external_address_manager, @@ -815,6 +820,7 @@ pub struct SwarmDriver { pub(crate) close_group: Vec, pub(crate) peers_in_rt: usize, pub(crate) bootstrap: ContinuousNetworkDiscover, + pub(crate) bootstrap_cache: Option, pub(crate) external_address_manager: Option, pub(crate) relay_manager: Option, /// The peers that are using our relay service. @@ -843,7 +849,7 @@ pub struct SwarmDriver { pub(crate) bootstrap_peers: BTreeMap, HashSet>, // Peers that having live connection to. Any peer got contacted during kad network query // will have live connection established. And they may not appear in the RT. - pub(crate) live_connected_peers: BTreeMap, + pub(crate) live_connected_peers: BTreeMap, /// The list of recently established connections ids. /// This is used to prevent log spamming. pub(crate) latest_established_connection_ids: HashMap, @@ -876,6 +882,24 @@ impl SwarmDriver { let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL); let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL); + let mut bootstrap_cache_save_interval = self.bootstrap_cache.as_ref().and_then(|cache| { + if cache.config().disable_cache_writing { + None + } else { + // add a variance of 10% to the interval, to avoid all nodes writing to disk at the same time. + let duration = + Self::duration_with_variance(cache.config().min_cache_save_duration, 10); + Some(interval(duration)) + } + }); + if let Some(interval) = bootstrap_cache_save_interval.as_mut() { + interval.tick().await; // first tick completes immediately + info!( + "Bootstrap cache save interval is set to {:?}", + interval.period() + ); + } + // temporarily skip processing IncomingConnectionError swarm event to avoid log spamming let mut previous_incoming_connection_error_event = None; loop { @@ -1005,6 +1029,37 @@ impl SwarmDriver { relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes) } }, + Some(()) = Self::conditional_interval(&mut bootstrap_cache_save_interval) => { + let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() else { + continue; + }; + let Some(current_interval) = bootstrap_cache_save_interval.as_mut() else { + continue; + }; + + if let Err(err) = bootstrap_cache.sync_and_save_to_disk(true).await { + error!("Failed to save bootstrap cache: {err}"); + } + + if current_interval.period() >= bootstrap_cache.config().max_cache_save_duration { + continue; + } + + // add a variance of 1% to the max interval to avoid all nodes writing to disk at the same time. + let max_cache_save_duration = + Self::duration_with_variance(bootstrap_cache.config().max_cache_save_duration, 1); + + // scale up the interval until we reach the max + let new_duration = Duration::from_secs( + std::cmp::min( + current_interval.period().as_secs() * bootstrap_cache.config().cache_save_scaling_factor, + max_cache_save_duration.as_secs(), + )); + info!("Scaling up the bootstrap cache save interval to {new_duration:?}"); + *current_interval = interval(new_duration); + current_interval.tick().await; // first tick completes immediately + + }, } } } @@ -1156,13 +1211,35 @@ impl SwarmDriver { info!("Listening on {id:?} with addr: {addr:?}"); Ok(()) } + + /// Returns a new duration that is within +/- variance of the provided duration. + fn duration_with_variance(duration: Duration, variance: u32) -> Duration { + let actual_variance = duration / variance; + let random_adjustment = + Duration::from_secs(rand::thread_rng().gen_range(0..actual_variance.as_secs())); + if random_adjustment.as_secs() % 2 == 0 { + duration - random_adjustment + } else { + duration + random_adjustment + } + } + + /// To tick an optional interval inside tokio::select! without looping forever. + async fn conditional_interval(i: &mut Option) -> Option<()> { + match i { + Some(i) => { + i.tick().await; + Some(()) + } + None => None, + } + } } #[cfg(test)] mod tests { use super::check_and_wipe_storage_dir_if_necessary; - - use std::{fs, io::Read}; + use std::{fs, io::Read, time::Duration}; #[tokio::test] async fn version_file_update() { @@ -1219,4 +1296,18 @@ mod tests { // The storage_dir shall be removed as version_key changed assert!(fs::metadata(storage_dir.clone()).is_err()); } + + #[tokio::test] + async fn test_duration_variance_fn() { + let duration = Duration::from_secs(100); + let variance = 10; + for _ in 0..10000 { + let new_duration = crate::SwarmDriver::duration_with_variance(duration, variance); + if new_duration < duration - duration / variance + || new_duration > duration + duration / variance + { + panic!("new_duration: {new_duration:?} is not within the expected range",); + } + } + } } diff --git a/ant-networking/src/event/kad.rs b/ant-networking/src/event/kad.rs index 5934b11bfa..1af95f9d1d 100644 --- a/ant-networking/src/event/kad.rs +++ b/ant-networking/src/event/kad.rs @@ -242,11 +242,12 @@ impl SwarmDriver { peer, is_new_peer, old_peer, + addresses, .. } => { event_string = "kad_event::RoutingUpdated"; if is_new_peer { - self.update_on_peer_addition(peer); + self.update_on_peer_addition(peer, addresses); // This should only happen once if self.bootstrap.notify_new_peer() { diff --git a/ant-networking/src/event/mod.rs b/ant-networking/src/event/mod.rs index ad44f83da2..ae6e2aefca 100644 --- a/ant-networking/src/event/mod.rs +++ b/ant-networking/src/event/mod.rs @@ -16,7 +16,7 @@ use custom_debug::Debug as CustomDebug; #[cfg(feature = "local")] use libp2p::mdns; use libp2p::{ - kad::{Record, RecordKey, K_VALUE}, + kad::{Addresses, Record, RecordKey, K_VALUE}, request_response::ResponseChannel as PeerResponseChannel, Multiaddr, PeerId, }; @@ -232,7 +232,7 @@ impl SwarmDriver { } /// Update state on addition of a peer to the routing table. - pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId) { + pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) { self.peers_in_rt = self.peers_in_rt.saturating_add(1); let n_peers = self.peers_in_rt; info!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers"); @@ -240,6 +240,12 @@ impl SwarmDriver { #[cfg(feature = "loud")] println!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers"); + if let Some(bootstrap_cache) = &mut self.bootstrap_cache { + for addr in addresses.iter() { + bootstrap_cache.add_addr(addr.clone()); + } + } + self.log_kbuckets(&added_peer); self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt)); diff --git a/ant-networking/src/event/swarm.rs b/ant-networking/src/event/swarm.rs index c5fad1256b..6d0c283a0c 100644 --- a/ant-networking/src/event/swarm.rs +++ b/ant-networking/src/event/swarm.rs @@ -375,8 +375,17 @@ impl SwarmDriver { let _ = self.live_connected_peers.insert( connection_id, - (peer_id, Instant::now() + Duration::from_secs(60)), + ( + peer_id, + endpoint.get_remote_address().clone(), + Instant::now() + Duration::from_secs(60), + ), ); + + if let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() { + bootstrap_cache.update_addr_status(endpoint.get_remote_address(), true); + } + self.insert_latest_established_connection_ids( connection_id, endpoint.get_remote_address(), @@ -406,7 +415,7 @@ impl SwarmDriver { } => { event_string = "OutgoingConnErr"; warn!("OutgoingConnectionError to {failed_peer_id:?} on {connection_id:?} - {error:?}"); - let _ = self.live_connected_peers.remove(&connection_id); + let connection_details = self.live_connected_peers.remove(&connection_id); self.record_connection_metrics(); // we need to decide if this was a critical error and the peer should be removed from the routing table @@ -506,6 +515,15 @@ impl SwarmDriver { } }; + // Just track failures during outgoing connection with `failed_peer_id` inside the bootstrap cache. + // OutgoingConnectionError without peer_id can happen when dialing multiple addresses of a peer. + // And similarly IncomingConnectionError can happen when a peer has multiple transports/listen addrs. + if let (Some((_, failed_addr, _)), Some(bootstrap_cache)) = + (connection_details, self.bootstrap_cache.as_mut()) + { + bootstrap_cache.update_addr_status(&failed_addr, false); + } + if should_clean_peer { warn!("Tracking issue of {failed_peer_id:?}. Clearing it out for now"); @@ -641,7 +659,7 @@ impl SwarmDriver { self.last_connection_pruning_time = Instant::now(); let mut removed_conns = 0; - self.live_connected_peers.retain(|connection_id, (peer_id, timeout_time)| { + self.live_connected_peers.retain(|connection_id, (peer_id, _addr, timeout_time)| { // skip if timeout isn't reached yet if Instant::now() < *timeout_time { diff --git a/ant-node/Cargo.toml b/ant-node/Cargo.toml index a1a5700b64..283dc940a3 100644 --- a/ant-node/Cargo.toml +++ b/ant-node/Cargo.toml @@ -28,6 +28,7 @@ upnp = ["ant-networking/upnp"] websockets = ["ant-networking/websockets"] [dependencies] +ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" } ant-build-info = { path = "../ant-build-info", version = "0.1.19" } ant-evm = { path = "../ant-evm", version = "0.1.4" } ant-logging = { path = "../ant-logging", version = "0.2.40" } diff --git a/ant-node/src/bin/antnode/main.rs b/ant-node/src/bin/antnode/main.rs index cebbc0857c..caae71685f 100644 --- a/ant-node/src/bin/antnode/main.rs +++ b/ant-node/src/bin/antnode/main.rs @@ -13,12 +13,12 @@ mod rpc_service; mod subcommands; use crate::subcommands::EvmNetworkCommand; +use ant_bootstrap::{BootstrapCacheConfig, BootstrapCacheStore, PeersArgs}; use ant_evm::{get_evm_network_from_env, EvmNetwork, RewardsAddress}; #[cfg(feature = "metrics")] use ant_logging::metrics::init_metrics; use ant_logging::{Level, LogFormat, LogOutputDest, ReloadHandle}; use ant_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver}; -use ant_peers_acquisition::PeersArgs; use ant_protocol::{ node::get_antnode_root_dir, node_rpc::{NodeCtrl, StopResult}, @@ -172,12 +172,6 @@ struct Opt { #[clap(long)] rpc: Option, - /// Run the node in local mode. - /// - /// When this flag is set, we will not filter out local addresses that we observe. - #[clap(long)] - local: bool, - /// Specify the owner(readable discord user name). #[clap(long)] owner: Option, @@ -271,7 +265,9 @@ fn main() -> Result<()> { init_logging(&opt, keypair.public().to_peer_id())?; let rt = Runtime::new()?; - let bootstrap_peers = rt.block_on(opt.peers.get_peers())?; + let mut bootstrap_cache = BootstrapCacheStore::empty(BootstrapCacheConfig::default_config()?)?; + rt.block_on(bootstrap_cache.initialize_from_peers_arg(&opt.peers))?; + let msg = format!( "Running {} v{}", env!("CARGO_BIN_NAME"), @@ -285,7 +281,10 @@ fn main() -> Result<()> { ant_build_info::git_info() ); - info!("Node started with initial_peers {bootstrap_peers:?}"); + info!( + "Node started with bootstrap cache containing {} peers", + bootstrap_cache.peer_count() + ); // Create a tokio runtime per `run_node` attempt, this ensures // any spawned tasks are closed before we would attempt to run @@ -299,13 +298,13 @@ fn main() -> Result<()> { rewards_address, evm_network, node_socket_addr, - bootstrap_peers, - opt.local, + opt.peers.local, root_dir, #[cfg(feature = "upnp")] opt.upnp, ); - node_builder.is_behind_home_network = opt.home_network; + node_builder.bootstrap_cache(bootstrap_cache); + node_builder.is_behind_home_network(opt.home_network); #[cfg(feature = "open-metrics")] let mut node_builder = node_builder; // if enable flag is provided or only if the port is specified then enable the server by setting Some() diff --git a/ant-node/src/error.rs b/ant-node/src/error.rs index 86aba2df5c..4a80796eb2 100644 --- a/ant-node/src/error.rs +++ b/ant-node/src/error.rs @@ -81,6 +81,8 @@ pub enum Error { // ---------- Initialize Errors #[error("Failed to generate a reward key")] FailedToGenerateRewardKey, + #[error("Cannot set both initial_peers and bootstrap_cache")] + InitialPeersAndBootstrapCacheSet, // ---------- Miscellaneous Errors #[error("Failed to obtain node's current port")] diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index c1ea235239..c3b2ab710c 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -11,7 +11,8 @@ use super::{ }; #[cfg(feature = "open-metrics")] use crate::metrics::NodeMetricsRecorder; -use crate::RunningNode; +use crate::{error::Error, RunningNode}; +use ant_bootstrap::BootstrapCacheStore; use ant_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use ant_networking::MetricsRegistries; @@ -81,41 +82,42 @@ const NETWORK_DENSITY_SAMPLING_INTERVAL_MAX_S: u64 = 200; /// Helper to build and run a Node pub struct NodeBuilder { + bootstrap_cache: Option, + initial_peers: Vec, identity_keypair: Keypair, evm_address: RewardsAddress, evm_network: EvmNetwork, addr: SocketAddr, - initial_peers: Vec, local: bool, root_dir: PathBuf, #[cfg(feature = "open-metrics")] /// Set to Some to enable the metrics server metrics_server_port: Option, /// Enable hole punching for nodes connecting from home networks. - pub is_behind_home_network: bool, + is_behind_home_network: bool, #[cfg(feature = "upnp")] upnp: bool, } impl NodeBuilder { - /// Instantiate the builder - #[expect(clippy::too_many_arguments)] + /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method + /// or fetched from the bootstrap cache set using `bootstrap_cache` method. pub fn new( identity_keypair: Keypair, evm_address: RewardsAddress, evm_network: EvmNetwork, addr: SocketAddr, - initial_peers: Vec, local: bool, root_dir: PathBuf, #[cfg(feature = "upnp")] upnp: bool, ) -> Self { Self { + bootstrap_cache: None, + initial_peers: vec![], identity_keypair, evm_address, evm_network, addr, - initial_peers, local, root_dir, #[cfg(feature = "open-metrics")] @@ -132,6 +134,21 @@ impl NodeBuilder { self.metrics_server_port = port; } + /// Set the initialized bootstrap cache. This is mutually exclusive with `initial_peers` + pub fn bootstrap_cache(&mut self, cache: BootstrapCacheStore) { + self.bootstrap_cache = Some(cache); + } + + /// Set the initial peers to dial at startup. This is mutually exclusive with `bootstrap_cache` + pub fn initial_peers(&mut self, peers: Vec) { + self.initial_peers = peers; + } + + /// Set the flag to indicate if the node is behind a home network + pub fn is_behind_home_network(&mut self, is_behind_home_network: bool) { + self.is_behind_home_network = is_behind_home_network; + } + /// Asynchronously runs a new node instance, setting up the swarm driver, /// creating a data storage, and handling network events. Returns the /// created `RunningNode` which contains a `NodeEventsChannel` for listening @@ -160,11 +177,25 @@ impl NodeBuilder { None }; + if !self.initial_peers.is_empty() && self.bootstrap_cache.is_some() { + return Err(Error::InitialPeersAndBootstrapCacheSet); + } + + let initial_peers = if !self.initial_peers.is_empty() { + self.initial_peers.clone() + } else if let Some(cache) = &self.bootstrap_cache { + cache.get_unique_peer_addr().cloned().collect() + } else { + vec![] + }; + network_builder.listen_addr(self.addr); #[cfg(feature = "open-metrics")] network_builder.metrics_server_port(self.metrics_server_port); - network_builder.initial_peers(self.initial_peers.clone()); network_builder.is_behind_home_network(self.is_behind_home_network); + if let Some(cache) = self.bootstrap_cache { + network_builder.bootstrap_cache(cache); + } #[cfg(feature = "upnp")] network_builder.upnp(self.upnp); @@ -176,7 +207,7 @@ impl NodeBuilder { let node = NodeInner { network: network.clone(), events_channel: node_events_channel.clone(), - initial_peers: self.initial_peers, + initial_peers, reward_address: self.evm_address, #[cfg(feature = "open-metrics")] metrics_recorder, diff --git a/ant-node/src/python.rs b/ant-node/src/python.rs index 954609b830..3d50520940 100644 --- a/ant-node/src/python.rs +++ b/ant-node/src/python.rs @@ -102,13 +102,13 @@ impl AntNode { rewards_address, evm_network, node_socket_addr, - initial_peers, local, root_dir.unwrap_or_else(|| PathBuf::from(".")), #[cfg(feature = "upnp")] false, ); - node_builder.is_behind_home_network = home_network; + node_builder.initial_peers(initial_peers); + node_builder.is_behind_home_network(home_network); node_builder .build_and_run()