Skip to content

Commit

Permalink
feat: updated peer exchange (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Feb 3, 2025
1 parent 78c1b24 commit be307e9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 19 deletions.
10 changes: 9 additions & 1 deletion src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) struct StatsCollector {
total_peers: u64,
total_grey_list: u64,
total_black_list: u64,
total_non_squad_peers: u64,
pending_incoming: u32,
pending_outgoing: u32,
established_incoming: u32,
Expand Down Expand Up @@ -72,6 +73,7 @@ impl StatsCollector {
total_peers: 0,
total_grey_list: 0,
total_black_list: 0,
total_non_squad_peers: 0,
sha_network_difficulty: Difficulty::min(),
randomx_network_difficulty: Difficulty::min(),
sha_target_difficulty: Difficulty::min(),
Expand Down Expand Up @@ -133,11 +135,13 @@ impl StatsCollector {
total_peers,
total_grey_list,
total_black_list,
total_non_squad,
..
} => {
self.total_peers = total_peers;
self.total_grey_list = total_grey_list;
self.total_black_list = total_black_list;
self.total_non_squad_peers = total_non_squad;
},
StatData::TargetDifficultyChanged {
target_difficulty,
Expand Down Expand Up @@ -194,7 +198,7 @@ impl StatsCollector {
let formatter = Formatter::new();

info!(target: LOG_TARGET,
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(a/g/b/o) {}/{}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
humantime::format_duration(Duration::from_secs(
EpochTime::now().as_u64().checked_sub(
self.first_stat_received.unwrap_or(EpochTime::now()).as_u64())
Expand All @@ -216,6 +220,7 @@ impl StatsCollector {
self.total_peers,
self.total_grey_list,
self.total_black_list,
self.total_non_squad_peers,
self.established_incoming,
self.established_outgoing,
humantime::format_duration(Duration::from_secs(
Expand Down Expand Up @@ -329,6 +334,7 @@ pub(crate) enum StatData {
total_peers: u64,
total_grey_list: u64,
total_black_list: u64,
total_non_squad: u64,
timestamp: EpochTime,
},
LibP2PStats {
Expand Down Expand Up @@ -452,11 +458,13 @@ impl StatsBroadcastClient {
total_peers: u64,
total_grey_list: u64,
total_black_list: u64,
total_non_squad: u64,
) -> Result<(), anyhow::Error> {
self.broadcast(StatData::NewPeer {
total_peers,
total_grey_list,
total_black_list,
total_non_squad,
timestamp: EpochTime::now(),
})
}
Expand Down
19 changes: 10 additions & 9 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,9 +786,7 @@ where S: ShareChain
return;
}
let peer_store_read_lock = self.network_peer_store.read().await;
let my_best_peers = peer_store_read_lock.best_peers_to_share(NUM_PEERS_TO_SYNC_PER_ALGO, &self.squad, &[]);

let my_best_peers: Vec<_> = my_best_peers.into_iter().map(|p| p.peer_info).collect();
let my_best_peers = vec![];
let known_peers = peer_store_read_lock.get_known_peers();
drop(peer_store_read_lock);
self.swarm
Expand Down Expand Up @@ -850,6 +848,9 @@ where S: ShareChain
let num_peers = request.best_peers.len();

info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[DIRECT_PEER_EXCHANGE_REQ] New peer info: {source_peer:?} with {num_peers} new peers");
for peer in &request.best_peers {
info!(target: PEER_INFO_LOGGING_LOG_TARGET, "new peer from request received. {:?}, squad: {}", peer.peer_id, peer.squad);
}
let local_peer_id = *self.swarm.local_peer_id();
if let Ok(info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
Expand Down Expand Up @@ -1069,12 +1070,12 @@ where S: ShareChain
// Keep going until we have all the peers
if num_peers_added > 0 {
self.initiate_direct_peer_exchange(&peer_id).await;
} else {
// Once we have peer info from the seed peers, disconnect from them.
if self.network_peer_store.read().await.is_seed_peer(&peer_id) {
warn!(target: LOG_TARGET, "Disconnecting from seed peer {}", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
}
}

// Once we have peer info from the seed peers, disconnect from them.
if self.network_peer_store.read().await.is_seed_peer(&peer_id) {
warn!(target: LOG_TARGET, "Disconnecting from seed peer {}", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
}
},
Err(error) => {
Expand Down
37 changes: 28 additions & 9 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,19 @@ impl PeerStore {
}

pub fn best_peers_to_share(&self, count: usize, squad: &str, other_nodes_peers: &[PeerId]) -> Vec<PeerStoreRecord> {
let mut peers = if squad == self.my_squad {
self.whitelist_peers.values().collect::<Vec<_>>()
} else {
self.non_squad_peers.values().collect::<Vec<_>>()
};
peers.retain(|peer| {
!peer.peer_info.public_addresses().is_empty() && peer.last_ping.is_some() && peer.peer_info.squad == squad
// let mut peers = if squad == self.my_squad {
// self.whitelist_peers.values().collect::<Vec<_>>()
// } else {
// self.non_squad_peers.values().collect::<Vec<_>>()
// };
let mut peers = self.whitelist_peers.values().collect::<Vec<_>>();
peers.extend(self.non_squad_peers.values().collect::<Vec<_>>());
peers.retain(|peer| !peer.peer_info.public_addresses().is_empty() && peer.last_ping.is_some());
peers.sort_by(|a, b| {
(if a.peer_info.squad == squad { 0 } else { 1 })
.cmp(&(if b.peer_info.squad == squad { 0 } else { 1 }))
.then(b.last_seen().cmp(&a.last_seen()))
});
peers.sort_by_key(|a| a.last_seen());
peers.reverse();

peers.retain(|peer| !other_nodes_peers.contains(&peer.peer_id));
peers.truncate(count);
Expand All @@ -224,13 +227,18 @@ impl PeerStore {

pub fn best_peers_to_dial(&self, count: usize) -> Vec<PeerStoreRecord> {
let mut peers = self.whitelist_peers.values().collect::<Vec<_>>();
peers.extend(self.non_squad_peers.values().collect::<Vec<_>>());
peers.retain(|peer| {
!peer.peer_info.public_addresses().is_empty() &&
(peer.last_dial_attempt.is_none() || peer.last_dial_attempt.unwrap().elapsed().as_secs() > 120)
});
peers.sort_by(|a, b| {
b.num_grey_listings
.cmp(&a.num_grey_listings)
.then(
(if a.peer_info.squad == self.my_squad { 0 } else { 1 })
.cmp(&(if b.peer_info.squad == self.my_squad { 0 } else { 1 })),
)
.then(b.peer_info.current_random_x_pow.cmp(&a.peer_info.current_random_x_pow))
.then(b.peer_info.current_sha3x_pow.cmp(&a.peer_info.current_sha3x_pow))
});
Expand Down Expand Up @@ -308,6 +316,12 @@ impl PeerStore {
if peer_info.squad != self.my_squad {
self.non_squad_peers
.insert(peer_id.to_base58(), PeerStoreRecord::new(peer_id, peer_info));
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
return AddPeerStatus::NonSquad;
}

Expand All @@ -333,6 +347,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);

// self.peer_removals.insert(peer_id, removal_count).await;
Expand All @@ -359,6 +374,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}

Expand All @@ -373,6 +389,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}

Expand Down Expand Up @@ -428,6 +445,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}
}
Expand All @@ -449,6 +467,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}
}
Expand Down

0 comments on commit be307e9

Please sign in to comment.