diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index a471943a37f..3b23a1a2a6b 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{ ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; use std::task::{Context, Poll}; @@ -249,20 +249,12 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - if !self.queued_events.is_empty() { - return KeepAlive::Yes; - } - - if self.inbound_connect.is_some() { - return KeepAlive::Yes; - } - + fn connection_keep_alive(&self) -> bool { if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - return KeepAlive::Yes; + return true; } - KeepAlive::No + false } fn poll( diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5c5260b4b13..4db3342365f 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3265,7 +3265,6 @@ where type ConnectionHandler = Handler; type ToSwarm = Event; - #[allow(deprecated)] fn handle_established_inbound_connection( &mut self, _: ConnectionId, @@ -3276,7 +3275,6 @@ where Ok(Handler::new(self.config.protocol_config())) } - #[allow(deprecated)] fn handle_established_outbound_connection( &mut self, _: ConnectionId, diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 1a50ef88fd5..44258bb5394 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -30,8 +30,7 @@ use instant::Instant; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError, - SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p_swarm::Stream; use smallvec::SmallVec; @@ -424,26 +423,8 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - match self { - Handler::Enabled(handler) => { - if handler.in_mesh { - return KeepAlive::Yes; - } - - if let Some( - OutboundSubstreamState::PendingSend(_, _) - | OutboundSubstreamState::PendingFlush(_), - ) = handler.outbound_substream - { - return KeepAlive::Yes; - } - - #[allow(deprecated)] - KeepAlive::No - } - Handler::Disabled(_) => KeepAlive::No, - } + fn connection_keep_alive(&self) -> bool { + matches!(self, Handler::Enabled(h) if h.in_mesh) } fn poll( diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index b71f98e8509..51501d79f9c 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{ ProtocolSupport, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, SubstreamProtocol, SupportedProtocols, }; use log::{warn, Level}; @@ -314,14 +314,6 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - if !self.active_streams.is_empty() { - return KeepAlive::Yes; - } - - KeepAlive::No - } - fn poll( &mut self, cx: &mut Context<'_>, diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index b8dcf59139a..fce77bc13e4 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamUpgradeError, + ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamUpgradeError, SubstreamProtocol, SupportedProtocols, }; use log::trace; @@ -702,14 +702,6 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { - return KeepAlive::No; - }; - - KeepAlive::Yes - } - fn poll( &mut self, cx: &mut Context<'_>, diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 522663196e6..3a92ef4b249 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -28,8 +28,8 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, - StreamUpgradeError, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, }; use std::collections::VecDeque; use std::{ @@ -225,10 +225,6 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, _: Void) {} - fn connection_keep_alive(&self) -> KeepAlive { - KeepAlive::No - } - fn poll( &mut self, cx: &mut Context<'_>, @@ -349,15 +345,17 @@ impl ConnectionHandler for Handler { ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol: stream, + protocol: mut stream, .. }) => { + stream.ignore_for_keep_alive(); self.inbound = Some(protocol::recv_ping(stream).boxed()); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: stream, + protocol: mut stream, .. }) => { + stream.ignore_for_keep_alive(); self.outbound = Some(OutboundState::Ping( send_ping(stream, self.config.timeout).boxed(), )); diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 6fb0a834d2f..13619cb45c6 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -37,7 +37,7 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; @@ -376,10 +376,6 @@ pub struct Handler { /// /// Contains a [`futures::future::Future`] for each lend out substream that /// resolves once the substream is dropped. - /// - /// Once all substreams are dropped and this handler has no other work, - /// [`KeepAlive::Until`] can be set, allowing the connection to be closed - /// eventually. alive_lend_out_substreams: FuturesUnordered>, /// Futures relaying data for circuit between two peers. circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, @@ -615,13 +611,12 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - match self.idle_at { - Some(idle_at) if Instant::now().duration_since(idle_at) > Duration::from_secs(10) => { - KeepAlive::No - } - _ => KeepAlive::Yes, - } + fn connection_keep_alive(&self) -> bool { + let Some(idle_at) = self.idle_at else { + return true; + }; + + Instant::now().duration_since(idle_at) <= Duration::from_secs(10) } fn poll( @@ -881,13 +876,7 @@ impl ConnectionHandler for Handler { {} // Check keep alive status. - if self.reservation_request_future.is_none() - && self.circuit_accept_futures.is_empty() - && self.circuit_deny_futures.is_empty() - && self.alive_lend_out_substreams.is_empty() - && self.circuits.is_empty() - && self.active_reservation.is_none() - { + if self.active_reservation.is_none() { if self.idle_at.is_none() { self.idle_at = Some(Instant::now()); } diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 2c6db0008bd..fb7428e3133 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -37,7 +37,7 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use log::debug; @@ -319,28 +319,8 @@ impl ConnectionHandler for Handler { } } - fn connection_keep_alive(&self) -> KeepAlive { - if self.reservation.is_some() { - return KeepAlive::Yes; - } - - if !self.alive_lend_out_substreams.is_empty() { - return KeepAlive::Yes; - } - - if !self.circuit_deny_futs.is_empty() { - return KeepAlive::Yes; - } - - if !self.open_circuit_futs.is_empty() { - return KeepAlive::Yes; - } - - if !self.outbound_circuits.is_empty() { - return KeepAlive::Yes; - } - - KeepAlive::No + fn connection_keep_alive(&self) -> bool { + self.reservation.is_some() } fn poll( diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 69929b77873..3a5fa8b0e61 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -32,7 +32,7 @@ use libp2p_swarm::handler::{ ListenUpgradeError, }; use libp2p_swarm::{ - handler::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError}, + handler::{ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError}, SubstreamProtocol, }; use smallvec::SmallVec; @@ -59,8 +59,6 @@ where /// The timeout for inbound and outbound substreams (i.e. request /// and response processing). substream_timeout: Duration, - /// The current connection keep-alive. - keep_alive: KeepAlive, /// Queue of events to emit in `poll()`. pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. @@ -94,7 +92,6 @@ where Self { inbound_protocols, codec, - keep_alive: KeepAlive::Yes, substream_timeout, outbound: VecDeque::new(), inbound: FuturesUnordered::new(), @@ -274,14 +271,9 @@ where } fn on_behaviour_event(&mut self, request: Self::FromBehaviour) { - self.keep_alive = KeepAlive::Yes; self.outbound.push_back(request); } - fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive - } - fn poll( &mut self, cx: &mut Context<'_>, @@ -300,7 +292,6 @@ where match result { Ok(((id, rq), rs_sender)) => { // We received an inbound request. - self.keep_alive = KeepAlive::Yes; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request { request_id: id, request: rq, @@ -330,13 +321,6 @@ where self.outbound.shrink_to_fit(); } - if self.inbound.is_empty() && self.keep_alive.is_yes() { - // No new inbound or outbound requests. We already check - // there is no active streams exist in swarm connection, - // so we can set keep-alive to no directly. - self.keep_alive = KeepAlive::No; - } - Poll::Pending } diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 4f4f9585f0e..cd3713b201f 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -22,8 +22,7 @@ use crate::behaviour::FromSwarm; use crate::connection::ConnectionId; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError, - SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use crate::{ @@ -291,11 +290,11 @@ where .on_behaviour_event(event) } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { self.inner .as_ref() .map(|h| h.connection_keep_alive()) - .unwrap_or(KeepAlive::No) + .unwrap_or(false) } fn poll( diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index a9c56c80d63..aa353a912ca 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -34,10 +34,10 @@ use crate::handler::{ FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsAdded, ProtocolsChange, UpgradeInfoSend, }; +use crate::stream::ActiveStreamCounter; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; use crate::{ - ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError, - SubstreamProtocol, + ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; @@ -52,7 +52,6 @@ use libp2p_core::upgrade; use libp2p_core::upgrade::{NegotiationError, ProtocolError}; use libp2p_core::Endpoint; use libp2p_identity::PeerId; -use std::cmp::max; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::future::Future; @@ -157,6 +156,7 @@ where local_supported_protocols: HashSet, remote_supported_protocols: HashSet, idle_timeout: Duration, + stream_counter: ActiveStreamCounter, } impl fmt::Debug for Connection @@ -205,6 +205,7 @@ where local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), idle_timeout, + stream_counter: ActiveStreamCounter::default(), } } @@ -237,6 +238,7 @@ where local_supported_protocols: supported_protocols, remote_supported_protocols, idle_timeout, + stream_counter, } = self.get_mut(); loop { @@ -344,19 +346,19 @@ where } } - // Compute new shutdown - if let Some(new_shutdown) = - compute_new_shutdown(handler.connection_keep_alive(), shutdown, *idle_timeout) - { - *shutdown = new_shutdown; - } - // Check if the connection (and handler) should be shut down. - // As long as we're still negotiating substreams, shutdown is always postponed. + // As long as we're still negotiating substreams or have any active streams shutdown is always postponed. if negotiating_in.is_empty() && negotiating_out.is_empty() && requested_substreams.is_empty() + && stream_counter.has_no_active_streams() { + if let Some(new_timeout) = + compute_new_shutdown(handler.connection_keep_alive(), shutdown, *idle_timeout) + { + *shutdown = new_timeout; + } + match shutdown { Shutdown::None => {} Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), @@ -367,6 +369,8 @@ where Poll::Pending => {} }, } + } else { + *shutdown = Shutdown::None; } match muxing.poll_unpin(cx)? { @@ -391,6 +395,7 @@ where timeout, upgrade, *substream_upgrade_protocol_override, + stream_counter.clone(), )); continue; // Go back to the top, handler can potentially make progress again. @@ -404,7 +409,11 @@ where Poll::Ready(substream) => { let protocol = handler.listen_protocol(); - negotiating_in.push(StreamUpgrade::new_inbound(substream, protocol)); + negotiating_in.push(StreamUpgrade::new_inbound( + substream, + protocol, + stream_counter.clone(), + )); continue; // Go back to the top, handler can potentially make progress again. } @@ -446,49 +455,14 @@ fn gather_supported_protocols(handler: &impl ConnectionHandler) -> HashSet Option { - #[allow(deprecated)] match (current_shutdown, handler_keep_alive) { - (Shutdown::Later(_, deadline), KeepAlive::Until(t)) => { - let now = Instant::now(); - - if *deadline != t { - let deadline = t; - if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) { - let effective_keep_alive = max(new_duration, idle_timeout); - - let safe_keep_alive = checked_add_fraction(now, effective_keep_alive); - return Some(Shutdown::Later(Delay::new(safe_keep_alive), deadline)); - } - } - None - } - (_, KeepAlive::Until(earliest_shutdown)) => { - let now = Instant::now(); - - if let Some(requested) = earliest_shutdown.checked_duration_since(now) { - let effective_keep_alive = max(requested, idle_timeout); - - let safe_keep_alive = checked_add_fraction(now, effective_keep_alive); - - // Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch. - // This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See / - return Some(Shutdown::Later( - Delay::new(safe_keep_alive), - earliest_shutdown, - )); - } - None - } - (_, KeepAlive::No) if idle_timeout == Duration::ZERO => Some(Shutdown::Asap), - (Shutdown::Later(_, _), KeepAlive::No) => { - // Do nothing, i.e. let the shutdown timer continue to tick. - None - } - (_, KeepAlive::No) => { + (_, false) if idle_timeout == Duration::ZERO => Some(Shutdown::Asap), + (Shutdown::Later(_, _), false) => None, // Do nothing, i.e. let the shutdown timer continue to tick. + (_, false) => { let now = Instant::now(); let safe_keep_alive = checked_add_fraction(now, idle_timeout); @@ -497,7 +471,7 @@ fn compute_new_shutdown( now + safe_keep_alive, )) } - (_, KeepAlive::Yes) => Some(Shutdown::None), + (_, true) => Some(Shutdown::None), } } @@ -547,6 +521,7 @@ impl StreamUpgrade { timeout: Delay, upgrade: Upgrade, version_override: Option, + counter: ActiveStreamCounter, ) -> Self where Upgrade: OutboundUpgradeSend, @@ -578,7 +553,7 @@ impl StreamUpgrade { .map_err(to_stream_upgrade_error)?; let output = upgrade - .upgrade_outbound(Stream::new(stream), info) + .upgrade_outbound(Stream::new(stream, counter), info) .await .map_err(StreamUpgradeError::Apply)?; @@ -592,6 +567,7 @@ impl StreamUpgrade { fn new_inbound( substream: SubstreamBox, protocol: SubstreamProtocol, + counter: ActiveStreamCounter, ) -> Self where Upgrade: InboundUpgradeSend, @@ -610,7 +586,7 @@ impl StreamUpgrade { .map_err(to_stream_upgrade_error)?; let output = upgrade - .upgrade_inbound(Stream::new(stream), info) + .upgrade_inbound(Stream::new(stream, counter), info) .await .map_err(StreamUpgradeError::Apply)?; @@ -933,68 +909,6 @@ mod tests { )); } - #[tokio::test] - async fn idle_timeout_with_keep_alive_until_greater_than_idle_timeout() { - let idle_timeout = Duration::from_millis(100); - - let mut connection = Connection::new( - StreamMuxerBox::new(PendingStreamMuxer), - KeepAliveUntilConnectionHandler { - until: Instant::now() + idle_timeout * 2, - }, - None, - 0, - idle_timeout, - ); - - assert!(connection.poll_noop_waker().is_pending()); - - tokio::time::sleep(idle_timeout).await; - - assert!( - connection.poll_noop_waker().is_pending(), - "`KeepAlive::Until` is greater than idle-timeout, continue sleeping" - ); - - tokio::time::sleep(idle_timeout).await; - - assert!(matches!( - connection.poll_noop_waker(), - Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) - )); - } - - #[tokio::test] - async fn idle_timeout_with_keep_alive_until_less_than_idle_timeout() { - let idle_timeout = Duration::from_millis(100); - - let mut connection = Connection::new( - StreamMuxerBox::new(PendingStreamMuxer), - KeepAliveUntilConnectionHandler { - until: Instant::now() + idle_timeout / 2, - }, - None, - 0, - idle_timeout, - ); - - assert!(connection.poll_noop_waker().is_pending()); - - tokio::time::sleep(idle_timeout / 2).await; - - assert!( - connection.poll_noop_waker().is_pending(), - "`KeepAlive::Until` is less than idle-timeout, honor idle-timeout" - ); - - tokio::time::sleep(idle_timeout / 2).await; - - assert!(matches!( - connection.poll_noop_waker(), - Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) - )); - } - #[test] fn checked_add_fraction_can_add_u64_max() { let _ = env_logger::try_init(); @@ -1048,7 +962,7 @@ mod tests { } fn prop( - handler_keep_alive: KeepAlive, + handler_keep_alive: bool, current_shutdown: ArbitraryShutdown, idle_timeout: Duration, ) { @@ -1058,58 +972,6 @@ mod tests { QuickCheck::new().quickcheck(prop as fn(_, _, _)); } - struct KeepAliveUntilConnectionHandler { - until: Instant, - } - - impl ConnectionHandler for KeepAliveUntilConnectionHandler { - type FromBehaviour = Void; - type ToBehaviour = Void; - type Error = Void; - type InboundProtocol = DeniedUpgrade; - type OutboundProtocol = DeniedUpgrade; - type InboundOpenInfo = (); - type OutboundOpenInfo = Void; - - fn listen_protocol( - &self, - ) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade, ()) - } - - fn connection_keep_alive(&self) -> KeepAlive { - #[allow(deprecated)] - KeepAlive::Until(self.until) - } - - fn poll( - &mut self, - _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, - > { - Poll::Pending - } - - fn on_behaviour_event(&mut self, _: Self::FromBehaviour) {} - - fn on_connection_event( - &mut self, - _: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - } - } - struct DummyStreamMuxer { counter: Arc<()>, } @@ -1308,8 +1170,8 @@ mod tests { void::unreachable(event) } - fn connection_keep_alive(&self) -> KeepAlive { - KeepAlive::Yes + fn connection_keep_alive(&self) -> bool { + true } fn poll( @@ -1385,8 +1247,8 @@ mod tests { void::unreachable(event) } - fn connection_keep_alive(&self) -> KeepAlive { - KeepAlive::Yes + fn connection_keep_alive(&self) -> bool { + true } fn poll( diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 067c9788e4d..3122825e2c0 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -4,8 +4,8 @@ use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use crate::{ - ConnectionDenied, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, - THandler, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, THandler, + THandlerInEvent, THandlerOutEvent, }; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::Endpoint; @@ -94,10 +94,6 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { void::unreachable(event) } - fn connection_keep_alive(&self) -> KeepAlive { - KeepAlive::No - } - fn poll( &mut self, _: &mut Context<'_>, diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 02eb9f83935..aadb435242d 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -55,7 +55,6 @@ pub use select::ConnectionHandlerSelect; use crate::StreamProtocol; use ::either::Either; -use instant::Instant; use libp2p_core::Multiaddr; use once_cell::sync::Lazy; use smallvec::SmallVec; @@ -63,7 +62,7 @@ use std::collections::hash_map::RandomState; use std::collections::hash_set::{Difference, Intersection}; use std::collections::HashSet; use std::iter::Peekable; -use std::{cmp::Ordering, error, fmt, io, task::Context, task::Poll, time::Duration}; +use std::{error, fmt, io, task::Context, task::Poll, time::Duration}; /// A handler for a set of protocols used on a connection with a remote. /// @@ -123,27 +122,27 @@ pub trait ConnectionHandler: Send + 'static { /// > This allows a remote to put the list of supported protocols in a cache. fn listen_protocol(&self) -> SubstreamProtocol; - /// Returns until when the connection should be kept alive. + /// Returns whether the connection should be kept alive. /// - /// This method is called by the `Swarm` after each invocation of - /// [`ConnectionHandler::poll`] to determine if the connection and the associated - /// [`ConnectionHandler`]s should be kept alive as far as this handler is concerned - /// and if so, for how long. + /// ## Keep alive algorithm /// - /// Returning [`KeepAlive::No`] indicates that the connection should be - /// closed and this handler destroyed immediately. + /// A connection is always kept alive: /// - /// Returning [`KeepAlive::Until`] indicates that the connection may be closed - /// and this handler destroyed after the specified `Instant`. + /// - Whilst a [`ConnectionHandler`] returns [`Poll::Ready`]. + /// - We are negotiating inbound or outbound streams. + /// - There are active [`Stream`](crate::Stream)s on the connection. /// - /// Returning [`KeepAlive::Yes`] indicates that the connection should - /// be kept alive until the next call to this method. + /// The combination of the above means that _most_ protocols will not need to override this method. + /// This method is only invoked when all of the above are `false`, i.e. when the connection is entirely idle. /// - /// > **Note**: The connection is always closed and the handler destroyed - /// > when [`ConnectionHandler::poll`] returns an error. Furthermore, the - /// > connection may be closed for reasons outside of the control - /// > of the handler. - fn connection_keep_alive(&self) -> KeepAlive; + /// ## Exceptions + /// + /// - Protocols like [circuit-relay v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) need to keep a connection alive beyond these circumstances and can thus override this method. + /// - Protocols like [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) **don't** want to keep a connection alive despite an active streams. + /// In that case, protocol authors can use [`Stream::ignore_for_keep_alive`](crate::Stream::ignore_for_keep_alive) to opt-out a particular stream from the keep-alive algorithm. + fn connection_keep_alive(&self) -> bool { + false + } /// Should behave like `Stream::poll()`. fn poll( @@ -178,10 +177,6 @@ pub trait ConnectionHandler: Send + 'static { /// Creates a new [`ConnectionHandler`] that selects either this handler or /// `other` by delegating methods calls appropriately. - /// - /// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence, - /// > i.e. is returned from [`ConnectionHandler::connection_keep_alive`] by the returned - /// > handler. fn select(self, other: TProto2) -> ConnectionHandlerSelect where Self: Sized, @@ -543,8 +538,7 @@ pub enum ConnectionHandlerEvent bool { - matches!(*self, KeepAlive::Yes) - } -} - -impl PartialOrd for KeepAlive { - fn partial_cmp(&self, other: &KeepAlive) -> Option { - Some(self.cmp(other)) - } -} - -#[allow(deprecated)] -impl Ord for KeepAlive { - fn cmp(&self, other: &KeepAlive) -> Ordering { - use self::KeepAlive::*; - - match (self, other) { - (No, No) | (Yes, Yes) => Ordering::Equal, - (No, _) | (_, Yes) => Ordering::Less, - (_, No) | (Yes, _) => Ordering::Greater, - (Until(t1), Until(t2)) => t1.cmp(t2), - } - } -} - -#[cfg(test)] -impl quickcheck::Arbitrary for KeepAlive { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - match quickcheck::GenRange::gen_range(g, 1u8..4) { - 1 => - { - #[allow(deprecated)] - KeepAlive::Until( - Instant::now() - .checked_add(Duration::arbitrary(g)) - .unwrap_or(Instant::now()), - ) - } - 2 => KeepAlive::Yes, - 3 => KeepAlive::No, - _ => unreachable!(), - } - } -} - /// A statically declared, empty [`HashSet`] allows us to work around borrow-checker rules for /// [`ProtocolsAdded::from_set`]. The lifetimes don't work unless we have a [`HashSet`] with a `'static' lifetime. static EMPTY_HASHSET: Lazy> = Lazy::new(HashSet::new); diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index 6a60427228d..093900135b8 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -20,7 +20,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, FullyNegotiatedInbound, - InboundUpgradeSend, KeepAlive, ListenUpgradeError, SubstreamProtocol, + InboundUpgradeSend, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use either::Either; @@ -108,7 +108,7 @@ where } } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { match self { Either::Left(handler) => handler.connection_keep_alive(), Either::Right(handler) => handler.connection_keep_alive(), diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index 82cb12a183d..e3458eb5451 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, }; use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll}; @@ -68,7 +68,7 @@ where } } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { self.inner.connection_keep_alive() } diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index 8528b563ece..cc06a4c50c8 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, }; use std::fmt::Debug; use std::task::{Context, Poll}; @@ -61,7 +61,7 @@ where self.inner.on_behaviour_event(event) } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { self.inner.connection_keep_alive() } diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index ced94f1213c..41e0cf42df9 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -23,8 +23,7 @@ use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError, - SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::Stream; @@ -230,12 +229,12 @@ where } } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { self.handlers .values() .map(|h| h.connection_keep_alive()) .max() - .unwrap_or(KeepAlive::No) + .unwrap_or(false) } fn poll( diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 68854bdcaa3..a611bc5073c 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -20,7 +20,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; use crate::StreamUpgradeError; @@ -41,8 +41,6 @@ where dial_queue: SmallVec<[TOutbound; 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, - /// Value to return from `connection_keep_alive`. - keep_alive: KeepAlive, /// The configuration container for the handler config: OneShotHandlerConfig, } @@ -61,7 +59,6 @@ where events_out: SmallVec::new(), dial_queue: SmallVec::new(), dial_negotiated: 0, - keep_alive: KeepAlive::Yes, config, } } @@ -89,7 +86,6 @@ where /// Opens an outbound substream with `upgrade`. pub fn send_request(&mut self, upgrade: TOutbound) { - self.keep_alive = KeepAlive::Yes; self.dial_queue.push(upgrade); } } @@ -133,10 +129,6 @@ where self.send_request(event); } - fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive - } - fn poll( &mut self, _: &mut Context<'_>, @@ -167,10 +159,6 @@ where } } else { self.dial_queue.shrink_to_fit(); - - if self.dial_negotiated == 0 && self.keep_alive.is_yes() { - self.keep_alive = KeepAlive::No; - } } Poll::Pending @@ -220,7 +208,6 @@ pub struct OneShotHandlerConfig { } impl Default for OneShotHandlerConfig { - #[allow(deprecated)] fn default() -> Self { OneShotHandlerConfig { outbound_substream_timeout: Duration::from_secs(10), @@ -239,7 +226,6 @@ mod tests { use void::Void; #[test] - #[allow(deprecated)] fn do_not_keep_idle_connection_alive() { let mut handler: OneShotHandler<_, DeniedUpgrade, Void> = OneShotHandler::new( SubstreamProtocol::new(DeniedUpgrade {}, ()), @@ -252,6 +238,6 @@ mod tests { } })); - assert!(matches!(handler.connection_keep_alive(), KeepAlive::No)); + assert!(matches!(handler.connection_keep_alive(), false)); } } diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs index ee6829356bd..90e6522404e 100644 --- a/swarm/src/handler/pending.rs +++ b/swarm/src/handler/pending.rs @@ -21,7 +21,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, FullyNegotiatedInbound, - FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, + FullyNegotiatedOutbound, SubstreamProtocol, }; use libp2p_core::upgrade::PendingUpgrade; use std::task::{Context, Poll}; @@ -56,10 +56,6 @@ impl ConnectionHandler for PendingConnectionHandler { void::unreachable(v) } - fn connection_keep_alive(&self) -> KeepAlive { - KeepAlive::No - } - fn poll( &mut self, _: &mut Context<'_>, diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 65db4ab525b..957ba43fbe7 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -20,8 +20,8 @@ use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, KeepAlive, - ListenUpgradeError, OutboundUpgradeSend, StreamUpgradeError, SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, ListenUpgradeError, + OutboundUpgradeSend, StreamUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use either::Either; @@ -208,7 +208,7 @@ where } } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { cmp::max( self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive(), diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs deleted file mode 100644 index deae4bf9bb3..00000000000 --- a/swarm/src/keep_alive.rs +++ /dev/null @@ -1,141 +0,0 @@ -use crate::behaviour::{FromSwarm, NetworkBehaviour, ToSwarm}; -use crate::connection::ConnectionId; -use crate::handler::{ - ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, - KeepAlive, SubstreamProtocol, -}; -use crate::{ConnectionDenied, THandler, THandlerInEvent, THandlerOutEvent}; -use libp2p_core::upgrade::DeniedUpgrade; -use libp2p_core::{Endpoint, Multiaddr}; -use libp2p_identity::PeerId; -use std::task::{Context, Poll}; -use void::Void; - -/// Implementation of [`NetworkBehaviour`] that doesn't do anything other than keep all connections alive. -/// -/// This is primarily useful for test code. In can however occasionally be useful for production code too. -/// The caveat is that open connections consume system resources and should thus be shutdown when -/// they are not in use. Connections can also fail at any time so really, your application should be -/// designed to establish them when necessary, making the use of this behaviour likely redundant. -#[derive(Default)] -pub struct Behaviour; - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = ConnectionHandler; - type ToSwarm = Void; - - fn handle_established_inbound_connection( - &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: &Multiaddr, - ) -> Result, ConnectionDenied> { - Ok(ConnectionHandler) - } - - fn handle_established_outbound_connection( - &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: Endpoint, - ) -> Result, ConnectionDenied> { - Ok(ConnectionHandler) - } - - fn on_connection_handler_event( - &mut self, - _: PeerId, - _: ConnectionId, - event: THandlerOutEvent, - ) { - void::unreachable(event) - } - - fn poll(&mut self, _: &mut Context<'_>) -> Poll>> { - Poll::Pending - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(_) - | FromSwarm::ConnectionClosed(_) - | FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) - | FromSwarm::ExternalAddrConfirmed(_) => {} - } - } -} - -/// Implementation of [`ConnectionHandler`] that doesn't handle anything but keeps the connection alive. -#[derive(Clone, Debug)] -pub struct ConnectionHandler; - -impl crate::handler::ConnectionHandler for ConnectionHandler { - type FromBehaviour = Void; - type ToBehaviour = Void; - type Error = Void; - type InboundProtocol = DeniedUpgrade; - type OutboundProtocol = DeniedUpgrade; - type InboundOpenInfo = (); - type OutboundOpenInfo = Void; - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade, ()) - } - - fn on_behaviour_event(&mut self, v: Self::FromBehaviour) { - void::unreachable(v) - } - - fn connection_keep_alive(&self) -> KeepAlive { - KeepAlive::Yes - } - - fn poll( - &mut self, - _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, - > { - Poll::Pending - } - - fn on_connection_event( - &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, .. - }) => void::unreachable(protocol), - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol, .. - }) => void::unreachable(protocol), - ConnectionEvent::DialUpgradeError(_) - | ConnectionEvent::ListenUpgradeError(_) - | ConnectionEvent::AddressChange(_) - | ConnectionEvent::LocalProtocolsChange(_) - | ConnectionEvent::RemoteProtocolsChange(_) => {} - } - } -} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 641357d3ae5..3482865f50e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -67,10 +67,6 @@ pub mod behaviour; pub mod dial_opts; pub mod dummy; pub mod handler; -#[deprecated( - note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead. To keep connections alive 'forever', use `Duration::from_secs(u64::MAX)`." -)] -pub mod keep_alive; mod listen_opts; /// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro. @@ -119,7 +115,7 @@ pub use connection::pool::ConnectionCounters; pub use connection::{ConnectionError, ConnectionId, SupportedProtocols}; pub use executor::Executor; pub use handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, KeepAlive, OneShotHandler, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler, OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol, }; #[cfg(feature = "macros")] diff --git a/swarm/src/stream.rs b/swarm/src/stream.rs index 3c4c52afc33..871352f3c6a 100644 --- a/swarm/src/stream.rs +++ b/swarm/src/stream.rs @@ -1,16 +1,55 @@ use futures::{AsyncRead, AsyncWrite}; use libp2p_core::muxing::SubstreamBox; use libp2p_core::Negotiated; -use std::io::{IoSlice, IoSliceMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + io::{IoSlice, IoSliceMut}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +/// Counter for the number of active streams on a connection. +#[derive(Debug, Clone)] +pub(crate) struct ActiveStreamCounter(Arc<()>); + +impl ActiveStreamCounter { + pub(crate) fn default() -> Self { + Self(Arc::new(())) + } + + pub(crate) fn has_no_active_streams(&self) -> bool { + self.num_alive_streams() == 1 + } + + fn num_alive_streams(&self) -> usize { + Arc::strong_count(&self.0) + } +} #[derive(Debug)] -pub struct Stream(Negotiated); +pub struct Stream { + stream: Negotiated, + counter: Option, +} impl Stream { - pub(crate) fn new(stream: Negotiated) -> Self { - Self(stream) + pub(crate) fn new(stream: Negotiated, counter: ActiveStreamCounter) -> Self { + Self { + stream, + counter: Some(counter), + } + } + + /// Ignore this stream in the [Swarm](crate::Swarm)'s connection-keep-alive algorithm. + /// + /// By default, any active stream keeps a connection alive. For most protocols, + /// this is a good default as it ensures that the protocol is completed before + /// a connection is shut down. + /// Some protocols like libp2p's [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) + /// for example never complete and are of an auxiliary nature. + /// These protocols should opt-out of the keep alive algorithm using this method. + pub fn ignore_for_keep_alive(&mut self) { + self.counter.take(); } } @@ -20,7 +59,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + Pin::new(&mut self.get_mut().stream).poll_read(cx, buf) } fn poll_read_vectored( @@ -28,7 +67,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) + Pin::new(&mut self.get_mut().stream).poll_read_vectored(cx, bufs) } } @@ -38,7 +77,7 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + Pin::new(&mut self.get_mut().stream).poll_write(cx, buf) } fn poll_write_vectored( @@ -46,14 +85,14 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) + Pin::new(&mut self.get_mut().stream).poll_write_vectored(cx, bufs) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_flush(cx) + Pin::new(&mut self.get_mut().stream).poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_close(cx) + Pin::new(&mut self.get_mut().stream).poll_close(cx) } }