Skip to content

Commit

Permalink
Merge branch 'master' into webrtc-message-size
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel authored Sep 3, 2024
2 parents 094c764 + 93169cc commit 546a800
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 288 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ libp2p-core = { version = "0.42.0", path = "core" }
libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.42.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" }
libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.45.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.9" }
libp2p-kad = { version = "0.47.0", path = "protocols/kad" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.47.1

- Attempt to publish to at least mesh_n peers when flood publish is disabled.
See [PR 5578](https://github.com/libp2p/rust-libp2p/pull/5578).

## 0.47.0

<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-gossipsub"
edition = "2021"
rust-version = { workspace = true }
description = "Gossipsub protocol for libp2p"
version = "0.47.0"
version = "0.47.1"
authors = ["Age Manning <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
399 changes: 167 additions & 232 deletions protocols/gossipsub/src/behaviour.rs

Large diffs are not rendered by default.

101 changes: 54 additions & 47 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ fn test_unsubscribe() {

for topic_hash in &topic_hashes {
assert!(
gs.topic_peers.contains_key(topic_hash),
gs.connected_peers
.values()
.any(|p| p.topics.contains(topic_hash)),
"Topic_peers contain a topic entry"
);
assert!(
Expand Down Expand Up @@ -629,8 +631,11 @@ fn test_publish_without_flood_publishing() {

// all peers should be subscribed to the topic
assert_eq!(
gs.topic_peers.get(&topic_hashes[0]).map(|p| p.len()),
Some(20),
gs.connected_peers
.values()
.filter(|p| p.topics.contains(&topic_hashes[0]))
.count(),
20,
"Peers should be subscribed to the topic"
);

Expand Down Expand Up @@ -669,8 +674,8 @@ fn test_publish_without_flood_publishing() {
let config: Config = Config::default();
assert_eq!(
publishes.len(),
config.mesh_n_low(),
"Should send a publish message to all known peers"
config.mesh_n(),
"Should send a publish message to at least mesh_n peers"
);

assert!(
Expand Down Expand Up @@ -809,9 +814,9 @@ fn test_inject_connected() {

// should add the new peers to `peer_topics` with an empty vec as a gossipsub node
for peer in peers {
let known_topics = gs.peer_topics.get(&peer).unwrap();
let peer = gs.connected_peers.get(&peer).unwrap();
assert!(
known_topics == &topic_hashes.iter().cloned().collect(),
peer.topics == topic_hashes.iter().cloned().collect(),
"The topics for each node should all topics"
);
}
Expand Down Expand Up @@ -860,24 +865,39 @@ fn test_handle_received_subscriptions() {

// verify the result

let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
let peer = gs.connected_peers.get(&peers[0]).unwrap();
assert!(
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
peer.topics
== topic_hashes
.iter()
.take(3)
.cloned()
.collect::<BTreeSet<_>>(),
"First peer should be subscribed to three topics"
);
let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone();
let peer1 = gs.connected_peers.get(&peers[1]).unwrap();
assert!(
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
peer1.topics
== topic_hashes
.iter()
.take(3)
.cloned()
.collect::<BTreeSet<_>>(),
"Second peer should be subscribed to three topics"
);

assert!(
!gs.peer_topics.contains_key(&unknown_peer),
!gs.connected_peers.contains_key(&unknown_peer),
"Unknown peer should not have been added"
);

for topic_hash in topic_hashes[..3].iter() {
let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone();
let topic_peers = gs
.connected_peers
.iter()
.filter(|(_, p)| p.topics.contains(topic_hash))
.map(|(peer_id, _)| *peer_id)
.collect::<BTreeSet<PeerId>>();
assert!(
topic_peers == peers[..2].iter().cloned().collect(),
"Two peers should be added to the first three topics"
Expand All @@ -894,13 +914,21 @@ fn test_handle_received_subscriptions() {
&peers[0],
);

let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
assert!(
peer_topics == topic_hashes[1..3].iter().cloned().collect(),
let peer = gs.connected_peers.get(&peers[0]).unwrap().clone();
assert_eq!(
peer.topics,
topic_hashes[1..3].iter().cloned().collect::<BTreeSet<_>>(),
"Peer should be subscribed to two topics"
);

let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment
// only gossipsub at the moment
let topic_peers = gs
.connected_peers
.iter()
.filter(|(_, p)| p.topics.contains(&topic_hashes[0]))
.map(|(peer_id, _)| *peer_id)
.collect::<BTreeSet<PeerId>>();

assert!(
topic_peers == peers[1..2].iter().cloned().collect(),
"Only the second peers should be in the first topic"
Expand All @@ -924,9 +952,8 @@ fn test_get_random_peers() {
for _ in 0..20 {
peers.push(PeerId::random())
}

gs.topic_peers
.insert(topic_hash.clone(), peers.iter().cloned().collect());
let mut topics = BTreeSet::new();
topics.insert(topic_hash.clone());

gs.connected_peers = peers
.iter()
Expand All @@ -936,52 +963,32 @@ fn test_get_random_peers() {
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
},
)
})
.collect();

let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
true
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true);
assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned");
let random_peers = get_random_peers(
&gs.topic_peers,
&gs.connected_peers,
&topic_hash,
30,
|_| true,
);
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(
random_peers == peers.iter().cloned().collect(),
"Expected no shuffling"
);
let random_peers = get_random_peers(
&gs.topic_peers,
&gs.connected_peers,
&topic_hash,
20,
|_| true,
);
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(
random_peers == peers.iter().cloned().collect(),
"Expected no shuffling"
);
let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| {
true
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true);
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
// test the filter
let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
false
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false);
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
let random_peers = get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, {
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, {
|peer| peers.contains(peer)
});
assert!(random_peers.len() == 10, "Expected 10 peers to be returned");
Expand Down
15 changes: 10 additions & 5 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,17 @@ impl Metrics {
}
}

/// Register how many peers do we known are subscribed to this topic.
pub(crate) fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) {
/// Increase the number of peers that are subscribed to this topic.
pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count
.get_or_create(topic)
.set(count as i64);
self.topic_peers_count.get_or_create(topic).inc();
}
}

/// Decrease the number of peers that are subscribed to this topic.
pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count.get_or_create(topic).dec();
}
}

Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use quick_protobuf::MessageWrite;
use std::fmt;
use std::fmt::Debug;
use std::{collections::BTreeSet, fmt};

use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -77,6 +77,8 @@ pub(crate) struct PeerConnections {
pub(crate) kind: PeerKind,
/// Its current connections.
pub(crate) connections: Vec<ConnectionId>,
/// Subscribed topics.
pub(crate) topics: BTreeSet<TopicHash>,
}

/// Describes the types of peers that can exist in the gossipsub context.
Expand Down

0 comments on commit 546a800

Please sign in to comment.