Skip to content

Commit

Permalink
Merge pull request #2652 from maqi/refactor-remove-mdns
Browse files Browse the repository at this point in the history
Refactor remove mdns
  • Loading branch information
maqi authored Jan 22, 2025
2 parents 9f4af65 + 76746f0 commit cc1e6a9
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 162 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -727,12 +727,6 @@ jobs:
echo "EVM_NETWORK has been set to $EVM_NETWORK"
fi
- name: Verify the routing tables of the nodes
run: cargo test --release -p ant-node --test verify_routing_table -- --nocapture
env:
CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }}
timeout-minutes: 5

- name: Verify the location of the data on the network
run: cargo test --release -p ant-node --test verify_data_location -- --nocapture
env:
Expand Down
10 changes: 2 additions & 8 deletions ant-bootstrap/src/cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,26 +184,20 @@ impl BootstrapCacheStore {
let mut config = if let Some(cfg) = config {
cfg
} else {
BootstrapCacheConfig::default_config()?
BootstrapCacheConfig::default_config(peers_arg.local)?
};
if let Some(bootstrap_cache_path) = peers_arg.get_bootstrap_cache_path()? {
config.cache_file_path = bootstrap_cache_path;
}

let mut store = Self::new(config)?;
let store = Self::new(config)?;

// If it is the first node, clear the cache.
if peers_arg.first {
info!("First node in network, writing empty cache to disk");
store.write()?;
}

// If local mode is enabled, return empty store (will use mDNS)
if peers_arg.local {
info!("Setting config to not write to cache, as 'local' mode is enabled");
store.config.disable_cache_writing = true;
}

Ok(store)
}

Expand Down
42 changes: 30 additions & 12 deletions ant-bootstrap/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,18 @@ pub struct BootstrapCacheConfig {

impl BootstrapCacheConfig {
/// Creates a new BootstrapConfig with default settings
pub fn default_config() -> Result<Self> {
///
/// When `local` is set to true, a different cache file name is used.
/// I.e. the file name will include `_local_` in the name.
pub fn default_config(local: bool) -> Result<Self> {
let cache_file_path = if local {
default_cache_path_local()?
} else {
default_cache_path()?
};
Ok(Self {
addr_expiry_duration: ADDR_EXPIRY_DURATION,
max_peers: MAX_PEERS,
max_addrs_per_peer: MAX_ADDRS_PER_PEER,
cache_file_path: default_cache_path()?,
disable_cache_writing: false,
min_cache_save_duration: MIN_BOOTSTRAP_CACHE_SAVE_INTERVAL,
max_cache_save_duration: MAX_BOOTSTRAP_CACHE_SAVE_INTERVAL,
cache_save_scaling_factor: 2,
cache_file_path,
..Self::empty()
})
}

Expand Down Expand Up @@ -110,19 +112,35 @@ impl BootstrapCacheConfig {

/// Returns the default path for the bootstrap cache file
fn default_cache_path() -> Result<PathBuf> {
Ok(default_cache_dir()?.join(cache_file_name()))
}
/// Returns the default path for the bootstrap cache file that is used for
/// local networks
fn default_cache_path_local() -> Result<PathBuf> {
Ok(default_cache_dir()?.join(cache_file_name_local()))
}

/// Returns the default dir that should contain the bootstrap cache file
fn default_cache_dir() -> Result<PathBuf> {
let dir = dirs_next::data_dir()
.ok_or_else(|| Error::CouldNotObtainDataDir)?
.join("autonomi")
.join("bootstrap_cache");

std::fs::create_dir_all(&dir)?;

let path = dir.join(cache_file_name());

Ok(path)
Ok(dir)
}

/// Returns the name of the cache file
pub fn cache_file_name() -> String {
format!("bootstrap_cache_{}.json", crate::get_network_version())
}

/// Returns the name of the cache file for local networks
pub fn cache_file_name_local() -> String {
format!(
"bootstrap_cache_local_{}.json",
crate::get_network_version()
)
}
14 changes: 3 additions & 11 deletions ant-bootstrap/src/initial_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ pub struct PeersArgs {
#[clap(long, conflicts_with = "first", value_delimiter = ',')]
pub network_contacts_url: Vec<String>,
/// Set to indicate this is a local network.
///
/// This would use mDNS for peer discovery.
#[clap(long, conflicts_with = "network_contacts_url", default_value = "false")]
pub local: bool,
/// Set to indicate this is a testnet.
Expand Down Expand Up @@ -116,12 +114,6 @@ impl PeersArgs {
return Ok(bootstrap_addresses);
}

// If local mode is enabled, return empty store (will use mDNS)
if self.local {
info!("Local mode enabled, using only local discovery.");
return Ok(vec![]);
}

// Add addrs from arguments if present
for addr in &self.addrs {
if let Some(addr) = craft_valid_multiaddr(addr, false) {
Expand All @@ -146,7 +138,7 @@ impl PeersArgs {
let cfg = if let Some(config) = config {
Some(config)
} else {
BootstrapCacheConfig::default_config().ok()
BootstrapCacheConfig::default_config(self.local).ok()
};
if let Some(mut cfg) = cfg {
if let Some(file_path) = self.get_bootstrap_cache_path()? {
Expand Down Expand Up @@ -177,7 +169,7 @@ impl PeersArgs {
}

// If we have a network contacts URL, fetch addrs from there.
if !self.network_contacts_url.is_empty() {
if !self.local && !self.network_contacts_url.is_empty() {
info!(
"Fetching bootstrap address from network contacts URLs: {:?}",
self.network_contacts_url
Expand All @@ -204,7 +196,7 @@ impl PeersArgs {
}
}

if !self.disable_mainnet_contacts {
if !self.local && !self.disable_mainnet_contacts {
let mut contacts_fetcher = ContactsFetcher::with_mainnet_endpoints()?;
if let Some(count) = count {
contacts_fetcher.set_max_addrs(count);
Expand Down
34 changes: 0 additions & 34 deletions ant-bootstrap/tests/cli_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,40 +109,6 @@ async fn test_network_contacts_fallback() -> Result<(), Box<dyn std::error::Erro
Ok(())
}

#[tokio::test]
async fn test_local_mode() -> Result<(), Box<dyn std::error::Error>> {
let _guard = LogBuilder::init_single_threaded_tokio_test("cli_integration_tests", false);

let temp_dir = TempDir::new()?;
let cache_path = temp_dir.path().join("cache.json");

// Create a config with some peers in the cache
let config = BootstrapCacheConfig::empty().with_cache_path(&cache_path);

// Create args with local mode enabled
let args = PeersArgs {
first: false,
addrs: vec![],
network_contacts_url: vec![],
local: true,
disable_mainnet_contacts: false,
ignore_cache: false,
bootstrap_cache_dir: None,
};

let addrs = args.get_addrs(Some(config), None).await?;

assert!(addrs.is_empty(), "Local mode should have no peers");

// Verify cache was not touched
assert!(
!cache_path.exists(),
"Cache file should not exist in local mode"
);

Ok(())
}

#[tokio::test]
async fn test_test_network_peers() -> Result<(), Box<dyn std::error::Error>> {
let _guard = LogBuilder::init_single_threaded_tokio_test("cli_integration_tests", false);
Expand Down
1 change: 0 additions & 1 deletion ant-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ itertools = "~0.12.1"
libp2p = { version = "0.54.1", features = [
"tokio",
"dns",
"mdns",
"kad",
"macros",
"request-response",
Expand Down
13 changes: 8 additions & 5 deletions ant-networking/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,22 @@ impl SwarmDriver {
let now = Instant::now();

// Find the farthest bucket that is not full. This is used to skip refreshing the RT of farthest full buckets.
let mut farthest_unfilled_bucket = 0;
let mut first_filled_bucket = 0;
// unfilled kbuckets will not be returned, hence the value shall be:
// * first_filled_kbucket.ilog2() - 1
for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
let Some(ilog2) = kbucket.range().0.ilog2() else {
continue;
};
if kbucket.num_entries() < K_VALUE.get() && ilog2 > farthest_unfilled_bucket {
farthest_unfilled_bucket = ilog2;
if kbucket.num_entries() >= K_VALUE.get() {
first_filled_bucket = ilog2;
break;
}
}
let farthest_unfilled_bucket = if farthest_unfilled_bucket == 0 {
let farthest_unfilled_bucket = if first_filled_bucket == 0 {
None
} else {
Some(farthest_unfilled_bucket)
Some(first_filled_bucket - 1)
};

let addrs = self.network_discovery.candidates(farthest_unfilled_bucket);
Expand Down
34 changes: 8 additions & 26 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use ant_protocol::{
};
use futures::future::Either;
use futures::StreamExt;
#[cfg(feature = "upnp")]
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
Expand All @@ -55,7 +57,6 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use libp2p::{mdns, swarm::behaviour::toggle::Toggle};
use libp2p::{swarm::SwarmEvent, Transport as _};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::info::Info;
Expand Down Expand Up @@ -227,8 +228,6 @@ pub(super) struct NodeBehaviour {
pub(super) blocklist:
libp2p::allow_block_list::Behaviour<libp2p::allow_block_list::BlockedPeers>,
pub(super) identify: libp2p::identify::Behaviour,
/// mDNS behaviour to use in local mode
pub(super) mdns: Toggle<mdns::tokio::Behaviour>,
#[cfg(feature = "upnp")]
pub(super) upnp: Toggle<libp2p::upnp::tokio::Behaviour>,
pub(super) relay_client: libp2p::relay::client::Behaviour,
Expand Down Expand Up @@ -406,7 +405,7 @@ impl NetworkBuilder {
ProtocolSupport::Full,
#[cfg(feature = "upnp")]
upnp,
)?;
);

// Listen on the provided address
let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
Expand All @@ -423,7 +422,7 @@ impl NetworkBuilder {
}

/// Same as `build_node` API but creates the network components in client mode
pub fn build_client(self) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
pub fn build_client(self) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
// Create a Kademlia behaviour for client mode, i.e. set req/resp protocol
// to outbound-only mode and don't listen on any address
let mut kad_cfg = kad::Config::new(KAD_STREAM_PROTOCOL_ID); // default query timeout is 60 secs
Expand All @@ -445,9 +444,9 @@ impl NetworkBuilder {
ProtocolSupport::Outbound,
#[cfg(feature = "upnp")]
false,
)?;
);

Ok((network, net_event_recv, driver))
(network, net_event_recv, driver)
}

/// Private helper to create the network components with the provided config and req/res behaviour
Expand All @@ -458,7 +457,7 @@ impl NetworkBuilder {
is_client: bool,
req_res_protocol: ProtocolSupport,
#[cfg(feature = "upnp")] upnp: bool,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
let identify_protocol_str = IDENTIFY_PROTOCOL_STR
.read()
.expect("Failed to obtain read lock for IDENTIFY_PROTOCOL_STR")
Expand Down Expand Up @@ -593,22 +592,6 @@ impl NetworkBuilder {
}
};

let mdns = if self.local {
debug!("Enabling mDNS behavior (because of local mode)");

let mdns_config = mdns::Config {
// lower query interval to speed up peer discovery this
// increases traffic, but means we no longer have clients
// unable to connect after a few minutes
query_interval: Duration::from_secs(5),
..Default::default()
};
Some(mdns::tokio::Behaviour::new(mdns_config, peer_id)?)
} else {
None
}
.into(); // Into `Toggle<T>`

let agent_version = if is_client {
IDENTIFY_CLIENT_VERSION_STR
.read()
Expand Down Expand Up @@ -661,7 +644,6 @@ impl NetworkBuilder {
request_response,
kademlia,
identify,
mdns,
};

let swarm_config = libp2p::swarm::Config::with_tokio_executor()
Expand Down Expand Up @@ -741,7 +723,7 @@ impl NetworkBuilder {
self.keypair,
);

Ok((network, network_event_receiver, swarm_driver))
(network, network_event_receiver, swarm_driver)
}
}

Expand Down
8 changes: 0 additions & 8 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ mod swarm;
use crate::{driver::SwarmDriver, error::Result};
use core::fmt;
use custom_debug::Debug as CustomDebug;
use libp2p::mdns;
use libp2p::{
kad::{Addresses, Record, RecordKey, K_VALUE},
request_response::ResponseChannel as PeerResponseChannel,
Expand Down Expand Up @@ -46,7 +45,6 @@ pub(super) enum NodeEvent {
Upnp(libp2p::upnp::Event),
MsgReceived(libp2p::request_response::Event<Request, Response>),
Kademlia(libp2p::kad::Event),
Mdns(Box<mdns::Event>),
Identify(Box<libp2p::identify::Event>),
RelayClient(Box<libp2p::relay::client::Event>),
RelayServer(Box<libp2p::relay::Event>),
Expand All @@ -72,12 +70,6 @@ impl From<libp2p::kad::Event> for NodeEvent {
}
}

impl From<mdns::Event> for NodeEvent {
fn from(event: mdns::Event) -> Self {
NodeEvent::Mdns(Box::new(event))
}
}

impl From<libp2p::identify::Event> for NodeEvent {
fn from(event: libp2p::identify::Event) -> Self {
NodeEvent::Identify(Box::new(event))
Expand Down
Loading

0 comments on commit cc1e6a9

Please sign in to comment.