diff --git a/Cargo.lock b/Cargo.lock index f4fd13cfccd..b135804f917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,6 +194,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "arraydeque" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" + [[package]] name = "arrayref" version = "0.3.7" @@ -2562,6 +2568,7 @@ dependencies = [ name = "libp2p-gossipsub" version = "0.46.0" dependencies = [ + "arraydeque", "async-std", "asynchronous-codec 0.6.2", "base64 0.21.4", @@ -2591,7 +2598,6 @@ dependencies = [ "regex", "serde", "sha2 0.10.8", - "smallvec", "unsigned-varint", "void", ] diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index b86ec4de6d4..0db1ef48edc 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -5,9 +5,9 @@ - Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`. See [PR 4642](https://github.com/libp2p/rust-libp2p/pull/4642). - Return typed error from config builder. - See [PR 4445]. - -[PR 4445]: https://github.com/libp2p/rust-libp2p/pull/4445 + See [PR 4445](https://github.com/libp2p/rust-libp2p/pull/4445). +- Bound send_queue with an `ArrayDeque` and drop front-most item if full. + See [PR 4756](https://github.com/libp2p/rust-libp2p/pull/4756). ## 0.45.2 diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 58ea50161a4..7df0c166625 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -35,12 +35,12 @@ rand = "0.8" regex = "1.10.2" serde = { version = "1", optional = true, features = ["derive"] } sha2 = "0.10.8" -smallvec = "1.11.1" unsigned-varint = { version = "0.7.2", features = ["asynchronous_codec"] } void = "1.0.2" # Metrics dependencies prometheus-client = { workspace = true } +arraydeque = "0.5.1" [dev-dependencies] async-std = { version = "1.6.3", features = ["unstable"] } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 44258bb5394..c074123086e 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -22,6 +22,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; use crate::types::{PeerKind, RawMessage, Rpc}; use crate::ValidationError; +use arraydeque::{ArrayDeque, Wrapping}; use asynchronous_codec::Framed; use futures::future::Either; use futures::prelude::*; @@ -33,7 +34,7 @@ use libp2p_swarm::handler::{ FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p_swarm::Stream; -use smallvec::SmallVec; +use log::warn; use std::{ pin::Pin, task::{Context, Poll}, @@ -95,7 +96,7 @@ pub struct EnabledHandler { inbound_substream: Option, /// Queue of values that we want to send to the remote. - send_queue: SmallVec<[proto::RPC; 16]>, + send_queue: ArrayDeque, /// Flag indicating that an outbound substream is being established to prevent duplicate /// requests. @@ -167,7 +168,7 @@ impl Handler { outbound_substream_establishing: false, outbound_substream_attempts: 0, inbound_substream_attempts: 0, - send_queue: SmallVec::new(), + send_queue: ArrayDeque::new(), peer_kind: None, peer_kind_sent: false, last_io_activity: Instant::now(), @@ -315,8 +316,7 @@ impl EnabledHandler { ) { // outbound idle state Some(OutboundSubstreamState::WaitingOutput(substream)) => { - if let Some(message) = self.send_queue.pop() { - self.send_queue.shrink_to_fit(); + if let Some(message) = self.send_queue.pop_front() { self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); continue; @@ -409,7 +409,11 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, message: HandlerIn) { match self { Handler::Enabled(handler) => match message { - HandlerIn::Message(m) => handler.send_queue.push(m), + HandlerIn::Message(m) => { + if handler.send_queue.push_back(m).is_some() { + warn!("Handler send queue is full. Dropping message at head"); + } + } HandlerIn::JoinedMesh => { handler.in_mesh = true; }