From 6fcbf3e9667ca6f3ee778978430bb98c422b22ef Mon Sep 17 00:00:00 2001 From: Sebastian Holmin Date: Fri, 20 Dec 2024 16:40:00 +0100 Subject: [PATCH 1/7] Add logging for ephemeral peer negotiation timeouts on Windows --- talpid-wireguard/src/lib.rs | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/talpid-wireguard/src/lib.rs b/talpid-wireguard/src/lib.rs index 2d282c6315c6..7452cd4643e4 100644 --- a/talpid-wireguard/src/lib.rs +++ b/talpid-wireguard/src/lib.rs @@ -268,14 +268,22 @@ impl WireguardMonitor { let ephemeral_obfs_sender = close_obfs_sender.clone(); if config.quantum_resistant || config.daita { - ephemeral::config_ephemeral_peers( + if let Err(e) = ephemeral::config_ephemeral_peers( &tunnel, &mut config, args.retry_attempt, obfuscator.clone(), ephemeral_obfs_sender, ) - .await?; + .await + { + // We have received a small amount of reports about ephemeral peer nogationation + // timing out on Windows for 2024.9-beta1. These verbose data usage logs are + // a temporary measure to help us understand the issue. They can be removed + // if the issue is resolved. + log_tunnel_data_usage(&config, &tunnel).await; + return Err(e); + } let metadata = Self::tunnel_metadata(&iface_name, &config); event_hook @@ -965,6 +973,25 @@ impl WireguardMonitor { } } +async fn log_tunnel_data_usage(config: &Config, tunnel: &Arc>>>) { + let tunnel = tunnel.lock().await; + let Ok(tunnel_stats) = tunnel.as_ref().unwrap().get_tunnel_stats() else { + return; + }; + if let Some(stats) = config + .exit_peer + .as_ref() + .map(|peer| peer.public_key.as_bytes()) + .and_then(|pubkey| tunnel_stats.get(pubkey)) + { + log::warn!("Exit peer stats: {:?}", stats); + }; + let pubkey = config.entry_peer.public_key.as_bytes(); + if let Some(stats) = tunnel_stats.get(pubkey) { + log::warn!("Entry peer stats: {:?}", stats); + } +} + #[derive(Debug)] enum CloseMsg { Stop, From 3392a9455ea828e38612b28d5d9836c26e04ef83 Mon Sep 17 00:00:00 2001 From: Sebastian Holmin Date: Fri, 20 Dec 2024 16:46:08 +0100 Subject: [PATCH 2/7] Log data usage for tunnel config client --- talpid-tunnel-config-client/src/lib.rs | 78 ++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/talpid-tunnel-config-client/src/lib.rs b/talpid-tunnel-config-client/src/lib.rs index 7c80d4f3e5fa..f09cd32074af 100644 --- a/talpid-tunnel-config-client/src/lib.rs +++ b/talpid-tunnel-config-client/src/lib.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; #[cfg(not(target_os = "ios"))] use std::net::{IpAddr, Ipv4Addr}; use talpid_types::net::wireguard::{PresharedKey, PublicKey}; +use tokio::io::{AsyncRead, AsyncWrite}; use tonic::transport::Channel; #[cfg(not(target_os = "ios"))] use tonic::transport::Endpoint; @@ -107,7 +108,9 @@ pub async fn request_ephemeral_peer( enable_post_quantum: bool, enable_daita: bool, ) -> Result { + log::debug!("Connecting to relay config service at {service_address}"); let client = connect_relay_config_client(service_address).await?; + log::debug!("Connected to relay config service at {service_address}"); request_ephemeral_peer_with( client, @@ -128,6 +131,7 @@ pub async fn request_ephemeral_peer_with( ) -> Result { let (pq_request, kem_secrets) = if enable_quantum_resistant { let (pq_request, kem_secrets) = post_quantum_secrets().await; + log::debug!("Generated PQ secrets"); (Some(pq_request), Some(kem_secrets)) } else { (None, None) @@ -275,20 +279,84 @@ fn xor_assign(dst: &mut [u8; 32], src: &[u8; 32]) { /// value has been speficically lowered, to avoid MTU issues. See the `socket` module. #[cfg(not(target_os = "ios"))] async fn connect_relay_config_client(ip: Ipv4Addr) -> Result { - use futures::TryFutureExt; - let endpoint = Endpoint::from_static("tcp://0.0.0.0:0"); let addr = SocketAddr::new(IpAddr::V4(ip), CONFIG_SERVICE_PORT); let connection = endpoint .connect_with_connector(service_fn(move |_| async move { let sock = socket::TcpSocket::new()?; - sock.connect(addr) - .map_ok(hyper_util::rt::tokio::TokioIo::new) - .await + let stream = sock.connect(addr).await?; + let sniffer = SocketSniffer { + s: stream, + rx_bytes: 0, + tx_bytes: 0, + start_time: std::time::Instant::now(), + }; + Ok::<_, std::io::Error>(hyper_util::rt::tokio::TokioIo::new(sniffer)) })) .await .map_err(Error::GrpcConnectError)?; Ok(RelayConfigService::new(connection)) } + +struct SocketSniffer { + s: S, + rx_bytes: usize, + tx_bytes: usize, + start_time: std::time::Instant, +} + +impl Drop for SocketSniffer { + fn drop(&mut self) { + let duration = self.start_time.elapsed(); + log::debug!( + "Tunnel config client connection ended. RX: {} bytes, TX: {} bytes, duration: {} s", + self.rx_bytes, + self.tx_bytes, + duration.as_secs() + ); + } +} + +impl AsyncRead for SocketSniffer { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let bytes = std::task::ready!(std::pin::Pin::new(&mut self.s).poll_read(cx, buf)); + if bytes.is_ok() { + self.rx_bytes += buf.filled().len(); + } + std::task::Poll::Ready(bytes) + } +} + +impl AsyncWrite for SocketSniffer { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let bytes = std::task::ready!(std::pin::Pin::new(&mut self.s).poll_write(cx, buf)); + if bytes.is_ok() { + self.tx_bytes += buf.len(); + } + std::task::Poll::Ready(bytes) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.s).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.s).poll_shutdown(cx) + } +} From 512c4aeb70f9b77afc2e9b6cb936571d91e4fbcd Mon Sep 17 00:00:00 2001 From: Sebastian Holmin Date: Fri, 20 Dec 2024 17:00:29 +0100 Subject: [PATCH 3/7] Add `log_tunnel_data_usage` to Android too --- talpid-wireguard/src/lib.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/talpid-wireguard/src/lib.rs b/talpid-wireguard/src/lib.rs index 7452cd4643e4..c61d20bec604 100644 --- a/talpid-wireguard/src/lib.rs +++ b/talpid-wireguard/src/lib.rs @@ -472,7 +472,7 @@ impl WireguardMonitor { if should_negotiate_ephemeral_peer { let ephemeral_obfs_sender = close_obfs_sender.clone(); - ephemeral::config_ephemeral_peers( + if let Err(e) = ephemeral::config_ephemeral_peers( &tunnel, &mut config, args.retry_attempt, @@ -480,7 +480,15 @@ impl WireguardMonitor { ephemeral_obfs_sender, args.tun_provider, ) - .await?; + .await + { + // We have received a small amount of reports about ephemeral peer nogationation + // timing out on Windows for 2024.9-beta1. These verbose data usage logs are + // a temporary measure to help us understand the issue. They can be removed + // if the issue is resolved. + log_tunnel_data_usage(&config, &tunnel).await; + return Err(e); + } let metadata = Self::tunnel_metadata(&iface_name, &config); event_hook @@ -973,7 +981,7 @@ impl WireguardMonitor { } } -async fn log_tunnel_data_usage(config: &Config, tunnel: &Arc>>>) { +async fn log_tunnel_data_usage(config: &Config, tunnel: &Arc>>) { let tunnel = tunnel.lock().await; let Ok(tunnel_stats) = tunnel.as_ref().unwrap().get_tunnel_stats() else { return; From f58dfd59f013ca4873723b519abc367c557c9352 Mon Sep 17 00:00:00 2001 From: Sebastian Holmin Date: Fri, 20 Dec 2024 17:04:27 +0100 Subject: [PATCH 4/7] Remove unused `futures` dep --- Cargo.lock | 1 - talpid-tunnel-config-client/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 90e8b3e0ba36..c85c6b6f08e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4495,7 +4495,6 @@ name = "talpid-tunnel-config-client" version = "0.0.0" dependencies = [ "classic-mceliece-rust", - "futures", "hyper-util", "libc", "log", diff --git a/talpid-tunnel-config-client/Cargo.toml b/talpid-tunnel-config-client/Cargo.toml index 5cf6e4b52215..cb63d93fc999 100644 --- a/talpid-tunnel-config-client/Cargo.toml +++ b/talpid-tunnel-config-client/Cargo.toml @@ -14,7 +14,6 @@ workspace = true log = { workspace = true } rand = "0.8" talpid-types = { path = "../talpid-types" } -futures = { workspace = true } tonic = { workspace = true } tower = { workspace = true } prost = { workspace = true } From bb087ff19f28ac839c9d892cab798d2666843047 Mon Sep 17 00:00:00 2001 From: Sebastian Holmin Date: Fri, 20 Dec 2024 17:12:46 +0100 Subject: [PATCH 5/7] Remove unwrap --- talpid-wireguard/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/talpid-wireguard/src/lib.rs b/talpid-wireguard/src/lib.rs index c61d20bec604..cb19559dcf96 100644 --- a/talpid-wireguard/src/lib.rs +++ b/talpid-wireguard/src/lib.rs @@ -983,7 +983,8 @@ impl WireguardMonitor { async fn log_tunnel_data_usage(config: &Config, tunnel: &Arc>>) { let tunnel = tunnel.lock().await; - let Ok(tunnel_stats) = tunnel.as_ref().unwrap().get_tunnel_stats() else { + let Some(tunnel) = &*tunnel else { return }; + let Ok(tunnel_stats) = tunnel.get_tunnel_stats() else { return; }; if let Some(stats) = config From 4fc8028b301f59e0da4154aaa7d7c650a59cffeb Mon Sep 17 00:00:00 2001 From: Sebastian Holmin Date: Fri, 20 Dec 2024 17:13:26 +0100 Subject: [PATCH 6/7] Move SocketSniffer to separate module --- talpid-tunnel-config-client/src/lib.rs | 110 +++++++++++++------------ 1 file changed, 58 insertions(+), 52 deletions(-) diff --git a/talpid-tunnel-config-client/src/lib.rs b/talpid-tunnel-config-client/src/lib.rs index f09cd32074af..3a1b25bb4656 100644 --- a/talpid-tunnel-config-client/src/lib.rs +++ b/talpid-tunnel-config-client/src/lib.rs @@ -5,7 +5,6 @@ use std::net::SocketAddr; #[cfg(not(target_os = "ios"))] use std::net::{IpAddr, Ipv4Addr}; use talpid_types::net::wireguard::{PresharedKey, PublicKey}; -use tokio::io::{AsyncRead, AsyncWrite}; use tonic::transport::Channel; #[cfg(not(target_os = "ios"))] use tonic::transport::Endpoint; @@ -279,6 +278,8 @@ fn xor_assign(dst: &mut [u8; 32], src: &[u8; 32]) { /// value has been speficically lowered, to avoid MTU issues. See the `socket` module. #[cfg(not(target_os = "ios"))] async fn connect_relay_config_client(ip: Ipv4Addr) -> Result { + use hyper_util::rt::tokio::TokioIo; + let endpoint = Endpoint::from_static("tcp://0.0.0.0:0"); let addr = SocketAddr::new(IpAddr::V4(ip), CONFIG_SERVICE_PORT); @@ -286,13 +287,13 @@ async fn connect_relay_config_client(ip: Ipv4Addr) -> Result(hyper_util::rt::tokio::TokioIo::new(sniffer)) + Ok::<_, std::io::Error>(TokioIo::new(sniffer)) })) .await .map_err(Error::GrpcConnectError)?; @@ -300,63 +301,68 @@ async fn connect_relay_config_client(ip: Ipv4Addr) -> Result { - s: S, - rx_bytes: usize, - tx_bytes: usize, - start_time: std::time::Instant, -} - -impl Drop for SocketSniffer { - fn drop(&mut self) { - let duration = self.start_time.elapsed(); - log::debug!( - "Tunnel config client connection ended. RX: {} bytes, TX: {} bytes, duration: {} s", - self.rx_bytes, - self.tx_bytes, - duration.as_secs() - ); +mod socket_sniffer { + pub struct SocketSniffer { + pub s: S, + pub rx_bytes: usize, + pub tx_bytes: usize, + pub start_time: std::time::Instant, } -} + use std::{ + io, + pin::Pin, + task::{Context, Poll}, + }; + + use tokio::io::AsyncWrite; -impl AsyncRead for SocketSniffer { - fn poll_read( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - let bytes = std::task::ready!(std::pin::Pin::new(&mut self.s).poll_read(cx, buf)); - if bytes.is_ok() { - self.rx_bytes += buf.filled().len(); + use tokio::io::{AsyncRead, ReadBuf}; + + impl Drop for SocketSniffer { + fn drop(&mut self) { + let duration = self.start_time.elapsed(); + log::debug!( + "Tunnel config client connection ended. RX: {} bytes, TX: {} bytes, duration: {} s", + self.rx_bytes, + self.tx_bytes, + duration.as_secs() + ); } - std::task::Poll::Ready(bytes) } -} -impl AsyncWrite for SocketSniffer { - fn poll_write( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - let bytes = std::task::ready!(std::pin::Pin::new(&mut self.s).poll_write(cx, buf)); - if bytes.is_ok() { - self.tx_bytes += buf.len(); + impl AsyncRead for SocketSniffer { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let bytes = std::task::ready!(Pin::new(&mut self.s).poll_read(cx, buf)); + if bytes.is_ok() { + self.rx_bytes += buf.filled().len(); + } + Poll::Ready(bytes) } - std::task::Poll::Ready(bytes) } - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::pin::Pin::new(&mut self.s).poll_flush(cx) - } + impl AsyncWrite for SocketSniffer { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let bytes = std::task::ready!(Pin::new(&mut self.s).poll_write(cx, buf)); + if bytes.is_ok() { + self.tx_bytes += buf.len(); + } + Poll::Ready(bytes) + } - fn poll_shutdown( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::pin::Pin::new(&mut self.s).poll_shutdown(cx) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.s).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.s).poll_shutdown(cx) + } } } From 05e01e9ca9c434b165431a2776dbf948d95b90d6 Mon Sep 17 00:00:00 2001 From: Sebastian Holmin Date: Fri, 20 Dec 2024 17:24:46 +0100 Subject: [PATCH 7/7] Fix byte counting in socket sniffer --- talpid-tunnel-config-client/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/talpid-tunnel-config-client/src/lib.rs b/talpid-tunnel-config-client/src/lib.rs index 3a1b25bb4656..f13832fa609b 100644 --- a/talpid-tunnel-config-client/src/lib.rs +++ b/talpid-tunnel-config-client/src/lib.rs @@ -336,9 +336,10 @@ mod socket_sniffer { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + let initial_data = buf.filled().len(); let bytes = std::task::ready!(Pin::new(&mut self.s).poll_read(cx, buf)); if bytes.is_ok() { - self.rx_bytes += buf.filled().len(); + self.rx_bytes += buf.filled().len().saturating_sub(initial_data); } Poll::Ready(bytes) } @@ -351,8 +352,8 @@ mod socket_sniffer { buf: &[u8], ) -> Poll> { let bytes = std::task::ready!(Pin::new(&mut self.s).poll_write(cx, buf)); - if bytes.is_ok() { - self.tx_bytes += buf.len(); + if let Ok(bytes) = bytes { + self.tx_bytes += bytes; } Poll::Ready(bytes) }