diff --git a/Cargo.lock b/Cargo.lock index 1ac9743a0031..2e4357788d20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1162,16 +1162,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "eyre" -version = "0.6.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" -dependencies = [ - "indenter", - "once_cell", -] - [[package]] name = "fastrand" version = "2.0.2" @@ -1935,12 +1925,6 @@ dependencies = [ "icu_properties", ] -[[package]] -name = "indenter" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" - [[package]] name = "indexmap" version = "1.9.3" @@ -2214,11 +2198,10 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" name = "leak-checker" version = "0.1.0" dependencies = [ + "anyhow", "clap", - "eyre", "futures", "log", - "match_cfg", "nix 0.29.0", "pnet_packet 0.35.0", "pretty_env_logger", @@ -2570,6 +2553,7 @@ name = "mullvad-daemon" version = "0.0.0" dependencies = [ "android_logger", + "anyhow", "async-trait", "chrono", "clap", @@ -2579,6 +2563,7 @@ dependencies = [ "fern", "futures", "hickory-resolver", + "leak-checker", "libc", "log", "log-panics", @@ -2596,10 +2581,12 @@ dependencies = [ "serde", "serde_json", "simple-signal", + "socket2", "talpid-core", "talpid-dbus", "talpid-future", "talpid-platform-metadata", + "talpid-routing", "talpid-time", "talpid-types", "talpid-windows", @@ -3346,6 +3333,8 @@ dependencies = [ [[package]] name = "pfctl" version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44e65c0d3523afa79a600a3964c3ac0fabdabe2d7c68da624b2bb0b441b9d61" dependencies = [ "derive_builder", "ioctl-sys 0.8.0", @@ -4663,7 +4652,7 @@ dependencies = [ "parking_lot", "pcap", "pfctl", - "pnet_packet 0.34.0", + "pnet_packet 0.35.0", "rand 0.8.5", "resolv-conf", "serde", diff --git a/Cargo.toml b/Cargo.toml index 13206e5db715..ac49a9b5e6a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ hickory-server = { version = "0.24.2", features = ["resolver"] } tokio = { version = "1.42" } parity-tokio-ipc = "0.9" futures = "0.3.15" + # Tonic and related crates tonic = "0.12.3" tonic-build = { version = "0.10.0", default-features = false } @@ -94,6 +95,7 @@ hyper-util = {version = "0.1.8", features = ["client", "client-legacy", "http2", env_logger = "0.10.0" thiserror = "2.0" +anyhow = "1.0" log = "0.4" shadowsocks = "1.20.3" @@ -107,8 +109,10 @@ once_cell = "1.16" serde = "1.0.204" serde_json = "1.0.122" +pnet_packet = "0.35.0" ipnetwork = "0.20" tun = { version = "0.7", features = ["async"] } +socket2 = "0.5.7" # Test dependencies proptest = "1.4" diff --git a/leak-checker/Cargo.toml b/leak-checker/Cargo.toml index 6a24daba0cb3..e880c7f18a98 100644 --- a/leak-checker/Cargo.toml +++ b/leak-checker/Cargo.toml @@ -7,24 +7,28 @@ license.workspace = true edition.workspace = true rust-version.workspace = true +[features] +default = ["am-i-mullvad"] +am-i-mullvad = ["dep:reqwest"] + [dependencies] log.workspace = true -eyre = "0.6.12" -socket2 = { version = "0.5.7", features = ["all"] } -match_cfg = "0.1.0" -pnet_packet = "0.35.0" +anyhow.workspace = true +socket2 = { workspace = true, features = ["all"] } +pnet_packet.workspace = true pretty_env_logger = "0.5.0" -tokio = { workspace = true, features = ["macros", "time", "rt", "sync", "net"] } +tokio = { workspace = true, features = ["macros", "time", "rt", "sync", "net", "process"] } futures.workspace = true serde = { workspace = true, features = ["derive"] } -reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"] } -clap = { version = "*", features = ["derive"] } +clap = { workspace = true, features = ["derive"] } + +reqwest = { version = "0.12.9", optional = true, default-features = false, features = ["json", "rustls-tls"] } [dev-dependencies] tokio = { workspace = true, features = ["full"] } [target.'cfg(unix)'.dependencies] -nix = { version = "0.29.0", features = ["net"] } +nix = { version = "0.29.0", features = ["net", "socket", "uio"] } [target.'cfg(windows)'.dependencies] windows-sys.workspace = true diff --git a/leak-checker/examples/leaker-cli.rs b/leak-checker/examples/leaker-cli.rs index 3a391e7bf1ca..66ea1541d6c0 100644 --- a/leak-checker/examples/leaker-cli.rs +++ b/leak-checker/examples/leaker-cli.rs @@ -1,5 +1,5 @@ use clap::{Parser, Subcommand}; -use leak_checker::{am_i_mullvad::AmIMullvadOpt, traceroute::TracerouteOpt}; +use leak_checker::traceroute::TracerouteOpt; #[derive(Parser)] pub struct Opt { @@ -13,11 +13,12 @@ pub enum LeakMethod { Traceroute(#[clap(flatten)] TracerouteOpt), /// Ask `am.i.mullvad.net` whether you are leaking. - AmIMullvad(#[clap(flatten)] AmIMullvadOpt), + #[cfg(feature = "am-i-mullvad")] + AmIMullvad(#[clap(flatten)] leak_checker::am_i_mullvad::AmIMullvadOpt), } #[tokio::main] -async fn main() -> eyre::Result<()> { +async fn main() -> anyhow::Result<()> { pretty_env_logger::formatted_builder() .filter_level(log::LevelFilter::Debug) .parse_default_env() @@ -27,6 +28,7 @@ async fn main() -> eyre::Result<()> { let leak_status = match &opt.method { LeakMethod::Traceroute(opt) => leak_checker::traceroute::run_leak_test(opt).await, + #[cfg(feature = "am-i-mullvad")] LeakMethod::AmIMullvad(opt) => leak_checker::am_i_mullvad::run_leak_test(opt).await, }; diff --git a/leak-checker/notes.md b/leak-checker/notes.md deleted file mode 100644 index 237bc2f12ba5..000000000000 --- a/leak-checker/notes.md +++ /dev/null @@ -1,16 +0,0 @@ -# Apple notes - -The first packet is always dropped when a connection is routed and NATed - - -The NAT rules do not match up with the firewall rules in regards to the relay - - -``` -# NAT-rule -no nat inet from any to 185.213.154.68 - -# FW-rule -pass out quick inet proto udp from any to 185.213.154.68 port = 49020 user = 0 keep state -``` - diff --git a/leak-checker/src/am_i_mullvad.rs b/leak-checker/src/am_i_mullvad.rs index f024e54ea744..06404ad1e317 100644 --- a/leak-checker/src/am_i_mullvad.rs +++ b/leak-checker/src/am_i_mullvad.rs @@ -1,6 +1,5 @@ -use eyre::{eyre, Context}; +use anyhow::{anyhow, Context}; use futures::TryFutureExt; -use match_cfg::match_cfg; use reqwest::{Client, ClientBuilder}; use serde::Deserialize; @@ -24,7 +23,7 @@ pub async fn run_leak_test(opt: &AmIMullvadOpt) -> LeakStatus { } /// Check if connected to Mullvad and print the result to stdout -pub async fn try_run_leak_test(opt: &AmIMullvadOpt) -> eyre::Result { +pub async fn try_run_leak_test(opt: &AmIMullvadOpt) -> anyhow::Result { #[derive(Debug, Deserialize)] struct Response { ip: String, @@ -37,14 +36,14 @@ pub async fn try_run_leak_test(opt: &AmIMullvadOpt) -> eyre::Result client = bind_client_to_interface(client, interface)?; } - let client = client.build().wrap_err("Failed to create HTTP client")?; + let client = client.build().context("Failed to create HTTP client")?; let response: Response = client .get(AM_I_MULLVAD_URL) //.timeout(Duration::from_secs(opt.timeout)) .send() .and_then(|r| r.json()) .await - .wrap_err_with(|| eyre!("Failed to GET {AM_I_MULLVAD_URL}"))?; + .with_context(|| anyhow!("Failed to GET {AM_I_MULLVAD_URL}"))?; if let Some(server) = &response.mullvad_exit_ip_hostname { log::debug!( @@ -59,32 +58,32 @@ pub async fn try_run_leak_test(opt: &AmIMullvadOpt) -> eyre::Result response.ip ); Ok(LeakStatus::LeakDetected(LeakInfo::AmIMullvad { - ip: response.ip.parse().wrap_err("Malformed IP")?, + ip: response.ip.parse().context("Malformed IP")?, })) } } -match_cfg! { - #[cfg(target_os = "linux")] => { - fn bind_client_to_interface( - builder: ClientBuilder, - interface: &str - ) -> eyre::Result { - log::debug!("Binding HTTP client to {interface}"); - Ok(builder.interface(interface)) - } - } - #[cfg(any(target_os = "macos", target_os = "windows", target_os = "android"))] => { - fn bind_client_to_interface( - builder: ClientBuilder, - interface: &str - ) -> eyre::Result { - use crate::util::get_interface_ip; +#[cfg(target_os = "linux")] +fn bind_client_to_interface( + builder: ClientBuilder, + interface: &str, +) -> anyhow::Result { + log::debug!("Binding HTTP client to {interface}"); + Ok(builder.interface(interface)) +} - let ip = get_interface_ip(interface)?; +#[cfg(any(target_os = "macos", target_os = "windows", target_os = "android"))] +fn bind_client_to_interface( + builder: ClientBuilder, + interface: &str, +) -> anyhow::Result { + use crate::util::{get_interface_ip, Ip}; + use crate::Interface; - log::debug!("Binding HTTP client to {ip} ({interface})"); - Ok(builder.local_address(ip)) - } - } + let interface = Interface::Name(interface.to_string()); + let ip = get_interface_ip(&interface, Ip::v6()) + .or_else(|_| get_interface_ip(&interface, Ip::v4()))?; + + log::debug!("Binding HTTP client to {ip} ({interface:?})"); + Ok(builder.local_address(ip)) } diff --git a/leak-checker/src/lib.rs b/leak-checker/src/lib.rs index 1927385bc175..eb802115186a 100644 --- a/leak-checker/src/lib.rs +++ b/leak-checker/src/lib.rs @@ -1,5 +1,6 @@ -use std::net::IpAddr; +use std::{fmt, net::IpAddr}; +#[cfg(feature = "am-i-mullvad")] pub mod am_i_mullvad; pub mod traceroute; mod util; @@ -16,9 +17,42 @@ pub enum LeakInfo { /// Managed to reach another network node on the physical interface, bypassing firewall rules. NodeReachableOnInterface { reachable_nodes: Vec, - interface: String, + interface: Interface, }, /// Queried a , and was not mullvad. + #[cfg(feature = "am-i-mullvad")] AmIMullvad { ip: IpAddr }, } + +#[derive(Clone)] +pub enum Interface { + Name(String), + + #[cfg(target_os = "windows")] + Luid(windows_sys::Win32::NetworkManagement::Ndis::NET_LUID_LH), + + #[cfg(target_os = "macos")] + Index(std::num::NonZeroU32), +} + +impl From for Interface { + fn from(name: String) -> Self { + Interface::Name(name) + } +} + +impl fmt::Debug for Interface { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Name(arg0) => f.debug_tuple("Name").field(arg0).finish(), + + // SAFETY: u64 is valid for all bit patterns, so reading the union as a u64 is safe. + #[cfg(target_os = "windows")] + Self::Luid(arg0) => f.debug_tuple("Luid").field(&unsafe { arg0.Value }).finish(), + + #[cfg(target_os = "macos")] + Self::Index(arg0) => f.debug_tuple("Luid").field(arg0).finish(), + } + } +} diff --git a/leak-checker/src/traceroute.rs b/leak-checker/src/traceroute.rs index 59b1e0fc3fd9..22a861a05153 100644 --- a/leak-checker/src/traceroute.rs +++ b/leak-checker/src/traceroute.rs @@ -1,62 +1,41 @@ -use std::{ - ascii::escape_default, - io, - net::{IpAddr, Ipv4Addr}, - ops::{Range, RangeFrom}, - os::fd::{FromRawFd, IntoRawFd}, - time::Duration, -}; +use std::{net::IpAddr, ops::Range, time::Duration}; -use eyre::{bail, ensure, eyre, OptionExt, WrapErr}; -use futures::{future::pending, stream, StreamExt, TryFutureExt, TryStreamExt}; -use match_cfg::match_cfg; -use pnet_packet::{ - icmp::{ - echo_request::EchoRequestPacket, time_exceeded::TimeExceededPacket, IcmpPacket, IcmpTypes, - }, - ip::IpNextHeaderProtocols as IpProtocol, - ipv4::Ipv4Packet, - udp::UdpPacket, - Packet, -}; -use socket2::{Domain, Protocol, Socket, Type}; -use tokio::{ - net::UdpSocket, - select, - time::{sleep, sleep_until, timeout, Instant}, -}; +use crate::{Interface, LeakStatus}; -use crate::{LeakInfo, LeakStatus}; +/// Traceroute implementation for windows. +#[cfg(target_os = "windows")] +mod windows; + +/// Traceroute implementation for unix. +#[cfg(unix)] +mod unix; #[derive(Clone, clap::Args)] pub struct TracerouteOpt { /// Try to bind to a specific interface #[clap(short, long)] - pub interface: String, + pub interface: Interface, /// Destination IP of the probe packets #[clap(short, long)] - pub destination: Ipv4Addr, + pub destination: IpAddr, - /// Avoid sending probe packets to this port - #[clap(long)] + /// Avoid sending UDP probe packets to this port. + #[clap(long, conflicts_with = "icmp")] + #[cfg(unix)] pub exclude_port: Option, - /// Send probe packets only to this port, instead of the default ports. - #[clap(long)] + /// Send UDP probe packets only to this port, instead of the default ports. + #[clap(long, conflicts_with = "icmp")] + #[cfg(unix)] pub port: Option, /// Use ICMP-Echo for the probe packets instead of UDP. #[clap(long)] + #[cfg(unix)] pub icmp: bool, } -/// Type of the UDP payload of the probe packets -type ProbePayload = [u8; 32]; - -/// Value of the UDP payload of the probe packets -const PROBE_PAYLOAD: ProbePayload = *b"ABCDEFGHIJKLMNOPQRSTUVWXYZ123456"; - /// Timeout of the leak test as a whole. Should be more than [SEND_TIMEOUT] + [RECV_TIMEOUT]. const LEAK_TIMEOUT: Duration = Duration::from_secs(5); @@ -64,10 +43,11 @@ const LEAK_TIMEOUT: Duration = Duration::from_secs(5); const SEND_TIMEOUT: Duration = Duration::from_secs(1); /// Timeout of receiving additional probe packets after the first one -const RECV_TIMEOUT: Duration = Duration::from_secs(1); +#[cfg(not(target_os = "windows"))] +const RECV_GRACE_TIME: Duration = Duration::from_millis(220); -/// Default range of ports for the probe packets. Stolen from `traceroute`. -const DEFAULT_PORT_RANGE: RangeFrom = 33434..; +/// Time in-between send of each probe packet. +const PROBE_INTERVAL: Duration = Duration::from_millis(100); /// Range of TTL values for the probe packets. const DEFAULT_TTL_RANGE: Range = 1..6; @@ -95,649 +75,19 @@ pub async fn run_leak_test(opt: &TracerouteOpt) -> LeakStatus { /// /// This test needs a raw socket to be able to listen for the ICMP responses, therefore it requires /// root/admin priviliges. -pub async fn try_run_leak_test(opt: &TracerouteOpt) -> eyre::Result { - // create the socket used for receiving the ICMP/TimeExceeded responses - let icmp_socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::ICMPV4)) - .wrap_err("Failed to open ICMP socket")?; - - icmp_socket - .set_nonblocking(true) - .wrap_err("Failed to set icmp_socket to nonblocking")?; - - #[cfg(any(target_os = "linux", target_os = "android"))] - { - use std::ffi::c_void; - use std::os::fd::{AsFd, AsRawFd}; - - let n = 1; - unsafe { - setsockopt( - icmp_socket.as_fd().as_raw_fd(), - nix::libc::SOL_IP, - nix::libc::IP_RECVERR, - &n as *const _ as *const std::ffi::c_void, - size_of_val(&n) as u32, - ) - }; - } - - bind_socket_to_interface(&icmp_socket, &opt.interface)?; - - // HACK: Wrap the socket in a tokio::net::UdpSocket to be able to use it async - // SAFETY: `into_raw_fd()` consumes the socket and returns an owned & open file descriptor. - let icmp_socket = unsafe { std::net::UdpSocket::from_raw_fd(icmp_socket.into_raw_fd()) }; - let mut icmp_socket = UdpSocket::from_std(icmp_socket)?; - - // on Windows, we need to do some additional configuration of the raw socket - #[cfg(target_os = "windows")] - configure_listen_socket(&icmp_socket, interface)?; - - if opt.icmp { - timeout(SEND_TIMEOUT, send_icmp_probes(&mut icmp_socket, opt)) - .map_err(|_timeout| eyre!("Timed out while trying to send probe packet")) - .await??; - } else { - // create the socket used for sending the UDP probing packets - let udp_socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - .wrap_err("Failed to open UDP socket")?; - bind_socket_to_interface(&udp_socket, &opt.interface) - .wrap_err("Failed to bind UDP socket to interface")?; - udp_socket - .set_nonblocking(true) - .wrap_err("Failed to set udp_socket to nonblocking")?; - - // HACK: Wrap the socket in a tokio::net::UdpSocket to be able to use it async - // SAFETY: `into_raw_fd()` consumes the socket and returns an owned & open file descriptor. - let udp_socket = unsafe { std::net::UdpSocket::from_raw_fd(udp_socket.into_raw_fd()) }; - let mut udp_socket = UdpSocket::from_std(udp_socket)?; - - timeout(SEND_TIMEOUT, send_udp_probes(&mut udp_socket, opt)) - .map_err(|_timeout| eyre!("Timed out while trying to send probe packet")) - .await??; - } - - //let recv_task = read_probe_responses(&opt.interface, icmp_socket); - let recv_task = read_probe_responses(&opt.interface, icmp_socket); - - // wait until either task exits, or the timeout is reached - let leak_status = select! { - _ = sleep(LEAK_TIMEOUT) => LeakStatus::NoLeak, - result = recv_task => result?, +pub async fn try_run_leak_test(opt: &TracerouteOpt) -> anyhow::Result { + #[cfg(unix)] + return { + #[cfg(target_os = "android")] + type Impl = unix::android::TracerouteAndroid; + #[cfg(target_os = "linux")] + type Impl = unix::linux::TracerouteLinux; + #[cfg(target_os = "macos")] + type Impl = unix::macos::TracerouteMacos; + + unix::try_run_leak_test::(opt).await }; - // let send_task = timeout(SEND_TIMEOUT, send_icmp_probes(&mut udp_socket, opt)) - // .map_err(|_timeout| eyre!("Timed out while trying to send probe packet")) - // // never return on success - // .and_then(|_| pending()); - // - // let recv_task = read_probe_responses(&opt.interface, icmp_socket); - // - // wait until either thread exits, or the timeout is reached - // let leak_status = select! { - // _ = sleep(LEAK_TIMEOUT) => LeakStatus::NoLeak, - // result = recv_task => result?, - // result = send_task => result?, - // }; - - Ok(leak_status) -} - -async fn send_icmp_probes(socket: &mut UdpSocket, opt: &TracerouteOpt) -> eyre::Result<()> { - use pnet_packet::icmp::{echo_request::*, *}; - - let ports = DEFAULT_PORT_RANGE - // ensure we don't send anything to `opt.exclude_port` - .filter(|&p| Some(p) != opt.exclude_port) - // `opt.port` overrides the default port range - .map(|port| opt.port.unwrap_or(port)); - - for (port, ttl) in ports.zip(DEFAULT_TTL_RANGE) { - log::debug!("sending probe packet (ttl={ttl})"); - - socket - .set_ttl(ttl.into()) - .wrap_err("Failed to set TTL on socket")?; - - // the first packet will sometimes get dropped on MacOS, thus we send two packets - let number_of_sends = if cfg!(target_os = "macos") { 2 } else { 1 }; - - let echo = EchoRequest { - icmp_type: IcmpTypes::EchoRequest, - icmp_code: IcmpCode(0), - checksum: 0, - identifier: 1, - sequence_number: 1, - payload: PROBE_PAYLOAD.to_vec(), - }; - let mut packet = - MutableEchoRequestPacket::owned(vec![0u8; 8 + PROBE_PAYLOAD.len()]).unwrap(); - packet.populate(&echo); - packet.set_checksum(checksum(&IcmpPacket::new(packet.packet()).unwrap())); - - let result: io::Result<()> = stream::iter(0..number_of_sends) - // call `send_to` `number_of_sends` times - .then(|_| socket.send_to(&packet.packet(), (opt.destination, port))) - .map_ok(drop) - .try_collect() // abort on the first error - .await; - - let Err(e) = result else { continue }; - match e.kind() { - io::ErrorKind::PermissionDenied => { - // Linux returns this error if our packet was rejected by nftables. - log::debug!("send_to failed with 'permission denied'"); - } - _ => return Err(e).wrap_err("Failed to send packet")?, - } - } - - Ok(()) -} - -async fn send_udp_probes(socket: &mut UdpSocket, opt: &TracerouteOpt) -> eyre::Result<()> { - // ensure we don't send anything to `opt.exclude_port` - let ports = DEFAULT_PORT_RANGE - // skip the excluded port - .filter(|&p| Some(p) != opt.exclude_port) - // `opt.port` overrides the default port range - .map(|port| opt.port.unwrap_or(port)); - - for (port, ttl) in ports.zip(DEFAULT_TTL_RANGE) { - log::debug!("sending probe packet (ttl={ttl})"); - - socket - .set_ttl(ttl.into()) - .wrap_err("Failed to set TTL on socket")?; - - // the first packet will sometimes get dropped on MacOS, thus we send two packets - let number_of_sends = if cfg!(target_os = "macos") { 2 } else { 1 }; - - let result: io::Result<()> = stream::iter(0..number_of_sends) - // call `send_to` `number_of_sends` times - .then(|_| socket.send_to(&PROBE_PAYLOAD, (opt.destination, port))) - .map_ok(drop) - .try_collect() // abort on the first error - .await; - - let Err(e) = result else { continue }; - match e.kind() { - io::ErrorKind::PermissionDenied => { - // Linux returns this error if our packet was rejected by nftables. - log::debug!("send_to failed with 'permission denied'"); - } - _ => return Err(e).wrap_err("Failed to send packet")?, - } - } - - Ok(()) -} - -/// Experimental PoC of a linux implementation that doesn't need root. -#[cfg(any(target_os = "linux", target_os = "android"))] -#[allow(dead_code)] -async fn read_probe_responses_no_root( - _interface: &str, - socket: UdpSocket, -) -> eyre::Result { - use nix::libc::{errno::Errno, libc::setsockopt, setsockopt, sock_extended_err}; - use std::ffi::c_void; - use std::mem::transmute; - use std::os::fd::AsRawFd; - - // the list of node IP addresses from which we received a response to our probe packets. - let mut reachable_nodes = vec![]; - - let mut read_buf = vec![0u8; usize::from(u16::MAX)].into_boxed_slice(); - loop { - log::debug!("Reading from ICMP socket"); - - // XXX: only works for ipv4 - let mut msg_name: nix::libc::sockaddr_in = unsafe { std::mem::zeroed() }; - let mut msg_iov = vec![nix::libc::iovec { - iov_base: read_buf.as_mut_ptr() as *mut _, - iov_len: read_buf.len(), - }]; - let mut msg_control = vec![0u8; 2048]; - - let mut msg_header = nix::libc::msghdr { - msg_name: &mut msg_name as *mut _ as *mut c_void, - msg_namelen: size_of_val(&msg_name) as u32, - msg_iov: msg_iov.as_mut_ptr() as *mut _, - msg_iovlen: msg_iov.len(), - msg_control: msg_control.as_mut_ptr() as *mut _, - msg_controllen: msg_control.len(), - msg_flags: 0, - }; - log::debug!("header: {msg_header:?}"); - - // Calling recvmsg with MSG_ERRQUEUE will prompt linux to tell us if we get any ICMP errorr - // replies to our Echos. - let flags = nix::libc::MSG_ERRQUEUE; - let n = loop { - match unsafe { nix::libc::recvmsg(socket.as_raw_fd(), &mut msg_header, flags) } { - ..0 => match nix::errno::Errno::last() { - nix::errno::Errno::EWOULDBLOCK => { - sleep(Duration::from_millis(10)).await; - continue; - } - e => bail!("Faileed to read from socket {e}"), - }, - n => break n as usize, - } - }; - - log::debug!("header after: {msg_header:?}"); - msg_iov.truncate(msg_header.msg_iovlen); - msg_control.truncate(msg_header.msg_controllen); - let _ = msg_header; - - log::debug!("msg_name: {msg_name:?}"); - log::debug!("msg_iov: {msg_iov:?}"); - log::debug!("msg_iov[0]: {:?}", &read_buf[..n]); - log::debug!("msg_control: {msg_control:?}"); - - let source = Ipv4Addr::from_bits(msg_name.sin_addr.s_addr); - //let source = source.ip(); - let (control_header, rest) = msg_control - .split_first_chunk::<{ size_of::() }>() - .ok_or_eyre("Foo")?; - let control_header: nix::libc::cmsghdr = unsafe { transmute(*control_header) }; - let _control_message_len = control_header - .cmsg_len - .saturating_sub(size_of::()); - - debug_assert_eq!(control_header.cmsg_level, nix::libc::IPPROTO_IP); - debug_assert_eq!(control_header.cmsg_type, nix::libc::IP_RECVERR); - - let (control_message, rest) = rest - .split_first_chunk::<{ size_of::() }>() - .ok_or_eyre("ASADAD")?; - //debug_assert_eq!(control_message_len, control_message.len()); - - let control_message: sock_extended_err = unsafe { transmute(*control_message) }; - - let result = parse_icmp_time_exceeded_raw(&rest) - .map_err(|e| eyre!("Ignoring packet (len={n}, ip.src={source}): {e}",)); - - log::debug!("{control_header:?}"); - log::debug!("{control_message:?}"); - log::debug!("rest: {rest:?}"); - log::debug!("{:?}", Errno::from_raw(control_message.ee_errno as i32)); - - let _original_icmp_echo = &read_buf[..n]; - - // contains the source address of the ICMP Time Exceeded packet - let _icmp_source/*: nix::libc::sockaddr */ = rest; - - match result { - Ok(..) => { - log::debug!("Got a probe response, we are leaking!"); - //timeout_at.get_or_insert_with(|| Instant::now() + RECV_TIMEOUT); - //let ip = IpAddr::from(ip); - let ip = IpAddr::from(Ipv4Addr::new(1, 3, 3, 7)); - if !reachable_nodes.contains(&ip) { - reachable_nodes.push(ip); - } - } - - // an error means the packet wasn't the ICMP/TimeExceeded we're listening for. - Err(e) => log::debug!("{e}"), - } - } -} - -async fn read_probe_responses(interface: &str, socket: UdpSocket) -> eyre::Result { - // the list of node IP addresses from which we received a response to our probe packets. - let mut reachable_nodes = vec![]; - - // a time at which this function should exit. this is set when we receive the first probe - // response, and allows us to wait a while to collect any additional probe responses before - // returning. - let mut timeout_at = None; - - let mut read_buf = vec![0u8; usize::from(u16::MAX)].into_boxed_slice(); - loop { - let timer = async { - match timeout_at { - // resolve future at the timeout, if it's set - Some(time) => sleep_until(time).await, - - // otherwise, never resolve - None => pending().await, - } - }; - - log::debug!("Reading from ICMP socket"); - - // let n = socket - // .recv(unsafe { &mut *(&mut read_buf[..] as *mut [u8] as *mut [MaybeUninit]) }) - // .wrap_err("Failed to read from raw socket")?; - - let (n, source) = select! { - result = socket.recv_from(&mut read_buf[..]) => result - .wrap_err("Failed to read from raw socket")?, - - _timeout = timer => { - return Ok(LeakStatus::LeakDetected(LeakInfo::NodeReachableOnInterface { - reachable_nodes, - interface: interface.to_string(), - })); - } - }; - - let source = source.ip(); - let packet = &read_buf[..n]; - let result = parse_ipv4(packet) - .map_err(|e| eyre!("Ignoring packet: (len={n}, ip.src={source}) {e} ({packet:02x?})")) - .and_then(|ip_packet| { - parse_icmp_time_exceeded(&ip_packet).map_err(|e| { - eyre!( - "Ignoring packet (len={n}, ip.src={source}, ip.dest={}): {e}", - ip_packet.get_destination(), - ) - }) - }); - - match result { - Ok(ip) => { - log::debug!("Got a probe response, we are leaking!"); - timeout_at.get_or_insert_with(|| Instant::now() + RECV_TIMEOUT); - let ip = IpAddr::from(ip); - if !reachable_nodes.contains(&ip) { - reachable_nodes.push(ip); - } - } - - // an error means the packet wasn't the ICMP/TimeExceeded we're listening for. - Err(e) => log::debug!("{e}"), - } - } -} - -/// Configure the raw socket we use for listening to ICMP responses. -/// -/// This will bind the socket to an interface, and set the `SIO_RCVALL`-option. -#[cfg(target_os = "windows")] -fn configure_listen_socket(socket: &Socket, interface: &str) -> eyre::Result<()> { - use std::{ffi::c_void, os::windows::io::AsRawSocket, ptr::null_mut}; - use windows_sys::Win32::Networking::WinSock::{ - WSAGetLastError, WSAIoctl, SIO_RCVALL, SOCKET, SOCKET_ERROR, - }; - - bind_socket_to_interface(&socket, interface) - .wrap_err("Failed to bind listen socket to interface")?; - - let j = 1; - let mut _in: u32 = 0; - let result = unsafe { - WSAIoctl( - socket.as_raw_socket() as SOCKET, - SIO_RCVALL, - &j as *const _ as *const c_void, - size_of_val(&j) as u32, - null_mut(), - 0, - &mut _in as *mut u32, - null_mut(), - None, - ) - }; - - if result == SOCKET_ERROR { - let code = unsafe { WSAGetLastError() }; - bail!("Failed to call WSAIoctl(listen_socket, SIO_RCVALL, ...), code = {code}"); - } - - Ok(()) -} - -/// Try to parse the bytes as an IPv4 packet. -/// -/// This only valdiates the IPv4 header, not the payload. -fn parse_ipv4(packet: &[u8]) -> eyre::Result> { - let ip_packet = Ipv4Packet::new(packet).ok_or_eyre("Too small")?; - ensure!(ip_packet.get_version() == 4, "Not IPv4"); - eyre::Ok(ip_packet) -} - -/// Try to parse an [Ipv4Packet] as an ICMP/TimeExceeded response to a packet sent by -/// [send_probes]. If successful, returns the [Ipv4Addr] of the packet source. -/// -/// If the packet fails to parse, or is not a reply to a packet sent by [send_probes], this -/// function returns an error. -fn parse_icmp_time_exceeded(ip_packet: &Ipv4Packet<'_>) -> eyre::Result { - let ip_protocol = ip_packet.get_next_level_protocol(); - ensure!(ip_protocol == IpProtocol::Icmp, "Not ICMP"); - parse_icmp_time_exceeded_raw(ip_packet.payload())?; - Ok(ip_packet.get_source()) -} - -fn parse_icmp_time_exceeded_raw(bytes: &[u8]) -> eyre::Result<()> { - let icmp_packet = IcmpPacket::new(bytes).ok_or(eyre!("Too small"))?; - let too_small = || eyre!("Too small"); - - let correct_type = icmp_packet.get_icmp_type() == IcmpTypes::TimeExceeded; - ensure!(correct_type, "Not ICMP/TimeExceeded"); - - let time_exceeeded = TimeExceededPacket::new(icmp_packet.packet()).ok_or_else(too_small)?; - - let original_ip_packet = Ipv4Packet::new(time_exceeeded.payload()).ok_or_else(too_small)?; - let original_ip_protocol = original_ip_packet.get_next_level_protocol(); - ensure!(original_ip_packet.get_version() == 4, "Not IPv4"); - - match original_ip_protocol { - IpProtocol::Udp => { - let original_udp_packet = - UdpPacket::new(original_ip_packet.payload()).ok_or_else(too_small)?; - - // check if payload looks right - // some network nodes will strip the payload, that's fine. - if !original_udp_packet.payload().is_empty() { - let udp_len = usize::from(original_udp_packet.get_length()); - let udp_payload = udp_len - .checked_sub(UdpPacket::minimum_packet_size()) - .and_then(|len| original_udp_packet.payload().get(..len)) - .ok_or_eyre("Invalid UDP length")?; - if udp_payload != &PROBE_PAYLOAD { - let udp_payload: String = udp_payload - .iter() - .copied() - .flat_map(escape_default) - .map(char::from) - .collect(); - bail!("Wrong UDP payload: {udp_payload:?}"); - } - } - - Ok(()) - } - - IpProtocol::Icmp => { - let original_icmp_packet = - EchoRequestPacket::new(original_ip_packet.payload()).ok_or_else(too_small)?; - - ensure!( - original_icmp_packet.get_icmp_type() == IcmpTypes::EchoRequest, - "Not ICMP/EchoRequest" - ); - - // check if payload looks right - // some network nodes will strip the payload, that's fine. - let echo_payload = original_icmp_packet.payload(); - if !echo_payload.is_empty() && !echo_payload.starts_with(&PROBE_PAYLOAD) { - let echo_payload: String = echo_payload - .iter() - .copied() - .flat_map(escape_default) - .map(char::from) - .collect(); - bail!("Wrong ICMP/Echo payload: {echo_payload:?}"); - } - - Ok(()) - } - - _ => bail!("Not UDP/ICMP"), - } -} - -match_cfg! { - #[cfg(any(target_os = "windows", target_os = "android"))] => { - fn bind_socket_to_interface(socket: &Socket, interface: &str) -> eyre::Result<()> { - use crate::util::get_interface_ip; - use std::net::SocketAddr; - - let interface_ip = get_interface_ip(interface)?; - - log::info!("Binding socket to {interface_ip} ({interface:?})"); - - socket.bind(&SocketAddr::new(interface_ip, 0).into()) - .wrap_err("Failed to bind socket to interface address")?; - - return Ok(()); - } - } - #[cfg(target_os = "linux")] => { - fn bind_socket_to_interface(socket: &Socket, interface: &str) -> eyre::Result<()> { - log::info!("Binding socket to {interface:?}"); - - socket - .bind_device(Some(interface.as_bytes())) - .wrap_err("Failed to bind socket to interface")?; - - Ok(()) - } - } - #[cfg(target_os = "macos")] => { - fn bind_socket_to_interface(socket: &Socket, interface: &str) -> eyre::Result<()> { - use nix::net::if_::if_nametoindex; - use std::num::NonZero; - - log::info!("Binding socket to {interface:?}"); - - let interface_index = if_nametoindex(interface) - .map_err(eyre::Report::from) - .and_then(|code| NonZero::new(code).ok_or_eyre("Non-zero error code")) - .wrap_err("Failed to get interface index")?; - - socket.bind_device_by_index_v4(Some(interface_index))?; - Ok(()) - } - } + #[cfg(target_os = "windows")] + return windows::traceroute_using_ping(opt).await; } - -// OLD ICMP SEND CODE -// -// use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; -// let interface_luid = luid_from_alias(INTERFACE)?; -// let IpAddr::V4(interface_ip) = -// get_ip_address_for_interface(AddressFamily::Ipv4, interface_luid)? -// .ok_or(eyre!("No IP for interface {INTERFACE:?}"))? -// else { -// panic!() -// }; -// -// for ttl in 1..=5 { -// let mut packet = Packet { -// ip: Ipv4Header { -// version_and_ihl: 0x45, -// dscp_and_ecn: 0, // should be fine -// total_length: (size_of::() as u16).to_be_bytes(), -// _stuff: Default::default(), // should be fine -// ttl, -// protocol: 1, // icmp -// header_checksum: Default::default(), -// source_address: interface_ip.octets(), -// destination_address: destination.octets(), -// }, -// icmp: Icmpv4Header { -// icmp_type: 8, // echo -// code: 0, -// checksum: Default::default(), -// }, -// }; -// let icmp = Icmpv4Header { -// icmp_type: 8, // echo -// code: 0, -// checksum: Default::default(), -// }; -// -// packet.ip.header_checksum = checksum(packet.ip.as_bytes()); -// let mut packet = Icmpv4Packet { -// header: icmp, -// payload: Icmpv4EchoPayload { -// identifier: 0u16.to_be_bytes(), -// sequence_number: (ttl as u16).to_be_bytes(), -// data: [0x77; 32], -// }, -// }; -// -// packet.header.checksum = checksum(packet.as_bytes()); -// -// let packet = packet; -// -// listen_socket.set_ttl(ttl).wrap_err("Failed to set TTL")?; -// listen_socket -// .send_to( -// packet.as_bytes(), -// &SocketAddrV4::new(destination, 0u16).into(), -// ) -// .wrap_err("Failed to send on raw socket")?; -// } - -// use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; -// let interface_luid = luid_from_alias(INTERFACE)?; -// let IpAddr::V4(interface_ip) = -// get_ip_address_for_interface(AddressFamily::Ipv4, interface_luid)? -// .ok_or(eyre!("No IP for interface {INTERFACE:?}"))? -// else { -// panic!() -// }; -// -// for ttl in 1..=5 { -// let mut packet = Packet { -// ip: Ipv4Header { -// version_and_ihl: 0x45, -// dscp_and_ecn: 0, // should be fine -// total_length: (size_of::() as u16).to_be_bytes(), -// _stuff: Default::default(), // should be fine -// ttl, -// protocol: 1, // icmp -// header_checksum: Default::default(), -// source_address: interface_ip.octets(), -// destination_address: destination.octets(), -// }, -// icmp: Icmpv4Header { -// icmp_type: 8, // echo -// code: 0, -// checksum: Default::default(), -// }, -// }; -// let icmp = Icmpv4Header { -// icmp_type: 8, // echo -// code: 0, -// checksum: Default::default(), -// }; -// -// packet.ip.header_checksum = checksum(packet.ip.as_bytes()); -// let mut packet = Icmpv4Packet { -// header: icmp, -// payload: Icmpv4EchoPayload { -// identifier: 0u16.to_be_bytes(), -// sequence_number: (ttl as u16).to_be_bytes(), -// data: [0x77; 32], -// }, -// }; -// -// packet.header.checksum = checksum(packet.as_bytes()); -// -// let packet = packet; -// -// listen_socket.set_ttl(ttl).wrap_err("Failed to set TTL")?; -// listen_socket -// .send_to( -// packet.as_bytes(), -// &SocketAddrV4::new(destination, 0u16).into(), -// ) -// .wrap_err("Failed to send on raw socket")?; -// } diff --git a/leak-checker/src/traceroute/unix/android.rs b/leak-checker/src/traceroute/unix/android.rs new file mode 100644 index 000000000000..4d3e979b44c4 --- /dev/null +++ b/leak-checker/src/traceroute/unix/android.rs @@ -0,0 +1,21 @@ +use socket2::Socket; + +use crate::{util::Ip, Interface}; + +use super::{common::bind_socket_to_interface, linux, Traceroute}; + +pub struct TracerouteAndroid; + +impl Traceroute for TracerouteAndroid { + type AsyncIcmpSocket = linux::AsyncIcmpSocketImpl; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()> { + // We do not have permission to bind directly to an interface on Android, + // unlike desktop Linux. Therefore we bind to the interface IP instead. + bind_socket_to_interface(socket, interface, ip_version) + } +} diff --git a/leak-checker/src/traceroute/unix/common.rs b/leak-checker/src/traceroute/unix/common.rs new file mode 100644 index 000000000000..1efcdba7b405 --- /dev/null +++ b/leak-checker/src/traceroute/unix/common.rs @@ -0,0 +1,25 @@ +#![allow(dead_code)] // some code here is not used on some targets. + +use std::net::SocketAddr; + +use anyhow::Context; +use socket2::Socket; + +use crate::util::{get_interface_ip, Ip}; +use crate::Interface; + +pub(crate) fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, +) -> anyhow::Result<()> { + let interface_ip = get_interface_ip(interface, ip_version)?; + + log::info!("Binding socket to {interface_ip} ({interface:?})"); + + socket + .bind(&SocketAddr::new(interface_ip, 0).into()) + .context("Failed to bind socket to interface address")?; + + Ok(()) +} diff --git a/leak-checker/src/traceroute/unix/linux.rs b/leak-checker/src/traceroute/unix/linux.rs new file mode 100644 index 000000000000..3f5dcb5bfb4c --- /dev/null +++ b/leak-checker/src/traceroute/unix/linux.rs @@ -0,0 +1,352 @@ +use std::{ + ffi::c_int, + io::{self, IoSliceMut}, + net::IpAddr, + os::fd::{AsRawFd, RawFd}, + time::Duration, +}; + +use anyhow::{anyhow, Context}; +use nix::{ + cmsg_space, + errno::Errno, + libc, + sys::socket::{ + recvmsg, setsockopt, + sockopt::{Ipv4RecvErr, Ipv4Ttl, Ipv6RecvErr, Ipv6Ttl}, + ControlMessageOwned, MsgFlags, SockaddrIn, SockaddrIn6, SockaddrLike, + }, +}; +use pnet_packet::{ + icmp::{time_exceeded::IcmpCodes, IcmpCode, IcmpType, IcmpTypes}, + icmpv6::{Icmpv6Code, Icmpv6Type, Icmpv6Types}, +}; +use socket2::Socket; +use tokio::time::{sleep, Instant}; + +use crate::{ + traceroute::{unix::parse_icmp_probe, TracerouteOpt, RECV_GRACE_TIME}, + util::Ip, + Interface, LeakInfo, LeakStatus, +}; + +use super::{AsyncIcmpSocket, Traceroute}; + +pub struct TracerouteLinux; + +pub struct AsyncIcmpSocketImpl { + ip_version: Ip, + inner: tokio::net::UdpSocket, +} + +impl Traceroute for TracerouteLinux { + type AsyncIcmpSocket = AsyncIcmpSocketImpl; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + _: Ip, + ) -> anyhow::Result<()> { + bind_socket_to_interface(socket, interface) + } +} + +impl AsyncIcmpSocket for AsyncIcmpSocketImpl { + fn from_socket2(socket: Socket, ip_version: Ip) -> anyhow::Result { + // IP_RECVERR tells Linux to pass any error packets received over ICMP to us through `recvmsg` control messages. + match ip_version { + Ip::V4(_) => { + setsockopt(&socket, Ipv4RecvErr, &true).context("Failed to set IP_RECVERR")? + } + Ip::V6(_) => { + setsockopt(&socket, Ipv6RecvErr, &true).context("Failed to set IPV6_RECVERR")? + } + } + + let std_socket = std::net::UdpSocket::from(socket); + let tokio_socket = tokio::net::UdpSocket::from_std(std_socket).unwrap(); + Ok(AsyncIcmpSocketImpl { + ip_version, + inner: tokio_socket, + }) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + let ttl = ttl as c_int; + match self.ip_version { + Ip::V4(_) => setsockopt(&self.inner, Ipv4Ttl, &ttl), + Ip::V6(_) => setsockopt(&self.inner, Ipv6Ttl, &ttl), + } + .context("Failed to set TTL value for socket") + } + + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result { + self.inner.send_to(packet, (destination.into(), 0)).await + } + + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, IpAddr)> { + self.inner + .recv_from(buf) + .await + .map(|(n, source)| (n, source.ip())) + } + + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result { + recv_ttl_responses(opt.destination, &opt.interface, &self.inner).await + } +} + +fn bind_socket_to_interface(socket: &Socket, interface: &Interface) -> anyhow::Result<()> { + log::info!("Binding socket to {interface:?}"); + + let Interface::Name(interface) = interface; + + socket + .bind_device(Some(interface.as_bytes())) + .context("Failed to bind socket to interface")?; + + Ok(()) +} + +/// Try to read ICMP/TimeExceeded error packets from an ICMP socket. +/// +/// This method does not require root, but only works on Linux (including Android). +async fn recv_ttl_responses( + destination: IpAddr, + interface: &Interface, + socket: &impl AsRawFd, +) -> anyhow::Result { + // the list of node IP addresses from which we received a response to our probe packets. + let mut reachable_nodes = vec![]; + + // A time at which this function should exit. This is set when we receive the first probe + // response, and allows us to wait a while to collect any additional probe responses before + // returning. + let mut timeout_at = None; + + // Allocate buffer for receiving packets. + let mut recv_buf = vec![0u8; usize::from(u16::MAX)].into_boxed_slice(); + let mut io_vec = [IoSliceMut::new(&mut recv_buf)]; + + // Allocate space for EHOSTUNREACH errors caused by ICMP/TimeExceeded packets. + let mut control_buf = match destination { + // This is the size of ControlMessageOwned::Ipv4RecvErr(sock_extended_err, sockaddr_in). + IpAddr::V4(..) => cmsg_space!(libc::sock_extended_err, libc::sockaddr_in), + + // This is the size of ControlMessageOwned::Ipv6RecvErr(sock_extended_err, sockaddr_in6). + IpAddr::V6(..) => cmsg_space!(libc::sock_extended_err, libc::sockaddr_in6), + }; + + 'outer: loop { + log::debug!("Reading from ICMP socket"); + + // Call recvmsg in a loop + let recv_packet = loop { + if let Some(timeout_at) = timeout_at { + if Instant::now() >= timeout_at { + break 'outer; + } + } + + let recv_packet = match destination { + IpAddr::V4(..) => recvmsg_with_control_message::( + socket.as_raw_fd(), + &mut io_vec, + &mut control_buf, + )? + .map(|packet| packet.map_source_addr(|a| IpAddr::from(a.ip()))), + IpAddr::V6(..) => recvmsg_with_control_message::( + socket.as_raw_fd(), + &mut io_vec, + &mut control_buf, + )? + .map(|packet| packet.map_source_addr(|a| IpAddr::from(a.ip()))), + }; + + let Some(recv_packet) = recv_packet else { + // poor-mans async IO :'( + sleep(Duration::from_millis(10)).await; + continue; + }; + + break recv_packet; + }; + + let RecvPacket { + source_addr, + packet, + control_message, + } = recv_packet; + + macro_rules! skip_if { + ($skip_condition:expr, $note:expr) => {{ + if $skip_condition { + log::debug!("Ignoring received message: {}", $note); + continue 'outer; + } + }}; + } + + // NOTE: This should be the IP destination of our ping packets. That does NOT mean the + // packets reached the destination. Instead, if we see an EHOSTUNREACH control message, + // it means the packets was instead dropped along the way. Seeing this address helps us + // identify that this is a response to the ping we sent. + skip_if!(source_addr != destination, "Unknown source"); + + let error_source = match control_message { + ControlMessageOwned::Ipv4RecvErr(socket_error, source_addr) => { + let libc::sock_extended_err { + ee_errno, // Error Number: Should be EHOSTUNREACH + ee_origin, // Error Origin: 2 = Icmp + ee_type, // ICMP Type: 11 = ICMP/TimeExceeded. + ee_code, // ICMP Code. 0 = TTL exceeded in transit. + .. + } = socket_error; + + let errno = Errno::from_raw(ee_errno as i32); + skip_if!(errno != Errno::EHOSTUNREACH, "Unexpected errno"); + skip_if!( + ee_origin != nix::libc::SO_EE_ORIGIN_ICMP, + "Unexpected origin" + ); + + let icmp_type = IcmpType::new(ee_type); + skip_if!(icmp_type != IcmpTypes::TimeExceeded, "Unexpected ICMP type"); + + let icmp_code = IcmpCode::new(ee_code); + skip_if!( + icmp_code != IcmpCodes::TimeToLiveExceededInTransit, + "Unexpected ICMP code" + ); + + // NOTE: This is the IP of the node that dropped the packet due to TTL exceeded. + let error_source = SockaddrIn::from(source_addr.unwrap()); + log::debug!("addr: {error_source}"); + + // Ensure that this is the original Echo packet that we sent. + skip_if!( + parse_icmp_probe(Ip::V4(packet)).is_err(), + "Not a response to us" + ); + + IpAddr::from(error_source.ip()) + } + ControlMessageOwned::Ipv6RecvErr(socket_error, source_addr) => { + let libc::sock_extended_err { + ee_errno, // Error Number: Should be EHOSTUNREACH + ee_origin, // Error Origin: 3 = Icmp6. + ee_type, // ICMP Type: 3 = ICMP6/TimeExceeded + ee_code, // ICMP Code. 0 = TTL exceeded in transit. + .. + } = socket_error; + + let errno = Errno::from_raw(ee_errno as i32); + skip_if!(errno != Errno::EHOSTUNREACH, "Unexpected errno"); + skip_if!( + ee_origin != nix::libc::SO_EE_ORIGIN_ICMP6, + "Unexpected origin" + ); + + let icmp_type = Icmpv6Type::new(ee_type); + skip_if!( + icmp_type != Icmpv6Types::TimeExceeded, + "Unexpected ICMP type" + ); + + let icmp_code = Icmpv6Code::new(ee_code); + skip_if!(icmp_code != Icmpv6Code::new(0), "Unexpected ICMP code"); + + // NOTE: This is the IP of the node that dropped the packet due to TTL exceeded. + let error_source = SockaddrIn6::from(source_addr.unwrap()); + log::debug!("addr: {error_source}"); + + // Ensure that this is the original Echo packet that we sent. + skip_if!( + parse_icmp_probe(Ip::V6(packet)).is_err(), + "Not a response to us" + ); + + IpAddr::from(error_source.ip()) + } + other_message => { + log::debug!("Unhandled control message: {other_message:?}"); + continue 'outer; + } + }; + + log::debug!("Got a probe response, we are leaking!"); + timeout_at.get_or_insert_with(|| Instant::now() + RECV_GRACE_TIME); + reachable_nodes.push(error_source); + } + + debug_assert!(!reachable_nodes.is_empty()); + + Ok(LeakStatus::LeakDetected( + LeakInfo::NodeReachableOnInterface { + reachable_nodes, + interface: interface.clone(), + }, + )) +} + +struct RecvPacket<'a, S> { + pub source_addr: S, + pub packet: &'a [u8], + pub control_message: ControlMessageOwned, +} + +impl<'a, S> RecvPacket<'a, S> { + /// Convert the type of [RecvPacket::source_addr], e.g. from [SockaddrIn6] to [IpAddr]. + fn map_source_addr(self, f: impl FnOnce(S) -> T) -> RecvPacket<'a, T> { + RecvPacket { + source_addr: f(self.source_addr), + packet: self.packet, + control_message: self.control_message, + } + } +} + +/// Call recvmsg and expect exactly one control message. +/// +/// See [ControlMessageOwned] for details on control messages. +/// Returns `Ok(None)` on `EWOULDBLOCK`, or if recvmsg returns no control message. +fn recvmsg_with_control_message<'a, S: SockaddrLike + Copy>( + socket: RawFd, + io_vec: &'a mut [IoSliceMut<'_>; 1], + control_buf: &mut Vec, +) -> anyhow::Result>> { + // MSG_ERRQUEUE asks linux to tell us if we get any ICMP error replies to + // our Echo packets. + let flags = MsgFlags::MSG_ERRQUEUE; + + let result = recvmsg::(socket.as_raw_fd(), io_vec, Some(control_buf), flags); + + let recv = match result { + Ok(recv) => recv, + Err(Errno::EWOULDBLOCK) => return Ok(None), + Err(e) => return Err(anyhow!("Failed to read from socket: {e}")), + }; + + let source_addr = recv.address.unwrap(); + + let mut control_messages = recv + .cmsgs() + .context("Failed to decode cmsgs from recvmsg")?; + + let Some(control_message) = control_messages.next() else { + // We're looking for EHOSTUNREACH errors. No errors means skip. + log::debug!("Skipping recvmsg that produced no control messages."); + return Ok(None); + }; + + let Some(packet) = recv.iovs().next() else { + log::debug!("Skipping recvmsg that produced no data."); + return Ok(None); + }; + + Ok(Some(RecvPacket { + source_addr, + packet, + control_message, + })) +} diff --git a/leak-checker/src/traceroute/unix/macos.rs b/leak-checker/src/traceroute/unix/macos.rs new file mode 100644 index 000000000000..fcf0f942e7dd --- /dev/null +++ b/leak-checker/src/traceroute/unix/macos.rs @@ -0,0 +1,297 @@ +use std::{ascii::escape_default, ffi::c_int, future::pending, io, net::IpAddr, num::NonZero}; + +use anyhow::{anyhow, bail, ensure, Context}; +use nix::{ + net::if_::if_nametoindex, + sys::socket::{setsockopt, sockopt::Ipv6Ttl}, +}; +use pnet_packet::{ + icmp::{self, time_exceeded::TimeExceededPacket, IcmpPacket, IcmpTypes}, + icmpv6::{Icmpv6Packet, Icmpv6Types}, + ip::IpNextHeaderProtocols, + ipv4::Ipv4Packet, + ipv6::Ipv6Packet, + udp::UdpPacket, + Packet, +}; +use socket2::Socket; +use tokio::{ + select, + time::{sleep_until, Instant}, +}; + +use crate::{ + traceroute::{TracerouteOpt, RECV_GRACE_TIME}, + util::Ip, + Interface, LeakInfo, LeakStatus, +}; + +use super::{parse_icmp_probe, too_small, AsyncIcmpSocket, Traceroute, PROBE_PAYLOAD}; + +pub struct TracerouteMacos; + +pub struct AsyncIcmpSocketImpl { + ip_version: Ip, + inner: tokio::net::UdpSocket, +} + +impl Traceroute for TracerouteMacos { + type AsyncIcmpSocket = AsyncIcmpSocketImpl; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()> { + // can't use the same method as desktop-linux here beacuse reasons + bind_socket_to_interface(socket, interface, ip_version) + } +} + +impl AsyncIcmpSocket for AsyncIcmpSocketImpl { + fn from_socket2(socket: Socket, ip_version: Ip) -> anyhow::Result { + let std_socket = std::net::UdpSocket::from(socket); + let tokio_socket = tokio::net::UdpSocket::from_std(std_socket).unwrap(); + Ok(AsyncIcmpSocketImpl { + ip_version, + inner: tokio_socket, + }) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + match self.ip_version { + Ip::V6(_) => { + let ttl = ttl as c_int; + setsockopt(&self.inner, Ipv6Ttl, &ttl).context("Failed to set TTL value for socket") + } + Ip::V4(..) => self + .inner + .set_ttl(ttl) + .context("Failed to set TTL value for socket"), + } + } + + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result { + self.inner.send_to(packet, (destination.into(), 0)).await + } + + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, IpAddr)> { + self.inner + .recv_from(buf) + .await + .map(|(n, source)| (n, source.ip())) + } + + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result { + recv_ttl_responses(self, opt).await + } +} + +fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, +) -> anyhow::Result<()> { + log::info!("Binding socket to {interface:?}"); + + let interface_index = match interface { + &Interface::Index(index) => index, + Interface::Name(interface) => if_nametoindex(interface.as_str()) + .map_err(anyhow::Error::from) + .and_then(|code| NonZero::new(code).ok_or(anyhow!("Non-zero error code"))) + .context("Failed to get interface index")?, + }; + + match ip_version { + Ip::V4(..) => socket.bind_device_by_index_v4(Some(interface_index))?, + Ip::V6(..) => socket.bind_device_by_index_v6(Some(interface_index))?, + } + Ok(()) +} + +async fn recv_ttl_responses( + socket: &impl AsyncIcmpSocket, + opt: &TracerouteOpt, +) -> anyhow::Result { + let interface = &opt.interface; + + // the list of node IP addresses from which we received a response to our probe packets. + let mut reachable_nodes = vec![]; + + // A time at which this function should exit. This is set when we receive the first probe + // response, and allows us to wait a while to collect any additional probe responses before + // returning. + let mut timeout_at = None; + + let mut read_buf = vec![0u8; usize::from(u16::MAX)].into_boxed_slice(); + loop { + let timer = async { + match timeout_at { + // resolve future at the timeout, if it's set + Some(time) => sleep_until(time).await, + + // otherwise, never resolve + None => pending().await, + } + }; + + log::debug!("Reading from ICMP socket"); + + let (n, source) = select! { + result = socket.recv_from(&mut read_buf[..]) => result + .context("Failed to read from raw socket")?, + + _timeout = timer => { + return Ok(LeakStatus::LeakDetected(LeakInfo::NodeReachableOnInterface { + reachable_nodes, + interface: interface.clone(), + })); + } + }; + + let packet = &read_buf[..n]; + log::debug!("packet: {packet:02x?}"); + + let result = match opt.destination { + // Reading on an ICMPv6 raw socket returns ICMPv6 packets. + IpAddr::V6(..) => parse_icmp_time_exceeded_raw(Ip::V6(packet)).map(|_| source), + + // Reading on an ICMPv4 raw socket returns whole IP packets. + IpAddr::V4(..) => { + parse_ipv4(packet).and_then(|ip_packet| parse_icmp4_time_exceeded(&ip_packet)) + } + } + .map_err(|e| anyhow!("Ignoring packet (len={n}, ip.src={source}): {e}")); + + match result { + Ok(ip) => { + log::debug!("Got a probe response, we are leaking!"); + timeout_at.get_or_insert_with(|| Instant::now() + RECV_GRACE_TIME); + if !reachable_nodes.contains(&ip) { + reachable_nodes.push(ip); + } + } + + // an error means the packet wasn't the ICMP/TimeExceeded we're listening for. + Err(e) => log::debug!("{e}"), + } + } +} + +/// Try to parse the bytes as an IPv4 packet. +/// +/// This only valdiates the IP header, not the payload. +fn parse_ipv4(packet: &[u8]) -> anyhow::Result> { + let packet = Ipv4Packet::new(packet).ok_or_else(too_small)?; + let version = packet.get_version(); + if version != 4 { + bail!("Invalid IP version: {version}") + } + Ok(packet) +} + +/// Try to parse the bytes as an IPv4 or IPv6 packet. +/// +/// This only valdiates the IP header, not the payload. +fn parse_ip(packet: &[u8]) -> anyhow::Result, Ipv6Packet<'_>>> { + let ipv4_packet = Ipv4Packet::new(packet).ok_or_else(too_small)?; + + // ipv4-packets are smaller than ipv6, so we use an Ipv4Packet to check the version. + Ok(match ipv4_packet.get_version() { + 4 => Ip::V4(ipv4_packet), + 6 => { + let ipv6_packet = Ipv6Packet::new(packet).ok_or_else(too_small)?; + Ip::V6(ipv6_packet) + } + _ => bail!("Not a valid IP header"), + }) +} + +/// Try to parse an [Ipv4Packet] as an ICMP/TimeExceeded response to a packet sent by +/// [send_udp_probes] or [send_icmp_probes]. If successful, returns the [Ipv4Addr] of the packet +/// source. +/// +/// If the packet fails to parse, or is not a reply to a packet sent by us, this function returns +/// an error. +fn parse_icmp4_time_exceeded(ip_packet: &Ipv4Packet<'_>) -> anyhow::Result { + let ip_protocol = ip_packet.get_next_level_protocol(); + ensure!(ip_protocol == IpNextHeaderProtocols::Icmp, "Not ICMP"); + parse_icmp_time_exceeded_raw(Ip::V4(ip_packet.payload()))?; + Ok(ip_packet.get_source().into()) +} + +/// Try to parse some bytes into an ICMP or ICMP6 TimeExceeded response to a probe packet sent by +/// [send_udp_probes] or [send_icmp_probes]. +/// +/// If the packet fails to parse, or is not a reply to a packet sent by us, this function returns +/// an error. +fn parse_icmp_time_exceeded_raw(ip_payload: Ip<&[u8], &[u8]>) -> anyhow::Result<()> { + let icmpv4_packet; + let icmpv6_packet; + let icmp_packet: &[u8] = match ip_payload { + Ip::V4(ipv4_payload) => { + icmpv4_packet = IcmpPacket::new(ipv4_payload).ok_or(anyhow!("Too small"))?; + + let correct_type = icmpv4_packet.get_icmp_type() == IcmpTypes::TimeExceeded; + ensure!(correct_type, "Not ICMP/TimeExceeded"); + + icmpv4_packet.packet() + } + Ip::V6(ipv6_payload) => { + icmpv6_packet = Icmpv6Packet::new(ipv6_payload).ok_or(anyhow!("Too small"))?; + + let correct_type = icmpv6_packet.get_icmpv6_type() == Icmpv6Types::TimeExceeded; + ensure!(correct_type, "Not ICMP6/TimeExceeded"); + + icmpv6_packet.packet() + } + }; + + // TimeExceededPacket looks the same for both ICMP and ICMP6. + let time_exceeded = TimeExceededPacket::new(icmp_packet).ok_or_else(too_small)?; + ensure!( + time_exceeded.get_icmp_code() + == icmp::time_exceeded::IcmpCodes::TimeToLiveExceededInTransit, + "Not TTL Exceeded", + ); + + let original_ip_packet = parse_ip(time_exceeded.payload()).context("ICMP-wrapped IP packet")?; + + let (original_ip_protocol, original_ip_payload) = match &original_ip_packet { + Ip::V4(ipv4_packet) => (ipv4_packet.get_next_level_protocol(), ipv4_packet.payload()), + Ip::V6(ipv6_packet) => (ipv6_packet.get_next_header(), ipv6_packet.payload()), + }; + + match original_ip_protocol { + IpNextHeaderProtocols::Udp => { + let original_udp_packet = UdpPacket::new(original_ip_payload).ok_or_else(too_small)?; + + // check if payload looks right + // some network nodes will strip the payload, that's fine. + if !original_udp_packet.payload().is_empty() { + let udp_len = usize::from(original_udp_packet.get_length()); + let udp_payload = udp_len + .checked_sub(UdpPacket::minimum_packet_size()) + .and_then(|len| original_udp_packet.payload().get(..len)) + .ok_or(anyhow!("Invalid UDP length"))?; + if udp_payload != PROBE_PAYLOAD { + let udp_payload: String = udp_payload + .iter() + .copied() + .flat_map(escape_default) + .map(char::from) + .collect(); + bail!("Wrong UDP payload: {udp_payload:?}"); + } + } + + Ok(()) + } + + IpNextHeaderProtocols::Icmp => parse_icmp_probe(Ip::V4(original_ip_payload)), + + IpNextHeaderProtocols::Icmpv6 => parse_icmp_probe(Ip::V6(original_ip_payload)), + + _ => bail!("Not UDP/ICMP"), + } +} diff --git a/leak-checker/src/traceroute/unix/mod.rs b/leak-checker/src/traceroute/unix/mod.rs new file mode 100644 index 000000000000..ef95e57b600d --- /dev/null +++ b/leak-checker/src/traceroute/unix/mod.rs @@ -0,0 +1,350 @@ +use std::{ + ascii::escape_default, + convert::Infallible, + io, + net::{IpAddr, SocketAddr}, + ops::RangeFrom, + os::fd::{FromRawFd, IntoRawFd}, +}; + +use crate::{ + traceroute::{TracerouteOpt, DEFAULT_TTL_RANGE, LEAK_TIMEOUT, PROBE_INTERVAL, SEND_TIMEOUT}, + util::{get_interface_ip, Ip}, + Interface, LeakStatus, +}; + +use anyhow::{anyhow, bail, ensure, Context}; +use futures::{future::pending, select, stream, FutureExt, StreamExt, TryStreamExt}; +use pnet_packet::{ + icmp::{self, IcmpCode, IcmpTypes}, + icmpv6::{self, Icmpv6Code, Icmpv6Types}, + Packet, +}; +use socket2::{Domain, Protocol, Socket, Type}; +use tokio::time::{sleep, timeout}; + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod android; + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod linux; + +#[cfg(target_os = "macos")] +pub mod macos; + +pub mod common; + +/// Type of the UDP payload of the probe packets +type ProbePayload = [u8; 32]; + +/// Value of the UDP payload of the probe packets +const PROBE_PAYLOAD: ProbePayload = *b"ABCDEFGHIJKLMNOPQRSTUVWXYZ123456"; + +/// Default range of ports for the UDP probe packets. Stolen from `traceroute`. +const DEFAULT_PORT_RANGE: RangeFrom = 33434..; + +/// Private trait that let's us define the platform-specific implementations and types required for +/// tracerouting. +pub trait Traceroute { + type AsyncIcmpSocket: AsyncIcmpSocket; + + fn bind_socket_to_interface( + socket: &socket2::Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()>; +} + +pub trait AsyncIcmpSocket: Sized { + fn from_socket2(socket: socket2::Socket, ip_version: Ip) -> anyhow::Result; + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()>; + + /// Send an ICMP packet to the destination. + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result; + + /// Receive an ICMP packet. + #[cfg_attr(target_os = "linux", allow(dead_code))] + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, IpAddr)>; + + /// Try to read ICMP/TimeExceeded error packets to see if probe packets leaked. + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result; +} + +struct AsyncUdpSocket(tokio::net::UdpSocket); + +pub async fn try_run_leak_test( + opt: &TracerouteOpt, +) -> anyhow::Result { + // If we ever change this to support windows, this probably needs to be Type::DGRAM. + let icmp_socket_type = Type::RAW; + + let (ip_version, domain, icmp_protocol) = match opt.destination { + IpAddr::V4(..) => (Ip::v4(), Domain::IPV4, Protocol::ICMPV4), + IpAddr::V6(..) => (Ip::v6(), Domain::IPV6, Protocol::ICMPV6), + }; + + // create the socket used for receiving the ICMP/TimeExceeded responses + let icmp_socket = Socket::new(domain, icmp_socket_type, Some(icmp_protocol)) + .context("Failed to open ICMP socket")?; + + icmp_socket + .set_nonblocking(true) + .context("Failed to set icmp_socket to nonblocking")?; + + Impl::bind_socket_to_interface(&icmp_socket, &opt.interface, ip_version)?; + + let icmp_socket = Impl::AsyncIcmpSocket::from_socket2(icmp_socket, ip_version)?; + + let send_probes = async { + if opt.icmp { + send_icmp_probes::(opt, &icmp_socket).await?; + } else { + // create the socket used for sending the UDP probing packets + let udp_socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) + .context("Failed to open UDP socket")?; + + Impl::bind_socket_to_interface(&udp_socket, &opt.interface, ip_version) + .context("Failed to bind UDP socket to interface")?; + + udp_socket + .set_nonblocking(true) + .context("Failed to set udp_socket to nonblocking")?; + + let mut udp_socket = AsyncUdpSocket::from_socket2(udp_socket); + + send_udp_probes(opt, &mut udp_socket).await?; + } + + anyhow::Ok(()) + }; + + let send_probes = async { + timeout(SEND_TIMEOUT, send_probes) + .await + .map_err(|_timeout| anyhow!("Timed out while trying to send probe packet"))??; + Ok(pending::().await) + }; + + let recv_probe_responses = icmp_socket.recv_ttl_responses(opt); + + // wait until either future returns, or the timeout is reached + // friendship ended with tokio::select. now futures::select is my best friend! + let leak_status = select! { + result = recv_probe_responses.fuse() => result?, + Err(e) = send_probes.fuse() => return Err(e), + _ = sleep(LEAK_TIMEOUT).fuse() => LeakStatus::NoLeak, + }; + + Ok(leak_status) +} + +async fn send_icmp_probes( + opt: &TracerouteOpt, + socket: &impl AsyncIcmpSocket, +) -> anyhow::Result<()> { + for ttl in DEFAULT_TTL_RANGE { + log::debug!("sending probe packet (ttl={ttl})"); + + socket + .set_ttl(ttl.into()) + .context("Failed to set TTL on socket")?; + + // the first packet will sometimes get dropped on MacOS, thus we send two packets + let number_of_sends = if cfg!(target_os = "macos") { 2 } else { 1 }; + + // construct ICMP/ICMP6 echo request packet + let mut packet_v4; + let mut packet_v6; + let packet_bytes; + const ECHO_REQUEST_HEADER_LEN: usize = 8; + match opt.destination { + IpAddr::V4(..) => { + let echo = icmp::echo_request::EchoRequest { + icmp_type: IcmpTypes::EchoRequest, + icmp_code: IcmpCode(0), + checksum: 0, + identifier: 1, + sequence_number: 1, + payload: PROBE_PAYLOAD.to_vec(), + }; + + let len = ECHO_REQUEST_HEADER_LEN + PROBE_PAYLOAD.len(); + packet_v4 = + icmp::echo_request::MutableEchoRequestPacket::owned(vec![0u8; len]).unwrap(); + packet_v4.populate(&echo); + packet_v4.set_checksum(icmp::checksum( + &icmp::IcmpPacket::new(packet_v4.packet()).unwrap(), + )); + packet_bytes = packet_v4.packet(); + } + IpAddr::V6(destination) => { + let IpAddr::V6(source) = get_interface_ip(&opt.interface, Ip::V6(()))? else { + bail!("Tried to send IPv6 on IPv4 interface"); + }; + + let echo = icmpv6::echo_request::EchoRequest { + icmpv6_type: Icmpv6Types::EchoRequest, + icmpv6_code: Icmpv6Code(0), + checksum: 0, + identifier: 1, + sequence_number: 1, + payload: PROBE_PAYLOAD.to_vec(), + }; + + let len = ECHO_REQUEST_HEADER_LEN + PROBE_PAYLOAD.len(); + packet_v6 = + icmpv6::echo_request::MutableEchoRequestPacket::owned(vec![0u8; len]).unwrap(); + packet_v6.populate(&echo); + packet_v6.set_checksum(icmpv6::checksum( + &icmpv6::Icmpv6Packet::new(packet_v6.packet()).unwrap(), + &source, + &destination, + )); + packet_bytes = packet_v6.packet(); + } + } + + let result: io::Result<()> = stream::iter(0..number_of_sends) + // call `send_to` `number_of_sends` times + .then(|_| socket.send_to(packet_bytes, opt.destination)) + .map_ok(drop) + .try_collect() // abort on the first error + .await; + + // if there was an error, handle it, otherwise continue probing. + let Err(e) = result else { + sleep(PROBE_INTERVAL).await; + continue; + }; + + match e.kind() { + io::ErrorKind::PermissionDenied | io::ErrorKind::ConnectionRefused => { + // Linux returns one of these errors if our packet was rejected by nftables. + log::debug!("send_to failed, was probably caught by firewall"); + break; + } + _ => return Err(e).context("Failed to send packet")?, + } + } + + Ok(()) +} + +impl AsyncUdpSocket { + pub fn from_socket2(socket: socket2::Socket) -> Self { + // HACK: Wrap the socket in a tokio::net::UdpSocket to be able to use it async + // SAFETY: `into_raw_fd()` consumes the socket and returns an owned & open file descriptor. + let udp_socket = unsafe { std::net::UdpSocket::from_raw_fd(socket.into_raw_fd()) }; + let udp_socket = tokio::net::UdpSocket::from_std(udp_socket).unwrap(); + AsyncUdpSocket(udp_socket) + } + + pub fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for UDP socket") + } + + pub async fn send_to( + &self, + packet: &[u8], + destination: impl Into, + ) -> std::io::Result { + self.0.send_to(packet, destination.into()).await + } +} + +/// Send ICMP/Echo packets with a very low TTL to `opt.destination`. +/// +/// Use [AsyncIcmpSocket::recv_ttl_responses] to receive replies. +/// Send UDP packets with a very low TTL to `opt.destination`. +/// +/// Use [Impl::recv_ttl_responses] to receive replies. +async fn send_udp_probes(opt: &TracerouteOpt, socket: &mut AsyncUdpSocket) -> anyhow::Result<()> { + // ensure we don't send anything to `opt.exclude_port` + let ports = DEFAULT_PORT_RANGE + // skip the excluded port + .filter(|&p| Some(p) != opt.exclude_port) + // `opt.port` overrides the default port range + .map(|port| opt.port.unwrap_or(port)); + + for (port, ttl) in ports.zip(DEFAULT_TTL_RANGE) { + log::debug!("sending probe packet (ttl={ttl})"); + + socket + .set_ttl(ttl.into()) + .context("Failed to set TTL on socket")?; + + // the first packet will sometimes get dropped on MacOS, thus we send two packets + let number_of_sends = if cfg!(target_os = "macos") { 2 } else { 1 }; + + let result: io::Result<()> = stream::iter(0..number_of_sends) + // call `send_to` `number_of_sends` times + .then(|_| socket.send_to(&PROBE_PAYLOAD, (opt.destination, port))) + .map_ok(drop) + .try_collect() // abort on the first error + .await; + + let Err(e) = result else { continue }; + match e.kind() { + io::ErrorKind::PermissionDenied => { + // Linux returns this error if our packet was rejected by nftables. + log::debug!("send_to failed with 'permission denied'"); + } + _ => return Err(e).context("Failed to send packet")?, + } + } + + Ok(()) +} + +/// Try to parse bytes as an ICMP/ICMP6 Echo Request matching the probe packets send by +/// [send_icmp_probes]. +fn parse_icmp_probe(icmp_bytes: Ip<&[u8], &[u8]>) -> anyhow::Result<()> { + let echo_packet_v4; + let echo_packet_v6; + let echo_payload = match icmp_bytes { + Ip::V4(icmpv4_bytes) => { + echo_packet_v4 = + icmp::echo_request::EchoRequestPacket::new(icmpv4_bytes).ok_or_else(too_small)?; + + ensure!( + echo_packet_v4.get_icmp_type() == IcmpTypes::EchoRequest, + "Not ICMP/EchoRequest" + ); + + echo_packet_v4.payload() + } + Ip::V6(icmpv6_bytes) => { + echo_packet_v6 = + icmpv6::echo_request::EchoRequestPacket::new(icmpv6_bytes).ok_or_else(too_small)?; + + ensure!( + echo_packet_v6.get_icmpv6_type() == Icmpv6Types::EchoRequest, + "Not ICMP6/EchoRequest" + ); + + echo_packet_v6.payload() + } + }; + + // check if payload looks right + // some network nodes will strip the payload. + // some network nodes will add a bunch of zeros at the end. + if !echo_payload.is_empty() && !echo_payload.starts_with(&PROBE_PAYLOAD) { + let echo_payload: String = echo_payload + .iter() + .copied() + .flat_map(escape_default) + .map(char::from) + .collect(); + bail!("Wrong ICMP6/Echo payload: {echo_payload:?}"); + } + + Ok(()) +} + +fn too_small() -> anyhow::Error { + anyhow!("Too small") +} diff --git a/leak-checker/src/traceroute/windows.rs b/leak-checker/src/traceroute/windows.rs new file mode 100644 index 000000000000..ff5aad54e3aa --- /dev/null +++ b/leak-checker/src/traceroute/windows.rs @@ -0,0 +1,241 @@ +use std::{net::IpAddr, str}; + +use anyhow::{anyhow, Context}; +use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt}; + +use tokio::time::sleep; + +use crate::{ + traceroute::{TracerouteOpt, DEFAULT_TTL_RANGE, LEAK_TIMEOUT, PROBE_INTERVAL, SEND_TIMEOUT}, + util::{get_interface_ip, Ip}, + LeakInfo, LeakStatus, +}; + +/// Implementation of traceroute using `ping.exe` +/// +/// This monstrosity exists because the Windows firewall is not helpful enough to allow us to +/// permit a process (the daemon) to receive ICMP TimeExceeded packets. We can get around this by +/// using `ping.exe`, which does work for some reason. My best guess is that it has special kernel +/// access to be able to do this. +pub async fn traceroute_using_ping(opt: &TracerouteOpt) -> anyhow::Result { + let ip_version = match opt.destination { + IpAddr::V4(..) => Ip::v4(), + IpAddr::V6(..) => Ip::v6(), + }; + + let interface_ip = get_interface_ip(&opt.interface, ip_version)?; + + let mut ping_tasks = FuturesUnordered::new(); + + for (i, ttl) in DEFAULT_TTL_RANGE.enumerate() { + // Don't send all pings at once, wait a bit in between + // each one to avoid sending more than necessary + let probe_delay = PROBE_INTERVAL * i as u32; + + ping_tasks.push(async move { + sleep(probe_delay).await; + + log::debug!("sending probe packet (ttl={ttl})"); + + // ping.exe will send ICMP Echo packets to the destination, and since it's running in + // the kernel it will be able to receive TimeExceeded responses. + let ping_path = r"C:\Windows\System32\ping.exe"; + let output = tokio::process::Command::new(ping_path) + .args(["-i", &ttl.to_string()]) + .args(["-n", "1"]) // number of pings + .args(["-w", &SEND_TIMEOUT.as_millis().to_string()]) + .args(["-S", &interface_ip.to_string()]) // bind to interface IP + .arg(opt.destination.to_string()) + .kill_on_drop(true) + .output() + .await + .context(anyhow!("Failed to execute {ping_path}"))?; + + let output_err = || anyhow!("Unexpected output from `ping.exe`"); + + let stdout = str::from_utf8(&output.stdout).with_context(output_err)?; + let _stderr = str::from_utf8(&output.stderr).with_context(output_err)?; + + log::trace!("ping stdout: {stdout}"); + log::trace!("ping stderr: {_stderr}"); + + // Dumbly parse stdout for a line that looks like this: + // "Reply from : TTL expired" + + if !stdout.contains("TTL expired") { + // No "TTL expired" means we did not receive any TimeExceeded replies. + return Ok(None); + } + + // NOTE: for IPv6, ping outputs the incorrect address here. + // No way to work around that unfortunately. + let (ip, ..) = stdout + .split_once("Reply from ") + .and_then(|(.., s)| s.split_once(": TTL expired")) + .with_context(output_err)?; + + let ip: IpAddr = ip + .parse() + .context("`ping.exe` outputted an invalid IP address")?; + + anyhow::Ok(Some(ip)) + }); + } + + let wait_for_first_leak = async move { + while let Some(result) = ping_tasks.next().await { + let Some(ip) = result? else { continue }; + + return Ok(LeakStatus::LeakDetected( + LeakInfo::NodeReachableOnInterface { + reachable_nodes: vec![ip], + interface: opt.interface.clone(), + }, + )); + } + + anyhow::Ok(LeakStatus::NoLeak) + }; + + select! { + _ = sleep(LEAK_TIMEOUT).fuse() => Ok(LeakStatus::NoLeak), + result = wait_for_first_leak.fuse() => result, + } +} + +// TODO: remove this +/* +use std::{ + ffi::c_void, + io, mem, + net::{IpAddr, SocketAddr}, + os::windows::io::{AsRawSocket, AsSocket, FromRawSocket, IntoRawSocket}, + ptr::null_mut, + str, +}; + +use anyhow::{anyhow, bail, Context}; +use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt}; +use socket2::Socket; +use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; + +use tokio::time::sleep; +use windows_sys::Win32::Networking::WinSock::{ + WSAGetLastError, WSAIoctl, SIO_RCVALL, SOCKET, SOCKET_ERROR, +}; + +use crate::{ + traceroute::{ + Ip, TracerouteOpt, DEFAULT_TTL_RANGE, LEAK_TIMEOUT, PROBE_INTERVAL, SEND_TIMEOUT, + }, + Interface, LeakInfo, LeakStatus, +}; +use super::{common, AsyncIcmpSocket, AsyncUdpSocket, Traceroute}; + +pub struct TracerouteWindows; + +pub struct AsyncIcmpSocketImpl(tokio::net::UdpSocket); + +pub struct AsyncUdpSocketWindows(tokio::net::UdpSocket); + +impl Traceroute for TracerouteWindows { + type AsyncIcmpSocket = AsyncIcmpSocketImpl; + type AsyncUdpSocket = AsyncUdpSocketWindows; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()> { + common::bind_socket_to_interface::(socket, interface, ip_version) + } + + fn get_interface_ip(interface: &Interface, ip_version: Ip) -> anyhow::Result { + get_interface_ip(interface, ip_version) + } + + fn configure_icmp_socket(socket: &socket2::Socket, _opt: &TracerouteOpt) -> anyhow::Result<()> { + configure_icmp_socket(socket) + } +} + +impl AsyncIcmpSocket for AsyncIcmpSocketImpl { + fn from_socket2(socket: Socket) -> Self { + let raw_socket = socket.as_socket().as_raw_socket(); + mem::forget(socket); + let std_socket = unsafe { std::net::UdpSocket::from_raw_socket(raw_socket) }; + let tokio_socket = tokio::net::UdpSocket::from_std(std_socket).unwrap(); + AsyncIcmpSocketImpl(tokio_socket) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for ICMP socket") + } + + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result { + self.0.send_to(packet, (destination.into(), 0)).await + } + + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, std::net::IpAddr)> { + let (n, source) = self.0.recv_from(buf).await?; + Ok((n, source.ip())) + } + + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result { + common::recv_ttl_responses(self, &opt.interface).await + } +} + +impl AsyncUdpSocket for AsyncUdpSocketWindows { + fn from_socket2(socket: socket2::Socket) -> Self { + // HACK: Wrap the socket in a tokio::net::UdpSocket to be able to use it async + let udp_socket = unsafe { std::net::UdpSocket::from_raw_socket(socket.into_raw_socket()) }; + let udp_socket = tokio::net::UdpSocket::from_std(udp_socket).unwrap(); + AsyncUdpSocketWindows(udp_socket) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for UDP socket") + } + + async fn send_to( + &self, + packet: &[u8], + destination: impl Into, + ) -> std::io::Result { + self.0.send_to(packet, destination.into()).await + } +} + +/// Configure the raw socket we use for listening to ICMP responses. +/// +/// This will set the `SIO_RCVALL`-option. +pub fn configure_icmp_socket(socket: &Socket) -> anyhow::Result<()> { + let j = 1; + let mut _in: u32 = 0; + let result = unsafe { + WSAIoctl( + socket.as_raw_socket() as SOCKET, + SIO_RCVALL, + &j as *const _ as *const c_void, + size_of_val(&j) as u32, + null_mut(), + 0, + &mut _in as *mut u32, + null_mut(), + None, + ) + }; + + if result == SOCKET_ERROR { + let code = unsafe { WSAGetLastError() }; + bail!("Failed to call WSAIoctl(listen_socket, SIO_RCVALL, ...), code = {code}"); + } + + Ok(()) +} +*/ diff --git a/leak-checker/src/util.rs b/leak-checker/src/util.rs index a7a61febf31b..e1b6ba29f9ee 100644 --- a/leak-checker/src/util.rs +++ b/leak-checker/src/util.rs @@ -1,35 +1,88 @@ -use match_cfg::match_cfg; - -#[cfg(any(target_os = "windows", target_os = "macos", target_os = "android"))] +use crate::Interface; use std::net::IpAddr; -match_cfg! { - #[cfg(target_os = "windows")] => { - pub fn get_interface_ip(interface: &str) -> eyre::Result { - use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; +/// IP version, v4 or v6, with some associated data. +#[derive(Clone, Copy)] +pub enum Ip { + V4(V4), + V6(V6), +} + +impl Ip { + pub const fn v4() -> Self { + Ip::V4(()) + } + + pub const fn v6() -> Self { + Ip::V6(()) + } +} + +#[cfg(target_os = "windows")] +pub fn get_interface_ip(interface: &Interface, ip_version: Ip) -> anyhow::Result { + use anyhow::{anyhow, Context}; + use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; + + let interface_luid = match interface { + Interface::Name(name) => luid_from_alias(name)?, + Interface::Luid(luid) => *luid, + }; + + let address_family = match ip_version { + Ip::V4(..) => AddressFamily::Ipv4, + Ip::V6(..) => AddressFamily::Ipv6, + }; - let interface_luid = luid_from_alias(interface)?; + get_ip_address_for_interface(address_family, interface_luid) + .with_context(|| anyhow!("Failed to get IP for interface {interface:?}"))? + .ok_or(anyhow!("No IP for interface {interface:?}")) +} + +#[cfg(unix)] +pub fn get_interface_ip(interface: &Interface, ip_version: Ip) -> anyhow::Result { + #[cfg(target_os = "macos")] + let interface_name; + + let interface_name = match interface { + Interface::Name(name) => name.as_str(), - // TODO: ipv6 - let interface_ip = get_ip_address_for_interface(AddressFamily::Ipv4, interface_luid)? - .ok_or(eyre!("No IP for interface {interface:?}"))?; + #[cfg(target_os = "macos")] + &Interface::Index(index) => { + use anyhow::{anyhow, Context}; + use std::ffi::c_uint; - Ok(interface_ip) + // nix getifaddrs provides no way of getting an interface by index, so we need to get + // the interface name + interface_name = nix::net::if_::if_indextoname(c_uint::from(index)) + .with_context(|| anyhow!("Failed to get name of iface with index {index}"))?; + + interface_name + .to_str() + .context("Network interface name was not UTF-8")? } - } - #[cfg(any(target_os = "macos", target_os = "android"))] => { - pub fn get_interface_ip(interface: &str) -> eyre::Result { - for interface_address in nix::ifaddrs::getifaddrs()? { - if interface_address.interface_name != interface { continue }; - let Some(address) = interface_address.address else { continue }; - let Some(address) = address.as_sockaddr_in() else { continue }; - // TODO: ipv6 - //let Some(address) = address.as_sockaddr_in6() else { continue }; - - return Ok(address.ip().into()); - } + }; + + for interface_address in nix::ifaddrs::getifaddrs()? { + if interface_address.interface_name != interface_name { + continue; + }; + let Some(address) = interface_address.address else { + continue; + }; - eyre::bail!("Interface {interface:?} has no valid IP to bind to"); + match ip_version { + Ip::V4(()) => { + if let Some(address) = address.as_sockaddr_in() { + return Ok(IpAddr::V4(address.ip())); + }; + } + Ip::V6(()) => { + if let Some(address) = address.as_sockaddr_in6() { + return Ok(IpAddr::V6(address.ip())); + }; + } } } + + anyhow::bail!("Interface {interface:?} has no valid IP to bind to"); } diff --git a/mullvad-cli/Cargo.toml b/mullvad-cli/Cargo.toml index 9997531d53f7..a489123847fd 100644 --- a/mullvad-cli/Cargo.toml +++ b/mullvad-cli/Cargo.toml @@ -15,7 +15,7 @@ name = "mullvad" path = "src/main.rs" [dependencies] -anyhow = "1.0" +anyhow = { workspace = true } chrono = { workspace = true } clap = { workspace = true } thiserror = { workspace = true } diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index 4c9fce37ec99..af3411c8aa3a 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -15,6 +15,7 @@ workspace = true api-override = ["mullvad-api/api-override"] [dependencies] +anyhow = { workspace = true } chrono = { workspace = true } thiserror = { workspace = true } either = "1.11" @@ -27,6 +28,7 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tokio = { workspace = true, features = ["fs", "io-util", "rt-multi-thread", "sync", "time"] } tokio-stream = "0.1" +socket2 = { workspace = true } mullvad-relay-selector = { path = "../mullvad-relay-selector" } mullvad-types = { path = "../mullvad-types" } @@ -40,6 +42,9 @@ talpid-future = { path = "../talpid-future" } talpid-platform-metadata = { path = "../talpid-platform-metadata" } talpid-time = { path = "../talpid-time" } talpid-types = { path = "../talpid-types" } +talpid-routing = { path = "../talpid-routing" } + +leak-checker = { path = "../leak-checker", default-features = false } clap = { workspace = true } log-panics = "2.0.0" diff --git a/mullvad-daemon/src/leak_checker/mod.rs b/mullvad-daemon/src/leak_checker/mod.rs index e3cd57d194a6..a7186af60e25 100644 --- a/mullvad-daemon/src/leak_checker/mod.rs +++ b/mullvad-daemon/src/leak_checker/mod.rs @@ -1,26 +1,254 @@ -pub fn check_for_leaks() { - // TODO: When do we run this? - // After connecting? - // Periodically? - // Whenever something changes? (interface, connection state, dns server, etc) - // All of the above? +use anyhow::{anyhow, Context}; +use futures::{select, FutureExt}; +use leak_checker::traceroute::TracerouteOpt; +pub use leak_checker::LeakInfo; +use std::time::Duration; +use talpid_routing::RouteManagerHandle; +use talpid_types::{net::Endpoint, tunnel::TunnelStateTransition}; +use tokio::sync::mpsc; - // TODO: Figure out which interface(s) to bind to +/// An actor that tries to leak traffic outside the tunnel while we are connected. +pub struct LeakChecker { + task_event_tx: mpsc::UnboundedSender, +} + +/// [LeakChecker] internal task state. +struct Task { + events_rx: mpsc::UnboundedReceiver, + route_manager: RouteManagerHandle, + callbacks: Vec>, +} + +enum TaskEvent { + NewTunnelState(TunnelStateTransition), + AddCallback(Box), +} + +#[derive(PartialEq, Eq)] +pub enum CallbackResult { + /// Callback completed successfully + Ok, + + /// Callback is no longer valid and should be dropped. + Drop, +} + +pub trait LeakCheckerCallback: Send + 'static { + fn on_leak(&mut self, info: LeakInfo) -> CallbackResult; +} + +impl LeakChecker { + pub fn new(route_manager: RouteManagerHandle) -> Self { + let (task_event_tx, events_rx) = mpsc::unbounded_channel(); + + let task = Task { + events_rx, + route_manager, + callbacks: vec![], + }; + + tokio::task::spawn(task.run()); + + LeakChecker { task_event_tx } + } + + /// Call when we transition to a new tunnel state. + pub fn on_tunnel_state_transition(&mut self, tunnel_state: TunnelStateTransition) { + self.send(TaskEvent::NewTunnelState(tunnel_state)) + } + + /// Call `callback` if a leak is detected. + pub fn add_leak_callback(&mut self, callback: impl LeakCheckerCallback) { + self.send(TaskEvent::AddCallback(Box::new(callback))) + } + + /// Send a [TaskEvent] to the running [Task]; + fn send(&mut self, event: TaskEvent) { + if self.task_event_tx.send(event).is_err() { + panic!("LeakChecker unexpectedly closed"); + } + } +} + +impl Task { + async fn run(mut self) { + loop { + let Some(event) = self.events_rx.recv().await else { + break; // All LeakChecker handles dropped. + }; + + match event { + TaskEvent::NewTunnelState(s) => self.on_new_tunnel_state(s).await, + TaskEvent::AddCallback(c) => self.on_add_callback(c), + } + } + } + + fn on_add_callback(&mut self, c: Box) { + self.callbacks.push(c); + } - // TODO: get connection check config - // http get https://am.i.mullvad.net/config + async fn on_new_tunnel_state(&mut self, mut tunnel_state: TunnelStateTransition) { + 'leak_test: loop { + let TunnelStateTransition::Connected(tunnel) = &tunnel_state else { + break 'leak_test; + }; - // TODO: For each interface: + let ping_destination = tunnel.endpoint; + let route_manager = self.route_manager.clone(); + let leak_test = async { + // Give the connection a little time to settle before starting the test. + tokio::time::sleep(Duration::from_millis(5000)).await; - // TODO: send an ICMP ping (to the relay?) - // TODO: how to see if the pings are actually going outside the tunnel? + check_for_leaks(&route_manager, ping_destination).await + }; - // TODO: send a DNS request to leak check endpoint - // TODO: will the service be able to handle all of the mullvad users constantly doing leak - // checks + // Make sure the tunnel state doesn't change while we're doing the leak test. + // If that happens, then our results might be invalid. + let another_tunnel_state = async { + 'listen_for_events: while let Some(event) = self.events_rx.recv().await { + let new_state = match event { + TaskEvent::NewTunnelState(tunnel_state) => tunnel_state, + TaskEvent::AddCallback(c) => { + self.on_add_callback(c); + continue 'listen_for_events; + } + }; - // TODO: query DNS leak checker HTTPS endpoint + if let TunnelStateTransition::Connected(..) = new_state { + // Still connected, all is well... + } else { + // Tunnel state changed! We have to discard the leak test and try again. + tunnel_state = new_state; + break 'listen_for_events; + } + } + }; + + let leak_result = select! { + // If tunnel state changes, restart the test. + _ = another_tunnel_state.fuse() => continue 'leak_test, + + leak_result = leak_test.fuse() => leak_result, + }; + + let leak_info = match leak_result { + Ok(Some(leak_info)) => leak_info, + Ok(None) => { + log::debug!("No leak detected"); + break 'leak_test; + } + Err(e) => { + log::debug!("Leak check errored: {e:#?}"); + break 'leak_test; + } + }; + + log::debug!("Leak detected: {leak_info:?}"); + + self.callbacks + .retain_mut(|callback| callback.on_leak(leak_info.clone()) == CallbackResult::Ok); + + break 'leak_test; + } + } +} + +async fn check_for_leaks( + route_manager: &RouteManagerHandle, + destination: Endpoint, +) -> anyhow::Result> { + #[cfg(target_os = "linux")] + let interface = { + // By setting FWMARK, we are effectively getting the same route as when using split tunneling. + let route = route_manager + .get_destination_route(destination.address.ip(), Some(mullvad_types::TUNNEL_FWMARK)) + .await + .context("Failed to get route to relay")? + .ok_or(anyhow!("No route to relay"))?; + + route + .get_node() + .get_device() + .context("No device for default route")? + .to_string() + .into() + }; + + #[cfg(target_os = "android")] + let interface = { + // TODO: We currently don't have a way to get the non-tunnel interface on Android. + return Ok(None); + }; + + #[cfg(target_os = "macos")] + let interface = { + let (v4_route, v6_route) = route_manager + .get_default_routes() + .await + .context("Failed to get default interface")?; + let index = if destination.address.is_ipv4() { + let v4_route = v4_route.context("Missing IPv4 default interface")?; + v4_route.interface_index + } else { + let v6_route = v6_route.context("Missing IPv6 default interface")?; + v6_route.interface_index + }; + + let index = + std::num::NonZeroU32::try_from(u32::from(index)).context("Interface index was 0")?; + leak_checker::Interface::Index(index) + }; + + #[cfg(target_os = "windows")] + let interface = { + use std::net::IpAddr; + use talpid_windows::net::AddressFamily; + + let _ = route_manager; // don't need this on windows + + let family = match destination.address.ip() { + IpAddr::V4(..) => AddressFamily::Ipv4, + IpAddr::V6(..) => AddressFamily::Ipv6, + }; + + let route = talpid_routing::get_best_default_route(family) + .context("Failed to get best default route")? + .ok_or_else(|| anyhow!("No default route found"))?; + + leak_checker::Interface::Luid(route.iface) + }; + + log::debug!("attempting to leak traffic on interface {interface:?} to {destination}"); + + leak_checker::traceroute::try_run_leak_test(&TracerouteOpt { + interface, + destination: destination.address.ip(), + + #[cfg(unix)] + port: None, + #[cfg(unix)] + exclude_port: None, + #[cfg(unix)] + icmp: true, + }) + .await + .map_err(|e| anyhow!("{e:#}")) + .map(|status| match status { + leak_checker::LeakStatus::NoLeak => None, + leak_checker::LeakStatus::LeakDetected(info) => Some(info), + }) +} - // TODO: query https://ipv4.am.i.mullvad.net/ - // TODO: query https://ipv6.am.i.mullvad.net/ +impl LeakCheckerCallback for T +where + T: FnMut(LeakInfo) -> bool + Send + 'static, +{ + fn on_leak(&mut self, info: LeakInfo) -> CallbackResult { + if self(info) { + CallbackResult::Ok + } else { + CallbackResult::Drop + } + } } diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index e4274adf63a6..0da3c02eb585 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -39,6 +39,7 @@ use futures::{ StreamExt, }; use geoip::GeoIpHandler; +use leak_checker::{LeakChecker, LeakInfo}; use management_interface::ManagementInterfaceServer; use mullvad_api::ApiEndpoint; use mullvad_relay_selector::{RelaySelector, SelectorConfig}; @@ -83,6 +84,7 @@ use talpid_core::{ split_tunnel, tunnel_state_machine::{self, TunnelCommand, TunnelStateMachineHandle}, }; +use talpid_routing::RouteManagerHandle; #[cfg(target_os = "android")] use talpid_types::android::AndroidContext; #[cfg(target_os = "windows")] @@ -182,6 +184,10 @@ pub enum Error { #[error("Tunnel state machine error")] TunnelError(#[source] tunnel_state_machine::Error), + /// Errors from [talpid_routing::RouteManagerHandle]. + #[error("Route manager error")] + RouteManager(#[source] talpid_routing::Error), + /// Custom list already exists #[error("Custom list error: {0}")] CustomListError(#[source] mullvad_types::custom_list::Error), @@ -414,6 +420,8 @@ pub(crate) enum InternalDaemonEvent { /// The split tunnel paths or state were updated. #[cfg(any(windows, target_os = "android", target_os = "macos"))] ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender>), + /// A network leak was detected. + LeakDetected(LeakInfo), } #[cfg(any(windows, target_os = "android", target_os = "macos"))] @@ -588,6 +596,7 @@ pub struct Daemon { #[cfg(target_os = "windows")] volume_update_tx: mpsc::UnboundedSender<()>, location_handler: GeoIpHandler, + leak_checker: LeakChecker, } pub struct DaemonConfig { pub log_dir: Option, @@ -776,6 +785,15 @@ impl Daemon { let _ = settings_changed_event_sender.send(InternalDaemonEvent::SettingsChanged); }); + let route_manager = RouteManagerHandle::spawn( + #[cfg(target_os = "linux")] + mullvad_types::TUNNEL_FWMARK, + #[cfg(target_os = "linux")] + mullvad_types::TUNNEL_TABLE_ID, + ) + .await + .map_err(Error::RouteManager)?; + let (offline_state_tx, offline_state_rx) = mpsc::unbounded(); #[cfg(target_os = "windows")] let (volume_update_tx, volume_update_rx) = mpsc::unbounded(); @@ -799,6 +817,7 @@ impl Daemon { config.resource_dir.clone(), internal_event_tx.to_specialized_sender(), offline_state_tx, + route_manager.clone(), #[cfg(target_os = "windows")] volume_update_rx, #[cfg(target_os = "android")] @@ -850,6 +869,17 @@ impl Daemon { internal_event_tx.clone().to_specialized_sender(), ); + let leak_checker = { + let mut leak_checker = LeakChecker::new(route_manager); + let internal_event_tx = internal_event_tx.clone(); + leak_checker.add_leak_callback(move |info| { + internal_event_tx + .send(InternalDaemonEvent::LeakDetected(info)) + .is_ok() + }); + leak_checker + }; + let daemon = Daemon { tunnel_state: TunnelState::Disconnected { location: None, @@ -880,6 +910,7 @@ impl Daemon { #[cfg(target_os = "windows")] volume_update_tx, location_handler, + leak_checker, }; api_availability.unsuspend(); @@ -978,7 +1009,7 @@ impl Daemon { let mut should_stop = false; match event { TunnelStateTransition(transition) => { - self.handle_tunnel_state_transition(transition).await + self.handle_tunnel_state_transition(transition).await; } Command(command) => self.handle_command(command).await, TriggerShutdown(user_init_shutdown) => { @@ -1000,6 +1031,10 @@ impl Daemon { } #[cfg(any(windows, target_os = "android", target_os = "macos"))] ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await, + LeakDetected(leak_info) => { + log::warn!("Network leak detected! Please contact Mullvad support."); + log::warn!("{leak_info:?}") + } } should_stop } @@ -1008,6 +1043,9 @@ impl Daemon { &mut self, tunnel_state_transition: TunnelStateTransition, ) { + self.leak_checker + .on_tunnel_state_transition(tunnel_state_transition.clone()); + self.reset_rpc_sockets_on_tunnel_state_transition(&tunnel_state_transition); self.device_checker .handle_state_transition(&tunnel_state_transition); diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml index 2552f6089b05..e9a971fdb8da 100644 --- a/talpid-core/Cargo.toml +++ b/talpid-core/Cargo.toml @@ -46,14 +46,13 @@ duct = "0.13" [target.'cfg(target_os = "macos")'.dependencies] async-trait = "0.1" -#pfctl = "0.6.1" -pfctl = { path = "../../pfctl-rs" } +pfctl = "0.6.1" system-configuration = "0.5.1" hickory-proto = { workspace = true } hickory-server = { workspace = true, features = ["resolver"] } talpid-platform-metadata = { path = "../talpid-platform-metadata" } pcap = { version = "2.1", features = ["capture-stream"] } -pnet_packet = "0.34" +pnet_packet = { workspace = true } tun = { version = "0.5.5", features = ["async"] } nix = { version = "0.28", features = ["socket", "signal"] } serde = { workspace = true, features = ["derive"] } diff --git a/talpid-core/src/firewall/macos.rs b/talpid-core/src/firewall/macos.rs index e608d946688e..c0e7b67c4cd3 100644 --- a/talpid-core/src/firewall/macos.rs +++ b/talpid-core/src/firewall/macos.rs @@ -322,30 +322,18 @@ impl Firewall { } // no nat to [vpn ip] - //let no_nat_to_vpn_server = pfctl::NatRuleBuilder::default() - // .action(pfctl::NatRuleAction::NoNat) - // .to(peer_endpoint.endpoint.address) - // .user(Uid::from(0)) - // .build()?; - //rules.push(no_nat_to_vpn_server); - - //for ip in &tunnel.ips { - // rules.push( - // pfctl::NatRuleBuilder::default() - // .action(pfctl::NatRuleAction::Nat { - // nat_to: pfctl::NatEndpoint::from(pfctl::Ip::from(*ip)), - // }) - // .to(peer_endpoint.endpoint.address.ip()) - // .build()?, - // ); - //} - - //// no nat on [tun interface] - //let no_nat_on_tun = pfctl::NatRuleBuilder::default() - // .action(pfctl::NatRuleAction::NoNat) - // .interface(&tunnel.interface) - // .build()?; - //rules.push(no_nat_on_tun); + let no_nat_to_vpn_server = pfctl::NatRuleBuilder::default() + .action(pfctl::NatRuleAction::NoNat) + .to(peer_endpoint.endpoint.address) + .build()?; + rules.push(no_nat_to_vpn_server); + + // no nat on [tun interface] + let no_nat_on_tun = pfctl::NatRuleBuilder::default() + .action(pfctl::NatRuleAction::NoNat) + .interface(&tunnel.interface) + .build()?; + rules.push(no_nat_on_tun); // Masquerade other traffic via VPN utun for ip in &tunnel.ips { @@ -438,7 +426,6 @@ impl Firewall { } rules.push(self.get_allow_relay_rule(peer_endpoint)?); - //rules.push(self.get_block_relay_rule(peer_endpoint)?); // Important to block DNS *before* we allow the tunnel and allow LAN. So DNS // can't leak to the wrong IPs in the tunnel or on the LAN. @@ -604,20 +591,6 @@ impl Firewall { builder.build() } - /// Block traffic to relay_endpoint ip. Should come after [Self::get_allow_relay_rule]. - fn get_block_relay_rule( - &self, - relay_endpoint: &net::AllowedEndpoint, - ) -> Result { - let mut builder = self.create_rule_builder(FilterRuleAction::Drop(DropAction::Return)); - builder - .direction(pfctl::Direction::Out) - .to(relay_endpoint.endpoint.address.ip()) - .quick(true); - - builder.build() - } - /// Produces a rule that allows traffic to flow to the API. Allows the app (or other apps if /// configured) to reach the API in blocked states. fn get_allowed_endpoint_rule( diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index e8bd4ed64980..cae7f2384bf5 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -131,6 +131,7 @@ pub async fn spawn( resource_dir: PathBuf, state_change_listener: impl Sender + Send + 'static, offline_state_listener: mpsc::UnboundedSender, + route_manager: RouteManagerHandle, #[cfg(target_os = "windows")] volume_update_rx: mpsc::UnboundedReceiver<()>, #[cfg(target_os = "android")] android_context: AndroidContext, #[cfg(target_os = "android")] connectivity_listener: ConnectivityListener, @@ -158,6 +159,7 @@ pub async fn spawn( log_dir, resource_dir, commands_rx: command_rx, + route_manager, #[cfg(target_os = "windows")] volume_update_rx, #[cfg(target_os = "android")] @@ -258,6 +260,7 @@ struct TunnelStateMachineInitArgs { log_dir: Option, resource_dir: PathBuf, commands_rx: mpsc::UnboundedReceiver, + route_manager: RouteManagerHandle, #[cfg(target_os = "windows")] volume_update_rx: mpsc::UnboundedReceiver<()>, #[cfg(target_os = "android")] @@ -280,28 +283,19 @@ impl TunnelStateMachine { #[cfg(target_os = "macos")] let filtering_resolver = crate::resolver::start_resolver().await?; - let route_manager = RouteManagerHandle::spawn( - #[cfg(target_os = "linux")] - args.linux_ids.fwmark, - #[cfg(target_os = "linux")] - args.linux_ids.table_id, - ) - .await - .map_err(Error::InitRouteManagerError)?; - #[cfg(windows)] let split_tunnel = split_tunnel::SplitTunnel::new( runtime.clone(), args.resource_dir.clone(), args.command_tx.clone(), volume_update_rx, - route_manager.clone(), + args.route_manager.clone(), ) .map_err(Error::InitSplitTunneling)?; #[cfg(target_os = "macos")] let split_tunnel = - split_tunnel::SplitTunnel::spawn(args.command_tx.clone(), route_manager.clone()); + split_tunnel::SplitTunnel::spawn(args.command_tx.clone(), args.route_manager.clone()); let fw_args = FirewallArguments { #[cfg(not(target_os = "android"))] @@ -326,7 +320,7 @@ impl TunnelStateMachine { #[cfg(target_os = "linux")] runtime.clone(), #[cfg(target_os = "linux")] - route_manager.clone(), + args.route_manager.clone(), ) .map_err(Error::InitDnsMonitorError)?; @@ -345,7 +339,7 @@ impl TunnelStateMachine { let offline_monitor = offline::spawn_monitor( offline_tx, #[cfg(not(target_os = "android"))] - route_manager.clone(), + args.route_manager.clone(), #[cfg(target_os = "linux")] Some(args.linux_ids.fwmark), #[cfg(target_os = "android")] @@ -385,7 +379,7 @@ impl TunnelStateMachine { runtime, firewall, dns_monitor, - route_manager, + route_manager: args.route_manager, _offline_monitor: offline_monitor, allow_lan: args.settings.allow_lan, #[cfg(not(target_os = "android"))] diff --git a/talpid-net/Cargo.toml b/talpid-net/Cargo.toml index aa30ed1b5b6a..861e1765cc60 100644 --- a/talpid-net/Cargo.toml +++ b/talpid-net/Cargo.toml @@ -13,5 +13,5 @@ workspace = true [target.'cfg(unix)'.dependencies] libc = "0.2" talpid-types = { path = "../talpid-types" } -socket2 = { version = "0.5.3", features = ["all"] } +socket2 = { workspace = true, features = ["all"] } log = { workspace = true } diff --git a/talpid-routing/src/unix/mod.rs b/talpid-routing/src/unix/mod.rs index d257140f7e3c..34d2570137c6 100644 --- a/talpid-routing/src/unix/mod.rs +++ b/talpid-routing/src/unix/mod.rs @@ -34,7 +34,7 @@ mod imp; pub use imp::Error as PlatformError; -/// Errors that can be encountered whilst initializing route manager +/// Errors that can be encountered whilst interacting with a [RouteManagerHandle]. #[derive(thiserror::Error, Debug)] pub enum Error { /// Route manager thread may have panicked diff --git a/talpid-windows/Cargo.toml b/talpid-windows/Cargo.toml index a44229b61d07..0b9e1d267217 100644 --- a/talpid-windows/Cargo.toml +++ b/talpid-windows/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [target.'cfg(windows)'.dependencies] thiserror = { workspace = true } -socket2 = { version = "0.5.3" } +socket2 = { workspace = true } futures = { workspace = true } talpid-types = { path = "../talpid-types" } diff --git a/talpid-wireguard/Cargo.toml b/talpid-wireguard/Cargo.toml index e02bf874d253..620cbab5cc26 100644 --- a/talpid-wireguard/Cargo.toml +++ b/talpid-wireguard/Cargo.toml @@ -39,7 +39,7 @@ duct = "0.13" [target.'cfg(not(target_os="android"))'.dependencies] byteorder = "1" internet-checksum = "0.2" -socket2 = { version = "0.5.3", features = ["all"] } +socket2 = { workspace = true, features = ["all"] } tokio-stream = { version = "0.1", features = ["io-util"] } [target.'cfg(unix)'.dependencies] diff --git a/test/Cargo.lock b/test/Cargo.lock index a4e2ee9df70a..ceda1733a4fc 100644 --- a/test/Cargo.lock +++ b/test/Cargo.lock @@ -556,7 +556,7 @@ dependencies = [ "ping", "reqwest", "serde", - "socket2 0.5.6", + "socket2 0.5.8", ] [[package]] @@ -1417,7 +1417,7 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -1686,7 +1686,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2 0.5.6", + "socket2 0.5.8", "widestring", "windows-sys 0.48.0", "winreg", @@ -2728,7 +2728,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.18", - "socket2 0.5.6", + "socket2 0.5.8", "thiserror 1.0.59", "tokio", "tracing", @@ -2759,7 +2759,7 @@ checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" dependencies = [ "libc", "once_cell", - "socket2 0.5.6", + "socket2 0.5.8", "tracing", "windows-sys 0.52.0", ] @@ -3267,7 +3267,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "shadowsocks-crypto", - "socket2 0.5.6", + "socket2 0.5.8", "spin", "thiserror 1.0.59", "tokio", @@ -3356,9 +3356,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.6" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" dependencies = [ "libc", "windows-sys 0.52.0", @@ -3440,7 +3440,7 @@ dependencies = [ "parking_lot 0.12.1", "pnet_packet", "rand 0.8.5", - "socket2 0.5.6", + "socket2 0.5.8", "thiserror 1.0.59", "tokio", "tracing", @@ -3529,7 +3529,7 @@ name = "talpid-windows" version = "0.0.0" dependencies = [ "futures", - "socket2 0.5.6", + "socket2 0.5.8", "talpid-types", "thiserror 2.0.3", "windows-sys 0.52.0", @@ -3616,7 +3616,7 @@ dependencies = [ "scopeguard", "serde", "serde_json", - "socket2 0.5.6", + "socket2 0.5.8", "socks-server", "ssh2", "talpid-types", @@ -3676,7 +3676,7 @@ dependencies = [ "rs-release", "serde", "serde_json", - "socket2 0.5.6", + "socket2 0.5.8", "surge-ping", "talpid-platform-metadata", "talpid-windows", @@ -3821,7 +3821,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.6", + "socket2 0.5.8", "tokio-macros", "windows-sys 0.52.0", ] @@ -3921,7 +3921,7 @@ dependencies = [ "log", "once_cell", "pin-project", - "socket2 0.5.6", + "socket2 0.5.8", "tokio", "windows-sys 0.52.0", ] @@ -3971,7 +3971,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.13.3", - "socket2 0.5.6", + "socket2 0.5.8", "tokio", "tokio-stream", "tower 0.4.13", diff --git a/test/Cargo.toml b/test/Cargo.toml index 763a21d1db17..a0b77826914e 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -77,5 +77,6 @@ chrono = { version = "0.4.26", default-features = false } clap = { version = "4.2.7", features = ["cargo", "derive"] } bytes = "1.3.0" async-trait = "0.1.58" +socket2 = "0.5.7" surge-ping = "0.8" nix = { version = "0.29", features = ["ioctl", "socket", "net"] } diff --git a/test/connection-checker/Cargo.toml b/test/connection-checker/Cargo.toml index 730a57ca5ab6..9ab2b3fb5ba5 100644 --- a/test/connection-checker/Cargo.toml +++ b/test/connection-checker/Cargo.toml @@ -17,4 +17,4 @@ eyre = "0.6.12" ping = "0.5.2" reqwest = { version = "0.12.7", default-features = false, features = ["blocking", "rustls-tls", "json"] } serde = { workspace = true, features = ["derive"] } -socket2 = { version = "0.5.4", features = ["all"] } +socket2 = { workspace = true, features = ["all"] } diff --git a/test/test-manager/Cargo.toml b/test/test-manager/Cargo.toml index 2671ea454a4d..51aed8c824b4 100644 --- a/test/test-manager/Cargo.toml +++ b/test/test-manager/Cargo.toml @@ -62,7 +62,7 @@ talpid-types = { path = "../../talpid-types" } ssh2 = "0.9.4" nix = { workspace = true } -socket2 = "0.5.6" +socket2 = { workspace = true } [target.'cfg(target_os = "macos")'.dependencies] tun = "0.5.1" diff --git a/test/test-runner/Cargo.toml b/test/test-runner/Cargo.toml index fd53f4b7cb79..af84ef4daeb0 100644 --- a/test/test-runner/Cargo.toml +++ b/test/test-runner/Cargo.toml @@ -33,7 +33,7 @@ test-rpc = { path = "../test-rpc" } mullvad-paths = { path = "../../mullvad-paths" } talpid-platform-metadata = { path = "../../talpid-platform-metadata", default-features = false } -socket2 = { version = "0.5.4", features = ["all"] } +socket2 = { workspace = true, features = ["all"] } [target."cfg(target_os=\"windows\")".dependencies] talpid-windows = { path = "../../talpid-windows" } diff --git a/windows-installer/Cargo.toml b/windows-installer/Cargo.toml index 5c09cf560c70..518fe2d82d53 100644 --- a/windows-installer/Cargo.toml +++ b/windows-installer/Cargo.toml @@ -13,11 +13,11 @@ workspace = true [target.'cfg(all(target_os = "windows", target_arch = "x86_64"))'.dependencies] windows-sys = { version = "0.52.0", features = ["Win32_System", "Win32_System_LibraryLoader", "Win32_System_SystemInformation", "Win32_System_Threading"] } tempfile = "3.10" -anyhow = "1.0" +anyhow.workspace = true [build-dependencies] winres = "0.1" -anyhow = "1.0" +anyhow.workspace = true windows-sys = { version = "0.52.0", features = ["Win32_System", "Win32_System_LibraryLoader", "Win32_System_SystemServices"] } mullvad-version = { path = "../mullvad-version" } diff --git a/wireguard-go-rs/Cargo.toml b/wireguard-go-rs/Cargo.toml index cfaef554cc40..cdea4afdad1d 100644 --- a/wireguard-go-rs/Cargo.toml +++ b/wireguard-go-rs/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" license.workspace = true [build-dependencies] -anyhow = "1.0" +anyhow.workspace = true [target.'cfg(unix)'.dependencies] thiserror.workspace = true