From f4ab209ec8c90a883871d619f228265aebaec883 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 14 Nov 2023 14:21:10 +1100 Subject: [PATCH] Use `InflightProtocolDataQueue` in `libp2p-relay` --- Cargo.lock | 1 + protocols/relay/Cargo.toml | 1 + protocols/relay/src/priv_client/handler.rs | 219 ++++++++++----------- 3 files changed, 105 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4156f54ec2..04fb22b2fc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2956,6 +2956,7 @@ dependencies = [ "libp2p-identity", "libp2p-ping", "libp2p-plaintext", + "libp2p-protocol-utils", "libp2p-swarm", "libp2p-swarm-test", "libp2p-yamux", diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 54336549c35..032083dcdf8 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -28,6 +28,7 @@ static_assertions = "1" thiserror = "1.0" tracing = "0.1.37" void = "1" +libp2p-protocol-utils = { workspace = true } [dev-dependencies] libp2p-identity = { workspace = true, features = ["rand"] } diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 1d24493be77..d4bca2e4574 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -28,17 +28,17 @@ use libp2p_core::multiaddr::Protocol; use libp2p_core::upgrade::ReadyUpgrade; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; -use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, -}; +use libp2p_protocol_utils::InflightProtocolDataQueue; +use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound}; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; use std::task::{Context, Poll}; use std::time::Duration; use std::{fmt, io}; +use void::Void; /// The maximum number of circuits being denied concurrently. /// @@ -104,8 +104,12 @@ pub struct Handler { >, >, - /// We issue a stream upgrade for each pending request. - pending_requests: VecDeque, + /// Manages associated data whilst we wait for outbound streams to be opened. + pending_streams: InflightProtocolDataQueue< + PendingRequest, + StreamProtocol, + Result>, + >, /// A `RESERVE` request is in-flight for each item in this queue. active_reserve_requests: VecDeque>, @@ -136,7 +140,7 @@ impl Handler { remote_peer_id, remote_addr, queued_events: Default::default(), - pending_requests: Default::default(), + pending_streams: Default::default(), active_reserve_requests: Default::default(), inflight_reserve_requests: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, @@ -159,57 +163,6 @@ impl Handler { } } - fn on_dial_upgrade_error( - &mut self, - DialUpgradeError { error, .. }: DialUpgradeError< - ::OutboundOpenInfo, - ::OutboundProtocol, - >, - ) { - let pending_request = self - .pending_requests - .pop_front() - .expect("got a stream error without a pending request"); - - match pending_request { - PendingRequest::Reserve { mut to_listener } => { - let error = match error { - StreamUpgradeError::Timeout => { - outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()) - } - StreamUpgradeError::Apply(never) => void::unreachable(never), - StreamUpgradeError::NegotiationFailed => { - outbound_hop::ReserveError::Unsupported - } - StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e), - }; - - if let Err(e) = - to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error))) - { - tracing::debug!("Unable to send error to listener: {}", e.into_send_error()) - } - self.reservation.failed(); - } - PendingRequest::Connect { - to_dial: send_back, .. - } => { - let error = match error { - StreamUpgradeError::Timeout => { - outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into()) - } - StreamUpgradeError::NegotiationFailed => { - outbound_hop::ConnectError::Unsupported - } - StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e), - StreamUpgradeError::Apply(v) => void::unreachable(v), - }; - - let _ = send_back.send(Err(error)); - } - } - } - fn insert_to_deny_futs(&mut self, circuit: inbound_stop::Circuit) { let src_peer_id = circuit.src_peer_id(); @@ -241,25 +194,20 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { match event { In::Reserve { to_listener } => { - self.pending_requests - .push_back(PendingRequest::Reserve { to_listener }); - self.queued_events - .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()), - }); + self.pending_streams + .enqueue_request(HOP_PROTOCOL_NAME, PendingRequest::Reserve { to_listener }); } In::EstablishCircuit { to_dial: send_back, dst_peer_id, } => { - self.pending_requests.push_back(PendingRequest::Connect { - dst_peer_id, - to_dial: send_back, - }); - self.queued_events - .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()), - }); + self.pending_streams.enqueue_request( + HOP_PROTOCOL_NAME, + PendingRequest::Connect { + dst_peer_id, + to_dial: send_back, + }, + ); } } } @@ -442,12 +390,9 @@ impl ConnectionHandler for Handler { } if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) { - self.pending_requests - .push_back(PendingRequest::Reserve { to_listener }); - - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()), - }); + self.pending_streams + .enqueue_request(HOP_PROTOCOL_NAME, PendingRequest::Reserve { to_listener }); + continue; } // Deny incoming circuit requests. @@ -464,6 +409,81 @@ impl ConnectionHandler for Handler { Poll::Pending => {} } + if let Some(protocol) = self.pending_streams.next_request() { + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(protocol), ()), + }); + } + + match self.pending_streams.next_completed() { + Some((Ok(stream), PendingRequest::Reserve { to_listener })) => { + self.active_reserve_requests.push_back(to_listener); + if self + .inflight_reserve_requests + .try_push(outbound_hop::make_reservation(stream)) + .is_err() + { + tracing::warn!("Dropping outbound stream because we are at capacity") + } + continue; + } + Some((Err(error), PendingRequest::Reserve { mut to_listener })) => { + let error = match error { + StreamUpgradeError::Timeout => { + outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()) + } + StreamUpgradeError::Apply(never) => void::unreachable(never), + StreamUpgradeError::NegotiationFailed => { + outbound_hop::ReserveError::Unsupported + } + StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e), + }; + + if let Err(e) = + to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error))) + { + tracing::debug!("Unable to send error to listener: {}", e.into_send_error()) + } + self.reservation.failed(); + continue; + } + Some(( + Ok(stream), + PendingRequest::Connect { + to_dial, + dst_peer_id, + }, + )) => { + self.active_connect_requests.push_back(to_dial); + + if self + .inflight_outbound_connect_requests + .try_push(outbound_hop::open_circuit(stream, dst_peer_id)) + .is_err() + { + tracing::warn!("Dropping outbound stream because we are at capacity") + } + continue; + } + + Some((Err(error), PendingRequest::Connect { to_dial, .. })) => { + let error = match error { + StreamUpgradeError::Timeout => { + outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into()) + } + StreamUpgradeError::NegotiationFailed => { + outbound_hop::ConnectError::Unsupported + } + StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e), + StreamUpgradeError::Apply(v) => void::unreachable(v), + }; + + let _ = to_dial.send(Err(error)); + continue; + } + None => {} + } + return Poll::Pending; } } @@ -490,45 +510,12 @@ impl ConnectionHandler for Handler { tracing::warn!("Dropping inbound stream because we are at capacity") } } - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: stream, - .. - }) => { - let pending_request = self.pending_requests.pop_front().expect( - "opened a stream without a pending connection command or a reserve listener", - ); - match pending_request { - PendingRequest::Reserve { to_listener } => { - self.active_reserve_requests.push_back(to_listener); - if self - .inflight_reserve_requests - .try_push(outbound_hop::make_reservation(stream)) - .is_err() - { - tracing::warn!("Dropping outbound stream because we are at capacity") - } - } - PendingRequest::Connect { - dst_peer_id, - to_dial: send_back, - } => { - self.active_connect_requests.push_back(send_back); - - if self - .inflight_outbound_connect_requests - .try_push(outbound_hop::open_circuit(stream, dst_peer_id)) - .is_err() - { - tracing::warn!("Dropping outbound stream because we are at capacity") - } - } - } - } - ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { - void::unreachable(listen_upgrade_error.error) + ConnectionEvent::FullyNegotiatedOutbound(ev) => { + self.pending_streams.submit_response(Ok(ev.protocol)); } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - self.on_dial_upgrade_error(dial_upgrade_error) + ConnectionEvent::ListenUpgradeError(ev) => void::unreachable(ev.error), + ConnectionEvent::DialUpgradeError(ev) => { + self.pending_streams.submit_response(Err(ev.error)); } _ => {} }