Skip to content

Commit

Permalink
chore: not using seen_cache when add replication list
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 21, 2023
1 parent 4605345 commit c20f094
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 62 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/benchmark-prs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
shell: bash
run: |
client_peak_mem_limit_mb="700" # mb
client_avg_mem_limit_mb="350" # mb
client_avg_mem_limit_mb="400" # mb
peak_mem_usage=$(
rg '"memory_used_mb":[^,]*' $CLIENT_DATA_PATH/logs --glob safe.* -o --no-line-number --no-filename |
Expand Down Expand Up @@ -325,8 +325,8 @@ jobs:
echo "Total swarm_driver long handling times is: $total_num_of_times"
echo "Total swarm_driver long handling duration is: $total_long_handling ms"
echo "Total average swarm_driver long handling duration is: $average_handling_ms ms"
total_num_of_times_limit_hits="5000" # hits
total_long_handling_limit_ms="75000" # ms
total_num_of_times_limit_hits="10000" # hits
total_long_handling_limit_ms="150000" # ms
average_handling_limit_ms="20" # ms
if (( $(echo "$total_num_of_times > $total_num_of_times_limit_hits" | bc -l) )); then
echo "Swarm_driver long handling times exceeded threshold: $total_num_of_times hits"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ jobs:
# limits here are lower that benchmark tests as there is less going on.
run: |
client_peak_mem_limit_mb="700" # mb
client_avg_mem_limit_mb="350" # mb
client_avg_mem_limit_mb="400" # mb
peak_mem_usage=$(
rg '"memory_used_mb":[^,]*' $CLIENT_DATA_PATH/logs --glob safe.* -o --no-line-number --no-filename |
Expand Down
76 changes: 40 additions & 36 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};
use bytes::Bytes;
use libp2p::{
kad::{store::RecordStore, Quorum, Record, RecordKey, K_VALUE},
kad::{store::RecordStore, Quorum, Record, RecordKey},
swarm::dial_opts::DialOpts,
Multiaddr, PeerId,
};
Expand All @@ -25,16 +25,12 @@ use sn_protocol::{
};
use sn_transfers::NanoTokens;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{HashMap, HashSet},
fmt::Debug,
time::{Duration, Instant},
};
use tokio::sync::oneshot;
use xor_name::XorName;

// Timeout for seen replications to faded out.
const SEEN_REPLICATION_FADED_OUT_S: Duration = Duration::from_secs(120);

/// Commands to send to the Swarm
#[allow(clippy::large_enum_variant)]
pub enum SwarmCmd {
Expand Down Expand Up @@ -282,44 +278,52 @@ impl SwarmDriver {
#[allow(clippy::mutable_key_type)] // for Bytes in NetworkAddress
fn select_new_replications(
&mut self,
holder: PeerId,
incoming_keys: &mut HashMap<NetworkAddress, RecordType>,
) {
if self.seen_replications.len() / 2 > K_VALUE.into() {
self.seen_replications.retain(|_k, (time_stamp, _keys)| {
time_stamp.elapsed() < SEEN_REPLICATION_FADED_OUT_S
});
}
incoming_keys: &HashMap<NetworkAddress, RecordType>,
) -> Vec<(NetworkAddress, RecordType)> {
#[allow(clippy::mutable_key_type)]
let locally_stored_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let non_exist_keys: Vec<_> = incoming_keys
.iter()
.filter(|(addr, record_type)| {
let key = addr.to_record_key();
let local = locally_stored_keys.get(&key);

let entry = self.seen_replications.entry(holder);
if let Entry::Occupied(mut occupied_entry) = entry {
let (time_stamp, existing_keys) = occupied_entry.get_mut();
incoming_keys.retain(|k, v| match existing_keys.get(k) {
Some(existing_v) => v != existing_v,
None => true,
});
existing_keys.extend(incoming_keys.clone());
*time_stamp = Instant::now();
} else {
entry.or_insert((Instant::now(), incoming_keys.clone()));
}
// if we have a local value of matching record_type, we don't need to fetch it
if let Some((_, local_record_type)) = local {
&local_record_type != record_type
} else {
true
}
})
.collect();

let closest_k_peers = self.get_closest_k_value_local_peers();
non_exist_keys
.into_iter()
.filter_map(|(key, record_type)| {
if self.is_in_close_range(key, &closest_k_peers) {
Some((key.clone(), record_type.clone()))
} else {
None
}
})
.collect()
}

pub(crate) fn handle_cmd(&mut self, cmd: SwarmCmd) -> Result<(), Error> {
match cmd {
SwarmCmd::AddKeysToReplicationFetcher { holder, mut keys } => {
// Only handle those `first time see` keys
self.select_new_replications(holder, &mut keys);
if keys.is_empty() {
SwarmCmd::AddKeysToReplicationFetcher { holder, keys } => {
// Only handle those non-exist and in close range keys
let keys_to_store = self.select_new_replications(&keys);
if keys_to_store.is_empty() {
return Ok(());
}

// Only store record from Replication that close enough to us.
let closest_k_peers = self.get_closest_k_value_local_peers();
let keys_to_store = keys
.into_iter()
.filter(|(key, _)| self.is_in_close_range(key, &closest_k_peers))
.collect();
#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
Expand Down
5 changes: 1 addition & 4 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ use libp2p::{
use prometheus_client::registry::Registry;
use sn_protocol::{
messages::{Request, Response},
storage::RecordType,
NetworkAddress, PrettyPrintKBucketKey,
};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
time::{Duration, Instant},
time::Duration,
};
use tiny_keccak::{Hasher, Sha3};
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -480,7 +479,6 @@ impl NetworkBuilder {
// `identify` protocol to kick in and get them in the routing table.
dialed_peers: CircularVec::new(63),
is_gossip_listener: false,
seen_replications: Default::default(),
};

Ok((
Expand Down Expand Up @@ -520,7 +518,6 @@ pub struct SwarmDriver {
/// A list of the most recent peers we have dialed ourselves.
pub(crate) dialed_peers: CircularVec<PeerId>,
pub(crate) is_gossip_listener: bool,
pub(crate) seen_replications: HashMap<PeerId, (Instant, HashMap<NetworkAddress, RecordType>)>,
}

impl SwarmDriver {
Expand Down
23 changes: 5 additions & 18 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ impl ReplicationFetcher {
}
}

// Adds the non existing incoming keys from the peer to the fetcher. Returns the next set of keys that has to be
// fetched from the peer/network.
// Adds the non existing incoming keys from the peer to the fetcher.
// Returns the next set of keys that has to be fetched from the peer/network.
//
// Note: the `incoming_keys` shall already got filter for existence.
pub(crate) fn add_keys(
&mut self,
holder: PeerId,
Expand All @@ -58,22 +60,7 @@ impl ReplicationFetcher {
// add non existing keys to the fetcher
incoming_keys
.into_iter()
.filter_map(|(addr, record_type)| {
let key = addr.to_record_key();
let local = locally_stored_keys.get(&key);

// if we have a local value of matching record_type, we don't need to fetch it
if let Some((_, local_record_type)) = local {
if local_record_type == &record_type {
None
} else {
Some((key, record_type))
}
} else {
Some((key, record_type))
}
})
.for_each(|(key, record_type)| self.add_key(holder, key, record_type));
.for_each(|(key, record_type)| self.add_key(holder, key.to_record_key(), record_type));

self.next_keys_to_fetch()
}
Expand Down

0 comments on commit c20f094

Please sign in to comment.