Skip to content

Commit

Permalink
feat(bootstrap): impl bootstrap cache into the codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Dec 4, 2024
1 parent ad6e226 commit 1aab709
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 49 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ant-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ name = "files"
harness = false

[dependencies]
ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" }
ant-build-info = { path = "../ant-build-info", version = "0.1.19" }
ant-logging = { path = "../ant-logging", version = "0.2.40" }
ant-peers-acquisition = { path = "../ant-peers-acquisition", version = "0.5.7" }
Expand Down
5 changes: 2 additions & 3 deletions ant-cli/src/access/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use ant_peers_acquisition::PeersArgs;
use ant_peers_acquisition::ANT_PEERS_ENV;
use ant_bootstrap::{PeersArgs, ANT_PEERS_ENV};
use autonomi::Multiaddr;
use color_eyre::eyre::Context;
use color_eyre::Result;
use color_eyre::Section;

pub async fn get_peers(peers: PeersArgs) -> Result<Vec<Multiaddr>> {
peers.get_peers().await
peers.get_addrs().await
.wrap_err("Please provide valid Network peers to connect to")
.with_suggestion(|| format!("make sure you've provided network peers using the --peers option or the {ANT_PEERS_ENV} env var"))
.with_suggestion(|| "a peer address looks like this: /ip4/42.42.42.42/udp/4242/quic-v1/p2p/B64nodePeerIDvdjb3FAJF4ks3moreBase64CharsHere")
Expand Down
3 changes: 1 addition & 2 deletions ant-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ mod register;
mod vault;
mod wallet;

use crate::opt::Opt;
use clap::Subcommand;
use color_eyre::Result;

use crate::opt::Opt;

#[derive(Subcommand, Debug)]
pub enum SubCmd {
/// Operations related to file handling.
Expand Down
8 changes: 3 additions & 5 deletions ant-cli/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use std::time::Duration;

use crate::commands::SubCmd;
use ant_bootstrap::PeersArgs;
use ant_logging::{LogFormat, LogOutputDest};
use ant_peers_acquisition::PeersArgs;
use clap::Parser;
use color_eyre::Result;

use crate::commands::SubCmd;
use std::time::Duration;

// Please do not remove the blank lines in these doc comments.
// They are used for inserting line breaks when the help menu is rendered in the UI.
Expand Down
1 change: 1 addition & 0 deletions ant-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ websockets = ["libp2p/tcp"]

[dependencies]
aes-gcm-siv = "0.11.1"
ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" }
ant-build-info = { path = "../ant-build-info", version = "0.1.19" }
ant-evm = { path = "../ant-evm", version = "0.1.4" }
ant-protocol = { path = "../ant-protocol", version = "0.17.15" }
Expand Down
111 changes: 101 additions & 10 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
};
use crate::{transport, NodeIssue};

use ant_bootstrap::BootstrapCacheStore;
use ant_evm::PaymentQuote;
use ant_protocol::{
messages::{ChunkProof, Nonce, Request, Response},
Expand Down Expand Up @@ -71,8 +72,11 @@ use std::{
num::NonZeroUsize,
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use tokio::{
sync::{mpsc, oneshot},
time::Interval,
};
use tracing::warn;
use xor_name::XorName;

Expand Down Expand Up @@ -260,13 +264,13 @@ pub(super) struct NodeBehaviour {

#[derive(Debug)]
pub struct NetworkBuilder {
bootstrap_cache: Option<BootstrapCacheStore>,
is_behind_home_network: bool,
keypair: Keypair,
local: bool,
listen_addr: Option<SocketAddr>,
request_timeout: Option<Duration>,
concurrency_limit: Option<usize>,
initial_peers: Vec<Multiaddr>,
#[cfg(feature = "open-metrics")]
metrics_registries: Option<MetricsRegistries>,
#[cfg(feature = "open-metrics")]
Expand All @@ -278,13 +282,13 @@ pub struct NetworkBuilder {
impl NetworkBuilder {
pub fn new(keypair: Keypair, local: bool) -> Self {
Self {
bootstrap_cache: None,
is_behind_home_network: false,
keypair,
local,
listen_addr: None,
request_timeout: None,
concurrency_limit: None,
initial_peers: Default::default(),
#[cfg(feature = "open-metrics")]
metrics_registries: None,
#[cfg(feature = "open-metrics")]
Expand All @@ -294,6 +298,10 @@ impl NetworkBuilder {
}
}

pub fn bootstrap_cache(&mut self, bootstrap_cache: BootstrapCacheStore) {
self.bootstrap_cache = Some(bootstrap_cache);
}

pub fn is_behind_home_network(&mut self, enable: bool) {
self.is_behind_home_network = enable;
}
Expand All @@ -310,10 +318,6 @@ impl NetworkBuilder {
self.concurrency_limit = Some(concurrency_limit);
}

pub fn initial_peers(&mut self, initial_peers: Vec<Multiaddr>) {
self.initial_peers = initial_peers;
}

/// Set the registries used inside the metrics server.
/// Configure the `metrics_server_port` to enable the metrics server.
#[cfg(feature = "open-metrics")]
Expand Down Expand Up @@ -720,6 +724,7 @@ impl NetworkBuilder {
close_group: Vec::with_capacity(CLOSE_GROUP_SIZE),
peers_in_rt: 0,
bootstrap,
bootstrap_cache: self.bootstrap_cache,
relay_manager,
connected_relay_clients: Default::default(),
external_address_manager,
Expand Down Expand Up @@ -815,6 +820,7 @@ pub struct SwarmDriver {
pub(crate) close_group: Vec<PeerId>,
pub(crate) peers_in_rt: usize,
pub(crate) bootstrap: ContinuousNetworkDiscover,
pub(crate) bootstrap_cache: Option<BootstrapCacheStore>,
pub(crate) external_address_manager: Option<ExternalAddressManager>,
pub(crate) relay_manager: Option<RelayManager>,
/// The peers that are using our relay service.
Expand Down Expand Up @@ -843,7 +849,7 @@ pub struct SwarmDriver {
pub(crate) bootstrap_peers: BTreeMap<Option<u32>, HashSet<PeerId>>,
// Peers that having live connection to. Any peer got contacted during kad network query
// will have live connection established. And they may not appear in the RT.
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Multiaddr, Instant)>,
/// The list of recently established connections ids.
/// This is used to prevent log spamming.
pub(crate) latest_established_connection_ids: HashMap<usize, (IpAddr, Instant)>,
Expand Down Expand Up @@ -876,6 +882,24 @@ impl SwarmDriver {
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);

let mut bootstrap_cache_save_interval = self.bootstrap_cache.as_ref().and_then(|cache| {
if cache.config().disable_cache_writing {
None
} else {
// add a variance of 10% to the interval, to avoid all nodes writing to disk at the same time.
let duration =
Self::duration_with_variance(cache.config().min_cache_save_duration, 10);
Some(interval(duration))
}
});
if let Some(interval) = bootstrap_cache_save_interval.as_mut() {
interval.tick().await; // first tick completes immediately
info!(
"Bootstrap cache save interval is set to {:?}",
interval.period()
);
}

// temporarily skip processing IncomingConnectionError swarm event to avoid log spamming
let mut previous_incoming_connection_error_event = None;
loop {
Expand Down Expand Up @@ -1005,6 +1029,37 @@ impl SwarmDriver {
relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes)
}
},
Some(()) = Self::conditional_interval(&mut bootstrap_cache_save_interval) => {
let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() else {
continue;
};
let Some(current_interval) = bootstrap_cache_save_interval.as_mut() else {
continue;
};

if let Err(err) = bootstrap_cache.sync_and_save_to_disk(true).await {
error!("Failed to save bootstrap cache: {err}");
}

if current_interval.period() >= bootstrap_cache.config().max_cache_save_duration {
continue;
}

// add a variance of 1% to the max interval to avoid all nodes writing to disk at the same time.
let max_cache_save_duration =
Self::duration_with_variance(bootstrap_cache.config().max_cache_save_duration, 1);

// scale up the interval until we reach the max
let new_duration = Duration::from_secs(
std::cmp::min(
current_interval.period().as_secs() * bootstrap_cache.config().cache_save_scaling_factor,
max_cache_save_duration.as_secs(),
));
info!("Scaling up the bootstrap cache save interval to {new_duration:?}");
*current_interval = interval(new_duration);
current_interval.tick().await; // first tick completes immediately

},
}
}
}
Expand Down Expand Up @@ -1156,13 +1211,35 @@ impl SwarmDriver {
info!("Listening on {id:?} with addr: {addr:?}");
Ok(())
}

/// Returns a new duration that is within +/- variance of the provided duration.
fn duration_with_variance(duration: Duration, variance: u32) -> Duration {
let actual_variance = duration / variance;
let random_adjustment =
Duration::from_secs(rand::thread_rng().gen_range(0..actual_variance.as_secs()));
if random_adjustment.as_secs() % 2 == 0 {
duration - random_adjustment
} else {
duration + random_adjustment
}
}

/// To tick an optional interval inside tokio::select! without looping forever.
async fn conditional_interval(i: &mut Option<Interval>) -> Option<()> {
match i {
Some(i) => {
i.tick().await;
Some(())
}
None => None,
}
}
}

#[cfg(test)]
mod tests {
use super::check_and_wipe_storage_dir_if_necessary;

use std::{fs, io::Read};
use std::{fs, io::Read, time::Duration};

#[tokio::test]
async fn version_file_update() {
Expand Down Expand Up @@ -1219,4 +1296,18 @@ mod tests {
// The storage_dir shall be removed as version_key changed
assert!(fs::metadata(storage_dir.clone()).is_err());
}

#[tokio::test]
async fn test_duration_variance_fn() {
let duration = Duration::from_secs(100);
let variance = 10;
for _ in 0..10000 {
let new_duration = crate::SwarmDriver::duration_with_variance(duration, variance);
if new_duration < duration - duration / variance
|| new_duration > duration + duration / variance
{
panic!("new_duration: {new_duration:?} is not within the expected range",);
}
}
}
}
3 changes: 2 additions & 1 deletion ant-networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ impl SwarmDriver {
peer,
is_new_peer,
old_peer,
addresses,
..
} => {
event_string = "kad_event::RoutingUpdated";
if is_new_peer {
self.update_on_peer_addition(peer);
self.update_on_peer_addition(peer, addresses);

// This should only happen once
if self.bootstrap.notify_new_peer() {
Expand Down
10 changes: 8 additions & 2 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use custom_debug::Debug as CustomDebug;
#[cfg(feature = "local")]
use libp2p::mdns;
use libp2p::{
kad::{Record, RecordKey, K_VALUE},
kad::{Addresses, Record, RecordKey, K_VALUE},
request_response::ResponseChannel as PeerResponseChannel,
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -232,14 +232,20 @@ impl SwarmDriver {
}

/// Update state on addition of a peer to the routing table.
pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId) {
pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) {
self.peers_in_rt = self.peers_in_rt.saturating_add(1);
let n_peers = self.peers_in_rt;
info!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");

#[cfg(feature = "loud")]
println!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");

if let Some(bootstrap_cache) = &mut self.bootstrap_cache {
for addr in addresses.iter() {
bootstrap_cache.add_addr(addr.clone());
}
}

self.log_kbuckets(&added_peer);
self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt));

Expand Down
Loading

0 comments on commit 1aab709

Please sign in to comment.