From f303b3f2e25d6b8c8d1cae529e47f51353f0e75d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 1 Nov 2023 12:31:52 +1100 Subject: [PATCH] refactor(dcutr): simplify public API We refactor the `libp2p-dcutr` API to only emit a single event: whether the hole-punch was successful or not. All other intermediate events are removed. Hole-punching is something that we try to do automatically as soon as we are connected to a peer over a relayed connection. The lack of explicit user intent means any event we emit is at best informational and not a "response" that the user would wait for. Thus, I chose to not expose the details of why the hole-punch failed but return an opaque error. Lastly, this PR also removes the usage of `ConnectionHandlerEvent::Close`. Just because something went wrong during the DCUtR handshake, doesn't mean we should close the relayed connection. Related: #3591. Pull-Request: #4749. --- Cargo.lock | 1 + hole-punching-tests/src/main.rs | 22 +-- libp2p/src/tutorials/hole_punching.rs | 15 +- misc/metrics/src/dcutr.rs | 19 +- protocols/dcutr/CHANGELOG.md | 8 +- protocols/dcutr/Cargo.toml | 1 + protocols/dcutr/src/behaviour.rs | 108 +++++------- protocols/dcutr/src/handler/relayed.rs | 211 +++++++++++------------ protocols/dcutr/src/lib.rs | 4 +- protocols/dcutr/src/protocol/inbound.rs | 155 +++++++---------- protocols/dcutr/src/protocol/outbound.rs | 155 ++++++++--------- protocols/dcutr/tests/lib.rs | 22 +-- 12 files changed, 312 insertions(+), 409 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d3f2e5b07a..ef942f48c08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2502,6 +2502,7 @@ dependencies = [ "either", "env_logger 0.10.0", "futures", + "futures-bounded", "futures-timer", "instant", "libp2p-core", diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs index f7373aa4f94..72b81f776ad 100644 --- a/hole-punching-tests/src/main.rs +++ b/hole-punching-tests/src/main.rs @@ -111,12 +111,10 @@ async fn main() -> Result<()> { .await?; } ( - SwarmEvent::Behaviour(BehaviourEvent::Dcutr( - dcutr::Event::DirectConnectionUpgradeSucceeded { - remote_peer_id, - connection_id, - }, - )), + SwarmEvent::Behaviour(BehaviourEvent::Dcutr(dcutr::Event { + remote_peer_id, + result: Ok(connection_id), + })), _, _, ) => { @@ -138,13 +136,11 @@ async fn main() -> Result<()> { return Ok(()); } ( - SwarmEvent::Behaviour(BehaviourEvent::Dcutr( - dcutr::Event::DirectConnectionUpgradeFailed { - remote_peer_id, - error, - .. - }, - )), + SwarmEvent::Behaviour(BehaviourEvent::Dcutr(dcutr::Event { + remote_peer_id, + result: Err(error), + .. + })), _, _, ) => { diff --git a/libp2p/src/tutorials/hole_punching.rs b/libp2p/src/tutorials/hole_punching.rs index 5fd74fe754e..f9f42432ba4 100644 --- a/libp2p/src/tutorials/hole_punching.rs +++ b/libp2p/src/tutorials/hole_punching.rs @@ -166,18 +166,9 @@ //! [2022-01-30T12:54:10Z INFO client] Established connection to PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X") via Dialer { address: "/ip4/$RELAY_PEER_ID/tcp/4001/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN/p2p-circuit/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X", role_override: Dialer } //! ``` //! -//! 2. The listening client initiating a direct connection upgrade for the new relayed connection. -//! Reported by [`dcutr`](crate::dcutr) through -//! [`Event::RemoteInitiatedDirectConnectionUpgrade`](crate::dcutr::Event::RemoteInitiatedDirectConnectionUpgrade). +//! 2. The direct connection upgrade, also known as hole punch, succeeding. +//! Reported by [`dcutr`](crate::dcutr) through [`Event`](crate::dcutr::Event) containing [`Result::Ok`] with the [`ConnectionId`](libp2p_swarm::ConnectionId) of the new direct connection. //! //! ``` ignore -//! [2022-01-30T12:54:11Z INFO client] RemoteInitiatedDirectConnectionUpgrade { remote_peer_id: PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X"), remote_relayed_addr: "/ip4/$RELAY_PEER_ID/tcp/4001/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN/p2p-circuit/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X" } -//! ``` -//! -//! 3. The direct connection upgrade, also known as hole punch, succeeding. Reported by -//! [`dcutr`](crate::dcutr) through -//! [`Event::RemoteInitiatedDirectConnectionUpgrade`](crate::dcutr::Event::DirectConnectionUpgradeSucceeded). -//! -//! ``` ignore -//! [2022-01-30T12:54:11Z INFO client] DirectConnectionUpgradeSucceeded { remote_peer_id: PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X") } +//! [2022-01-30T12:54:11Z INFO client] Event { remote_peer_id: PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X"), result: Ok(2) } //! ``` diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index dc15e1f838d..3e60dca2cab 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -49,8 +49,6 @@ struct EventLabels { #[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)] enum EventType { - InitiateDirectConnectionUpgrade, - RemoteInitiatedDirectConnectionUpgrade, DirectConnectionUpgradeSucceeded, DirectConnectionUpgradeFailed, } @@ -58,22 +56,13 @@ enum EventType { impl From<&libp2p_dcutr::Event> for EventType { fn from(event: &libp2p_dcutr::Event) -> Self { match event { - libp2p_dcutr::Event::InitiatedDirectConnectionUpgrade { + libp2p_dcutr::Event { remote_peer_id: _, - local_relayed_addr: _, - } => EventType::InitiateDirectConnectionUpgrade, - libp2p_dcutr::Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: _, - remote_relayed_addr: _, - } => EventType::RemoteInitiatedDirectConnectionUpgrade, - libp2p_dcutr::Event::DirectConnectionUpgradeSucceeded { - remote_peer_id: _, - connection_id: _, + result: Ok(_), } => EventType::DirectConnectionUpgradeSucceeded, - libp2p_dcutr::Event::DirectConnectionUpgradeFailed { + libp2p_dcutr::Event { remote_peer_id: _, - connection_id: _, - error: _, + result: Err(_), } => EventType::DirectConnectionUpgradeFailed, } } diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 179db86dff2..cb84020ec5f 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -1,13 +1,13 @@ ## 0.11.0 - unreleased - Add `ConnectionId` to `Event::DirectConnectionUpgradeSucceeded` and `Event::DirectConnectionUpgradeFailed`. - See [PR 4558]. - -[PR 4558]: https://github.com/libp2p/rust-libp2p/pull/4558 - + See [PR 4558](https://github.com/libp2p/rust-libp2p/pull/4558). - Exchange address _candidates_ instead of external addresses in `CONNECT`. If hole-punching wasn't working properly for you until now, this might be the reason why. See [PR 4624](https://github.com/libp2p/rust-libp2p/pull/4624). +- Simplify public API. + We now only emit a single event: whether the hole-punch was successful or not. + See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). ## 0.10.0 diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 33dc570d112..0e59585a416 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -25,6 +25,7 @@ quick-protobuf-codec = { workspace = true } thiserror = "1.0" void = "1" lru = "0.11.1" +futures-bounded = { workspace = true } [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 72b30421346..6aecc596c71 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -20,7 +20,7 @@ //! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node. -use crate::handler; +use crate::{handler, protocol}; use either::Either; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Protocol; @@ -32,7 +32,7 @@ use libp2p_swarm::{ dummy, ConnectionDenied, ConnectionHandler, ConnectionId, NewExternalAddrCandidate, THandler, THandlerOutEvent, }; -use libp2p_swarm::{NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm}; +use libp2p_swarm::{NetworkBehaviour, NotifyHandler, THandlerInEvent, ToSwarm}; use lru::LruCache; use std::collections::{HashMap, HashSet, VecDeque}; use std::num::NonZeroUsize; @@ -44,32 +44,25 @@ pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; /// The events produced by the [`Behaviour`]. #[derive(Debug)] -pub enum Event { - InitiatedDirectConnectionUpgrade { - remote_peer_id: PeerId, - local_relayed_addr: Multiaddr, - }, - RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: PeerId, - remote_relayed_addr: Multiaddr, - }, - DirectConnectionUpgradeSucceeded { - remote_peer_id: PeerId, - connection_id: ConnectionId, - }, - DirectConnectionUpgradeFailed { - remote_peer_id: PeerId, - connection_id: ConnectionId, - error: Error, - }, +pub struct Event { + pub remote_peer_id: PeerId, + pub result: Result, +} + +#[derive(Debug, Error)] +#[error("Failed to hole-punch connection: {inner}")] +pub struct Error { + inner: InnerError, } #[derive(Debug, Error)] -pub enum Error { - #[error("Failed to dial peer.")] - Dial, - #[error("Failed to establish substream: {0}.")] - Handler(StreamUpgradeError), +enum InnerError { + #[error("Giving up after {0} dial attempts")] + AttemptsExceeded(u8), + #[error("Inbound stream error: {0}")] + InboundError(protocol::inbound::Error), + #[error("Outbound stream error: {0}")] + OutboundError(protocol::outbound::Error), } pub struct Behaviour { @@ -142,13 +135,12 @@ impl Behaviour { event: Either::Left(handler::relayed::Command::Connect), }) } else { - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: peer_id, - connection_id: failed_direct_connection, - error: Error::Dial, - }, - )]); + self.queued_events.extend([ToSwarm::GenerateEvent(Event { + remote_peer_id: peer_id, + result: Err(Error { + inner: InnerError::AttemptsExceeded(MAX_NUMBER_OF_UPGRADE_ATTEMPTS), + }), + })]); } } @@ -197,13 +189,6 @@ impl NetworkBehaviour for Behaviour { handler::relayed::Handler::new(connected_point, self.observed_addresses()); handler.on_behaviour_event(handler::relayed::Command::Connect); - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::InitiatedDirectConnectionUpgrade { - remote_peer_id: peer, - local_relayed_addr: local_addr.clone(), - }, - )]); - return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. } self.direct_connections @@ -255,12 +240,10 @@ impl NetworkBehaviour for Behaviour { ); } - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::DirectConnectionUpgradeSucceeded { - remote_peer_id: peer, - connection_id, - }, - )]); + self.queued_events.extend([ToSwarm::GenerateEvent(Event { + remote_peer_id: peer, + result: Ok(connection_id), + })]); } Ok(Either::Right(dummy::ConnectionHandler)) @@ -284,15 +267,7 @@ impl NetworkBehaviour for Behaviour { }; match handler_event { - Either::Left(handler::relayed::Event::InboundConnectRequest { remote_addr }) => { - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: event_source, - remote_relayed_addr: remote_addr, - }, - )]); - } - Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { + Either::Left(handler::relayed::Event::InboundConnectNegotiated { remote_addrs }) => { log::debug!( "Attempting to hole-punch as dialer to {event_source} using {remote_addrs:?}" ); @@ -308,14 +283,23 @@ impl NetworkBehaviour for Behaviour { .insert(maybe_direct_connection_id, relayed_connection_id); self.queued_events.push_back(ToSwarm::Dial { opts }); } - Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { - self.queued_events.push_back(ToSwarm::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: event_source, - connection_id: relayed_connection_id, - error: Error::Handler(error), - }, - )); + Either::Left(handler::relayed::Event::InboundConnectFailed { error }) => { + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + result: Err(Error { + inner: InnerError::InboundError(error), + }), + })); + } + Either::Left(handler::relayed::Event::OutboundConnectFailed { error }) => { + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + result: Err(Error { + inner: InnerError::OutboundError(error), + }), + })); + + // Maybe treat these as transient and retry? } Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { log::debug!( diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 23ab9f4ae5a..9d600d234e5 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -21,22 +21,26 @@ //! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection. use crate::behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS; -use crate::protocol; +use crate::{protocol, PROTOCOL_NAME}; use either::Either; use futures::future; -use futures::future::{BoxFuture, FutureExt}; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::upgrade::DeniedUpgrade; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, }; +use protocol::{inbound, outbound}; use std::collections::VecDeque; +use std::io; use std::task::{Context, Poll}; +use std::time::Duration; +use void::Void; #[derive(Debug)] pub enum Command { @@ -45,26 +49,14 @@ pub enum Command { #[derive(Debug)] pub enum Event { - InboundConnectRequest { - remote_addr: Multiaddr, - }, - InboundConnectNegotiated(Vec), - OutboundNegotiationFailed { - error: StreamUpgradeError, - }, - OutboundConnectNegotiated { - remote_addrs: Vec, - }, + InboundConnectNegotiated { remote_addrs: Vec }, + OutboundConnectNegotiated { remote_addrs: Vec }, + InboundConnectFailed { error: inbound::Error }, + OutboundConnectFailed { error: outbound::Error }, } pub struct Handler { endpoint: ConnectedPoint, - /// A pending fatal error that results in the connection being closed. - pending_error: Option< - StreamUpgradeError< - Either, - >, - >, /// Queue of events to return when polled. queued_events: VecDeque< ConnectionHandlerEvent< @@ -74,9 +66,12 @@ pub struct Handler { ::Error, >, >, - /// Inbound connect, accepted by the behaviour, pending completion. - inbound_connect: - Option, protocol::inbound::UpgradeError>>>, + + // Inbound DCUtR handshakes + inbound_stream: futures_bounded::FuturesSet, inbound::Error>>, + + // Outbound DCUtR handshake. + outbound_stream: futures_bounded::FuturesSet, outbound::Error>>, /// The addresses we will send to the other party for hole-punching attempts. holepunch_candidates: Vec, @@ -88,9 +83,9 @@ impl Handler { pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec) -> Self { Self { endpoint, - pending_error: Default::default(), queued_events: Default::default(), - inbound_connect: Default::default(), + inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), + outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), holepunch_candidates, attempts: 0, } @@ -106,29 +101,19 @@ impl Handler { >, ) { match output { - future::Either::Left(inbound_connect) => { + future::Either::Left(stream) => { if self - .inbound_connect - .replace( - inbound_connect - .accept(self.holepunch_candidates.clone()) - .boxed(), - ) - .is_some() + .inbound_stream + .try_push(inbound::handshake( + stream, + self.holepunch_candidates.clone(), + )) + .is_err() { log::warn!( - "New inbound connect stream while still upgrading previous one. \ - Replacing previous with new.", + "New inbound connect stream while still upgrading previous one. Replacing previous with new.", ); } - let remote_addr = match &self.endpoint { - ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), - ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), - }; - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectRequest { remote_addr }, - )); self.attempts += 1; } // A connection listener denies all incoming substreams, thus none can ever be fully negotiated. @@ -139,8 +124,7 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, FullyNegotiatedOutbound { - protocol: protocol::outbound::Connect { obs_addrs }, - .. + protocol: stream, .. }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, @@ -150,12 +134,18 @@ impl Handler { self.endpoint.is_listener(), "A connection dialer never initiates a connection upgrade." ); - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectNegotiated { - remote_addrs: obs_addrs, - }, - )); + if self + .outbound_stream + .try_push(outbound::handshake( + stream, + self.holepunch_candidates.clone(), + )) + .is_err() + { + log::warn!( + "New outbound connect stream while still upgrading previous one. Replacing previous with new.", + ); + } } fn on_listen_upgrade_error( @@ -165,10 +155,7 @@ impl Handler { ::InboundProtocol, >, ) { - self.pending_error = Some(StreamUpgradeError::Apply(match error { - Either::Left(e) => Either::Left(e), - Either::Right(v) => void::unreachable(v), - })); + void::unreachable(error.into_inner()); } fn on_dial_upgrade_error( @@ -178,50 +165,33 @@ impl Handler { ::OutboundProtocol, >, ) { - match error { - StreamUpgradeError::Timeout => { - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundNegotiationFailed { - error: StreamUpgradeError::Timeout, - }, - )); - } - StreamUpgradeError::NegotiationFailed => { - // The remote merely doesn't support the DCUtR protocol. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundNegotiationFailed { - error: StreamUpgradeError::NegotiationFailed, - }, - )); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(Either::Right)); - } - } + let error = match error { + StreamUpgradeError::Apply(v) => void::unreachable(v), + StreamUpgradeError::NegotiationFailed => outbound::Error::Unsupported, + StreamUpgradeError::Io(e) => outbound::Error::Io(e), + StreamUpgradeError::Timeout => outbound::Error::Io(io::ErrorKind::TimedOut.into()), + }; + + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { error }, + )) } } impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type Error = StreamUpgradeError< - Either, - >; - type InboundProtocol = Either; - type OutboundProtocol = protocol::outbound::Upgrade; + type Error = Void; + type InboundProtocol = Either, DeniedUpgrade>; + type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { match self.endpoint { ConnectedPoint::Dialer { .. } => { - SubstreamProtocol::new(Either::Left(protocol::inbound::Upgrade {}), ()) + SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), ()) } ConnectedPoint::Listener { .. } => { // By the protocol specification the listening side of a relayed connection @@ -239,10 +209,7 @@ impl ConnectionHandler for Handler { Command::Connect => { self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - protocol::outbound::Upgrade::new(self.holepunch_candidates.clone()), - (), - ), + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), }); self.attempts += 1; } @@ -268,31 +235,55 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - // Check for a pending (fatal) error. - if let Some(err) = self.pending_error.take() { - // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - // Return queued events. if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } - if let Some(Poll::Ready(result)) = self.inbound_connect.as_mut().map(|f| f.poll_unpin(cx)) { - self.inbound_connect = None; - match result { - Ok(addresses) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectNegotiated(addresses), - )); - } - Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( - Either::Left(e), - ))) - } + match self.inbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectFailed { error }, + )) + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectFailed { + error: inbound::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )) + } + Poll::Pending => {} + } + + match self.outbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { error }, + )) + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { + error: outbound::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )) } + Poll::Pending => {} } Poll::Pending diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 389365f94c5..7c5d28aba19 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -36,8 +36,8 @@ mod proto { pub use behaviour::{Behaviour, Error, Event}; pub use protocol::PROTOCOL_NAME; pub mod inbound { - pub use crate::protocol::inbound::UpgradeError; + pub use crate::protocol::inbound::ProtocolViolation; } pub mod outbound { - pub use crate::protocol::outbound::UpgradeError; + pub use crate::protocol::outbound::ProtocolViolation; } diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index d38b6f4559a..95665843724 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -20,114 +20,91 @@ use crate::proto; use asynchronous_codec::Framed; -use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::{Stream, StreamProtocol}; +use futures::prelude::*; +use libp2p_core::{multiaddr::Protocol, Multiaddr}; +use libp2p_swarm::Stream; use std::convert::TryFrom; -use std::iter; +use std::io; use thiserror::Error; -pub struct Upgrade {} - -impl upgrade::UpgradeInfo for Upgrade { - type Info = StreamProtocol; - type InfoIter = iter::Once; +pub(crate) async fn handshake( + stream: Stream, + candidates: Vec, +) -> Result, Error> { + let mut stream = Framed::new( + stream, + quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), + ); + + let proto::HolePunch { type_pb, ObsAddrs } = stream + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; + + if ObsAddrs.is_empty() { + return Err(Error::Protocol(ProtocolViolation::NoAddresses)); + }; + + let obs_addrs = ObsAddrs + .into_iter() + .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { + Ok(a) => Some(a), + Err(e) => { + log::debug!("Unable to parse multiaddr: {e}"); + None + } + }) + // Filter out relayed addresses. + .filter(|a| { + if a.iter().any(|p| p == Protocol::P2pCircuit) { + log::debug!("Dropping relayed address {a}"); + false + } else { + true + } + }) + .collect(); - fn protocol_info(&self) -> Self::InfoIter { - iter::once(super::PROTOCOL_NAME) + if !matches!(type_pb, proto::Type::CONNECT) { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)); } -} - -impl upgrade::InboundUpgrade for Upgrade { - type Output = PendingConnect; - type Error = UpgradeError; - type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future { - let mut substream = Framed::new( - substream, - quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), - ); + let msg = proto::HolePunch { + type_pb: proto::Type::CONNECT, + ObsAddrs: candidates.into_iter().map(|a| a.to_vec()).collect(), + }; - async move { - let proto::HolePunch { type_pb, ObsAddrs } = - substream.next().await.ok_or(UpgradeError::StreamClosed)??; - - let obs_addrs = if ObsAddrs.is_empty() { - return Err(UpgradeError::NoAddresses); - } else { - ObsAddrs - .into_iter() - .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { - Ok(a) => Some(a), - Err(e) => { - log::debug!("Unable to parse multiaddr: {e}"); - None - } - }) - // Filter out relayed addresses. - .filter(|a| { - if a.iter().any(|p| p == Protocol::P2pCircuit) { - log::debug!("Dropping relayed address {a}"); - false - } else { - true - } - }) - .collect::>() - }; + stream.send(msg).await?; + let proto::HolePunch { type_pb, .. } = stream + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; - match type_pb { - proto::Type::CONNECT => {} - proto::Type::SYNC => return Err(UpgradeError::UnexpectedTypeSync), - } - - Ok(PendingConnect { - substream, - remote_obs_addrs: obs_addrs, - }) - } - .boxed() + if !matches!(type_pb, proto::Type::SYNC) { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect)); } -} -pub struct PendingConnect { - substream: Framed>, - remote_obs_addrs: Vec, + Ok(obs_addrs) } -impl PendingConnect { - pub async fn accept( - mut self, - local_obs_addrs: Vec, - ) -> Result, UpgradeError> { - let msg = proto::HolePunch { - type_pb: proto::Type::CONNECT, - ObsAddrs: local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(), - }; - - self.substream.send(msg).await?; - let proto::HolePunch { type_pb, .. } = self - .substream - .next() - .await - .ok_or(UpgradeError::StreamClosed)??; - - match type_pb { - proto::Type::CONNECT => return Err(UpgradeError::UnexpectedTypeConnect), - proto::Type::SYNC => {} - } +#[derive(Debug, Error)] +pub enum Error { + #[error("IO error")] + Io(#[from] io::Error), + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), +} - Ok(self.remote_obs_addrs) +impl From for Error { + fn from(e: quick_protobuf_codec::Error) -> Self { + Error::Protocol(ProtocolViolation::Codec(e)) } } #[derive(Debug, Error)] -pub enum UpgradeError { +pub enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Stream closed")] - StreamClosed, #[error("Expected at least one address in reservation.")] NoAddresses, #[error("Failed to parse response type field.")] diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index 960d98cbe66..67c7116d706 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -19,115 +19,102 @@ // DEALINGS IN THE SOFTWARE. use crate::proto; +use crate::PROTOCOL_NAME; use asynchronous_codec::Framed; -use futures::{future::BoxFuture, prelude::*}; +use futures::prelude::*; use futures_timer::Delay; use instant::Instant; -use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::{Stream, StreamProtocol}; +use libp2p_core::{multiaddr::Protocol, Multiaddr}; +use libp2p_swarm::Stream; use std::convert::TryFrom; -use std::iter; +use std::io; use thiserror::Error; -pub struct Upgrade { - obs_addrs: Vec, -} +pub(crate) async fn handshake( + stream: Stream, + candidates: Vec, +) -> Result, Error> { + let mut stream = Framed::new( + stream, + quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), + ); -impl upgrade::UpgradeInfo for Upgrade { - type Info = StreamProtocol; - type InfoIter = iter::Once; + let msg = proto::HolePunch { + type_pb: proto::Type::CONNECT, + ObsAddrs: candidates.into_iter().map(|a| a.to_vec()).collect(), + }; - fn protocol_info(&self) -> Self::InfoIter { - iter::once(super::PROTOCOL_NAME) - } -} + stream.send(msg).await?; -impl Upgrade { - pub fn new(obs_addrs: Vec) -> Self { - Self { obs_addrs } - } -} + let sent_time = Instant::now(); -impl upgrade::OutboundUpgrade for Upgrade { - type Output = Connect; - type Error = UpgradeError; - type Future = BoxFuture<'static, Result>; + let proto::HolePunch { type_pb, ObsAddrs } = stream + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; - fn upgrade_outbound(self, substream: Stream, _: Self::Info) -> Self::Future { - let mut substream = Framed::new( - substream, - quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), - ); + let rtt = sent_time.elapsed(); - let msg = proto::HolePunch { - type_pb: proto::Type::CONNECT, - ObsAddrs: self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(), - }; + if !matches!(type_pb, proto::Type::CONNECT) { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)); + } - async move { - substream.send(msg).await?; + if ObsAddrs.is_empty() { + return Err(Error::Protocol(ProtocolViolation::NoAddresses)); + } - let sent_time = Instant::now(); + let obs_addrs = ObsAddrs + .into_iter() + .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { + Ok(a) => Some(a), + Err(e) => { + log::debug!("Unable to parse multiaddr: {e}"); + None + } + }) + // Filter out relayed addresses. + .filter(|a| { + if a.iter().any(|p| p == Protocol::P2pCircuit) { + log::debug!("Dropping relayed address {a}"); + false + } else { + true + } + }) + .collect(); - let proto::HolePunch { type_pb, ObsAddrs } = - substream.next().await.ok_or(UpgradeError::StreamClosed)??; + let msg = proto::HolePunch { + type_pb: proto::Type::SYNC, + ObsAddrs: vec![], + }; - let rtt = sent_time.elapsed(); + stream.send(msg).await?; - match type_pb { - proto::Type::CONNECT => {} - proto::Type::SYNC => return Err(UpgradeError::UnexpectedTypeSync), - } + Delay::new(rtt / 2).await; - let obs_addrs = if ObsAddrs.is_empty() { - return Err(UpgradeError::NoAddresses); - } else { - ObsAddrs - .into_iter() - .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { - Ok(a) => Some(a), - Err(e) => { - log::debug!("Unable to parse multiaddr: {e}"); - None - } - }) - // Filter out relayed addresses. - .filter(|a| { - if a.iter().any(|p| p == Protocol::P2pCircuit) { - log::debug!("Dropping relayed address {a}"); - false - } else { - true - } - }) - .collect::>() - }; - - let msg = proto::HolePunch { - type_pb: proto::Type::SYNC, - ObsAddrs: vec![], - }; - - substream.send(msg).await?; - - Delay::new(rtt / 2).await; - - Ok(Connect { obs_addrs }) - } - .boxed() - } + Ok(obs_addrs) } -pub struct Connect { - pub obs_addrs: Vec, +#[derive(Debug, Error)] +pub enum Error { + #[error("IO error")] + Io(#[from] io::Error), + #[error("Remote does not support the `{PROTOCOL_NAME}` protocol")] + Unsupported, + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), +} + +impl From for Error { + fn from(e: quick_protobuf_codec::Error) -> Self { + Error::Protocol(ProtocolViolation::Codec(e)) + } } #[derive(Debug, Error)] -pub enum UpgradeError { +pub enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Stream closed")] - StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, #[error("Expected 'reservation' field to be set.")] diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index f43144154a7..1c5ddb5a972 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -69,21 +69,6 @@ async fn connect() { src.dial_and_wait(dst_relayed_addr.clone()).await; - while let Ok(event) = src.next_swarm_event().await.try_into_behaviour_event() { - match event { - ClientEvent::Dcutr(dcutr::Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id, - remote_relayed_addr, - }) => { - if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr { - break; - } - } - ClientEvent::Identify(_) => {} - other => panic!("Unexpected event: {other:?}."), - } - } - let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id)); let established_conn_id = src @@ -99,9 +84,10 @@ async fn connect() { let reported_conn_id = src .wait(move |e| match e { - SwarmEvent::Behaviour(ClientEvent::Dcutr( - dcutr::Event::DirectConnectionUpgradeSucceeded { connection_id, .. }, - )) => Some(connection_id), + SwarmEvent::Behaviour(ClientEvent::Dcutr(dcutr::Event { + result: Ok(connection_id), + .. + })) => Some(connection_id), _ => None, }) .await;