diff --git a/.github/workflows/interop-test.yml b/.github/workflows/interop-test.yml index 7584380a10f..5515a9c7f19 100644 --- a/.github/workflows/interop-test.yml +++ b/.github/workflows/interop-test.yml @@ -37,3 +37,20 @@ jobs: s3-access-key-id: ${{ vars.TEST_PLANS_BUILD_CACHE_KEY_ID }} s3-secret-access-key: ${{ secrets.TEST_PLANS_BUILD_CACHE_KEY }} worker-count: 16 + run-holepunching-interop: + name: Run hole-punch interoperability tests + runs-on: ${{ fromJSON(github.repository == 'libp2p/rust-libp2p' && '["self-hosted", "linux", "x64", "4xlarge"]' || '"ubuntu-latest"') }} + steps: + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - name: Build image + run: docker buildx build --load -t rust-libp2p-head . -f hole-punching-tests/Dockerfile + - name: Run tests + uses: libp2p/test-plans/.github/actions/run-interop-hole-punch-test@master + with: + test-filter: rust-libp2p-head + extra-versions: ${{ github.workspace }}/hole-punching-tests/version.json + s3-cache-bucket: libp2p-by-tf-aws-bootstrap + s3-access-key-id: ${{ vars.TEST_PLANS_BUILD_CACHE_KEY_ID }} + s3-secret-access-key: ${{ secrets.TEST_PLANS_BUILD_CACHE_KEY }} + worker-count: 1 diff --git a/Cargo.lock b/Cargo.lock index 378ea5af2f8..e7b67a47724 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1937,6 +1937,21 @@ dependencies = [ "hmac 0.8.1", ] +[[package]] +name = "hole-punching-tests" +version = "0.1.0" +dependencies = [ + "anyhow", + "env_logger 0.10.0", + "futures", + "libp2p", + "log", + "redis", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "hostname" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 8f487a4e06a..699cb0619cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "examples/relay-server", "examples/rendezvous", "examples/upnp", + "hole-punching-tests", "identity", "interop-tests", "misc/allow-block-list", diff --git a/hole-punching-tests/Cargo.toml b/hole-punching-tests/Cargo.toml new file mode 100644 index 00000000000..8398d429014 --- /dev/null +++ b/hole-punching-tests/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "hole-punching-tests" +version = "0.1.0" +edition = "2021" +publish = false +license = "MIT" + +[dependencies] +anyhow = "1" +env_logger = "0.10.0" +futures = "0.3.28" +libp2p = { path = "../libp2p", features = ["tokio", "dcutr", "identify", "macros", "noise", "ping", "relay", "tcp", "yamux", "quic"] } +log = "0.4" +redis = { version = "0.23.0", default-features = false, features = ["tokio-comp"] } +tokio = { version = "1.29.1", features = ["full"] } +serde = { version = "1.0.188", features = ["derive"] } +serde_json = "1.0.107" diff --git a/hole-punching-tests/Dockerfile b/hole-punching-tests/Dockerfile new file mode 100644 index 00000000000..864f058799e --- /dev/null +++ b/hole-punching-tests/Dockerfile @@ -0,0 +1,19 @@ +# syntax=docker/dockerfile:1.5-labs +FROM rust:1.73.0 as builder + +# Run with access to the target cache to speed up builds +WORKDIR /workspace +ADD . . + +# Build the relay as a statically-linked binary. Unfortunately, we must specify the `--target` explicitly. See https://msfjarvis.dev/posts/building-static-rust-binaries-for-linux/. +RUN --mount=type=cache,target=./target \ + --mount=type=cache,target=/usr/local/cargo/registry \ + RUSTFLAGS='-C target-feature=+crt-static' cargo build --release --package hole-punching-tests --target $(rustc -vV | grep host | awk '{print $2}') + +RUN --mount=type=cache,target=./target \ + mv ./target/$(rustc -vV | grep host | awk '{print $2}')/release/hole-punching-tests /usr/local/bin/hole-punching-tests + +FROM alpine:3 +COPY --from=builder /usr/local/bin/hole-punching-tests /usr/bin/hole-punch-client +RUN --mount=type=cache,target=/var/cache/apk apk add bind-tools jq curl tcpdump iproute2-tc +ENV RUST_BACKTRACE=1 diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs new file mode 100644 index 00000000000..4bd7312df4b --- /dev/null +++ b/hole-punching-tests/src/main.rs @@ -0,0 +1,391 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::{Context, Result}; +use futures::stream::StreamExt; +use libp2p::swarm::dial_opts::DialOpts; +use libp2p::swarm::ConnectionId; +use libp2p::{ + core::multiaddr::{Multiaddr, Protocol}, + dcutr, identify, noise, ping, relay, + swarm::{NetworkBehaviour, SwarmEvent}, + tcp, yamux, Swarm, +}; +use redis::AsyncCommands; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; +use std::time::Duration; +use std::{fmt, io}; + +/// The redis key we push the relay's TCP listen address to. +const RELAY_TCP_ADDRESS: &str = "RELAY_TCP_ADDRESS"; +/// The redis key we push the relay's QUIC listen address to. +const RELAY_QUIC_ADDRESS: &str = "RELAY_QUIC_ADDRESS"; +/// The redis key we push the listen client's PeerId to. +const LISTEN_CLIENT_PEER_ID: &str = "LISTEN_CLIENT_PEER_ID"; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::builder() + .parse_filters("debug,netlink_proto=warn,rustls=warn,multistream_select=warn,libp2p_core::transport::choice=off,libp2p_swarm::connection=warn,libp2p_quic=trace") + .parse_default_env() + .init(); + + let mode = get_env("MODE")?; + let transport = get_env("TRANSPORT")?; + + let mut redis = RedisClient::new("redis", 6379).await?; + + let relay_addr = match transport { + TransportProtocol::Tcp => redis.pop::(RELAY_TCP_ADDRESS).await?, + TransportProtocol::Quic => redis.pop::(RELAY_QUIC_ADDRESS).await?, + }; + + let mut swarm = libp2p::SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp( + tcp::Config::new().port_reuse(true).nodelay(true), + noise::Config::new, + yamux::Config::default, + )? + .with_quic() + .with_relay_client(noise::Config::new, yamux::Config::default)? + .with_behaviour(|key, relay_client| { + Ok(Behaviour { + relay_client, + identify: identify::Behaviour::new(identify::Config::new( + "/hole-punch-tests/1".to_owned(), + key.public(), + )), + dcutr: dcutr::Behaviour::new(key.public().to_peer_id()), + ping: ping::Behaviour::new( + ping::Config::default().with_interval(Duration::from_secs(1)), + ), + }) + })? + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) + .build(); + + client_listen_on_transport(&mut swarm, transport).await?; + let relay_conn_id = client_connect_to_relay(&mut swarm, relay_addr.clone()) + .await + .context("Failed to connect to relay")?; + + client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?; + + let mut hole_punched_peer_connection = None; + + loop { + match (swarm.next().await.unwrap(), hole_punched_peer_connection) { + ( + SwarmEvent::Behaviour(BehaviourEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { .. }, + )), + _, + ) => { + log::info!("Relay accepted our reservation request."); + + redis + .push(LISTEN_CLIENT_PEER_ID, swarm.local_peer_id()) + .await?; + } + ( + SwarmEvent::Behaviour(BehaviourEvent::Dcutr( + dcutr::Event::DirectConnectionUpgradeSucceeded { + remote_peer_id, + connection_id, + }, + )), + _, + ) => { + log::info!("Successfully hole-punched to {remote_peer_id}"); + + // Closing the connection to the relay will implicitly close the relayed connection to the other peer. + swarm.close_connection(relay_conn_id); + hole_punched_peer_connection = Some(connection_id); + } + ( + SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { + connection, + result: Ok(rtt), + .. + })), + Some(hole_punched_connection), + ) if mode == Mode::Dial && connection == hole_punched_connection => { + println!("{}", serde_json::to_string(&Report::new(rtt))?); + + return Ok(()); + } + ( + SwarmEvent::Behaviour(BehaviourEvent::Dcutr( + dcutr::Event::DirectConnectionUpgradeFailed { + remote_peer_id, + error, + .. + }, + )), + _, + ) => { + log::info!("Failed to hole-punched to {remote_peer_id}"); + return Err(anyhow::Error::new(error)); + } + (SwarmEvent::OutgoingConnectionError { error, .. }, _) => { + anyhow::bail!(error) + } + ( + SwarmEvent::ConnectionClosed { + connection_id, + cause: Some(error), + .. + }, + _, + ) if connection_id == relay_conn_id => { + log::warn!("Connection to relay failed: {error}"); + + // TODO: Re-connecting is a bit of a hack, we should figure out why the connection sometimes fails. + client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?; + } + _ => {} + } + } +} + +#[derive(serde::Serialize)] +struct Report { + rtt_to_holepunched_peer_millis: u128, +} + +impl Report { + fn new(rtt: Duration) -> Self { + Self { + rtt_to_holepunched_peer_millis: rtt.as_millis(), + } + } +} + +fn get_env(key: &'static str) -> Result +where + T: FromStr, + T::Err: std::error::Error + Send + Sync + 'static, +{ + let val = std::env::var(key) + .with_context(|| format!("Missing env var `{key}`"))? + .parse() + .with_context(|| format!("Failed to parse `{key}`)"))?; + + Ok(val) +} + +async fn client_connect_to_relay( + swarm: &mut Swarm, + relay_addr: Multiaddr, +) -> Result { + let opts = DialOpts::from(relay_addr); + let relay_connection_id = opts.connection_id(); + + // Connect to the relay server. + swarm.dial(opts)?; + + loop { + match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { + info: identify::Info { observed_addr, .. }, + .. + })) => { + log::info!("Relay told us our public address: {observed_addr}"); + swarm.add_external_address(observed_addr); + break; + } + SwarmEvent::ConnectionEstablished { connection_id, .. } + if connection_id == relay_connection_id => + { + log::info!("Connected to the relay"); + } + SwarmEvent::OutgoingConnectionError { error, .. } => { + anyhow::bail!(error) + } + _ => {} + } + } + + Ok(relay_connection_id) +} + +async fn client_listen_on_transport( + swarm: &mut Swarm, + transport: TransportProtocol, +) -> Result<()> { + let listen_addr = match transport { + TransportProtocol::Tcp => tcp_addr(Ipv4Addr::UNSPECIFIED.into()), + TransportProtocol::Quic => quic_addr(Ipv4Addr::UNSPECIFIED.into()), + }; + let expected_listener_id = swarm + .listen_on(listen_addr) + .context("Failed to listen on address")?; + + let mut listen_addresses = 0; + + // We should have at least two listen addresses, one for localhost and the actual interface. + while listen_addresses < 2 { + if let SwarmEvent::NewListenAddr { + listener_id, + address, + } = swarm.next().await.unwrap() + { + if listener_id == expected_listener_id { + listen_addresses += 1; + } + + log::info!("Listening on {address}"); + } + } + Ok(()) +} + +async fn client_setup( + swarm: &mut Swarm, + redis: &mut RedisClient, + relay_addr: Multiaddr, + mode: Mode, +) -> Result<()> { + match mode { + Mode::Listen => { + swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?; + } + Mode::Dial => { + let remote_peer_id = redis.pop(LISTEN_CLIENT_PEER_ID).await?; + + swarm.dial( + relay_addr + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(remote_peer_id)), + )?; + } + }; + + Ok(()) +} + +fn tcp_addr(addr: IpAddr) -> Multiaddr { + Multiaddr::empty().with(addr.into()).with(Protocol::Tcp(0)) +} + +fn quic_addr(addr: IpAddr) -> Multiaddr { + Multiaddr::empty() + .with(addr.into()) + .with(Protocol::Udp(0)) + .with(Protocol::QuicV1) +} + +struct RedisClient { + inner: redis::aio::Connection, +} + +impl RedisClient { + async fn new(host: &str, port: u16) -> Result { + let client = redis::Client::open(format!("redis://{host}:{port}/")) + .context("Bad redis server URL")?; + let connection = client + .get_async_connection() + .await + .context("Failed to connect to redis server")?; + + Ok(Self { inner: connection }) + } + + async fn push(&mut self, key: &str, value: impl ToString) -> Result<()> { + let value = value.to_string(); + + log::debug!("Pushing {key}={value} to redis"); + + self.inner.rpush(key, value).await?; + + Ok(()) + } + + async fn pop(&mut self, key: &str) -> Result + where + V: FromStr + fmt::Display, + V::Err: std::error::Error + Send + Sync + 'static, + { + log::debug!("Fetching {key} from redis"); + + let value = self + .inner + .blpop::<_, HashMap>(key, 0) + .await? + .remove(key) + .with_context(|| format!("Failed to get value for {key} from redis"))? + .parse()?; + + log::debug!("{key}={value}"); + + Ok(value) + } +} + +#[derive(Clone, Copy, Debug, PartialEq)] +enum TransportProtocol { + Tcp, + Quic, +} + +impl FromStr for TransportProtocol { + type Err = io::Error; + fn from_str(mode: &str) -> Result { + match mode { + "tcp" => Ok(TransportProtocol::Tcp), + "quic" => Ok(TransportProtocol::Quic), + _ => Err(io::Error::new( + io::ErrorKind::Other, + "Expected either 'tcp' or 'quic'", + )), + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq)] +enum Mode { + Dial, + Listen, +} + +impl FromStr for Mode { + type Err = io::Error; + fn from_str(mode: &str) -> Result { + match mode { + "dial" => Ok(Mode::Dial), + "listen" => Ok(Mode::Listen), + _ => Err(io::Error::new( + io::ErrorKind::Other, + "Expected either 'dial' or 'listen'", + )), + } + } +} + +#[derive(NetworkBehaviour)] +struct Behaviour { + relay_client: relay::client::Behaviour, + identify: identify::Behaviour, + dcutr: dcutr::Behaviour, + ping: ping::Behaviour, +} diff --git a/hole-punching-tests/version.json b/hole-punching-tests/version.json new file mode 100644 index 00000000000..f5db52d1c2d --- /dev/null +++ b/hole-punching-tests/version.json @@ -0,0 +1,8 @@ +{ + "id": "rust-libp2p-head", + "containerImageID": "rust-libp2p-head", + "transports": [ + "tcp", + "quic" + ] +} diff --git a/transports/quic/CHANGELOG.md b/transports/quic/CHANGELOG.md index 9f1353fdfe7..f947c387584 100644 --- a/transports/quic/CHANGELOG.md +++ b/transports/quic/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.10.0 - unreleased +- Improve hole-punch timing. + This should improve success rates for hole-punching QUIC connections. + See [PR 4549](https://github.com/libp2p/rust-libp2p/pull/4549). ## 0.9.3 diff --git a/transports/quic/src/hole_punching.rs b/transports/quic/src/hole_punching.rs index 874bc659b2e..41f55c5cada 100644 --- a/transports/quic/src/hole_punching.rs +++ b/transports/quic/src/hole_punching.rs @@ -4,6 +4,7 @@ use futures::future::Either; use rand::{distributions, Rng}; +use std::convert::Infallible; use std::{ net::{SocketAddr, UdpSocket}, time::Duration, @@ -18,22 +19,26 @@ pub(crate) async fn hole_puncher( futures::pin_mut!(punch_holes_future); match futures::future::select(P::sleep(timeout_duration), punch_holes_future).await { Either::Left(_) => Error::HandshakeTimedOut, - Either::Right((hole_punch_err, _)) => hole_punch_err, + Either::Right((Err(hole_punch_err), _)) => hole_punch_err, + Either::Right((Ok(never), _)) => match never {}, } } -async fn punch_holes(socket: UdpSocket, remote_addr: SocketAddr) -> Error { +async fn punch_holes( + socket: UdpSocket, + remote_addr: SocketAddr, +) -> Result { loop { - let sleep_duration = Duration::from_millis(rand::thread_rng().gen_range(10..=200)); - P::sleep(sleep_duration).await; - let contents: Vec = rand::thread_rng() .sample_iter(distributions::Standard) .take(64) .collect(); - if let Err(e) = P::send_to(&socket, &contents, remote_addr).await { - return Error::Io(e); - } + log::trace!("Sending random UDP packet to {remote_addr}"); + + P::send_to(&socket, &contents, remote_addr).await?; + + let sleep_duration = Duration::from_millis(rand::thread_rng().gen_range(10..=200)); + P::sleep(sleep_duration).await; } } diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 12126b865d2..24527649edf 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -308,7 +308,7 @@ impl Transport for GenTransport

{ ) -> Result> { let (socket_addr, _version, peer_id) = self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; - let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr))?; + let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?; let socket = self .eligible_listener(&socket_addr) @@ -318,6 +318,8 @@ impl Transport for GenTransport

{ .try_clone_socket() .map_err(Self::Error::from)?; + log::debug!("Preparing for hole-punch from {addr}"); + let hole_puncher = hole_puncher::

(socket, socket_addr, self.handshake_timeout); let (sender, receiver) = oneshot::channel();