diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 128dddd601..a250acc2b2 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -727,12 +727,6 @@ jobs: echo "EVM_NETWORK has been set to $EVM_NETWORK" fi - - name: Verify the routing tables of the nodes - run: cargo test --release -p ant-node --test verify_routing_table -- --nocapture - env: - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 5 - - name: Verify the location of the data on the network run: cargo test --release -p ant-node --test verify_data_location -- --nocapture env: diff --git a/ant-bootstrap/src/cache_store.rs b/ant-bootstrap/src/cache_store.rs index d9d9bd131a..760c243b1a 100644 --- a/ant-bootstrap/src/cache_store.rs +++ b/ant-bootstrap/src/cache_store.rs @@ -184,13 +184,13 @@ impl BootstrapCacheStore { let mut config = if let Some(cfg) = config { cfg } else { - BootstrapCacheConfig::default_config()? + BootstrapCacheConfig::default_config(peers_arg.local)? }; if let Some(bootstrap_cache_path) = peers_arg.get_bootstrap_cache_path()? { config.cache_file_path = bootstrap_cache_path; } - let mut store = Self::new(config)?; + let store = Self::new(config)?; // If it is the first node, clear the cache. if peers_arg.first { @@ -198,12 +198,6 @@ impl BootstrapCacheStore { store.write()?; } - // If local mode is enabled, return empty store (will use mDNS) - if peers_arg.local { - info!("Setting config to not write to cache, as 'local' mode is enabled"); - store.config.disable_cache_writing = true; - } - Ok(store) } diff --git a/ant-bootstrap/src/config.rs b/ant-bootstrap/src/config.rs index b81c6377d8..6e5d1d3782 100644 --- a/ant-bootstrap/src/config.rs +++ b/ant-bootstrap/src/config.rs @@ -50,16 +50,18 @@ pub struct BootstrapCacheConfig { impl BootstrapCacheConfig { /// Creates a new BootstrapConfig with default settings - pub fn default_config() -> Result { + /// + /// When `local` is set to true, a different cache file name is used. + /// I.e. the file name will include `_local_` in the name. + pub fn default_config(local: bool) -> Result { + let cache_file_path = if local { + default_cache_path_local()? + } else { + default_cache_path()? + }; Ok(Self { - addr_expiry_duration: ADDR_EXPIRY_DURATION, - max_peers: MAX_PEERS, - max_addrs_per_peer: MAX_ADDRS_PER_PEER, - cache_file_path: default_cache_path()?, - disable_cache_writing: false, - min_cache_save_duration: MIN_BOOTSTRAP_CACHE_SAVE_INTERVAL, - max_cache_save_duration: MAX_BOOTSTRAP_CACHE_SAVE_INTERVAL, - cache_save_scaling_factor: 2, + cache_file_path, + ..Self::empty() }) } @@ -110,6 +112,16 @@ impl BootstrapCacheConfig { /// Returns the default path for the bootstrap cache file fn default_cache_path() -> Result { + Ok(default_cache_dir()?.join(cache_file_name())) +} +/// Returns the default path for the bootstrap cache file that is used for +/// local networks +fn default_cache_path_local() -> Result { + Ok(default_cache_dir()?.join(cache_file_name_local())) +} + +/// Returns the default dir that should contain the bootstrap cache file +fn default_cache_dir() -> Result { let dir = dirs_next::data_dir() .ok_or_else(|| Error::CouldNotObtainDataDir)? .join("autonomi") @@ -117,12 +129,18 @@ fn default_cache_path() -> Result { std::fs::create_dir_all(&dir)?; - let path = dir.join(cache_file_name()); - - Ok(path) + Ok(dir) } /// Returns the name of the cache file pub fn cache_file_name() -> String { format!("bootstrap_cache_{}.json", crate::get_network_version()) } + +/// Returns the name of the cache file for local networks +pub fn cache_file_name_local() -> String { + format!( + "bootstrap_cache_local_{}.json", + crate::get_network_version() + ) +} diff --git a/ant-bootstrap/src/initial_peers.rs b/ant-bootstrap/src/initial_peers.rs index 00241bb7af..f323af2796 100644 --- a/ant-bootstrap/src/initial_peers.rs +++ b/ant-bootstrap/src/initial_peers.rs @@ -54,8 +54,6 @@ pub struct PeersArgs { #[clap(long, conflicts_with = "first", value_delimiter = ',')] pub network_contacts_url: Vec, /// Set to indicate this is a local network. - /// - /// This would use mDNS for peer discovery. #[clap(long, conflicts_with = "network_contacts_url", default_value = "false")] pub local: bool, /// Set to indicate this is a testnet. @@ -116,12 +114,6 @@ impl PeersArgs { return Ok(bootstrap_addresses); } - // If local mode is enabled, return empty store (will use mDNS) - if self.local { - info!("Local mode enabled, using only local discovery."); - return Ok(vec![]); - } - // Add addrs from arguments if present for addr in &self.addrs { if let Some(addr) = craft_valid_multiaddr(addr, false) { @@ -146,7 +138,7 @@ impl PeersArgs { let cfg = if let Some(config) = config { Some(config) } else { - BootstrapCacheConfig::default_config().ok() + BootstrapCacheConfig::default_config(self.local).ok() }; if let Some(mut cfg) = cfg { if let Some(file_path) = self.get_bootstrap_cache_path()? { @@ -177,7 +169,7 @@ impl PeersArgs { } // If we have a network contacts URL, fetch addrs from there. - if !self.network_contacts_url.is_empty() { + if !self.local && !self.network_contacts_url.is_empty() { info!( "Fetching bootstrap address from network contacts URLs: {:?}", self.network_contacts_url @@ -204,7 +196,7 @@ impl PeersArgs { } } - if !self.disable_mainnet_contacts { + if !self.local && !self.disable_mainnet_contacts { let mut contacts_fetcher = ContactsFetcher::with_mainnet_endpoints()?; if let Some(count) = count { contacts_fetcher.set_max_addrs(count); diff --git a/ant-bootstrap/tests/cli_integration_tests.rs b/ant-bootstrap/tests/cli_integration_tests.rs index 98341ae452..ef36457915 100644 --- a/ant-bootstrap/tests/cli_integration_tests.rs +++ b/ant-bootstrap/tests/cli_integration_tests.rs @@ -109,40 +109,6 @@ async fn test_network_contacts_fallback() -> Result<(), Box Result<(), Box> { - let _guard = LogBuilder::init_single_threaded_tokio_test("cli_integration_tests", false); - - let temp_dir = TempDir::new()?; - let cache_path = temp_dir.path().join("cache.json"); - - // Create a config with some peers in the cache - let config = BootstrapCacheConfig::empty().with_cache_path(&cache_path); - - // Create args with local mode enabled - let args = PeersArgs { - first: false, - addrs: vec![], - network_contacts_url: vec![], - local: true, - disable_mainnet_contacts: false, - ignore_cache: false, - bootstrap_cache_dir: None, - }; - - let addrs = args.get_addrs(Some(config), None).await?; - - assert!(addrs.is_empty(), "Local mode should have no peers"); - - // Verify cache was not touched - assert!( - !cache_path.exists(), - "Cache file should not exist in local mode" - ); - - Ok(()) -} - #[tokio::test] async fn test_test_network_peers() -> Result<(), Box> { let _guard = LogBuilder::init_single_threaded_tokio_test("cli_integration_tests", false); diff --git a/ant-networking/Cargo.toml b/ant-networking/Cargo.toml index 75452bbc22..dc5ec7d678 100644 --- a/ant-networking/Cargo.toml +++ b/ant-networking/Cargo.toml @@ -37,7 +37,6 @@ itertools = "~0.12.1" libp2p = { version = "0.54.1", features = [ "tokio", "dns", - "mdns", "kad", "macros", "request-response", diff --git a/ant-networking/src/bootstrap.rs b/ant-networking/src/bootstrap.rs index 84a9e73c51..30511009c9 100644 --- a/ant-networking/src/bootstrap.rs +++ b/ant-networking/src/bootstrap.rs @@ -53,19 +53,22 @@ impl SwarmDriver { let now = Instant::now(); // Find the farthest bucket that is not full. This is used to skip refreshing the RT of farthest full buckets. - let mut farthest_unfilled_bucket = 0; + let mut first_filled_bucket = 0; + // unfilled kbuckets will not be returned, hence the value shall be: + // * first_filled_kbucket.ilog2() - 1 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { let Some(ilog2) = kbucket.range().0.ilog2() else { continue; }; - if kbucket.num_entries() < K_VALUE.get() && ilog2 > farthest_unfilled_bucket { - farthest_unfilled_bucket = ilog2; + if kbucket.num_entries() >= K_VALUE.get() { + first_filled_bucket = ilog2; + break; } } - let farthest_unfilled_bucket = if farthest_unfilled_bucket == 0 { + let farthest_unfilled_bucket = if first_filled_bucket == 0 { None } else { - Some(farthest_unfilled_bucket) + Some(first_filled_bucket - 1) }; let addrs = self.network_discovery.candidates(farthest_unfilled_bucket); diff --git a/ant-networking/src/driver.rs b/ant-networking/src/driver.rs index 36969c7ce7..d8c804e1f9 100644 --- a/ant-networking/src/driver.rs +++ b/ant-networking/src/driver.rs @@ -43,6 +43,8 @@ use ant_protocol::{ }; use futures::future::Either; use futures::StreamExt; +#[cfg(feature = "upnp")] +use libp2p::swarm::behaviour::toggle::Toggle; use libp2p::{core::muxing::StreamMuxerBox, relay}; use libp2p::{ identity::Keypair, @@ -55,7 +57,6 @@ use libp2p::{ }, Multiaddr, PeerId, }; -use libp2p::{mdns, swarm::behaviour::toggle::Toggle}; use libp2p::{swarm::SwarmEvent, Transport as _}; #[cfg(feature = "open-metrics")] use prometheus_client::metrics::info::Info; @@ -227,8 +228,6 @@ pub(super) struct NodeBehaviour { pub(super) blocklist: libp2p::allow_block_list::Behaviour, pub(super) identify: libp2p::identify::Behaviour, - /// mDNS behaviour to use in local mode - pub(super) mdns: Toggle, #[cfg(feature = "upnp")] pub(super) upnp: Toggle, pub(super) relay_client: libp2p::relay::client::Behaviour, @@ -406,7 +405,7 @@ impl NetworkBuilder { ProtocolSupport::Full, #[cfg(feature = "upnp")] upnp, - )?; + ); // Listen on the provided address let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?; @@ -423,7 +422,7 @@ impl NetworkBuilder { } /// Same as `build_node` API but creates the network components in client mode - pub fn build_client(self) -> Result<(Network, mpsc::Receiver, SwarmDriver)> { + pub fn build_client(self) -> (Network, mpsc::Receiver, SwarmDriver) { // Create a Kademlia behaviour for client mode, i.e. set req/resp protocol // to outbound-only mode and don't listen on any address let mut kad_cfg = kad::Config::new(KAD_STREAM_PROTOCOL_ID); // default query timeout is 60 secs @@ -445,9 +444,9 @@ impl NetworkBuilder { ProtocolSupport::Outbound, #[cfg(feature = "upnp")] false, - )?; + ); - Ok((network, net_event_recv, driver)) + (network, net_event_recv, driver) } /// Private helper to create the network components with the provided config and req/res behaviour @@ -458,7 +457,7 @@ impl NetworkBuilder { is_client: bool, req_res_protocol: ProtocolSupport, #[cfg(feature = "upnp")] upnp: bool, - ) -> Result<(Network, mpsc::Receiver, SwarmDriver)> { + ) -> (Network, mpsc::Receiver, SwarmDriver) { let identify_protocol_str = IDENTIFY_PROTOCOL_STR .read() .expect("Failed to obtain read lock for IDENTIFY_PROTOCOL_STR") @@ -593,22 +592,6 @@ impl NetworkBuilder { } }; - let mdns = if self.local { - debug!("Enabling mDNS behavior (because of local mode)"); - - let mdns_config = mdns::Config { - // lower query interval to speed up peer discovery this - // increases traffic, but means we no longer have clients - // unable to connect after a few minutes - query_interval: Duration::from_secs(5), - ..Default::default() - }; - Some(mdns::tokio::Behaviour::new(mdns_config, peer_id)?) - } else { - None - } - .into(); // Into `Toggle` - let agent_version = if is_client { IDENTIFY_CLIENT_VERSION_STR .read() @@ -661,7 +644,6 @@ impl NetworkBuilder { request_response, kademlia, identify, - mdns, }; let swarm_config = libp2p::swarm::Config::with_tokio_executor() @@ -741,7 +723,7 @@ impl NetworkBuilder { self.keypair, ); - Ok((network, network_event_receiver, swarm_driver)) + (network, network_event_receiver, swarm_driver) } } diff --git a/ant-networking/src/event/mod.rs b/ant-networking/src/event/mod.rs index 6dadbfb0a8..c04b256742 100644 --- a/ant-networking/src/event/mod.rs +++ b/ant-networking/src/event/mod.rs @@ -13,7 +13,6 @@ mod swarm; use crate::{driver::SwarmDriver, error::Result}; use core::fmt; use custom_debug::Debug as CustomDebug; -use libp2p::mdns; use libp2p::{ kad::{Addresses, Record, RecordKey, K_VALUE}, request_response::ResponseChannel as PeerResponseChannel, @@ -46,7 +45,6 @@ pub(super) enum NodeEvent { Upnp(libp2p::upnp::Event), MsgReceived(libp2p::request_response::Event), Kademlia(libp2p::kad::Event), - Mdns(Box), Identify(Box), RelayClient(Box), RelayServer(Box), @@ -72,12 +70,6 @@ impl From for NodeEvent { } } -impl From for NodeEvent { - fn from(event: mdns::Event) -> Self { - NodeEvent::Mdns(Box::new(event)) - } -} - impl From for NodeEvent { fn from(event: libp2p::identify::Event) -> Self { NodeEvent::Identify(Box::new(event)) diff --git a/ant-networking/src/event/swarm.rs b/ant-networking/src/event/swarm.rs index e6eef4e576..832933d066 100644 --- a/ant-networking/src/event/swarm.rs +++ b/ant-networking/src/event/swarm.rs @@ -10,8 +10,8 @@ use crate::{ event::NodeEvent, multiaddr_get_ip, multiaddr_is_global, multiaddr_strip_p2p, relay_manager::is_a_relayed_peer, time::Instant, NetworkEvent, Result, SwarmDriver, }; +use ant_bootstrap::BootstrapCacheStore; use ant_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR}; -use libp2p::mdns; #[cfg(feature = "open-metrics")] use libp2p::metrics::Recorder; use libp2p::{ @@ -286,29 +286,6 @@ impl SwarmDriver { libp2p::identify::Event::Error { .. } => debug!("identify: {iden:?}"), } } - SwarmEvent::Behaviour(NodeEvent::Mdns(mdns_event)) => { - event_string = "mdns"; - // mDNS is only relevant in local mode - if self.local { - match *mdns_event { - mdns::Event::Discovered(list) => { - for (peer_id, addr) in list { - // The multiaddr does not contain the peer ID, so add it. - let addr = addr.with(Protocol::P2p(peer_id)); - - info!(%addr, "mDNS node discovered and dialing"); - - if let Err(err) = self.dial(addr.clone()) { - warn!(%addr, "mDNS node dial error: {err:?}"); - } - } - } - mdns::Event::Expired(peer) => { - debug!("mdns peer {peer:?} expired"); - } - } - } - } SwarmEvent::NewListenAddr { mut address, listener_id, @@ -330,6 +307,26 @@ impl SwarmDriver { // all addresses are effectively external here... // this is needed for Kad Mode::Server self.swarm.add_external_address(address.clone()); + + // If we are local, add our own address(es) to cache + if let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() { + tracing::info!("Adding listen address to bootstrap cache"); + + let config = bootstrap_cache.config().clone(); + let mut old_cache = bootstrap_cache.clone(); + + if let Ok(new) = BootstrapCacheStore::new(config) { + self.bootstrap_cache = Some(new); + old_cache.add_addr(address.clone()); + + // Save cache to disk. + crate::time::spawn(async move { + if let Err(err) = old_cache.sync_and_flush_to_disk(true) { + error!("Failed to save bootstrap cache: {err}"); + } + }); + } + } } else if let Some(external_add_manager) = self.external_address_manager.as_mut() { diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index 620017fcf6..33cd1179c8 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -1260,7 +1260,7 @@ mod tests { #[tokio::test] async fn test_network_sign_verify() -> eyre::Result<()> { let (network, _, _) = - NetworkBuilder::new(Keypair::generate_ed25519(), false).build_client()?; + NetworkBuilder::new(Keypair::generate_ed25519(), false).build_client(); let msg = b"test message"; let sig = network.sign(msg)?; assert!(network.verify(msg, &sig)); diff --git a/ant-node/src/bin/antnode/main.rs b/ant-node/src/bin/antnode/main.rs index 9187292200..bbf9a20900 100644 --- a/ant-node/src/bin/antnode/main.rs +++ b/ant-node/src/bin/antnode/main.rs @@ -13,7 +13,7 @@ mod rpc_service; mod subcommands; use crate::subcommands::EvmNetworkCommand; -use ant_bootstrap::{BootstrapCacheConfig, BootstrapCacheStore, PeersArgs}; +use ant_bootstrap::{BootstrapCacheStore, PeersArgs}; use ant_evm::{get_evm_network, EvmNetwork, RewardsAddress}; use ant_logging::metrics::init_metrics; use ant_logging::{Level, LogFormat, LogOutputDest, ReloadHandle}; @@ -275,13 +275,14 @@ fn main() -> Result<()> { let (log_output_dest, log_reload_handle, _log_appender_guard) = init_logging(&opt, keypair.public().to_peer_id())?; - let rt = Runtime::new()?; - let mut bootstrap_cache = BootstrapCacheStore::new_from_peers_args( - &opt.peers, - Some(BootstrapCacheConfig::default_config()?), - )?; - // To create the file before startup if it doesn't exist. - bootstrap_cache.sync_and_flush_to_disk(true)?; + let mut bootstrap_cache = BootstrapCacheStore::new_from_peers_args(&opt.peers, None)?; + // If we are the first node, write initial cache to disk. + if opt.peers.first { + bootstrap_cache.write()?; + } else { + // Else we check/clean the file, write it back, and ensure its existence. + bootstrap_cache.sync_and_flush_to_disk(true)?; + } let msg = format!( "Running {} v{}", @@ -304,6 +305,7 @@ fn main() -> Result<()> { // Create a tokio runtime per `run_node` attempt, this ensures // any spawned tasks are closed before we would attempt to run // another process with these args. + let rt = Runtime::new()?; if opt.peers.local { rt.spawn(init_metrics(std::process::id())); } @@ -652,11 +654,14 @@ fn start_new_node_process(retain_peer_id: bool, root_dir: PathBuf, port: u16) { let current_exe = env::current_exe().expect("could not get current executable path"); // Retrieve the command-line arguments passed to this process - let args: Vec = env::args().collect(); + let mut args: Vec = env::args().collect(); info!("Original args are: {args:?}"); info!("Current exe is: {current_exe:?}"); + // Remove `--first` argument. If node is restarted, it is not the first anymore. + args.retain(|arg| arg != "--first"); + // Convert current exe path to string, log an error and return if it fails let current_exe = match current_exe.to_str() { Some(s) => { diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index e9a70c4249..3c0444a1c7 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -192,7 +192,6 @@ impl NodeBuilder { let node_events_channel = NodeEventsChannel::default(); let node = NodeInner { - local: self.local, network: network.clone(), events_channel: node_events_channel.clone(), initial_peers: self.initial_peers, @@ -229,7 +228,6 @@ pub(crate) struct Node { /// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose /// the Arc from the interface. struct NodeInner { - local: bool, events_channel: NodeEventsChannel, // Peers that are dialed at startup of node. initial_peers: Vec, @@ -458,17 +456,15 @@ impl Node { } NetworkEvent::NewListenAddr(_) => { event_header = "NewListenAddr"; - if !self.inner.local { - let network = self.network().clone(); - let peers = self.initial_peers().clone(); - let _handle = spawn(async move { - for addr in peers { - if let Err(err) = network.dial(addr.clone()).await { - tracing::error!("Failed to dial {addr}: {err:?}"); - }; - } - }); - } + let network = self.network().clone(); + let peers = self.initial_peers().clone(); + let _handle = spawn(async move { + for addr in peers { + if let Err(err) = network.dial(addr.clone()).await { + tracing::error!("Failed to dial {addr}: {err:?}"); + }; + } + }); } NetworkEvent::ResponseReceived { res } => { event_header = "ResponseReceived"; diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index e3eb72ddfd..e126b5647a 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -203,7 +203,7 @@ impl Client { /// Initialize the client with the given configuration. /// - /// This will block until [`CLOSE_GROUP_SIZE`] have been added to the routing table. + /// This will block until `CLOSE_GROUP_SIZE` have been added to the routing table. /// /// See [`ClientConfig`]. /// @@ -267,7 +267,7 @@ impl Client { fn build_client_and_run_swarm(local: bool) -> (Network, mpsc::Receiver) { let mut network_builder = NetworkBuilder::new(Keypair::generate_ed25519(), local); - if let Ok(mut config) = BootstrapCacheConfig::default_config() { + if let Ok(mut config) = BootstrapCacheConfig::default_config(local) { if local { config.disable_cache_writing = true; } @@ -278,8 +278,7 @@ fn build_client_and_run_swarm(local: bool) -> (Network, mpsc::Receiver` from `ant-networking`. Else users need to keep their `tokio` dependency in sync. // TODO: Think about handling the mDNS error here. - let (network, event_receiver, swarm_driver) = - network_builder.build_client().expect("mdns to succeed"); + let (network, event_receiver, swarm_driver) = network_builder.build_client(); let _swarm_driver = ant_networking::time::spawn(swarm_driver.run()); debug!("Client swarm driver is running");