Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): implement gossipsub 1.2 beta #5697

Merged
merged 22 commits into from
Dec 27, 2024
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge with remote branch origin/master
  • Loading branch information
hopinheimer committed Nov 28, 2024
commit ad94224bfc467d7293f291a7b0061ad947f1a3ad
18 changes: 3 additions & 15 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@
time::Duration,
};

use futures::{FutureExt, StreamExt};

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Check rustdoc intra-doc links

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / clippy (1.80.0)

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Compile on wasm32-unknown-unknown

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / clippy (beta)

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / examples

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Compile on wasm32-unknown-emscripten

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Compile with MSRV

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Test libp2p

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Test libp2p-gossipsub

unused import: `StreamExt`

Check failure on line 31 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Test libp2p-metrics

unused import: `StreamExt`
use futures_timer::Delay;
use libp2p_core::{
multiaddr::Protocol::{Ip4, Ip6},
@@ -46,16 +46,8 @@
use quick_protobuf::{MessageWrite, Writer};
use rand::{seq::SliceRandom, thread_rng};
use web_time::{Instant, SystemTime};
use hashlink::LinkedHashMap;

use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
use crate::time_cache::DuplicateCache;
use crate::topic::{Hasher, Topic, TopicHash};
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::{ControlAction, IDontWant, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, SubscriptionAction};
use crate::types::{PeerConnections, PeerKind, RpcOut};
use crate::{backoff::BackoffStorage, FailedMessages};
use crate::{
backoff::BackoffStorage,
config::{Config, ValidationMode},
@@ -72,16 +64,12 @@
topic::{Hasher, Topic, TopicHash},
transform::{DataTransform, IdentityTransform},
types::{
ControlAction, Graft, IHave, IWant, Message, MessageAcceptance, MessageId, PeerConnections,
ControlAction, Graft, IHave, IWant, IDontWant, Message, MessageAcceptance, MessageId, PeerConnections,
PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction,
},
FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use quick_protobuf::{MessageWrite, Writer};
use std::{cmp::Ordering::Equal, fmt::Debug};
use hashlink::LinkedHashMap;


#[cfg(test)]
mod tests;
@@ -2432,7 +2420,7 @@
// Flush stale IDONTWANTs.
for peer in self.connected_peers.values_mut() {
while let Some((_front, instant)) = peer.dont_send.front() {
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {

Check failure on line 2423 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Compile on wasm32-unknown-unknown

mismatched types
break;
} else {
peer.dont_send.pop_front();
@@ -3309,7 +3297,7 @@
metrics.register_idontwant(message_ids.len());
}
for message_id in message_ids {
peer.dont_send.insert(message_id, Instant::now());

Check failure on line 3300 in protocols/gossipsub/src/behaviour.rs

GitHub Actions / Compile on wasm32-unknown-unknown

mismatched types
// Don't exceed capacity.
if peer.dont_send.len() > IDONTWANT_CAP {
peer.dont_send.pop_front();
28 changes: 2 additions & 26 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -18,12 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::config::ValidationMode;
use crate::handler::HandlerEvent;
use crate::rpc_proto::proto;
use crate::topic::TopicHash;
use crate::types::{ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, RawMessage, Rpc, Subscription, SubscriptionAction};
use crate::ValidationError;
use std::{convert::Infallible, pin::Pin};

use asynchronous_codec::{Decoder, Encoder, Framed};
use byteorder::{BigEndian, ByteOrder};
use bytes::BytesMut;
@@ -47,11 +43,6 @@ use crate::{

pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";

pub(crate) const GOSSIPSUB_1_2_0_BETA_PROTOCOL: ProtocolId = ProtocolId {
protocol: StreamProtocol::new("/meshsub/1.2.0"),
kind: PeerKind::Gossipsubv1_2_beta,
};

pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
protocol: StreamProtocol::new("/meshsub/1.1.0"),
kind: PeerKind::Gossipsubv1_1,
@@ -488,25 +479,10 @@ impl Decoder for GossipsubCodec {
}));
}

let idontwant_msgs: Vec<ControlAction> = rpc_control
.idontwant
.into_iter()
.map(|idontwant| {
ControlAction::IDontWant(IDontWant {
message_ids: idontwant
.message_ids
.into_iter()
.map(MessageId::from)
.collect::<Vec<_>>(),
})
})
.collect();

control_msgs.extend(ihave_msgs);
control_msgs.extend(iwant_msgs);
control_msgs.extend(graft_msgs);
control_msgs.extend(prune_msgs);
control_msgs.extend(idontwant_msgs);
}

Ok(Some(HandlerEvent::Message {
7 changes: 2 additions & 5 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
@@ -26,13 +26,10 @@ use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use quick_protobuf::MessageWrite;
use std::fmt::Debug;
use std::{collections::BTreeSet, fmt};
use std::time::Instant;
use hashlink::LinkedHashMap;
use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::time::Instant;
use hashlink::LinkedHashMap;

use crate::{rpc::Sender, rpc_proto::proto, TopicHash};

You are viewing a condensed version of this merge commit. You can view the full changes here.