diff --git a/Cargo.lock b/Cargo.lock index 02360c33ddb..4c12e6fb984 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2311,10 +2311,9 @@ dependencies = [ name = "identify-example" version = "0.1.0" dependencies = [ - "async-std", - "async-trait", "futures", "libp2p", + "tokio", "tracing", "tracing-subscriber", ] @@ -2675,7 +2674,7 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" -version = "0.4.0" +version = "0.4.1" dependencies = [ "async-std", "libp2p-core", @@ -2838,7 +2837,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.47.0" +version = "0.47.1" dependencies = [ "async-std", "asynchronous-codec", @@ -4984,8 +4983,6 @@ dependencies = [ name = "rendezvous-example" version = "0.1.0" dependencies = [ - "async-std", - "async-trait", "futures", "libp2p", "tokio", diff --git a/Cargo.toml b/Cargo.toml index ed9977f45a7..da8d32e1a4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,14 +76,14 @@ asynchronous-codec = { version = "0.7.0" } futures-bounded = { version = "0.2.4" } futures-rustls = { version = "0.26.0", default-features = false } libp2p = { version = "0.54.1", path = "libp2p" } -libp2p-allow-block-list = { version = "0.4.0", path = "misc/allow-block-list" } +libp2p-allow-block-list = { version = "0.4.1", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.13.0", path = "protocols/autonat" } libp2p-connection-limits = { version = "0.4.0", path = "misc/connection-limits" } libp2p-core = { version = "0.42.0", path = "core" } libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" } libp2p-dns = { version = "0.42.0", path = "transports/dns" } libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" } -libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" } +libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.45.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.9" } libp2p-kad = { version = "0.47.0", path = "protocols/kad" } diff --git a/FUNDING.json b/FUNDING.json new file mode 100644 index 00000000000..cce3fb3fe4c --- /dev/null +++ b/FUNDING.json @@ -0,0 +1,5 @@ +{ + "opRetro": { + "projectId": "0xdf1bb03d08808e2d789f5eac8462bdc560f1bb5b0877f0cf8c66ab53a0bc2f5c" + } +} diff --git a/examples/identify/Cargo.toml b/examples/identify/Cargo.toml index cbe569b9d07..8d12699afa7 100644 --- a/examples/identify/Cargo.toml +++ b/examples/identify/Cargo.toml @@ -9,10 +9,9 @@ license = "MIT" release = false [dependencies] -async-std = { version = "1.12", features = ["attributes"] } -async-trait = "0.1" +tokio = { version = "1.37.0", features = ["full"] } futures = { workspace = true } -libp2p = { path = "../../libp2p", features = ["async-std", "dns", "dcutr", "identify", "macros", "noise", "ping", "relay", "rendezvous", "tcp", "tokio","yamux"] } +libp2p = { path = "../../libp2p", features = ["identify", "noise", "tcp", "tokio", "yamux"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/examples/identify/src/main.rs b/examples/identify/src/main.rs index 916317a5a43..22474061da6 100644 --- a/examples/identify/src/main.rs +++ b/examples/identify/src/main.rs @@ -25,14 +25,14 @@ use libp2p::{core::multiaddr::Multiaddr, identify, noise, swarm::SwarmEvent, tcp use std::{error::Error, time::Duration}; use tracing_subscriber::EnvFilter; -#[async_std::main] +#[tokio::main] async fn main() -> Result<(), Box> { let _ = tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .try_init(); let mut swarm = libp2p::SwarmBuilder::with_new_identity() - .with_async_std() + .with_tokio() .with_tcp( tcp::Config::default(), noise::Config::new, diff --git a/examples/rendezvous/Cargo.toml b/examples/rendezvous/Cargo.toml index 5a0672dcec8..4eea38616f4 100644 --- a/examples/rendezvous/Cargo.toml +++ b/examples/rendezvous/Cargo.toml @@ -9,10 +9,8 @@ license = "MIT" release = false [dependencies] -async-std = { version = "1.12", features = ["attributes"] } -async-trait = "0.1" futures = { workspace = true } -libp2p = { path = "../../libp2p", features = [ "async-std", "identify", "macros", "noise", "ping", "rendezvous", "tcp", "tokio", "yamux"] } +libp2p = { path = "../../libp2p", features = ["identify", "macros", "noise", "ping", "rendezvous", "tcp", "tokio", "yamux"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "time"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/funding.json b/funding.json deleted file mode 100644 index bcf7fc2783d..00000000000 --- a/funding.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "opRetro": { - "projectId": "0x966804cb492e1a4bde5d781a676a44a23d69aa5dd2562fa7a4f95bb606021c8b" - } -} diff --git a/misc/allow-block-list/CHANGELOG.md b/misc/allow-block-list/CHANGELOG.md index 0017cbc8648..3cda0603ee4 100644 --- a/misc/allow-block-list/CHANGELOG.md +++ b/misc/allow-block-list/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.4.1 + +- Add getters & setters for the allowed/blocked peers. + Return a `bool` for every "insert/remove" function, informing if a change was performed. + See [PR 5572](https://github.com/libp2p/rust-libp2p/pull/5572). + ## 0.4.0 diff --git a/misc/allow-block-list/Cargo.toml b/misc/allow-block-list/Cargo.toml index 4209d72ab4f..1ff0ccff906 100644 --- a/misc/allow-block-list/Cargo.toml +++ b/misc/allow-block-list/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-allow-block-list" edition = "2021" rust-version = { workspace = true } description = "Allow/block list connection management for libp2p." -version = "0.4.0" +version = "0.4.1" license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" keywords = ["peer-to-peer", "libp2p", "networking"] diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index c877ab09c9b..7646638a651 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -94,44 +94,74 @@ pub struct BlockedPeers { } impl Behaviour { + /// Peers that are currently allowed. + pub fn allowed_peers(&self) -> &HashSet { + &self.state.peers + } + /// Allow connections to the given peer. - pub fn allow_peer(&mut self, peer: PeerId) { - self.state.peers.insert(peer); - if let Some(waker) = self.waker.take() { - waker.wake() + /// + /// Returns whether the peer was newly inserted. Does nothing if the peer was already present in the set. + pub fn allow_peer(&mut self, peer: PeerId) -> bool { + let inserted = self.state.peers.insert(peer); + if inserted { + if let Some(waker) = self.waker.take() { + waker.wake() + } } + inserted } /// Disallow connections to the given peer. /// /// All active connections to this peer will be closed immediately. - pub fn disallow_peer(&mut self, peer: PeerId) { - self.state.peers.remove(&peer); - self.close_connections.push_back(peer); - if let Some(waker) = self.waker.take() { - waker.wake() + /// + /// Returns whether the peer was present in the set. Does nothing if the peer was not present in the set. + pub fn disallow_peer(&mut self, peer: PeerId) -> bool { + let removed = self.state.peers.remove(&peer); + if removed { + self.close_connections.push_back(peer); + if let Some(waker) = self.waker.take() { + waker.wake() + } } + removed } } impl Behaviour { + /// Peers that are currently blocked. + pub fn blocked_peers(&self) -> &HashSet { + &self.state.peers + } + /// Block connections to a given peer. /// /// All active connections to this peer will be closed immediately. - pub fn block_peer(&mut self, peer: PeerId) { - self.state.peers.insert(peer); - self.close_connections.push_back(peer); - if let Some(waker) = self.waker.take() { - waker.wake() + /// + /// Returns whether the peer was newly inserted. Does nothing if the peer was already present in the set. + pub fn block_peer(&mut self, peer: PeerId) -> bool { + let inserted = self.state.peers.insert(peer); + if inserted { + self.close_connections.push_back(peer); + if let Some(waker) = self.waker.take() { + waker.wake() + } } + inserted } /// Unblock connections to a given peer. - pub fn unblock_peer(&mut self, peer: PeerId) { - self.state.peers.remove(&peer); - if let Some(waker) = self.waker.take() { - waker.wake() + /// + /// Returns whether the peer was present in the set. Does nothing if the peer was not present in the set. + pub fn unblock_peer(&mut self, peer: PeerId) -> bool { + let removed = self.state.peers.remove(&peer); + if removed { + if let Some(waker) = self.waker.take() { + waker.wake() + } } + removed } } diff --git a/misc/server/Dockerfile b/misc/server/Dockerfile index 9d2742f97e8..1583fba6bef 100644 --- a/misc/server/Dockerfile +++ b/misc/server/Dockerfile @@ -1,5 +1,5 @@ # syntax=docker/dockerfile:1.5-labs -FROM rust:1.73.0 as chef +FROM rust:1.75.0 as chef RUN wget -q -O- https://github.com/LukeMathWalker/cargo-chef/releases/download/v0.1.62/cargo-chef-x86_64-unknown-linux-gnu.tar.gz | tar -zx -C /usr/local/bin RUN cargo install --locked --root /usr/local libp2p-lookup --version 0.6.4 WORKDIR /app diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 8e115052d31..c47a9f40f66 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.47.1 + +- Attempt to publish to at least mesh_n peers when flood publish is disabled. + See [PR 5578](https://github.com/libp2p/rust-libp2p/pull/5578). + ## 0.47.0 diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 4cb590bed0c..665f757fcb3 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-gossipsub" edition = "2021" rust-version = { workspace = true } description = "Gossipsub protocol for libp2p" -version = "0.47.0" +version = "0.47.1" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 0d1af1ada0c..16adb555a44 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -259,12 +259,6 @@ pub struct Behaviour { /// the set of [`ConnectionId`]s. connected_peers: HashMap, - /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. - topic_peers: HashMap>, - - /// A map of all connected peers to their subscribed topics. - peer_topics: HashMap>, - /// A set of all explicit peers. These are peers that remain connected and we unconditionally /// forward messages to, outside of the scoring system. explicit_peers: HashSet, @@ -443,8 +437,6 @@ where control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - topic_peers: HashMap::new(), - peer_topics: HashMap::new(), explicit_peers: HashSet::new(), blacklisted_peers: HashSet::new(), mesh: HashMap::new(), @@ -501,9 +493,9 @@ where /// Lists all known peers and their associated subscribed topics. pub fn all_peers(&self) -> impl Iterator)> { - self.peer_topics + self.connected_peers .iter() - .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect())) + .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect())) } /// Lists all known peers and their associated protocol. @@ -535,7 +527,7 @@ where } // send subscription request to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.connected_peers.keys().copied().collect::>() { tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); let event = RpcOut::Subscribe(topic_hash.clone()); self.send_message(peer, event); @@ -563,7 +555,7 @@ where } // announce to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.connected_peers.keys().copied().collect::>() { tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); let event = RpcOut::Unsubscribe(topic_hash.clone()); self.send_message(peer, event); @@ -621,84 +613,103 @@ where let topic_hash = raw_message.topic.clone(); + let mut peers_on_topic = self + .connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(&topic_hash)) + .map(|(peer_id, _)| peer_id) + .peekable(); + + if peers_on_topic.peek().is_none() { + return Err(PublishError::InsufficientPeers); + } + let mut recipient_peers = HashSet::new(); - if let Some(set) = self.topic_peers.get(&topic_hash) { - if self.config.flood_publish() { - // Forward to all peers above score and all explicit peers - recipient_peers.extend(set.iter().filter(|p| { - self.explicit_peers.contains(*p) - || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 - })); - } else { - match self.mesh.get(&raw_message.topic) { - // Mesh peers - Some(mesh_peers) => { - recipient_peers.extend(mesh_peers); + if self.config.flood_publish() { + // Forward to all peers above score and all explicit peers + recipient_peers.extend(peers_on_topic.filter(|p| { + self.explicit_peers.contains(*p) + || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 + })); + } else { + match self.mesh.get(&topic_hash) { + // Mesh peers + Some(mesh_peers) => { + // We have a mesh set. We want to make sure to publish to at least `mesh_n` + // peers (if possible). + let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len()); + + if needed_extra_peers > 0 { + // We don't have `mesh_n` peers in our mesh, we will randomly select extras + // and publish to them. + + // Get a random set of peers that are appropriate to send messages too. + let peer_list = get_random_peers( + &self.connected_peers, + &topic_hash, + needed_extra_peers, + |peer| { + !mesh_peers.contains(peer) + && !self.explicit_peers.contains(peer) + && !self + .score_below_threshold(peer, |pst| pst.publish_threshold) + .0 + }, + ); + recipient_peers.extend(peer_list); } - // Gossipsub peers - None => { - tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); - // If we have fanout peers add them to the map. - if self.fanout.contains_key(&topic_hash) { - for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { - recipient_peers.insert(*peer); - } - } else { - // We have no fanout peers, select mesh_n of them and add them to the fanout - let mesh_n = self.config.mesh_n(); - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - &topic_hash, - mesh_n, - { - |p| { - !self.explicit_peers.contains(p) - && !self - .score_below_threshold(p, |pst| { - pst.publish_threshold - }) - .0 - } - }, - ); - // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - tracing::debug!(%peer, "Peer added to fanout"); - recipient_peers.insert(peer); - } + + recipient_peers.extend(mesh_peers); + } + // Gossipsub peers + None => { + tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); + // If we have fanout peers add them to the map. + if self.fanout.contains_key(&topic_hash) { + for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + recipient_peers.insert(*peer); + } + } else { + // We have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n(); + let new_peers = + get_random_peers(&self.connected_peers, &topic_hash, mesh_n, { + |p| { + !self.explicit_peers.contains(p) + && !self + .score_below_threshold(p, |pst| pst.publish_threshold) + .0 + } + }); + // Add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + tracing::debug!(%peer, "Peer added to fanout"); + recipient_peers.insert(peer); } - // We are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); } + // We are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); } + } - // Explicit peers - for peer in &self.explicit_peers { - if set.contains(peer) { - recipient_peers.insert(*peer); - } - } + // Explicit peers that are part of the topic + recipient_peers + .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id))); - // Floodsub peers - for (peer, connections) in &self.connected_peers { - if connections.kind == PeerKind::Floodsub - && !self - .score_below_threshold(peer, |ts| ts.publish_threshold) - .0 - { - recipient_peers.insert(*peer); - } + // Floodsub peers + for (peer, connections) in &self.connected_peers { + if connections.kind == PeerKind::Floodsub + && !self + .score_below_threshold(peer, |ts| ts.publish_threshold) + .0 + { + recipient_peers.insert(*peer); } } } - if recipient_peers.is_empty() { - return Err(PublishError::InsufficientPeers); - } - // If the message isn't a duplicate and we have sent it to some peers add it to the // duplicate cache and memcache. self.duplicate_cache.insert(msg_id.clone()); @@ -964,7 +975,6 @@ where if added_peers.len() < self.config.mesh_n() { // get the peers let new_peers = get_random_peers( - &self.topic_peers, &self.connected_peers, topic_hash, self.config.mesh_n() - added_peers.len(), @@ -1009,7 +1019,6 @@ where peer_id, vec![topic_hash], &self.mesh, - self.peer_topics.get(&peer_id), &mut self.events, &self.connected_peers, ); @@ -1056,7 +1065,6 @@ where // Select peers for peer exchange let peers = if do_px { get_random_peers( - &self.topic_peers, &self.connected_peers, topic_hash, self.config.prune_peers(), @@ -1107,7 +1115,6 @@ where peer, topic_hash, &self.mesh, - self.peer_topics.get(&peer), &mut self.events, &self.connected_peers, ); @@ -1118,7 +1125,7 @@ where /// Checks if the given peer is still connected and if not dials the peer again. fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) { - if !self.peer_topics.contains_key(peer_id) { + if !self.connected_peers.contains_key(peer_id) { // Connect to peer tracing::debug!(peer=%peer_id, "Connecting to explicit peer"); self.events.push_back(ToSwarm::Dial { @@ -1329,17 +1336,19 @@ where let mut do_px = self.config.do_px(); + let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else { + tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft"); + return; + }; + // For each topic, if a peer has grafted us, then we necessarily must be in their mesh // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { - self.peer_topics - .entry(*peer_id) - .or_default() - .insert(topic.clone()); - self.topic_peers - .entry(topic.clone()) - .or_default() - .insert(*peer_id); + if connected_peer.topics.insert(topic.clone()) { + if let Some(m) = self.metrics.as_mut() { + m.inc_topic_peers(topic); + } + } } // we don't GRAFT to/from explicit peers; complain loudly if this happens @@ -1441,7 +1450,6 @@ where *peer_id, vec![&topic_hash], &self.mesh, - self.peer_topics.get(peer_id), &mut self.events, &self.connected_peers, ); @@ -1514,7 +1522,6 @@ where *peer_id, topic_hash, &self.mesh, - self.peer_topics.get(peer_id), &mut self.events, &self.connected_peers, ); @@ -1825,7 +1832,7 @@ where let mut unsubscribed_peers = Vec::new(); - let Some(subscribed_topics) = self.peer_topics.get_mut(propagation_source) else { + let Some(peer) = self.connected_peers.get_mut(propagation_source) else { tracing::error!( peer=%propagation_source, "Subscription by unknown peer" @@ -1841,7 +1848,7 @@ where let filtered_topics = match self .subscription_filter - .filter_incoming_subscriptions(subscriptions, subscribed_topics) + .filter_incoming_subscriptions(subscriptions, &peer.topics) { Ok(topics) => topics, Err(s) => { @@ -1857,29 +1864,24 @@ where for subscription in filtered_topics { // get the peers from the mapping, or insert empty lists if the topic doesn't exist let topic_hash = &subscription.topic_hash; - let peer_list = self.topic_peers.entry(topic_hash.clone()).or_default(); match subscription.action { SubscriptionAction::Subscribe => { - if peer_list.insert(*propagation_source) { + if peer.topics.insert(topic_hash.clone()) { tracing::debug!( peer=%propagation_source, topic=%topic_hash, "SUBSCRIPTION: Adding gossip peer to topic" ); - } - // add to the peer_topics mapping - subscribed_topics.insert(topic_hash.clone()); + if let Some(m) = self.metrics.as_mut() { + m.inc_topic_peers(topic_hash); + } + } // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) - && matches!( - self.connected_peers - .get(propagation_source) - .map(|v| &v.kind), - Some(PeerKind::Gossipsubv1_1) | Some(PeerKind::Gossipsub) - ) + && matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub) && !Self::score_below_threshold_from_scores( &self.peer_score, propagation_source, @@ -1922,16 +1924,18 @@ where })); } SubscriptionAction::Unsubscribe => { - if peer_list.remove(propagation_source) { + if peer.topics.remove(topic_hash) { tracing::debug!( peer=%propagation_source, topic=%topic_hash, "SUBSCRIPTION: Removing gossip peer from topic" ); + + if let Some(m) = self.metrics.as_mut() { + m.dec_topic_peers(topic_hash); + } } - // remove topic from the peer_topics mapping - subscribed_topics.remove(topic_hash); unsubscribed_peers.push((*propagation_source, topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed { @@ -1940,10 +1944,6 @@ where })); } } - - if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic_hash, peer_list.len()); - } } // remove unsubscribed peers from the mesh if it exists @@ -1958,7 +1958,6 @@ where *propagation_source, topics_joined, &self.mesh, - self.peer_topics.get(propagation_source), &mut self.events, &self.connected_peers, ); @@ -2039,7 +2038,6 @@ where for (topic_hash, peers) in self.mesh.iter_mut() { let explicit_peers = &self.explicit_peers; let backoffs = &self.backoffs; - let topic_peers = &self.topic_peers; let outbound_peers = &self.outbound_peers; // drop all peers with negative score, without PX @@ -2087,18 +2085,13 @@ where ); // not enough peers - get mesh_n - current_length more let desired_peers = self.config.mesh_n() - peers.len(); - let peer_list = get_random_peers( - topic_peers, - &self.connected_peers, - topic_hash, - desired_peers, - |peer| { + let peer_list = + get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) && *scores.get(peer).unwrap_or(&0.0) >= 0.0 - }, - ); + }); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); @@ -2154,10 +2147,9 @@ where if outbound <= self.config.mesh_outbound_min() { // do not remove anymore outbound peers continue; - } else { - // an outbound peer gets removed - outbound -= 1; } + // an outbound peer gets removed + outbound -= 1; } // remove the peer @@ -2180,19 +2172,14 @@ where // if we have not enough outbound peers, graft to some new outbound peers if outbound < self.config.mesh_outbound_min() { let needed = self.config.mesh_outbound_min() - outbound; - let peer_list = get_random_peers( - topic_peers, - &self.connected_peers, - topic_hash, - needed, - |peer| { + let peer_list = + get_random_peers(&self.connected_peers, topic_hash, needed, |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) && *scores.get(peer).unwrap_or(&0.0) >= 0.0 && outbound_peers.contains(peer) - }, - ); + }); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); @@ -2249,7 +2236,6 @@ where // GRAFT if median < thresholds.opportunistic_graft_threshold { let peer_list = get_random_peers( - topic_peers, &self.connected_peers, topic_hash, self.config.opportunistic_graft_peers(), @@ -2308,22 +2294,22 @@ where Some((_, thresholds, _, _)) => thresholds.publish_threshold, _ => 0.0, }; - for peer in peers.iter() { + for peer_id in peers.iter() { // is the peer still subscribed to the topic? - let peer_score = *scores.get(peer).unwrap_or(&0.0); - match self.peer_topics.get(peer) { - Some(topics) => { - if !topics.contains(topic_hash) || peer_score < publish_threshold { + let peer_score = *scores.get(peer_id).unwrap_or(&0.0); + match self.connected_peers.get(peer_id) { + Some(peer) => { + if !peer.topics.contains(topic_hash) || peer_score < publish_threshold { tracing::debug!( topic=%topic_hash, "HEARTBEAT: Peer removed from fanout for topic" ); - to_remove_peers.push(*peer); + to_remove_peers.push(*peer_id); } } None => { // remove if the peer has disconnected - to_remove_peers.push(*peer); + to_remove_peers.push(*peer_id); } } } @@ -2340,17 +2326,12 @@ where ); let needed_peers = self.config.mesh_n() - peers.len(); let explicit_peers = &self.explicit_peers; - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - topic_hash, - needed_peers, - |peer_id| { + let new_peers = + get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| { !peers.contains(peer_id) && !explicit_peers.contains(peer_id) && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold - }, - ); + }); peers.extend(new_peers); } } @@ -2432,17 +2413,12 @@ where ) }; // get gossip_lazy random peers - let to_msg_peers = get_random_peers_dynamic( - &self.topic_peers, - &self.connected_peers, - topic_hash, - n_map, - |peer| { + let to_msg_peers = + get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| { !peers.contains(peer) && !self.explicit_peers.contains(peer) && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0 - }, - ); + }); tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len()); @@ -2492,7 +2468,6 @@ where peer, vec![topic], &self.mesh, - self.peer_topics.get(&peer), &mut self.events, &self.connected_peers, ); @@ -2543,7 +2518,6 @@ where *peer, topic_hash, &self.mesh, - self.peer_topics.get(peer), &mut self.events, &self.connected_peers, ); @@ -2577,11 +2551,11 @@ where // Add explicit peers for peer_id in &self.explicit_peers { - if let Some(topics) = self.peer_topics.get(peer_id) { + if let Some(peer) = self.connected_peers.get(peer_id) { if Some(peer_id) != propagation_source && !originating_peers.contains(peer_id) && Some(peer_id) != message.source.as_ref() - && topics.contains(&message.topic) + && peer.topics.contains(&message.topic) { recipient_peers.insert(*peer_id); } @@ -2790,6 +2764,7 @@ where .or_insert(PeerConnections { kind: PeerKind::Floodsub, connections: vec![], + topics: Default::default(), }) .connections .push(connection_id); @@ -2798,9 +2773,6 @@ where return; // Not our first connection to this peer, hence nothing to do. } - // Insert an empty set of the topics of this peer until known. - self.peer_topics.insert(peer_id, Default::default()); - if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.add_peer(peer_id); } @@ -2843,28 +2815,26 @@ where if remaining_established != 0 { // Remove the connection from the list - if let Some(connections) = self.connected_peers.get_mut(&peer_id) { - let index = connections + if let Some(peer) = self.connected_peers.get_mut(&peer_id) { + let index = peer .connections .iter() .position(|v| v == &connection_id) .expect("Previously established connection to peer must be present"); - connections.connections.remove(index); + peer.connections.remove(index); // If there are more connections and this peer is in a mesh, inform the first connection // handler. - if !connections.connections.is_empty() { - if let Some(topics) = self.peer_topics.get(&peer_id) { - for topic in topics { - if let Some(mesh_peers) = self.mesh.get(topic) { - if mesh_peers.contains(&peer_id) { - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::JoinedMesh, - handler: NotifyHandler::One(connections.connections[0]), - }); - break; - } + if !peer.connections.is_empty() { + for topic in &peer.topics { + if let Some(mesh_peers) = self.mesh.get(topic) { + if mesh_peers.contains(&peer_id) { + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + event: HandlerIn::JoinedMesh, + handler: NotifyHandler::One(peer.connections[0]), + }); + break; } } } @@ -2874,7 +2844,7 @@ where // remove from mesh, topic_peers, peer_topic and the fanout tracing::debug!(peer=%peer_id, "Peer disconnected"); { - let Some(topics) = self.peer_topics.get(&peer_id) else { + let Some(peer) = self.connected_peers.get(&peer_id) else { debug_assert!( self.blacklisted_peers.contains(&peer_id), "Disconnected node not in connected list" @@ -2883,7 +2853,7 @@ where }; // remove peer from all mappings - for topic in topics { + for topic in peer.topics.iter() { // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(topic) { // check if the peer is in the mesh and remove it @@ -2895,24 +2865,8 @@ where }; } - // remove from topic_peers - if let Some(peer_list) = self.topic_peers.get_mut(topic) { - if !peer_list.remove(&peer_id) { - // debugging purposes - tracing::warn!( - peer=%peer_id, - "Disconnected node: peer not in topic_peers" - ); - } - if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic, peer_list.len()) - } - } else { - tracing::warn!( - peer=%peer_id, - topic=%topic, - "Disconnected node: peer with topic not in topic_peers" - ); + if let Some(m) = self.metrics.as_mut() { + m.dec_topic_peers(topic); } // remove from fanout @@ -2926,11 +2880,6 @@ where self.px_peers.remove(&peer_id); self.outbound_peers.remove(&peer_id); - // Remove peer from peer_topics and connected_peers - // NOTE: It is possible the peer has already been removed from all mappings if it does not - // support the protocol. - self.peer_topics.remove(&peer_id); - // If metrics are enabled, register the disconnection of a peer based on its protocol. if let Some(metrics) = self.metrics.as_mut() { let peer_kind = &self @@ -3190,7 +3139,6 @@ fn peer_added_to_mesh( peer_id: PeerId, new_topics: Vec<&TopicHash>, mesh: &HashMap>, - known_topics: Option<&BTreeSet>, events: &mut VecDeque>, connections: &HashMap, ) { @@ -3204,8 +3152,8 @@ fn peer_added_to_mesh( conn.connections[0] }; - if let Some(topics) = known_topics { - for topic in topics { + if let Some(peer) = connections.get(&peer_id) { + for topic in &peer.topics { if !new_topics.contains(&topic) { if let Some(mesh_peers) = mesh.get(topic) { if mesh_peers.contains(&peer_id) { @@ -3231,7 +3179,6 @@ fn peer_removed_from_mesh( peer_id: PeerId, old_topic: &TopicHash, mesh: &HashMap>, - known_topics: Option<&BTreeSet>, events: &mut VecDeque>, connections: &HashMap, ) { @@ -3243,8 +3190,8 @@ fn peer_removed_from_mesh( .first() .expect("There should be at least one connection to a peer."); - if let Some(topics) = known_topics { - for topic in topics { + if let Some(peer) = connections.get(&peer_id) { + for topic in &peer.topics { if topic != old_topic { if let Some(mesh_peers) = mesh.get(topic) { if mesh_peers.contains(&peer_id) { @@ -3267,28 +3214,19 @@ fn peer_removed_from_mesh( /// filtered by the function `f`. The number of peers to get equals the output of `n_map` /// that gets as input the number of filtered peers. fn get_random_peers_dynamic( - topic_peers: &HashMap>, connected_peers: &HashMap, topic_hash: &TopicHash, // maps the number of total peers to the number of selected peers n_map: impl Fn(usize) -> usize, mut f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { - let mut gossip_peers = match topic_peers.get(topic_hash) { - // if they exist, filter the peers by `f` - Some(peer_list) => peer_list - .iter() - .copied() - .filter(|p| { - f(p) && match connected_peers.get(p) { - Some(connections) if connections.kind == PeerKind::Gossipsub => true, - Some(connections) if connections.kind == PeerKind::Gossipsubv1_1 => true, - _ => false, - } - }) - .collect(), - None => Vec::new(), - }; + let mut gossip_peers = connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(topic_hash)) + .filter(|(peer_id, _)| f(peer_id)) + .filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1) + .map(|(peer_id, _)| *peer_id) + .collect::>(); // if we have less than needed, return them let n = n_map(gossip_peers.len()); @@ -3309,13 +3247,12 @@ fn get_random_peers_dynamic( /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` /// filtered by the function `f`. fn get_random_peers( - topic_peers: &HashMap>, connected_peers: &HashMap, topic_hash: &TopicHash, n: usize, f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { - get_random_peers_dynamic(topic_peers, connected_peers, topic_hash, |_| n, f) + get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f) } /// Validates the combination of signing, privacy and message validation to ensure the @@ -3355,8 +3292,6 @@ impl fmt::Debug for Behaviour>(), "First peer should be subscribed to three topics" ); - let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone(); + let peer1 = gs.connected_peers.get(&peers[1]).unwrap(); assert!( - peer_topics == topic_hashes.iter().take(3).cloned().collect(), + peer1.topics + == topic_hashes + .iter() + .take(3) + .cloned() + .collect::>(), "Second peer should be subscribed to three topics" ); assert!( - !gs.peer_topics.contains_key(&unknown_peer), + !gs.connected_peers.contains_key(&unknown_peer), "Unknown peer should not have been added" ); for topic_hash in topic_hashes[..3].iter() { - let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone(); + let topic_peers = gs + .connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(topic_hash)) + .map(|(peer_id, _)| *peer_id) + .collect::>(); assert!( topic_peers == peers[..2].iter().cloned().collect(), "Two peers should be added to the first three topics" @@ -894,13 +914,21 @@ fn test_handle_received_subscriptions() { &peers[0], ); - let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); - assert!( - peer_topics == topic_hashes[1..3].iter().cloned().collect(), + let peer = gs.connected_peers.get(&peers[0]).unwrap().clone(); + assert_eq!( + peer.topics, + topic_hashes[1..3].iter().cloned().collect::>(), "Peer should be subscribed to two topics" ); - let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment + // only gossipsub at the moment + let topic_peers = gs + .connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(&topic_hashes[0])) + .map(|(peer_id, _)| *peer_id) + .collect::>(); + assert!( topic_peers == peers[1..2].iter().cloned().collect(), "Only the second peers should be in the first topic" @@ -924,9 +952,8 @@ fn test_get_random_peers() { for _ in 0..20 { peers.push(PeerId::random()) } - - gs.topic_peers - .insert(topic_hash.clone(), peers.iter().cloned().collect()); + let mut topics = BTreeSet::new(); + topics.insert(topic_hash.clone()); gs.connected_peers = peers .iter() @@ -936,52 +963,32 @@ fn test_get_random_peers() { PeerConnections { kind: PeerKind::Gossipsubv1_1, connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), }, ) }) .collect(); - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { - true - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true); assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned"); - let random_peers = get_random_peers( - &gs.topic_peers, - &gs.connected_peers, - &topic_hash, - 30, - |_| true, - ); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = get_random_peers( - &gs.topic_peers, - &gs.connected_peers, - &topic_hash, - 20, - |_| true, - ); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| { - true - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); // test the filter - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { - false - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); - let random_peers = get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, { + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, { |peer| peers.contains(peer) }); assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index e044ca67e71..7d4acada3c7 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -355,12 +355,17 @@ impl Metrics { } } - /// Register how many peers do we known are subscribed to this topic. - pub(crate) fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) { + /// Increase the number of peers that are subscribed to this topic. + pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { - self.topic_peers_count - .get_or_create(topic) - .set(count as i64); + self.topic_peers_count.get_or_create(topic).inc(); + } + } + + /// Decrease the number of peers that are subscribed to this topic. + pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.topic_peers_count.get_or_create(topic).dec(); } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index d1b92ff0ba8..a88f4822ac2 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -24,8 +24,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; -use std::fmt; use std::fmt::Debug; +use std::{collections::BTreeSet, fmt}; use crate::rpc_proto::proto; #[cfg(feature = "serde")] @@ -77,6 +77,8 @@ pub(crate) struct PeerConnections { pub(crate) kind: PeerKind, /// Its current connections. pub(crate) connections: Vec, + /// Subscribed topics. + pub(crate) topics: BTreeSet, } /// Describes the types of peers that can exist in the gossipsub context. diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 12ccca2d7f1..d0ab7986aad 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -2,6 +2,9 @@ - Expose a kad query facility allowing specify num_results dynamicly. See [PR 5555](https://github.com/libp2p/rust-libp2p/pull/5555). +- Add `mode` getter on `Behaviour`. + See [PR 5573](https://github.com/libp2p/rust-libp2p/pull/5573). + ## 0.46.2 diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 50715c53c74..0b15e507ba4 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1111,6 +1111,11 @@ where } } + /// Get the [`Mode`] in which the DHT is currently operating. + pub fn mode(&self) -> Mode { + self.mode + } + fn reconfigure_mode(&mut self) { if self.connections.is_empty() { return; diff --git a/protocols/perf/Dockerfile b/protocols/perf/Dockerfile index 6523e3bede1..1bd846cc228 100644 --- a/protocols/perf/Dockerfile +++ b/protocols/perf/Dockerfile @@ -1,5 +1,5 @@ # syntax=docker/dockerfile:1.5-labs -FROM rust:1.67.0 as builder +FROM rust:1.75.0 as builder # Run with access to the target cache to speed up builds WORKDIR /workspace