Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: updated peer exchange #247

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) struct StatsCollector {
total_peers: u64,
total_grey_list: u64,
total_black_list: u64,
total_non_squad_peers: u64,
pending_incoming: u32,
pending_outgoing: u32,
established_incoming: u32,
Expand Down Expand Up @@ -72,6 +73,7 @@ impl StatsCollector {
total_peers: 0,
total_grey_list: 0,
total_black_list: 0,
total_non_squad_peers: 0,
sha_network_difficulty: Difficulty::min(),
randomx_network_difficulty: Difficulty::min(),
sha_target_difficulty: Difficulty::min(),
Expand Down Expand Up @@ -133,11 +135,13 @@ impl StatsCollector {
total_peers,
total_grey_list,
total_black_list,
total_non_squad,
..
} => {
self.total_peers = total_peers;
self.total_grey_list = total_grey_list;
self.total_black_list = total_black_list;
self.total_non_squad_peers = total_non_squad;
},
StatData::TargetDifficultyChanged {
target_difficulty,
Expand Down Expand Up @@ -194,7 +198,7 @@ impl StatsCollector {
let formatter = Formatter::new();

info!(target: LOG_TARGET,
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(a/g/b/o) {}/{}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
humantime::format_duration(Duration::from_secs(
EpochTime::now().as_u64().checked_sub(
self.first_stat_received.unwrap_or(EpochTime::now()).as_u64())
Expand All @@ -216,6 +220,7 @@ impl StatsCollector {
self.total_peers,
self.total_grey_list,
self.total_black_list,
self.total_non_squad_peers,
self.established_incoming,
self.established_outgoing,
humantime::format_duration(Duration::from_secs(
Expand Down Expand Up @@ -329,6 +334,7 @@ pub(crate) enum StatData {
total_peers: u64,
total_grey_list: u64,
total_black_list: u64,
total_non_squad: u64,
timestamp: EpochTime,
},
LibP2PStats {
Expand Down Expand Up @@ -452,11 +458,13 @@ impl StatsBroadcastClient {
total_peers: u64,
total_grey_list: u64,
total_black_list: u64,
total_non_squad: u64,
) -> Result<(), anyhow::Error> {
self.broadcast(StatData::NewPeer {
total_peers,
total_grey_list,
total_black_list,
total_non_squad,
timestamp: EpochTime::now(),
})
}
Expand Down
19 changes: 10 additions & 9 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,9 +786,7 @@ where S: ShareChain
return;
}
let peer_store_read_lock = self.network_peer_store.read().await;
let my_best_peers = peer_store_read_lock.best_peers_to_share(NUM_PEERS_TO_SYNC_PER_ALGO, &self.squad, &[]);

let my_best_peers: Vec<_> = my_best_peers.into_iter().map(|p| p.peer_info).collect();
let my_best_peers = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will never return any best peers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The person who requests does not include any peers. The responder returns peers.

let known_peers = peer_store_read_lock.get_known_peers();
drop(peer_store_read_lock);
self.swarm
Expand Down Expand Up @@ -850,6 +848,9 @@ where S: ShareChain
let num_peers = request.best_peers.len();

info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[DIRECT_PEER_EXCHANGE_REQ] New peer info: {source_peer:?} with {num_peers} new peers");
for peer in &request.best_peers {
info!(target: PEER_INFO_LOGGING_LOG_TARGET, "new peer from request received. {:?}, squad: {}", peer.peer_id, peer.squad);
}
let local_peer_id = *self.swarm.local_peer_id();
if let Ok(info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
Expand Down Expand Up @@ -1069,12 +1070,12 @@ where S: ShareChain
// Keep going until we have all the peers
if num_peers_added > 0 {
self.initiate_direct_peer_exchange(&peer_id).await;
} else {
// Once we have peer info from the seed peers, disconnect from them.
if self.network_peer_store.read().await.is_seed_peer(&peer_id) {
warn!(target: LOG_TARGET, "Disconnecting from seed peer {}", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
}
}

// Once we have peer info from the seed peers, disconnect from them.
if self.network_peer_store.read().await.is_seed_peer(&peer_id) {
warn!(target: LOG_TARGET, "Disconnecting from seed peer {}", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
}
},
Err(error) => {
Expand Down
37 changes: 28 additions & 9 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,19 @@ impl PeerStore {
}

pub fn best_peers_to_share(&self, count: usize, squad: &str, other_nodes_peers: &[PeerId]) -> Vec<PeerStoreRecord> {
let mut peers = if squad == self.my_squad {
self.whitelist_peers.values().collect::<Vec<_>>()
} else {
self.non_squad_peers.values().collect::<Vec<_>>()
};
peers.retain(|peer| {
!peer.peer_info.public_addresses().is_empty() && peer.last_ping.is_some() && peer.peer_info.squad == squad
// let mut peers = if squad == self.my_squad {
// self.whitelist_peers.values().collect::<Vec<_>>()
// } else {
// self.non_squad_peers.values().collect::<Vec<_>>()
// };
let mut peers = self.whitelist_peers.values().collect::<Vec<_>>();
peers.extend(self.non_squad_peers.values().collect::<Vec<_>>());
peers.retain(|peer| !peer.peer_info.public_addresses().is_empty() && peer.last_ping.is_some());
peers.sort_by(|a, b| {
(if a.peer_info.squad == squad { 0 } else { 1 })
.cmp(&(if b.peer_info.squad == squad { 0 } else { 1 }))
.then(b.last_seen().cmp(&a.last_seen()))
});
peers.sort_by_key(|a| a.last_seen());
peers.reverse();

peers.retain(|peer| !other_nodes_peers.contains(&peer.peer_id));
peers.truncate(count);
Expand All @@ -224,13 +227,18 @@ impl PeerStore {

pub fn best_peers_to_dial(&self, count: usize) -> Vec<PeerStoreRecord> {
let mut peers = self.whitelist_peers.values().collect::<Vec<_>>();
peers.extend(self.non_squad_peers.values().collect::<Vec<_>>());
peers.retain(|peer| {
!peer.peer_info.public_addresses().is_empty() &&
(peer.last_dial_attempt.is_none() || peer.last_dial_attempt.unwrap().elapsed().as_secs() > 120)
});
peers.sort_by(|a, b| {
b.num_grey_listings
.cmp(&a.num_grey_listings)
.then(
(if a.peer_info.squad == self.my_squad { 0 } else { 1 })
.cmp(&(if b.peer_info.squad == self.my_squad { 0 } else { 1 })),
)
.then(b.peer_info.current_random_x_pow.cmp(&a.peer_info.current_random_x_pow))
.then(b.peer_info.current_sha3x_pow.cmp(&a.peer_info.current_sha3x_pow))
});
Expand Down Expand Up @@ -308,6 +316,12 @@ impl PeerStore {
if peer_info.squad != self.my_squad {
self.non_squad_peers
.insert(peer_id.to_base58(), PeerStoreRecord::new(peer_id, peer_info));
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
return AddPeerStatus::NonSquad;
}

Expand All @@ -333,6 +347,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);

// self.peer_removals.insert(peer_id, removal_count).await;
Expand All @@ -359,6 +374,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}

Expand All @@ -373,6 +389,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}

Expand Down Expand Up @@ -428,6 +445,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}
}
Expand All @@ -449,6 +467,7 @@ impl PeerStore {
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}
}
Expand Down
Loading