Skip to content

Commit

Permalink
fix(networking): remove excessive parsing of incoming keys
Browse files Browse the repository at this point in the history
Removes compute intensive logic that is functionally the same as the result of
replication_fetcher.add_keys
  • Loading branch information
joshuef committed Jul 18, 2024
1 parent 275d893 commit 680e44e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 122 deletions.
123 changes: 17 additions & 106 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@

use crate::{
sort_peers_by_address, MsgResponder, NetworkError, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE,
REPLICATION_PEERS_COUNT,
};
use itertools::Itertools;
use libp2p::{
request_response::{self, Message},
PeerId,
};
use libp2p::request_response::{self, Message};
use rand::{rngs::OsRng, thread_rng, Rng};
use sn_protocol::{
messages::{CmdResponse, Request, Response},
Expand Down Expand Up @@ -203,42 +199,35 @@ impl SwarmDriver {
return;
}

let more_than_one_key = incoming_keys.len() > 1;

// On receive a replication_list from a close_group peer, we undertake two tasks:
// 1, For those keys that we don't have:
// fetch them if close enough to us
// 2, For those keys that we have and supposed to be held by the sender as well:
// start chunk_proof check against a randomly selected chunk type record to the sender
// 3, For those spends that we have that differ in the hash, we fetch the other version
// and update our local copy.

// For fetching, only handle those non-exist and in close range keys
let keys_to_store =
self.select_non_existent_records_for_replications(&incoming_keys, &closest_k_peers);

if keys_to_store.is_empty() {
debug!("Empty keys to store after adding to");
#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let keys_to_fetch = self
.replication_fetcher
.add_keys(holder, incoming_keys, all_keys);
if keys_to_fetch.is_empty() {
trace!("no waiting keys to fetch from the network");
} else {
#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let keys_to_fetch = self
.replication_fetcher
.add_keys(holder, keys_to_store, all_keys);
if keys_to_fetch.is_empty() {
trace!("no waiting keys to fetch from the network");
} else {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
}
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
}

// Only trigger chunk_proof check based every X% of the time
let mut rng = thread_rng();
// 5% probability
if incoming_keys.len() > 1 && rng.gen_bool(0.05) {
if more_than_one_key && rng.gen_bool(0.05) {
let keys_to_verify = self.select_verification_data_candidates(sender);

if keys_to_verify.is_empty() {
Expand All @@ -252,84 +241,6 @@ impl SwarmDriver {
}
}

/// Checks suggested records against what we hold, so we only
/// enqueue what we do not have
fn select_non_existent_records_for_replications(
&mut self,
incoming_keys: &[(NetworkAddress, RecordType)],
closest_k_peers: &Vec<PeerId>,
) -> Vec<(NetworkAddress, RecordType)> {
#[allow(clippy::mutable_key_type)]
let locally_stored_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let non_existent_keys: Vec<_> = incoming_keys
.iter()
.filter(|(addr, record_type)| {
let key = addr.to_record_key();
let local = locally_stored_keys.get(&key);

// if we have a local value of matching record_type, we don't need to fetch it
if let Some((_, local_record_type)) = local {
let not_same_type = local_record_type != record_type;
if not_same_type {
// Shall only happens for Register, or DoubleSpendAttempts
info!("Record {addr:?} has different type: local {local_record_type:?}, incoming {record_type:?}");
}
not_same_type
} else {
true
}
})
.collect();

non_existent_keys
.into_iter()
.filter_map(|(key, record_type)| {
if Self::is_in_close_range(&self.self_peer_id, key, closest_k_peers) {
Some((key.clone(), record_type.clone()))
} else {
// Reduce the log level as there will always be around 40% records being
// out of the close range, as the sender side is using `CLOSE_GROUP_SIZE + 2`
// to send our replication list to provide addressing margin.
// Given there will normally be 6 nodes sending such list with interval of 5-10s,
// this will accumulate to a lot of logs with the increasing records uploaded.
trace!("not in close range for key {key:?}");
None
}
})
.collect()
}

/// A close target doesn't falls into the close peers range:
/// For example, a node b11111X has an RT: [(1, b1111), (2, b111), (5, b11), (9, b1), (7, b0)]
/// Then for a target bearing b011111 as prefix, all nodes in (7, b0) are its close_group peers.
/// Then the node b11111X. But b11111X's close_group peers [(1, b1111), (2, b111), (5, b11)]
/// are none among target b011111's close range.
/// Hence, the ilog2 calculation based on close_range cannot cover such case.
/// And have to sort all nodes to figure out whether self is among the close_group to the target.
fn is_in_close_range(
our_peer_id: &PeerId,
target: &NetworkAddress,
all_peers: &Vec<PeerId>,
) -> bool {
if all_peers.len() <= REPLICATION_PEERS_COUNT {
return true;
}

// Margin of 2 to allow our RT being bit lagging.
match sort_peers_by_address(all_peers, target, REPLICATION_PEERS_COUNT) {
Ok(close_group) => close_group.contains(&our_peer_id),
Err(err) => {
warn!("Could not get sorted peers for {target:?} with error {err:?}");
true
}
}
}

/// Check among all chunk type records that we have, select those close to the peer,
/// and randomly pick one as the verification candidate.
#[allow(clippy::mutable_key_type)]
Expand Down
49 changes: 33 additions & 16 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,30 @@ impl ReplicationFetcher {
pub(crate) fn add_keys(
&mut self,
holder: PeerId,
mut incoming_keys: Vec<(NetworkAddress, RecordType)>,
incoming_keys: Vec<(NetworkAddress, RecordType)>,
locally_stored_keys: &HashMap<RecordKey, (NetworkAddress, RecordType)>,
) -> Vec<(PeerId, RecordKey)> {
// remove locally stored from incoming_keys
let mut new_incoming_keys: Vec<_> = incoming_keys
.iter()
.filter(|(addr, record_type)| {
let key = &addr.to_record_key();
!locally_stored_keys.contains_key(key)
&& !self
.to_be_fetched
.contains_key(&(key.clone(), record_type.clone(), holder))
})
.cloned()
.collect();

self.remove_stored_keys(locally_stored_keys);
let self_address = NetworkAddress::from_peer(self.self_peer_id);
let total_incoming_keys = incoming_keys.len();
let total_incoming_keys = new_incoming_keys.len();

// In case of node full, restrict fetch range
if let Some(farthest_distance) = self.farthest_acceptable_distance {
let mut out_of_range_keys = vec![];
incoming_keys.retain(|(addr, _)| {
new_incoming_keys.retain(|(addr, _)| {
let is_in_range = self_address.distance(addr) <= farthest_distance;
if !is_in_range {
out_of_range_keys.push(addr.clone());
Expand All @@ -101,8 +114,8 @@ impl ReplicationFetcher {
let mut keys_to_fetch = vec![];
// For new data, it will be replicated out in a special replication_list of length 1.
// And we shall `fetch` that copy immediately (if in range), if it's not being fetched.
if incoming_keys.len() == 1 {
let (record_address, record_type) = incoming_keys[0].clone();
if new_incoming_keys.len() == 1 {
let (record_address, record_type) = new_incoming_keys[0].clone();

let new_data_key = (record_address.to_record_key(), record_type);

Expand All @@ -113,16 +126,16 @@ impl ReplicationFetcher {
}

// To avoid later on un-necessary actions.
incoming_keys.clear();
new_incoming_keys.clear();
}

self.to_be_fetched
.retain(|_, time_out| *time_out > Instant::now());

let mut out_of_range_keys = vec![];
// Filter out those out_of_range ones among the imcoming_keys.
// Filter out those out_of_range ones among the incoming_keys.
if let Some(ref distance_range) = self.distance_range {
incoming_keys.retain(|(addr, _record_type)| {
new_incoming_keys.retain(|(addr, _record_type)| {
let is_in_range =
self_address.distance(addr).ilog2().unwrap_or(0) <= *distance_range;
if !is_in_range {
Expand All @@ -141,12 +154,14 @@ impl ReplicationFetcher {
}

// add in-range AND non existing keys to the fetcher
incoming_keys.into_iter().for_each(|(addr, record_type)| {
let _ = self
.to_be_fetched
.entry((addr.to_record_key(), record_type, holder))
.or_insert(Instant::now() + PENDING_TIMEOUT);
});
new_incoming_keys
.into_iter()
.for_each(|(addr, record_type)| {
let _ = self
.to_be_fetched
.entry((addr.to_record_key(), record_type, holder))
.or_insert(Instant::now() + PENDING_TIMEOUT);
});

keys_to_fetch.extend(self.next_keys_to_fetch());

Expand Down Expand Up @@ -469,11 +484,13 @@ mod tests {
replication_fetcher.add_keys(PeerId::random(), incoming_keys, &Default::default());
assert_eq!(
keys_to_fetch.len(),
replication_fetcher.on_going_fetches.len()
replication_fetcher.on_going_fetches.len(),
"keys to fetch and ongoing fetches should match"
);
assert_eq!(
in_range_keys,
keys_to_fetch.len() + replication_fetcher.to_be_fetched.len()
keys_to_fetch.len() + replication_fetcher.to_be_fetched.len(),
"all keys should be in range and in the fetcher"
);
}
}

0 comments on commit 680e44e

Please sign in to comment.