Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve-ephemeral-peer-error-logs #7396

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion talpid-tunnel-config-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ workspace = true
log = { workspace = true }
rand = "0.8"
talpid-types = { path = "../talpid-types" }
futures = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
prost = { workspace = true }
Expand Down
83 changes: 79 additions & 4 deletions talpid-tunnel-config-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ pub async fn request_ephemeral_peer(
enable_post_quantum: bool,
enable_daita: bool,
) -> Result<EphemeralPeer, Error> {
log::debug!("Connecting to relay config service at {service_address}");
let client = connect_relay_config_client(service_address).await?;
log::debug!("Connected to relay config service at {service_address}");

request_ephemeral_peer_with(
client,
Expand All @@ -128,6 +130,7 @@ pub async fn request_ephemeral_peer_with(
) -> Result<EphemeralPeer, Error> {
let (pq_request, kem_secrets) = if enable_quantum_resistant {
let (pq_request, kem_secrets) = post_quantum_secrets().await;
log::debug!("Generated PQ secrets");
(Some(pq_request), Some(kem_secrets))
} else {
(None, None)
Expand Down Expand Up @@ -275,20 +278,92 @@ fn xor_assign(dst: &mut [u8; 32], src: &[u8; 32]) {
/// value has been speficically lowered, to avoid MTU issues. See the `socket` module.
#[cfg(not(target_os = "ios"))]
async fn connect_relay_config_client(ip: Ipv4Addr) -> Result<RelayConfigService, Error> {
use futures::TryFutureExt;
use hyper_util::rt::tokio::TokioIo;

let endpoint = Endpoint::from_static("tcp://0.0.0.0:0");
let addr = SocketAddr::new(IpAddr::V4(ip), CONFIG_SERVICE_PORT);

let connection = endpoint
.connect_with_connector(service_fn(move |_| async move {
let sock = socket::TcpSocket::new()?;
sock.connect(addr)
.map_ok(hyper_util::rt::tokio::TokioIo::new)
.await
let stream = sock.connect(addr).await?;
let sniffer = socket_sniffer::SocketSniffer {
s: stream,
rx_bytes: 0,
tx_bytes: 0,
start_time: std::time::Instant::now(),
};
Ok::<_, std::io::Error>(TokioIo::new(sniffer))
}))
.await
.map_err(Error::GrpcConnectError)?;

Ok(RelayConfigService::new(connection))
}

mod socket_sniffer {
pub struct SocketSniffer<S> {
pub s: S,
pub rx_bytes: usize,
pub tx_bytes: usize,
pub start_time: std::time::Instant,
}
use std::{
io,
pin::Pin,
task::{Context, Poll},
};

use tokio::io::AsyncWrite;

use tokio::io::{AsyncRead, ReadBuf};

impl<S> Drop for SocketSniffer<S> {
fn drop(&mut self) {
let duration = self.start_time.elapsed();
log::debug!(
"Tunnel config client connection ended. RX: {} bytes, TX: {} bytes, duration: {} s",
self.rx_bytes,
self.tx_bytes,
duration.as_secs()
);
}
}

impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for SocketSniffer<S> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let initial_data = buf.filled().len();
let bytes = std::task::ready!(Pin::new(&mut self.s).poll_read(cx, buf));
if bytes.is_ok() {
self.rx_bytes += buf.filled().len().saturating_sub(initial_data);
}
Poll::Ready(bytes)
}
}

impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for SocketSniffer<S> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let bytes = std::task::ready!(Pin::new(&mut self.s).poll_write(cx, buf));
if let Ok(bytes) = bytes {
self.tx_bytes += bytes;
}
Poll::Ready(bytes)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.s).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.s).poll_shutdown(cx)
}
}
}
44 changes: 40 additions & 4 deletions talpid-wireguard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,22 @@ impl WireguardMonitor {

let ephemeral_obfs_sender = close_obfs_sender.clone();
if config.quantum_resistant || config.daita {
ephemeral::config_ephemeral_peers(
if let Err(e) = ephemeral::config_ephemeral_peers(
&tunnel,
&mut config,
args.retry_attempt,
obfuscator.clone(),
ephemeral_obfs_sender,
)
.await?;
.await
{
// We have received a small amount of reports about ephemeral peer nogationation
// timing out on Windows for 2024.9-beta1. These verbose data usage logs are
// a temporary measure to help us understand the issue. They can be removed
// if the issue is resolved.
log_tunnel_data_usage(&config, &tunnel).await;
return Err(e);
}

let metadata = Self::tunnel_metadata(&iface_name, &config);
event_hook
Expand Down Expand Up @@ -464,15 +472,23 @@ impl WireguardMonitor {
if should_negotiate_ephemeral_peer {
let ephemeral_obfs_sender = close_obfs_sender.clone();

ephemeral::config_ephemeral_peers(
if let Err(e) = ephemeral::config_ephemeral_peers(
&tunnel,
&mut config,
args.retry_attempt,
obfuscator.clone(),
ephemeral_obfs_sender,
args.tun_provider,
)
.await?;
.await
{
// We have received a small amount of reports about ephemeral peer nogationation
// timing out on Windows for 2024.9-beta1. These verbose data usage logs are
// a temporary measure to help us understand the issue. They can be removed
// if the issue is resolved.
log_tunnel_data_usage(&config, &tunnel).await;
return Err(e);
}

let metadata = Self::tunnel_metadata(&iface_name, &config);
event_hook
Expand Down Expand Up @@ -965,6 +981,26 @@ impl WireguardMonitor {
}
}

async fn log_tunnel_data_usage(config: &Config, tunnel: &Arc<AsyncMutex<Option<TunnelType>>>) {
let tunnel = tunnel.lock().await;
let Some(tunnel) = &*tunnel else { return };
let Ok(tunnel_stats) = tunnel.get_tunnel_stats() else {
return;
};
if let Some(stats) = config
.exit_peer
.as_ref()
.map(|peer| peer.public_key.as_bytes())
.and_then(|pubkey| tunnel_stats.get(pubkey))
{
log::warn!("Exit peer stats: {:?}", stats);
};
let pubkey = config.entry_peer.public_key.as_bytes();
if let Some(stats) = tunnel_stats.get(pubkey) {
log::warn!("Entry peer stats: {:?}", stats);
}
}

#[derive(Debug)]
enum CloseMsg {
Stop,
Expand Down
Loading