From 7bbca116bcfa18278c252763ef3581c614ec3c5f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 25 Oct 2023 09:53:38 +1100 Subject: [PATCH] feat(swarm): don't close connection in `OneShotHandler` Related: #3591. Pull-Request: #4715. --- protocols/floodsub/src/layer.rs | 18 +++++++++++++----- swarm/CHANGELOG.md | 3 +++ swarm/src/handler/one_shot.rs | 26 ++++++++------------------ 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index a8598f397aa..5f80f63c38e 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -30,8 +30,8 @@ use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::{ - dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, - OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + dial_opts::DialOpts, CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, + NotifyHandler, OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use log::warn; use smallvec::SmallVec; @@ -354,13 +354,21 @@ impl NetworkBehaviour for Floodsub { fn on_connection_handler_event( &mut self, propagation_source: PeerId, - _connection_id: ConnectionId, + connection_id: ConnectionId, event: THandlerOutEvent, ) { // We ignore successful sends or timeouts. let event = match event { - InnerMessage::Rx(event) => event, - InnerMessage::Sent => return, + Ok(InnerMessage::Rx(event)) => event, + Ok(InnerMessage::Sent) => return, + Err(e) => { + log::debug!("Failed to send floodsub message: {e}"); + self.events.push_back(ToSwarm::CloseConnection { + peer_id: propagation_source, + connection: CloseConnection::One(connection_id), + }); + return; + } }; // Update connected peers topics diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index d54f19121df..36c76bcdc2b 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -8,6 +8,9 @@ See [PR 4225](https://github.com/libp2p/rust-libp2p/pull/4225). - Remove deprecated `keep_alive_timeout` in `OneShotHandlerConfig`. See [PR 4677](https://github.com/libp2p/rust-libp2p/pull/4677). +- Don't close entire connection upon `DialUpgradeError`s within `OneShotHandler`. + Instead, the error is reported as `Err(e)` via `ConnectionHandler::ToBehaviour`. + See [PR 4715](https://github.com/libp2p/rust-libp2p/pull/4715). ## 0.43.6 diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 7f422cfa7d0..68854bdcaa3 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -20,10 +20,10 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError, - SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; +use crate::StreamUpgradeError; use smallvec::SmallVec; use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration}; @@ -35,10 +35,8 @@ where { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, - /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: Option::Error>>, /// Queue of events to produce in `poll()`. - events_out: SmallVec<[TEvent; 4]>, + events_out: SmallVec<[Result>; 4]>, /// Queue of outbound substreams to open. dial_queue: SmallVec<[TOutbound; 4]>, /// Current number of concurrent outbound substreams being opened. @@ -60,7 +58,6 @@ where ) -> Self { OneShotHandler { listen_protocol, - pending_error: None, events_out: SmallVec::new(), dial_queue: SmallVec::new(), dial_negotiated: 0, @@ -121,8 +118,8 @@ where TEvent: Debug + Send + 'static, { type FromBehaviour = TOutbound; - type ToBehaviour = TEvent; - type Error = StreamUpgradeError<::Error>; + type ToBehaviour = Result>; + type Error = void::Void; type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; type OutboundOpenInfo = (); @@ -151,10 +148,6 @@ where Self::Error, >, > { - if let Some(err) = self.pending_error.take() { - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - if !self.events_out.is_empty() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( self.events_out.remove(0), @@ -197,20 +190,17 @@ where protocol: out, .. }) => { - self.events_out.push(out.into()); + self.events_out.push(Ok(out.into())); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol: out, .. }) => { self.dial_negotiated -= 1; - self.events_out.push(out.into()); + self.events_out.push(Ok(out.into())); } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { - if self.pending_error.is_none() { - log::debug!("DialUpgradeError: {error}"); - self.keep_alive = KeepAlive::No; - } + self.events_out.push(Err(error)); } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_)