Skip to content

Commit

Permalink
chore(node): move add_to_replicate_fetcher to driver
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and joshuef committed Jan 9, 2024
1 parent ca03c8e commit 650e12d
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 211 deletions.
131 changes: 13 additions & 118 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
use crate::{
driver::{PendingGetClosestType, SwarmDriver},
error::{Error, Result},
multiaddr_pop_p2p, sort_peers_by_address, GetRecordCfg, GetRecordError, MsgResponder,
NetworkEvent, CLOSE_GROUP_SIZE, REPLICATE_RANGE,
multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
REPLICATE_RANGE,
};
use bytes::Bytes;
use libp2p::{
Expand Down Expand Up @@ -146,11 +146,6 @@ pub enum SwarmCmd {
},
/// Triggers interval repliation
TriggerIntervalReplication,
/// The keys added to the replication fetcher are later used to fetch the Record from network
AddKeysToReplicationFetcher {
holder: PeerId,
keys: HashMap<NetworkAddress, RecordType>,
},
/// Subscribe to a given Gossipsub topic
GossipsubSubscribe(String),
/// Unsubscribe from a given Gossipsub topic
Expand Down Expand Up @@ -221,13 +216,6 @@ impl Debug for SwarmCmd {
SwarmCmd::TriggerIntervalReplication => {
write!(f, "SwarmCmd::TriggerIntervalReplication")
}
SwarmCmd::AddKeysToReplicationFetcher { holder, keys } => {
write!(
f,
"SwarmCmd::AddKeysToReplicationFetcher {{ holder: {holder:?}, keys_len: {:?} }}",
keys.len()
)
}
SwarmCmd::GossipsubSubscribe(topic) => {
write!(f, "SwarmCmd::GossipsubSubscribe({topic:?})")
}
Expand Down Expand Up @@ -307,84 +295,11 @@ pub struct SwarmLocalState {
}

impl SwarmDriver {
/// Checks suggested records against what we hold, so we only
/// enqueue what we do not have
#[allow(clippy::mutable_key_type)] // for Bytes in NetworkAddress
fn select_non_existent_records_for_replications(
&mut self,
incoming_keys: &HashMap<NetworkAddress, RecordType>,
) -> Vec<(NetworkAddress, RecordType)> {
#[allow(clippy::mutable_key_type)]
let locally_stored_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let non_existent_keys: Vec<_> = incoming_keys
.iter()
.filter(|(addr, record_type)| {
let key = addr.to_record_key();
let local = locally_stored_keys.get(&key);

// if we have a local value of matching record_type, we don't need to fetch it
if let Some((_, local_record_type)) = local {
&local_record_type != record_type
} else {
true
}
})
.collect();

let closest_k_peers = self.get_closest_k_value_local_peers();

non_existent_keys
.into_iter()
.filter_map(|(key, record_type)| {
if self.is_in_close_range(key, &closest_k_peers) {
Some((key.clone(), record_type.clone()))
} else {
// Reduce the log level as there will always be around 40% records being
// out of the close range, as the sender side is using `CLOSE_GROUP_SIZE + 2`
// to send our replication list to provide addressing margin.
// Given there will normally be 6 nodes sending such list with interval of 5-10s,
// this will accumulate to a lot of logs with the increasing records uploaded.
trace!("not in close range for key {key:?}");
None
}
})
.collect()
}

pub(crate) fn handle_cmd(&mut self, cmd: SwarmCmd) -> Result<(), Error> {
match cmd {
SwarmCmd::TriggerIntervalReplication => {
self.try_interval_replication()?;
}
SwarmCmd::AddKeysToReplicationFetcher { holder, keys } => {
// Only handle those non-exist and in close range keys
let keys_to_store = self.select_non_existent_records_for_replications(&keys);
if keys_to_store.is_empty() {
debug!("Empty keys to store after adding to");
return Ok(());
}

#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let keys_to_fetch =
self.replication_fetcher
.add_keys(holder, keys_to_store, all_keys);
if !keys_to_fetch.is_empty() {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
} else {
trace!("no waiting keys to fetch from the network");
}
}
SwarmCmd::GetNetworkRecord { key, sender, cfg } => {
let query_id = self.swarm.behaviour_mut().kademlia.get_record(key.clone());

Expand Down Expand Up @@ -748,35 +663,16 @@ impl SwarmDriver {
Ok(())
}

// A close target doesn't falls into the close peers range:
// For example, a node b11111X has an RT: [(1, b1111), (2, b111), (5, b11), (9, b1), (7, b0)]
// Then for a target bearing b011111 as prefix, all nodes in (7, b0) are its close_group peers.
// Then the node b11111X. But b11111X's close_group peers [(1, b1111), (2, b111), (5, b11)]
// are none among target b011111's close range.
// Hence, the ilog2 calculation based on close_range cannot cover such case.
// And have to sort all nodes to figure out whether self is among the close_group to the target.
fn is_in_close_range(&self, target: &NetworkAddress, all_peers: &Vec<PeerId>) -> bool {
if all_peers.len() <= REPLICATE_RANGE {
return true;
}

// Margin of 2 to allow our RT being bit lagging.
match sort_peers_by_address(all_peers, target, REPLICATE_RANGE) {
Ok(close_group) => close_group.contains(&&self.self_peer_id),
Err(err) => {
warn!("Could not get sorted peers for {target:?} with error {err:?}");
true
}
}
}

fn try_interval_replication(&mut self) -> Result<()> {
// Already contains self_peer_id
let mut closest_k_peers = self.get_closest_k_value_local_peers();

// remove our peer id from the calculations here:
let our_peer_id = *self.swarm.local_peer_id();
closest_k_peers.retain(|peer_id| peer_id != &our_peer_id);
// get closest peers from buckets, sorted by increasing distance to us
let our_peer_id = self.self_peer_id.into();
let closest_k_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&our_peer_id)
// Map KBucketKey<PeerId> to PeerId.
.map(|key| key.into_preimage());

// Only grab the closest nodes within the REPLICATE_RANGE
let replicate_targets = closest_k_peers
Expand All @@ -785,8 +681,7 @@ impl SwarmDriver {
.take(REPLICATE_RANGE)
.collect::<Vec<_>>();

#[allow(clippy::mutable_key_type)] // for Bytes in NetworkAddress
let all_records: HashMap<_, _> = self
let all_records: Vec<_> = self
.swarm
.behaviour_mut()
.kademlia
Expand All @@ -802,7 +697,7 @@ impl SwarmDriver {
all_records.len()
);
let request = Request::Cmd(Cmd::Replicate {
holder: NetworkAddress::from_peer(our_peer_id),
holder: NetworkAddress::from_peer(self.self_peer_id),
keys: all_records,
});
for peer_id in replicate_targets {
Expand Down
139 changes: 124 additions & 15 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
use crate::{
driver::{truncate_patch_version, PendingGetClosestType, SwarmDriver},
error::{Error, Result},
multiaddr_is_global, multiaddr_strip_p2p, CLOSE_GROUP_SIZE,
multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, CLOSE_GROUP_SIZE,
REPLICATE_RANGE,
};
use bytes::Bytes;
use core::fmt;
Expand All @@ -32,7 +33,7 @@ use libp2p::{
};

use sn_protocol::{
messages::{Cmd, CmdResponse, Query, Request, Response},
messages::{CmdResponse, Query, Request, Response},
storage::RecordType,
NetworkAddress, PrettyPrintRecordKey,
};
Expand Down Expand Up @@ -108,11 +109,6 @@ pub enum MsgResponder {
#[allow(clippy::large_enum_variant)]
/// Events forwarded by the underlying Network; to be used by the upper layers
pub enum NetworkEvent {
/// Incoming `Cmd` from a peer
CmdRequestReceived {
/// Cmd
cmd: Cmd,
},
/// Incoming `Query` from a peer
QueryRequestReceived {
/// Query
Expand Down Expand Up @@ -161,9 +157,6 @@ pub enum NetworkEvent {
impl Debug for NetworkEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
NetworkEvent::CmdRequestReceived { cmd, .. } => {
write!(f, "NetworkEvent::CmdRequestReceived({cmd:?})")
}
NetworkEvent::QueryRequestReceived { query, .. } => {
write!(f, "NetworkEvent::QueryRequestReceived({query:?})")
}
Expand Down Expand Up @@ -646,7 +639,8 @@ impl SwarmDriver {
// as we send that regardless of how we handle the request as its unimportant to the sender.
match request {
Request::Cmd(sn_protocol::messages::Cmd::Replicate { holder, keys }) => {
trace!("Short circuit ReplicateOk response to peer {peer:?}");
self.add_keys_to_replication_fetcher(holder, keys);

let response = Response::Cmd(
sn_protocol::messages::CmdResponse::Replicate(Ok(())),
);
Expand All @@ -655,10 +649,6 @@ impl SwarmDriver {
.request_response
.send_response(channel, response)
.map_err(|_| Error::InternalMsgChannelDropped)?;

self.send_event(NetworkEvent::CmdRequestReceived {
cmd: sn_protocol::messages::Cmd::Replicate { holder, keys },
});
}
Request::Query(query) => {
self.send_event(NetworkEvent::QueryRequestReceived {
Expand Down Expand Up @@ -1103,6 +1093,125 @@ impl SwarmDriver {
let _result = self.swarm.close_connection(connection_id);
}
}

fn add_keys_to_replication_fetcher(
&mut self,
sender: NetworkAddress,
incoming_keys: Vec<(NetworkAddress, RecordType)>,
) {
let holder = if let Some(peer_id) = sender.as_peer_id() {
peer_id
} else {
warn!("Replication list sender is not a peer_id {sender:?}");
return;
};

trace!(
"Received replication list from {holder:?} of {} keys",
incoming_keys.len()
);

// accept replication requests from the K_VALUE peers away,
// giving us some margin for replication
let closest_k_peers = self.get_closest_k_value_local_peers();
if !closest_k_peers.contains(&holder) || holder == self.self_peer_id {
trace!("Holder {holder:?} is self or not in replication range.");
return;
}

// Only handle those non-exist and in close range keys
let keys_to_store =
self.select_non_existent_records_for_replications(&incoming_keys, &closest_k_peers);
if keys_to_store.is_empty() {
debug!("Empty keys to store after adding to");
return;
}

#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let keys_to_fetch = self
.replication_fetcher
.add_keys(holder, keys_to_store, all_keys);
if !keys_to_fetch.is_empty() {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
} else {
trace!("no waiting keys to fetch from the network");
}
}

/// Checks suggested records against what we hold, so we only
/// enqueue what we do not have
fn select_non_existent_records_for_replications(
&mut self,
incoming_keys: &[(NetworkAddress, RecordType)],
closest_k_peers: &Vec<PeerId>,
) -> Vec<(NetworkAddress, RecordType)> {
#[allow(clippy::mutable_key_type)]
let locally_stored_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let non_existent_keys: Vec<_> = incoming_keys
.iter()
.filter(|(addr, record_type)| {
let key = addr.to_record_key();
let local = locally_stored_keys.get(&key);

// if we have a local value of matching record_type, we don't need to fetch it
if let Some((_, local_record_type)) = local {
local_record_type != record_type
} else {
true
}
})
.collect();

non_existent_keys
.into_iter()
.filter_map(|(key, record_type)| {
if self.is_in_close_range(key, closest_k_peers) {
Some((key.clone(), record_type.clone()))
} else {
// Reduce the log level as there will always be around 40% records being
// out of the close range, as the sender side is using `CLOSE_GROUP_SIZE + 2`
// to send our replication list to provide addressing margin.
// Given there will normally be 6 nodes sending such list with interval of 5-10s,
// this will accumulate to a lot of logs with the increasing records uploaded.
trace!("not in close range for key {key:?}");
None
}
})
.collect()
}

// A close target doesn't falls into the close peers range:
// For example, a node b11111X has an RT: [(1, b1111), (2, b111), (5, b11), (9, b1), (7, b0)]
// Then for a target bearing b011111 as prefix, all nodes in (7, b0) are its close_group peers.
// Then the node b11111X. But b11111X's close_group peers [(1, b1111), (2, b111), (5, b11)]
// are none among target b011111's close range.
// Hence, the ilog2 calculation based on close_range cannot cover such case.
// And have to sort all nodes to figure out whether self is among the close_group to the target.
fn is_in_close_range(&self, target: &NetworkAddress, all_peers: &Vec<PeerId>) -> bool {
if all_peers.len() <= REPLICATE_RANGE {
return true;
}

// Margin of 2 to allow our RT being bit lagging.
match sort_peers_by_address(all_peers, target, REPLICATE_RANGE) {
Ok(close_group) => close_group.contains(&&self.self_peer_id),
Err(err) => {
warn!("Could not get sorted peers for {target:?} with error {err:?}");
true
}
}
}
}

/// Helper function to print formatted connection role info.
Expand Down
Loading

0 comments on commit 650e12d

Please sign in to comment.