diff --git a/Cargo.lock b/Cargo.lock index 66b17974d13b..a68c4d9823e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,8 +1374,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1705,6 +1707,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper", + "hyper-util", + "rustls 0.23.18", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "webpki-roots 0.26.7", +] + [[package]] name = "hyper-timeout" version = "0.5.1" @@ -2174,6 +2194,26 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "leak-checker" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "futures", + "log", + "match_cfg", + "nix 0.29.0", + "pnet_packet 0.35.0", + "pretty_env_logger", + "reqwest", + "serde", + "socket2", + "talpid-windows", + "tokio", + "windows-sys 0.52.0", +] + [[package]] name = "libc" version = "0.2.158" @@ -2523,6 +2563,7 @@ name = "mullvad-daemon" version = "0.0.0" dependencies = [ "android_logger", + "anyhow", "async-trait", "chrono", "clap", @@ -2532,6 +2573,7 @@ dependencies = [ "fern", "futures", "hickory-resolver", + "leak-checker", "libc", "log", "log-panics", @@ -2549,10 +2591,12 @@ dependencies = [ "serde", "serde_json", "simple-signal", + "socket2", "talpid-core", "talpid-dbus", "talpid-future", "talpid-platform-metadata", + "talpid-routing", "talpid-time", "talpid-types", "talpid-windows", @@ -2575,7 +2619,7 @@ dependencies = [ "rustls 0.21.11", "serde", "tokio", - "webpki-roots", + "webpki-roots 0.25.4", ] [[package]] @@ -2934,6 +2978,7 @@ dependencies = [ "cfg-if", "cfg_aliases 0.2.1", "libc", + "memoffset 0.9.1", ] [[package]] @@ -3429,6 +3474,15 @@ dependencies = [ "no-std-net", ] +[[package]] +name = "pnet_base" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc190d4067df16af3aba49b3b74c469e611cad6314676eaf1157f31aa0fb2f7" +dependencies = [ + "no-std-net", +] + [[package]] name = "pnet_macros" version = "0.34.0" @@ -3441,13 +3495,34 @@ dependencies = [ "syn 2.0.89", ] +[[package]] +name = "pnet_macros" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13325ac86ee1a80a480b0bc8e3d30c25d133616112bb16e86f712dcf8a71c863" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.89", +] + [[package]] name = "pnet_macros_support" version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eea925b72f4bd37f8eab0f221bbe4c78b63498350c983ffa9dd4bcde7e030f56" dependencies = [ - "pnet_base", + "pnet_base 0.34.0", +] + +[[package]] +name = "pnet_macros_support" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed67a952585d509dd0003049b1fc56b982ac665c8299b124b90ea2bdb3134ab" +dependencies = [ + "pnet_base 0.35.0", ] [[package]] @@ -3457,9 +3532,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9a005825396b7fe7a38a8e288dbc342d5034dac80c15212436424fef8ea90ba" dependencies = [ "glob", - "pnet_base", - "pnet_macros", - "pnet_macros_support", + "pnet_base 0.34.0", + "pnet_macros 0.34.0", + "pnet_macros_support 0.34.0", +] + +[[package]] +name = "pnet_packet" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c96ebadfab635fcc23036ba30a7d33a80c39e8461b8bd7dc7bb186acb96560f" +dependencies = [ + "glob", + "pnet_base 0.35.0", + "pnet_macros 0.35.0", + "pnet_macros_support 0.35.0", ] [[package]] @@ -3497,6 +3584,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pretty_env_logger" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" +dependencies = [ + "env_logger 0.10.2", + "log", +] + [[package]] name = "prettyplease" version = "0.2.19" @@ -3646,6 +3743,58 @@ dependencies = [ "serde", ] +[[package]] +name = "quinn" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls 0.23.18", + "socket2", + "thiserror 2.0.9", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" +dependencies = [ + "bytes", + "getrandom 0.2.14", + "rand 0.8.5", + "ring", + "rustc-hash", + "rustls 0.23.18", + "rustls-pki-types", + "slab", + "thiserror 2.0.9", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c40286217b4ba3a71d644d752e6a0b71f13f1b6a2c5311acfcbe0c2418ed904" +dependencies = [ + "cfg_aliases 0.2.1", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -3794,6 +3943,48 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +[[package]] +name = "reqwest" +version = "0.12.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls 0.23.18", + "rustls-pemfile 2.1.3", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 0.26.7", + "windows-registry", +] + [[package]] name = "resolv-conf" version = "0.7.0" @@ -3865,6 +4056,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" + [[package]] name = "rustc_version" version = "0.4.0" @@ -3938,6 +4135,9 @@ name = "rustls-pki-types" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +dependencies = [ + "web-time", +] [[package]] name = "rustls-webpki" @@ -4312,9 +4512,9 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" -version = "0.5.6" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" dependencies = [ "libc", "windows-sys 0.52.0", @@ -4365,7 +4565,7 @@ checksum = "efbf95ce4c7c5b311d2ce3f088af2b93edef0f09727fa50fbe03c7a979afce77" dependencies = [ "hex", "parking_lot", - "pnet_packet", + "pnet_packet 0.34.0", "rand 0.8.5", "socket2", "thiserror 1.0.59", @@ -4417,6 +4617,9 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "synstructure" @@ -4475,7 +4678,7 @@ dependencies = [ "parking_lot", "pcap", "pfctl", - "pnet_packet", + "pnet_packet 0.34.0", "rand 0.8.5", "resolv-conf", "serde", @@ -5367,6 +5570,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.92" @@ -5396,12 +5611,41 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "web-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -5516,6 +5760,17 @@ dependencies = [ "syn 2.0.89", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + [[package]] name = "windows-result" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 1fd83313b634..505ed7bc47fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,9 @@ members = [ "tunnel-obfuscation", "wireguard-go-rs", "windows-installer", + "leak-checker", ] + # The default members may exclude packages that cannot be built for all targets, or that do not always # need to be built default-members = [ @@ -113,6 +115,7 @@ hickory-server = { version = "0.24.2", features = ["resolver"] } tokio = { version = "1.42" } parity-tokio-ipc = "0.9" futures = "0.3.15" + # Tonic and related crates tonic = "0.12.3" tonic-build = { version = "0.10.0", default-features = false } @@ -123,6 +126,7 @@ hyper-util = {version = "0.1.8", features = ["client", "client-legacy", "http2", env_logger = "0.10.0" thiserror = "2.0" +anyhow = "1.0" log = "0.4" shadowsocks = "1.20.3" @@ -138,6 +142,7 @@ serde_json = "1.0.122" ipnetwork = "0.20" tun = { version = "0.7", features = ["async"] } +socket2 = "0.5.7" # Test dependencies proptest = "1.4" diff --git a/leak-checker/Cargo.toml b/leak-checker/Cargo.toml new file mode 100644 index 000000000000..0b7d9744398e --- /dev/null +++ b/leak-checker/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "leak-checker" +version = "0.1.0" +authors.workspace = true +repository.workspace = true +license.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +log.workspace = true +anyhow.workspace = true +socket2 = { workspace = true, features = ["all"] } +match_cfg = "0.1.0" +pnet_packet = "0.35.0" +pretty_env_logger = "0.5.0" +tokio = { workspace = true, features = ["macros", "time", "rt", "sync", "net"] } +futures.workspace = true +serde = { workspace = true, features = ["derive"] } +reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"] } +clap = { workspace = true, features = ["derive"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } + +[target.'cfg(unix)'.dependencies] +nix = { version = "0.29.0", features = ["net", "socket", "uio"] } + +[target.'cfg(windows)'.dependencies] +windows-sys.workspace = true +talpid-windows = { path = "../talpid-windows" } + +[lints] +workspace = true diff --git a/leak-checker/examples/leaker-cli.rs b/leak-checker/examples/leaker-cli.rs new file mode 100644 index 000000000000..ce5921ddf660 --- /dev/null +++ b/leak-checker/examples/leaker-cli.rs @@ -0,0 +1,36 @@ +use clap::{Parser, Subcommand}; +use leak_checker::{am_i_mullvad::AmIMullvadOpt, traceroute::TracerouteOpt}; + +#[derive(Parser)] +pub struct Opt { + #[clap(subcommand)] + pub method: LeakMethod, +} + +#[derive(Subcommand, Clone)] +pub enum LeakMethod { + /// Check for leaks by binding to a non-tunnel interface and probing for reachable nodes. + Traceroute(#[clap(flatten)] TracerouteOpt), + + /// Ask `am.i.mullvad.net` whether you are leaking. + AmIMullvad(#[clap(flatten)] AmIMullvadOpt), +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + pretty_env_logger::formatted_builder() + .filter_level(log::LevelFilter::Debug) + .parse_default_env() + .init(); + + let opt = Opt::parse(); + + let leak_status = match &opt.method { + LeakMethod::Traceroute(opt) => leak_checker::traceroute::run_leak_test(opt).await, + LeakMethod::AmIMullvad(opt) => leak_checker::am_i_mullvad::run_leak_test(opt).await, + }; + + log::info!("Leak status: {leak_status:#?}"); + + Ok(()) +} diff --git a/leak-checker/src/am_i_mullvad.rs b/leak-checker/src/am_i_mullvad.rs new file mode 100644 index 000000000000..81fe90815c93 --- /dev/null +++ b/leak-checker/src/am_i_mullvad.rs @@ -0,0 +1,90 @@ +use anyhow::{anyhow, Context}; +use futures::TryFutureExt; +use match_cfg::match_cfg; +use reqwest::{Client, ClientBuilder}; +use serde::Deserialize; + +use crate::{LeakInfo, LeakStatus}; + +#[derive(Clone, clap::Args)] +pub struct AmIMullvadOpt { + /// Try to bind to a specific interface + #[clap(short, long)] + interface: Option, +} + +const AM_I_MULLVAD_URL: &str = "https://am.i.mullvad.net/json"; + +/// [try_run_leak_test], but on an error, assume we aren't leaking. +pub async fn run_leak_test(opt: &AmIMullvadOpt) -> LeakStatus { + try_run_leak_test(opt) + .await + .inspect_err(|e| log::debug!("Leak test errored, assuming no leak. {e:?}")) + .unwrap_or(LeakStatus::NoLeak) +} + +/// Check if connected to Mullvad and print the result to stdout +pub async fn try_run_leak_test(opt: &AmIMullvadOpt) -> anyhow::Result { + #[derive(Debug, Deserialize)] + struct Response { + ip: String, + mullvad_exit_ip_hostname: Option, + } + + let mut client = Client::builder(); + + if let Some(interface) = &opt.interface { + client = bind_client_to_interface(client, interface)?; + } + + let client = client.build().context("Failed to create HTTP client")?; + let response: Response = client + .get(AM_I_MULLVAD_URL) + //.timeout(Duration::from_secs(opt.timeout)) + .send() + .and_then(|r| r.json()) + .await + .with_context(|| anyhow!("Failed to GET {AM_I_MULLVAD_URL}"))?; + + if let Some(server) = &response.mullvad_exit_ip_hostname { + log::debug!( + "You are connected to Mullvad (server {}). Your IP address is {}", + server, + response.ip + ); + Ok(LeakStatus::NoLeak) + } else { + log::debug!( + "You are not connected to Mullvad. Your IP address is {}", + response.ip + ); + Ok(LeakStatus::LeakDetected(LeakInfo::AmIMullvad { + ip: response.ip.parse().context("Malformed IP")?, + })) + } +} + +match_cfg! { + #[cfg(target_os = "linux")] => { + fn bind_client_to_interface( + builder: ClientBuilder, + interface: &str + ) -> anyhow::Result { + log::debug!("Binding HTTP client to {interface}"); + Ok(builder.interface(interface)) + } + } + #[cfg(any(target_os = "macos", target_os = "windows", target_os = "android"))] => { + fn bind_client_to_interface( + builder: ClientBuilder, + interface: &str + ) -> anyhow::Result { + use crate::util::get_interface_ip; + + let ip = get_interface_ip(interface)?; + + log::debug!("Binding HTTP client to {ip} ({interface})"); + Ok(builder.local_address(ip)) + } + } +} diff --git a/leak-checker/src/lib.rs b/leak-checker/src/lib.rs new file mode 100644 index 000000000000..7b3caa57bf40 --- /dev/null +++ b/leak-checker/src/lib.rs @@ -0,0 +1,50 @@ +use std::{fmt, net::IpAddr}; + +pub mod am_i_mullvad; +pub mod traceroute; +mod util; + +#[derive(Clone, Debug)] +pub enum LeakStatus { + NoLeak, + LeakDetected(LeakInfo), +} + +/// Details about how a leak happened +#[derive(Clone, Debug)] +pub enum LeakInfo { + /// Managed to reach another network node on the physical interface, bypassing firewall rules. + NodeReachableOnInterface { + reachable_nodes: Vec, + interface: Interface, + }, + + /// Queried a , and was not mullvad. + AmIMullvad { ip: IpAddr }, +} + +#[derive(Clone)] +pub enum Interface { + Name(String), + + #[cfg(target_os = "windows")] + Luid(windows_sys::Win32::NetworkManagement::Ndis::NET_LUID_LH), +} + +impl From for Interface { + fn from(name: String) -> Self { + Interface::Name(name) + } +} + +impl fmt::Debug for Interface { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Name(arg0) => f.debug_tuple("Name").field(arg0).finish(), + + // TODO: + #[cfg(target_os = "windows")] + Self::Luid(arg0) => f.debug_tuple("Luid").field(unsafe { &arg0.Value }).finish(), + } + } +} diff --git a/leak-checker/src/traceroute.rs b/leak-checker/src/traceroute.rs new file mode 100644 index 000000000000..45a58164fc70 --- /dev/null +++ b/leak-checker/src/traceroute.rs @@ -0,0 +1,96 @@ +use std::{net::IpAddr, ops::Range, time::Duration}; + +use crate::{Interface, LeakStatus}; + +/// Traceroute implementation for windows. +#[cfg(target_os = "windows")] +mod windows; + +/// Traceroute implementation for unix. +#[cfg(unix)] +mod unix; + +#[derive(Clone, clap::Args)] +pub struct TracerouteOpt { + /// Try to bind to a specific interface + #[clap(short, long)] + pub interface: Interface, + + /// Destination IP of the probe packets + #[clap(short, long)] + pub destination: IpAddr, + + /// Avoid sending UDP probe packets to this port. + #[clap(long, conflicts_with = "icmp")] + pub exclude_port: Option, + + /// Send UDP probe packets only to this port, instead of the default ports. + #[clap(long, conflicts_with = "icmp")] + pub port: Option, + + /// Use ICMP-Echo for the probe packets instead of UDP. + #[clap(long)] + pub icmp: bool, +} + +/// Timeout of the leak test as a whole. Should be more than [SEND_TIMEOUT] + [RECV_TIMEOUT]. +const LEAK_TIMEOUT: Duration = Duration::from_secs(5); + +/// Timeout of sending probe packets +const SEND_TIMEOUT: Duration = Duration::from_secs(1); + +/// Timeout of receiving additional probe packets after the first one +const RECV_GRACE_TIME: Duration = Duration::from_millis(220); + +/// Time in-between send of each probe packet. +const PROBE_INTERVAL: Duration = Duration::from_millis(100); + +/// Range of TTL values for the probe packets. +const DEFAULT_TTL_RANGE: Range = 1..6; + +/// [try_run_leak_test], but on an error, assume we aren't leaking. +pub async fn run_leak_test(opt: &TracerouteOpt) -> LeakStatus { + try_run_leak_test(opt) + .await + .inspect_err(|e| log::debug!("Leak test errored, assuming no leak. {e:?}")) + .unwrap_or(LeakStatus::NoLeak) +} + +/// Run a traceroute-based leak test. +/// +/// This test will try to create a socket and bind it to `interface`. Then it will send either UDP +/// or ICMP Echo packets to `destination` with very low TTL values. If any network nodes between +/// this one and `destination` see a packet with a TTL value of 0, they will _probably_ return an +/// ICMP/TimeExceeded response. +/// +/// If we receive the response, we know the outgoing packet was NOT blocked by the firewall, and +/// therefore we are leaking. Since we set the TTL very low, this also means that in the event of a +/// leak, the packet will _probably_ not make it out of the users local network, e.g. the local +/// router will probably be the first node that gives a reply. Since the packet should not actually +/// reach `destination`, this testing method is resistant to being fingerprinted or censored. +/// +/// This test needs a raw socket to be able to listen for the ICMP responses, therefore it requires +/// root/admin priviliges. +pub async fn try_run_leak_test(opt: &TracerouteOpt) -> anyhow::Result { + #[cfg(unix)] + return { + #[cfg(target_os = "android")] + type Impl = unix::android::TracerouteAndroid; + #[cfg(target_os = "linux")] + type Impl = unix::linux::TracerouteLinux; + #[cfg(target_os = "macos")] + type Impl = unix::macos::TracerouteMacos; + + unix::try_run_leak_test::(opt).await + }; + + #[cfg(target_os = "windows")] + return windows::traceroute_using_ping(opt).await; +} + +/// IP version, v4 or v6, with some associated data. +#[derive(Clone, Copy)] +enum Ip { + V4(V4), + V6(V6), +} diff --git a/leak-checker/src/traceroute/unix/android.rs b/leak-checker/src/traceroute/unix/android.rs new file mode 100644 index 000000000000..d42a108045ee --- /dev/null +++ b/leak-checker/src/traceroute/unix/android.rs @@ -0,0 +1,31 @@ +use socket2::Socket; + +use crate::{ + traceroute::{Ip, TracerouteOpt}, + Interface, +}; + +use super::{ + common::bind_socket_to_interface, + linux::{self, TracerouteLinux}, + Traceroute, +}; + +pub struct TracerouteAndroid; + +impl Traceroute for TracerouteAndroid { + type AsyncIcmpSocket = linux::AsyncIcmpSocketImpl; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()> { + // can't use the same method as desktop-linux here beacuse reasons + bind_socket_to_interface(socket, interface, ip_version) + } + + fn configure_icmp_socket(socket: &socket2::Socket, opt: &TracerouteOpt) -> anyhow::Result<()> { + TracerouteLinux::configure_icmp_socket(socket, opt) + } +} diff --git a/leak-checker/src/traceroute/unix/common.rs b/leak-checker/src/traceroute/unix/common.rs new file mode 100644 index 000000000000..e7810c7fb407 --- /dev/null +++ b/leak-checker/src/traceroute/unix/common.rs @@ -0,0 +1,52 @@ +#![allow(dead_code)] // some code here is not used on some targets. + +use std::net::{IpAddr, SocketAddr}; + +use anyhow::Context; +use socket2::Socket; + +use crate::{traceroute::Ip, Interface}; + +pub(crate) fn get_interface_ip(interface: &Interface, ip_version: Ip) -> anyhow::Result { + let Interface::Name(interface) = interface; + + for interface_address in nix::ifaddrs::getifaddrs()? { + if &interface_address.interface_name != interface { + continue; + }; + let Some(address) = interface_address.address else { + continue; + }; + + match ip_version { + Ip::V4(()) => { + if let Some(address) = address.as_sockaddr_in() { + return Ok(IpAddr::V4(address.ip())); + }; + } + Ip::V6(()) => { + if let Some(address) = address.as_sockaddr_in6() { + return Ok(IpAddr::V6(address.ip())); + }; + } + } + } + + anyhow::bail!("Interface {interface:?} has no valid IP to bind to"); +} + +pub(crate) fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, +) -> anyhow::Result<()> { + let interface_ip = get_interface_ip(interface, ip_version)?; + + log::info!("Binding socket to {interface_ip} ({interface:?})"); + + socket + .bind(&SocketAddr::new(interface_ip, 0).into()) + .context("Failed to bind socket to interface address")?; + + Ok(()) +} diff --git a/leak-checker/src/traceroute/unix/linux.rs b/leak-checker/src/traceroute/unix/linux.rs new file mode 100644 index 000000000000..def0f698485b --- /dev/null +++ b/leak-checker/src/traceroute/unix/linux.rs @@ -0,0 +1,329 @@ +use std::io::{self, IoSliceMut}; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::{net::IpAddr, time::Duration}; + +use anyhow::{anyhow, bail, Context}; +use nix::errno::Errno; +use nix::sys::socket::{ + recvmsg, setsockopt, sockopt::Ipv4RecvErr, ControlMessageOwned, MsgFlags, SockaddrIn, + SockaddrIn6, SockaddrLike, +}; +use nix::{cmsg_space, libc}; +use pnet_packet::icmp::time_exceeded::IcmpCodes; +use pnet_packet::icmp::{IcmpCode, IcmpType, IcmpTypes}; +use pnet_packet::icmpv6::{Icmpv6Code, Icmpv6Type, Icmpv6Types}; +use socket2::Socket; +use tokio::time::{sleep, Instant}; + +use crate::traceroute::unix::parse_icmp_probe; +use crate::traceroute::{Ip, TracerouteOpt, RECV_GRACE_TIME}; +use crate::{Interface, LeakInfo, LeakStatus}; + +use super::{AsyncIcmpSocket, Traceroute}; + +pub struct TracerouteLinux; + +pub struct AsyncIcmpSocketImpl(tokio::net::UdpSocket); + +impl Traceroute for TracerouteLinux { + type AsyncIcmpSocket = AsyncIcmpSocketImpl; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + _: Ip, + ) -> anyhow::Result<()> { + bind_socket_to_interface(socket, interface) + } + + fn configure_icmp_socket(socket: &socket2::Socket, _opt: &TracerouteOpt) -> anyhow::Result<()> { + // IP_RECVERR tells Linux to pass any error packets received over ICMP to us through `recvmsg` control messages. + setsockopt(socket, Ipv4RecvErr, &true).context("Failed to set IP_RECVERR") + } +} + +impl AsyncIcmpSocket for AsyncIcmpSocketImpl { + fn from_socket2(socket: Socket) -> Self { + let raw_socket = socket.into_raw_fd(); + let std_socket = unsafe { std::net::UdpSocket::from_raw_fd(raw_socket) }; + let tokio_socket = tokio::net::UdpSocket::from_std(std_socket).unwrap(); + AsyncIcmpSocketImpl(tokio_socket) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for socket") + } + + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result { + self.0.send_to(packet, (destination.into(), 0)).await + } + + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, IpAddr)> { + self.0 + .recv_from(buf) + .await + .map(|(n, source)| (n, source.ip())) + } + + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result { + recv_ttl_responses(opt.destination, &opt.interface, &self.0).await + } +} + +fn bind_socket_to_interface(socket: &Socket, interface: &Interface) -> anyhow::Result<()> { + log::info!("Binding socket to {interface:?}"); + + let Interface::Name(interface) = interface; + + socket + .bind_device(Some(interface.as_bytes())) + .context("Failed to bind socket to interface")?; + + Ok(()) +} + +/// Try to read ICMP/TimeExceeded error packets from an ICMP socket. +/// +/// This method does not require root, but only works on Linux (including Android). +async fn recv_ttl_responses( + destination: IpAddr, + interface: &Interface, + socket: &impl AsRawFd, +) -> anyhow::Result { + // the list of node IP addresses from which we received a response to our probe packets. + let mut reachable_nodes = vec![]; + + // A time at which this function should exit. This is set when we receive the first probe + // response, and allows us to wait a while to collect any additional probe responses before + // returning. + let mut timeout_at = None; + + // Allocate buffer for receiving packets. + let mut recv_buf = vec![0u8; usize::from(u16::MAX)].into_boxed_slice(); + let mut io_vec = [IoSliceMut::new(&mut recv_buf)]; + + // Allocate space for EHOSTUNREACH errors caused by ICMP/TimeExceeded packets. + let mut control_buf = match destination { + // This is the size of ControlMessageOwned::Ipv4RecvErr(sock_extended_err, sockaddr_in). + IpAddr::V4(..) => cmsg_space!(libc::sock_extended_err, libc::sockaddr_in), + + // This is the size of ControlMessageOwned::Ipv6RecvErr(sock_extended_err, sockaddr_in6). + IpAddr::V6(..) => cmsg_space!(libc::sock_extended_err, libc::sockaddr_in6), + }; + + 'outer: loop { + log::debug!("Reading from ICMP socket"); + + // Call recvmsg in a loop + let recv_packet = loop { + if let Some(timeout_at) = timeout_at { + if Instant::now() >= timeout_at { + break 'outer; + } + } + + let recv_packet = match destination { + IpAddr::V4(..) => recvmsg_with_control_message::( + socket.as_raw_fd(), + &mut io_vec, + &mut control_buf, + )? + .map(|packet| packet.map_source_addr(|a| IpAddr::from(a.ip()))), + IpAddr::V6(..) => recvmsg_with_control_message::( + socket.as_raw_fd(), + &mut io_vec, + &mut control_buf, + )? + .map(|packet| packet.map_source_addr(|a| IpAddr::from(a.ip()))), + }; + + let Some(recv_packet) = recv_packet else { + // poor-mans async IO :'( + sleep(Duration::from_millis(10)).await; + continue; + }; + + break recv_packet; + }; + + let RecvPacket { + source_addr, + packet, + control_message, + } = recv_packet; + + macro_rules! skip_if { + ($skip_condition:expr, $note:expr) => {{ + if $skip_condition { + log::debug!("Ignoring received message: {}", $note); + continue 'outer; + } + }}; + } + + // NOTE: This should be the IP destination of our ping packets. That does NOT mean the + // packets reached the destination. Instead, if we see an EHOSTUNREACH control message, + // it means the packets was instead dropped along the way. Seeing this address helps us + // identify that this is a response to the ping we sent. + skip_if!(source_addr != destination, "Unknown source"); + + let error_source = match control_message { + ControlMessageOwned::Ipv4RecvErr(socket_error, source_addr) => { + let libc::sock_extended_err { + ee_errno, // Error Number: Should be EHOSTUNREACH + ee_origin, // Error Origin: 2 = Icmp + ee_type, // ICMP Type: 11 = ICMP/TimeExceeded. + ee_code, // ICMP Code. 0 = TTL exceeded in transit. + .. + } = socket_error; + + let errno = Errno::from_raw(ee_errno as i32); + skip_if!(errno != Errno::EHOSTUNREACH, "Unexpected errno"); + skip_if!( + ee_origin != nix::libc::SO_EE_ORIGIN_ICMP, + "Unexpected origin" + ); + + let icmp_type = IcmpType::new(ee_type); + skip_if!(icmp_type != IcmpTypes::TimeExceeded, "Unexpected ICMP type"); + + let icmp_code = IcmpCode::new(ee_code); + skip_if!( + icmp_code != IcmpCodes::TimeToLiveExceededInTransit, + "Unexpected ICMP code" + ); + + // NOTE: This is the IP of the node that dropped the packet due to TTL exceeded. + let error_source = SockaddrIn::from(source_addr.unwrap()); + log::debug!("addr: {error_source}"); + + // Ensure that this is the original Echo packet that we sent. + skip_if!( + parse_icmp_probe(Ip::V4(packet)).is_err(), + "Not a response to us" + ); + + IpAddr::from(error_source.ip()) + } + ControlMessageOwned::Ipv6RecvErr(socket_error, source_addr) => { + let libc::sock_extended_err { + ee_errno, // Error Number: Should be EHOSTUNREACH + ee_origin, // Error Origin: 3 = Icmp6. + ee_type, // ICMP Type: 3 = ICMP6/TimeExceeded + ee_code, // ICMP Code. 0 = TTL exceeded in transit. + .. + } = socket_error; + + let errno = Errno::from_raw(ee_errno as i32); + skip_if!(errno != Errno::EHOSTUNREACH, "Unexpected errno"); + skip_if!( + ee_origin != nix::libc::SO_EE_ORIGIN_ICMP6, + "Unexpected origin" + ); + + let icmp_type = Icmpv6Type::new(ee_type); + skip_if!( + icmp_type != Icmpv6Types::TimeExceeded, + "Unexpected ICMP type" + ); + + let icmp_code = Icmpv6Code::new(ee_code); + skip_if!(icmp_code != Icmpv6Code::new(0), "Unexpected ICMP code"); + + // NOTE: This is the IP of the node that dropped the packet due to TTL exceeded. + let error_source = SockaddrIn6::from(source_addr.unwrap()); + log::debug!("addr: {error_source}"); + + // Ensure that this is the original Echo packet that we sent. + skip_if!( + parse_icmp_probe(Ip::V6(packet)).is_err(), + "Not a response to us" + ); + + IpAddr::from(error_source.ip()) + } + other_message => { + // TODO: We might want to not error in this case, and just ignore the cmsg. + // If so, we should loop over the iterator instead of taking the first elem. + bail!("Unhandled control message: {other_message:?}"); + } + }; + + log::debug!("Got a probe response, we are leaking!"); + timeout_at.get_or_insert_with(|| Instant::now() + RECV_GRACE_TIME); + reachable_nodes.push(error_source); + } + + debug_assert!(!reachable_nodes.is_empty()); + + Ok(LeakStatus::LeakDetected( + LeakInfo::NodeReachableOnInterface { + reachable_nodes, + interface: interface.clone(), + }, + )) +} + +struct RecvPacket<'a, S> { + pub source_addr: S, + pub packet: &'a [u8], + pub control_message: ControlMessageOwned, +} + +impl<'a, S> RecvPacket<'a, S> { + /// Convert the type of [RecvPacket::source_addr], e.g. from [SockaddrIn6] to [IpAddr]. + fn map_source_addr(self, f: impl FnOnce(S) -> T) -> RecvPacket<'a, T> { + RecvPacket { + source_addr: f(self.source_addr), + packet: self.packet, + control_message: self.control_message, + } + } +} + +/// Call recvmsg and expect exactly one control message. +/// +/// See [ControlMessageOwned] for details on control messages. +/// Returns `Ok(None)` on `EWOULDBLOCK`, or if recvmsg returns no control message. +fn recvmsg_with_control_message<'a, S: SockaddrLike + Copy>( + socket: RawFd, + io_vec: &'a mut [IoSliceMut<'_>; 1], + control_buf: &mut Vec, +) -> anyhow::Result>> { + // MSG_ERRQUEUE asks linux to tell us if we get any ICMP error replies to + // our Echo packets. + let flags = MsgFlags::MSG_ERRQUEUE; + + let result = recvmsg::(socket.as_raw_fd(), io_vec, Some(control_buf), flags); + + let recv = match result { + Ok(recv) => recv, + Err(Errno::EWOULDBLOCK) => return Ok(None), + Err(e) => return Err(anyhow!("Failed to read from socket: {e}")), + }; + + let source_addr = recv.address.unwrap(); + + let mut control_messages = recv + .cmsgs() + .context("Failed to decode cmsgs from recvmsg")?; + + let Some(control_message) = control_messages.next() else { + // We're looking for EHOSTUNREACH errors. No errors means skip. + log::debug!("Skipping recvmsg that produced no control messages."); + return Ok(None); + }; + + let Some(packet) = recv.iovs().next() else { + log::debug!("Skipping recvmsg that produced no data."); + return Ok(None); + }; + + Ok(Some(RecvPacket { + source_addr, + packet, + control_message, + })) +} diff --git a/leak-checker/src/traceroute/unix/macos.rs b/leak-checker/src/traceroute/unix/macos.rs new file mode 100644 index 000000000000..1fd6e31d41d1 --- /dev/null +++ b/leak-checker/src/traceroute/unix/macos.rs @@ -0,0 +1,285 @@ +use std::ascii::escape_default; +use std::future::pending; +use std::io; +use std::net::IpAddr; +use std::num::NonZero; +use std::os::fd::{FromRawFd, IntoRawFd}; + +use anyhow::{anyhow, bail, ensure, Context}; +use nix::net::if_::if_nametoindex; +use pnet_packet::{ + icmp::{self, time_exceeded::TimeExceededPacket, IcmpPacket, IcmpTypes}, + icmpv6::{Icmpv6Packet, Icmpv6Types}, + ip::IpNextHeaderProtocols, + ipv4::Ipv4Packet, + ipv6::Ipv6Packet, + udp::UdpPacket, + Packet, +}; +use socket2::Socket; +use tokio::{ + select, + time::{sleep_until, Instant}, +}; + +use crate::traceroute::{Ip, TracerouteOpt, RECV_GRACE_TIME}; +use crate::{Interface, LeakInfo, LeakStatus}; + +use super::{parse_icmp_probe, too_small, AsyncIcmpSocket, Traceroute, PROBE_PAYLOAD}; + +pub struct TracerouteMacos; + +pub struct AsyncIcmpSocketImpl(tokio::net::UdpSocket); + +impl Traceroute for TracerouteMacos { + type AsyncIcmpSocket = AsyncIcmpSocketImpl; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()> { + // can't use the same method as desktop-linux here beacuse reasons + bind_socket_to_interface(socket, interface, ip_version) + } + + fn configure_icmp_socket( + _socket: &socket2::Socket, + _opt: &TracerouteOpt, + ) -> anyhow::Result<()> { + Ok(()) + } +} + +impl AsyncIcmpSocket for AsyncIcmpSocketImpl { + fn from_socket2(socket: Socket) -> Self { + let raw_socket = socket.into_raw_fd(); + let std_socket = unsafe { std::net::UdpSocket::from_raw_fd(raw_socket) }; + let tokio_socket = tokio::net::UdpSocket::from_std(std_socket).unwrap(); + AsyncIcmpSocketImpl(tokio_socket) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for socket") + } + + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result { + self.0.send_to(packet, (destination.into(), 0)).await + } + + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, IpAddr)> { + self.0 + .recv_from(buf) + .await + .map(|(n, source)| (n, source.ip())) + } + + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result { + recv_ttl_responses(self, &opt.interface).await + } +} + +fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, +) -> anyhow::Result<()> { + let Interface::Name(interface) = interface; + + log::info!("Binding socket to {interface:?}"); + + let interface_index = if_nametoindex(interface.as_str()) + .map_err(anyhow::Error::from) + .and_then(|code| NonZero::new(code).ok_or(anyhow!("Non-zero error code"))) + .context("Failed to get interface index")?; + + match ip_version { + Ip::V4(..) => socket.bind_device_by_index_v4(Some(interface_index))?, + Ip::V6(..) => socket.bind_device_by_index_v6(Some(interface_index))?, + } + Ok(()) +} + +async fn recv_ttl_responses( + socket: &impl AsyncIcmpSocket, + interface: &Interface, +) -> anyhow::Result { + // the list of node IP addresses from which we received a response to our probe packets. + let mut reachable_nodes = vec![]; + + // A time at which this function should exit. This is set when we receive the first probe + // response, and allows us to wait a while to collect any additional probe responses before + // returning. + let mut timeout_at = None; + + let mut read_buf = vec![0u8; usize::from(u16::MAX)].into_boxed_slice(); + loop { + let timer = async { + match timeout_at { + // resolve future at the timeout, if it's set + Some(time) => sleep_until(time).await, + + // otherwise, never resolve + None => pending().await, + } + }; + + log::debug!("Reading from ICMP socket"); + + let (n, source) = select! { + result = socket.recv_from(&mut read_buf[..]) => result + .context("Failed to read from raw socket")?, + + _timeout = timer => { + return Ok(LeakStatus::LeakDetected(LeakInfo::NodeReachableOnInterface { + reachable_nodes, + interface: interface.clone(), + })); + } + }; + + let packet = &read_buf[..n]; + let result = parse_ip(packet) + .and_then(|packet| match packet { + Ip::V4(ip_packet) => parse_icmp4_time_exceeded(&ip_packet), + Ip::V6(ip_packet) => parse_icmp6_time_exceeded(&ip_packet), + }) + .map_err(|e| { + anyhow!("Ignoring packet: (len={n}, ip.src={source}) {e} ({packet:02x?})") + }); + + match result { + Ok(ip) => { + log::debug!("Got a probe response, we are leaking!"); + timeout_at.get_or_insert_with(|| Instant::now() + RECV_GRACE_TIME); + if !reachable_nodes.contains(&ip) { + reachable_nodes.push(ip); + } + } + + // an error means the packet wasn't the ICMP/TimeExceeded we're listening for. + Err(e) => log::debug!("{e}"), + } + } +} + +/// Try to parse the bytes as an IPv4 or IPv6 packet. +/// +/// This only valdiates the IP header, not the payload. +fn parse_ip(packet: &[u8]) -> anyhow::Result, Ipv6Packet<'_>>> { + let ipv4_packet = Ipv4Packet::new(packet).ok_or_else(too_small)?; + + // ipv4-packets are smaller than ipv6, so we use an Ipv4Packet to check the version. + Ok(match ipv4_packet.get_version() { + 4 => Ip::V4(ipv4_packet), + 6 => { + let ipv6_packet = Ipv6Packet::new(packet).ok_or_else(too_small)?; + Ip::V6(ipv6_packet) + } + _ => bail!("Not a valid IP header"), + }) +} + +/// Try to parse an [Ipv4Packet] as an ICMP/TimeExceeded response to a packet sent by +/// [send_udp_probes] or [send_icmp_probes]. If successful, returns the [Ipv4Addr] of the packet +/// source. +/// +/// If the packet fails to parse, or is not a reply to a packet sent by us, this function returns +/// an error. +fn parse_icmp4_time_exceeded(ip_packet: &Ipv4Packet<'_>) -> anyhow::Result { + let ip_protocol = ip_packet.get_next_level_protocol(); + ensure!(ip_protocol == IpNextHeaderProtocols::Icmp, "Not ICMP"); + parse_icmp_time_exceeded_raw(Ip::V4(ip_packet.payload()))?; + Ok(ip_packet.get_source().into()) +} + +/// Try to parse an [Ipv6Packet] as an ICMP6/TimeExceeded response to a packet sent by +/// [send_udp_probes] or [send_icmp_probes]. If successful, returns the [Ipv6Addr] of the packet +/// source. +/// +/// If the packet fails to parse, or is not a reply to a packet sent by us, this function returns +/// an error. +fn parse_icmp6_time_exceeded(ip_packet: &Ipv6Packet<'_>) -> anyhow::Result { + let ip_protocol = ip_packet.get_next_header(); + ensure!(ip_protocol == IpNextHeaderProtocols::Icmpv6, "Not ICMP6"); + parse_icmp_time_exceeded_raw(Ip::V6(ip_packet.payload()))?; + Ok(ip_packet.get_source().into()) +} + +/// Try to parse some bytes into an ICMP or ICMP6 TimeExceeded response to a probe packet sent by +/// [send_udp_probes] or [send_icmp_probes]. +/// +/// If the packet fails to parse, or is not a reply to a packet sent by us, this function returns +/// an error. +fn parse_icmp_time_exceeded_raw(ip_payload: Ip<&[u8], &[u8]>) -> anyhow::Result<()> { + let icmpv4_packet; + let icmpv6_packet; + let icmp_packet: &[u8] = match ip_payload { + Ip::V4(ipv4_payload) => { + icmpv4_packet = IcmpPacket::new(ipv4_payload).ok_or(anyhow!("Too small"))?; + + let correct_type = icmpv4_packet.get_icmp_type() == IcmpTypes::TimeExceeded; + ensure!(correct_type, "Not ICMP/TimeExceeded"); + + icmpv4_packet.packet() + } + Ip::V6(ipv6_payload) => { + icmpv6_packet = Icmpv6Packet::new(ipv6_payload).ok_or(anyhow!("Too small"))?; + + let correct_type = icmpv6_packet.get_icmpv6_type() == Icmpv6Types::TimeExceeded; + ensure!(correct_type, "Not ICMP6/TimeExceeded"); + + icmpv6_packet.packet() + } + }; + + // TimeExceededPacket looks the same for both ICMP and ICMP6. + let time_exceeded = TimeExceededPacket::new(icmp_packet).ok_or_else(too_small)?; + ensure!( + time_exceeded.get_icmp_code() + == icmp::time_exceeded::IcmpCodes::TimeToLiveExceededInTransit, + "Not TTL Exceeded", + ); + + let original_ip_packet = parse_ip(time_exceeded.payload()).context("ICMP-wrapped IP packet")?; + + let (original_ip_protocol, original_ip_payload) = match &original_ip_packet { + Ip::V4(ipv4_packet) => (ipv4_packet.get_next_level_protocol(), ipv4_packet.payload()), + Ip::V6(ipv6_packet) => (ipv6_packet.get_next_header(), ipv6_packet.payload()), + }; + + match original_ip_protocol { + IpNextHeaderProtocols::Udp => { + let original_udp_packet = UdpPacket::new(original_ip_payload).ok_or_else(too_small)?; + + // check if payload looks right + // some network nodes will strip the payload, that's fine. + if !original_udp_packet.payload().is_empty() { + let udp_len = usize::from(original_udp_packet.get_length()); + let udp_payload = udp_len + .checked_sub(UdpPacket::minimum_packet_size()) + .and_then(|len| original_udp_packet.payload().get(..len)) + .ok_or(anyhow!("Invalid UDP length"))?; + if udp_payload != PROBE_PAYLOAD { + let udp_payload: String = udp_payload + .iter() + .copied() + .flat_map(escape_default) + .map(char::from) + .collect(); + bail!("Wrong UDP payload: {udp_payload:?}"); + } + } + + Ok(()) + } + + IpNextHeaderProtocols::Icmp => parse_icmp_probe(Ip::V4(original_ip_payload)), + + IpNextHeaderProtocols::Icmpv6 => parse_icmp_probe(Ip::V6(original_ip_payload)), + + _ => bail!("Not UDP/ICMP"), + } +} diff --git a/leak-checker/src/traceroute/unix/mod.rs b/leak-checker/src/traceroute/unix/mod.rs new file mode 100644 index 000000000000..8ab877dbd781 --- /dev/null +++ b/leak-checker/src/traceroute/unix/mod.rs @@ -0,0 +1,356 @@ +use std::{ + ascii::escape_default, + convert::Infallible, + io, + net::{IpAddr, SocketAddr}, + ops::RangeFrom, + os::fd::{FromRawFd, IntoRawFd}, +}; + +use crate::traceroute::{ + Ip, TracerouteOpt, DEFAULT_TTL_RANGE, LEAK_TIMEOUT, PROBE_INTERVAL, SEND_TIMEOUT, +}; +use crate::{Interface, LeakStatus}; + +use anyhow::{anyhow, bail, ensure, Context}; +use common::get_interface_ip; +use futures::{future::pending, select, stream, FutureExt, StreamExt, TryStreamExt}; +use pnet_packet::{ + icmp::{self, IcmpCode, IcmpTypes}, + icmpv6::{self, Icmpv6Code, Icmpv6Types}, + Packet, +}; +use socket2::{Domain, Protocol, Socket, Type}; +use tokio::time::{sleep, timeout}; + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod android; + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod linux; + +#[cfg(target_os = "macos")] +pub mod macos; + +pub mod common; + +/// Type of the UDP payload of the probe packets +type ProbePayload = [u8; 32]; + +/// Value of the UDP payload of the probe packets +const PROBE_PAYLOAD: ProbePayload = *b"ABCDEFGHIJKLMNOPQRSTUVWXYZ123456"; + +/// Default range of ports for the UDP probe packets. Stolen from `traceroute`. +const DEFAULT_PORT_RANGE: RangeFrom = 33434..; + +/// Private trait that let's us define the platform-specific implementations and types required for +/// tracerouting. +pub trait Traceroute { + type AsyncIcmpSocket: AsyncIcmpSocket; + + fn bind_socket_to_interface( + socket: &socket2::Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()>; + + /// Configure an ICMP socket to allow reception of ICMP/TimeExceeded errors. + // TODO: consider moving into AsyncIcmpSocket constructor + fn configure_icmp_socket(socket: &socket2::Socket, opt: &TracerouteOpt) -> anyhow::Result<()>; +} + +pub trait AsyncIcmpSocket { + fn from_socket2(socket: socket2::Socket) -> Self; + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()>; + + /// Send an ICMP packet to the destination. + // TODO: anyhow? + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result; + + /// Receive an ICMP packet + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, IpAddr)>; + + /// Try to read ICMP/TimeExceeded error packets. + // TODO: this should be renamed, or not return a LeakStatus + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result; +} + +struct AsyncUdpSocket(tokio::net::UdpSocket); + +pub async fn try_run_leak_test( + opt: &TracerouteOpt, +) -> anyhow::Result { + // If we ever change this to support windows, this probably needs to be Type::DGRAM. + let icmp_socket_type = Type::RAW; + + let (ip_version, domain, icmp_protocol) = match opt.destination { + IpAddr::V4(..) => (Ip::V4(()), Domain::IPV4, Protocol::ICMPV4), + IpAddr::V6(..) => (Ip::V6(()), Domain::IPV6, Protocol::ICMPV6), + }; + + // create the socket used for receiving the ICMP/TimeExceeded responses + let icmp_socket = Socket::new(domain, icmp_socket_type, Some(icmp_protocol)) + .context("Failed to open ICMP socket")?; + + icmp_socket + .set_nonblocking(true) + .context("Failed to set icmp_socket to nonblocking")?; + + Impl::bind_socket_to_interface(&icmp_socket, &opt.interface, ip_version)?; + Impl::configure_icmp_socket(&icmp_socket, opt)?; + + let icmp_socket = Impl::AsyncIcmpSocket::from_socket2(icmp_socket); + + let send_probes = async { + if opt.icmp { + send_icmp_probes::(opt, &icmp_socket).await?; + } else { + // create the socket used for sending the UDP probing packets + let udp_socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) + .context("Failed to open UDP socket")?; + + Impl::bind_socket_to_interface(&udp_socket, &opt.interface, ip_version) + .context("Failed to bind UDP socket to interface")?; + + udp_socket + .set_nonblocking(true) + .context("Failed to set udp_socket to nonblocking")?; + + let mut udp_socket = AsyncUdpSocket::from_socket2(udp_socket); + + send_udp_probes(opt, &mut udp_socket).await?; + } + + anyhow::Ok(()) + }; + + let send_probes = async { + timeout(SEND_TIMEOUT, send_probes) + .await + .map_err(|_timeout| anyhow!("Timed out while trying to send probe packet"))??; + Ok(pending::().await) + }; + + let recv_probe_responses = icmp_socket.recv_ttl_responses(opt); + + // wait until either future returns, or the timeout is reached + // friendship ended with tokio::select. now futures::select is my best friend! + let leak_status = select! { + result = recv_probe_responses.fuse() => result?, + Err(e) = send_probes.fuse() => return Err(e), + _ = sleep(LEAK_TIMEOUT).fuse() => LeakStatus::NoLeak, + }; + + Ok(leak_status) +} + +async fn send_icmp_probes( + opt: &TracerouteOpt, + socket: &impl AsyncIcmpSocket, +) -> anyhow::Result<()> { + for ttl in DEFAULT_TTL_RANGE { + log::debug!("sending probe packet (ttl={ttl})"); + + socket + .set_ttl(ttl.into()) + .context("Failed to set TTL on socket")?; + + // the first packet will sometimes get dropped on MacOS, thus we send two packets + let number_of_sends = if cfg!(target_os = "macos") { 2 } else { 1 }; + + // construct ICMP/ICMP6 echo request packet + let mut packet_v4; + let mut packet_v6; + let packet_bytes; + const ECHO_REQUEST_HEADER_LEN: usize = 8; + match opt.destination { + IpAddr::V4(..) => { + let echo = icmp::echo_request::EchoRequest { + icmp_type: IcmpTypes::EchoRequest, + icmp_code: IcmpCode(0), + checksum: 0, + identifier: 1, + sequence_number: 1, + payload: PROBE_PAYLOAD.to_vec(), + }; + + let len = ECHO_REQUEST_HEADER_LEN + PROBE_PAYLOAD.len(); + packet_v4 = + icmp::echo_request::MutableEchoRequestPacket::owned(vec![0u8; len]).unwrap(); + packet_v4.populate(&echo); + packet_v4.set_checksum(icmp::checksum( + &icmp::IcmpPacket::new(packet_v4.packet()).unwrap(), + )); + packet_bytes = packet_v4.packet(); + } + IpAddr::V6(destination) => { + let IpAddr::V6(source) = get_interface_ip(&opt.interface, Ip::V6(()))? else { + bail!("Tried to send IPv6 on IPv4 interface"); + }; + + let echo = icmpv6::echo_request::EchoRequest { + icmpv6_type: Icmpv6Types::EchoRequest, + icmpv6_code: Icmpv6Code(0), + checksum: 0, + identifier: 1, + sequence_number: 1, + payload: PROBE_PAYLOAD.to_vec(), + }; + + let len = ECHO_REQUEST_HEADER_LEN + PROBE_PAYLOAD.len(); + packet_v6 = + icmpv6::echo_request::MutableEchoRequestPacket::owned(vec![0u8; len]).unwrap(); + packet_v6.populate(&echo); + packet_v6.set_checksum(icmpv6::checksum( + &icmpv6::Icmpv6Packet::new(packet_v6.packet()).unwrap(), + &source, + &destination, + )); + packet_bytes = packet_v6.packet(); + } + } + + let result: io::Result<()> = stream::iter(0..number_of_sends) + // call `send_to` `number_of_sends` times + .then(|_| socket.send_to(packet_bytes, opt.destination)) + .map_ok(drop) + .try_collect() // abort on the first error + .await; + + // if there was an error, handle it, otherwise continue probing. + let Err(e) = result else { + sleep(PROBE_INTERVAL).await; + continue; + }; + + match e.kind() { + io::ErrorKind::PermissionDenied | io::ErrorKind::ConnectionRefused => { + // Linux returns one of these errors if our packet was rejected by nftables. + log::debug!("send_to failed, was probably caught by firewall"); + break; + } + _ => return Err(e).context("Failed to send packet")?, + } + } + + Ok(()) +} + +impl AsyncUdpSocket { + pub fn from_socket2(socket: socket2::Socket) -> Self { + // HACK: Wrap the socket in a tokio::net::UdpSocket to be able to use it async + // SAFETY: `into_raw_fd()` consumes the socket and returns an owned & open file descriptor. + let udp_socket = unsafe { std::net::UdpSocket::from_raw_fd(socket.into_raw_fd()) }; + let udp_socket = tokio::net::UdpSocket::from_std(udp_socket).unwrap(); + AsyncUdpSocket(udp_socket) + } + + pub fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for UDP socket") + } + + pub async fn send_to( + &self, + packet: &[u8], + destination: impl Into, + ) -> std::io::Result { + self.0.send_to(packet, destination.into()).await + } +} + +/// Send ICMP/Echo packets with a very low TTL to `opt.destination`. +/// +/// Use [AsyncIcmpSocket::recv_ttl_responses] to receive replies. +/// Send UDP packets with a very low TTL to `opt.destination`. +/// +/// Use [Impl::recv_ttl_responses] to receive replies. +async fn send_udp_probes(opt: &TracerouteOpt, socket: &mut AsyncUdpSocket) -> anyhow::Result<()> { + // ensure we don't send anything to `opt.exclude_port` + let ports = DEFAULT_PORT_RANGE + // skip the excluded port + .filter(|&p| Some(p) != opt.exclude_port) + // `opt.port` overrides the default port range + .map(|port| opt.port.unwrap_or(port)); + + for (port, ttl) in ports.zip(DEFAULT_TTL_RANGE) { + log::debug!("sending probe packet (ttl={ttl})"); + + socket + .set_ttl(ttl.into()) + .context("Failed to set TTL on socket")?; + + // the first packet will sometimes get dropped on MacOS, thus we send two packets + let number_of_sends = if cfg!(target_os = "macos") { 2 } else { 1 }; + + let result: io::Result<()> = stream::iter(0..number_of_sends) + // call `send_to` `number_of_sends` times + .then(|_| socket.send_to(&PROBE_PAYLOAD, (opt.destination, port))) + .map_ok(drop) + .try_collect() // abort on the first error + .await; + + let Err(e) = result else { continue }; + match e.kind() { + io::ErrorKind::PermissionDenied => { + // Linux returns this error if our packet was rejected by nftables. + log::debug!("send_to failed with 'permission denied'"); + } + _ => return Err(e).context("Failed to send packet")?, + } + } + + Ok(()) +} + +/// Try to parse bytes as an ICMP/ICMP6 Echo Request matching the probe packets send by +/// [send_icmp_probes]. +fn parse_icmp_probe(icmp_bytes: Ip<&[u8], &[u8]>) -> anyhow::Result<()> { + let echo_packet_v4; + let echo_packet_v6; + let echo_payload = match icmp_bytes { + Ip::V4(icmpv4_bytes) => { + echo_packet_v4 = + icmp::echo_request::EchoRequestPacket::new(icmpv4_bytes).ok_or_else(too_small)?; + + ensure!( + echo_packet_v4.get_icmp_type() == IcmpTypes::EchoRequest, + "Not ICMP/EchoRequest" + ); + + echo_packet_v4.payload() + } + Ip::V6(icmpv6_bytes) => { + echo_packet_v6 = + icmpv6::echo_request::EchoRequestPacket::new(icmpv6_bytes).ok_or_else(too_small)?; + + ensure!( + echo_packet_v6.get_icmpv6_type() == Icmpv6Types::EchoRequest, + "Not ICMP6/EchoRequest" + ); + + echo_packet_v6.payload() + } + }; + + // check if payload looks right + // some network nodes will strip the payload. + // some network nodes will add a bunch of zeros at the end. + if !echo_payload.is_empty() && !echo_payload.starts_with(&PROBE_PAYLOAD) { + let echo_payload: String = echo_payload + .iter() + .copied() + .flat_map(escape_default) + .map(char::from) + .collect(); + bail!("Wrong ICMP6/Echo payload: {echo_payload:?}"); + } + + Ok(()) +} + +fn too_small() -> anyhow::Error { + anyhow!("Too small") +} diff --git a/leak-checker/src/traceroute/windows.rs b/leak-checker/src/traceroute/windows.rs new file mode 100644 index 000000000000..066714dd6d05 --- /dev/null +++ b/leak-checker/src/traceroute/windows.rs @@ -0,0 +1,253 @@ +use std::{net::IpAddr, str}; + +use anyhow::{anyhow, Context}; +use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt}; + +use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; +use tokio::time::sleep; + +use crate::{ + traceroute::{ + Ip, TracerouteOpt, DEFAULT_TTL_RANGE, LEAK_TIMEOUT, PROBE_INTERVAL, SEND_TIMEOUT, + }, + Interface, LeakInfo, LeakStatus, +}; + +/// Implementation of traceroute using `ping.exe` +/// +/// This monstrosity exists because the Windows firewall is not helpful enough to allow us to +/// permit a process (the daemon) to receive ICMP TimeExceeded packets. We can get around this by +/// using `ping.exe`, which does work for some reason. My best guess is that it has special kernel +/// access to be able to do this. +pub async fn traceroute_using_ping(opt: &TracerouteOpt) -> anyhow::Result { + let ip_version = match opt.destination { + IpAddr::V4(..) => Ip::V4(()), + IpAddr::V6(..) => Ip::V6(()), + }; + + let interface_ip = get_interface_ip(&opt.interface, ip_version)?; + + let mut ping_tasks = FuturesUnordered::new(); + + for (i, ttl) in DEFAULT_TTL_RANGE.enumerate() { + // Don't send all pings at once, wait a bit in between + // each one to avoid sending more than necessary + let probe_delay = PROBE_INTERVAL * i as u32; + + ping_tasks.push(async move { + sleep(probe_delay).await; + + log::debug!("sending probe packet (ttl={ttl})"); + + // ping.exe will send ICMP Echo packets to the destination, and since it's running in + // the kernel it will be able to receive TimeExceeded responses. + let ping_path = r"C:\Windows\System32\ping.exe"; + let output = tokio::process::Command::new(ping_path) + .args(["-i", &ttl.to_string()]) + .args(["-n", "1"]) // number of pings + .args(["-w", &SEND_TIMEOUT.as_millis().to_string()]) + .args(["-S", &interface_ip.to_string()]) // bind to interface IP + .arg(opt.destination.to_string()) + .kill_on_drop(true) + .output() + .await + .context(anyhow!("Failed to execute {ping_path}"))?; + + let output_err = || anyhow!("Unexpected output from `ping.exe`"); + + let stdout = str::from_utf8(&output.stdout).with_context(output_err)?; + let _stderr = str::from_utf8(&output.stderr).with_context(output_err)?; + + log::trace!("ping stdout: {stdout}"); + log::trace!("ping stderr: {_stderr}"); + + // Dumbly parse stdout for a line that looks like this: + // "Reply from : TTL expired" + + if !stdout.contains("TTL expired") { + // No "TTL expired" means we did not receive any TimeExceeded replies. + return Ok(None); + } + let (ip, ..) = stdout + .split_once("Reply from ") + .and_then(|(.., s)| s.split_once(": TTL expired")) + .with_context(output_err)?; + + let ip: IpAddr = ip.parse().unwrap(); + + anyhow::Ok(Some(ip)) + }); + } + + let wait_for_first_leak = async move { + while let Some(result) = ping_tasks.next().await { + let Some(ip) = result? else { continue }; + + return Ok(LeakStatus::LeakDetected( + LeakInfo::NodeReachableOnInterface { + reachable_nodes: vec![ip], + interface: opt.interface.clone(), + }, + )); + } + + anyhow::Ok(LeakStatus::NoLeak) + }; + + select! { + _ = sleep(LEAK_TIMEOUT).fuse() => Ok(LeakStatus::NoLeak), + result = wait_for_first_leak.fuse() => result, + } +} + +pub fn get_interface_ip(interface: &Interface, ip_version: Ip) -> anyhow::Result { + let interface_luid = match interface { + Interface::Name(name) => luid_from_alias(name)?, + Interface::Luid(luid) => *luid, + }; + + let address_family = match ip_version { + Ip::V4(..) => AddressFamily::Ipv4, + Ip::V6(..) => AddressFamily::Ipv6, + }; + + get_ip_address_for_interface(address_family, interface_luid) + .with_context(|| anyhow!("Failed to get IP for interface {interface:?}"))? + .ok_or(anyhow!("No IP for interface {interface:?}")) +} + +/* +use std::{ + ffi::c_void, + io, mem, + net::{IpAddr, SocketAddr}, + os::windows::io::{AsRawSocket, AsSocket, FromRawSocket, IntoRawSocket}, + ptr::null_mut, + str, +}; + +use anyhow::{anyhow, bail, Context}; +use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt}; +use socket2::Socket; +use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; + +use tokio::time::sleep; +use windows_sys::Win32::Networking::WinSock::{ + WSAGetLastError, WSAIoctl, SIO_RCVALL, SOCKET, SOCKET_ERROR, +}; + +use crate::{ + traceroute::{ + Ip, TracerouteOpt, DEFAULT_TTL_RANGE, LEAK_TIMEOUT, PROBE_INTERVAL, SEND_TIMEOUT, + }, + Interface, LeakInfo, LeakStatus, +}; +use super::{common, AsyncIcmpSocket, AsyncUdpSocket, Traceroute}; + +pub struct TracerouteWindows; + +pub struct AsyncIcmpSocketImpl(tokio::net::UdpSocket); + +pub struct AsyncUdpSocketWindows(tokio::net::UdpSocket); + +impl Traceroute for TracerouteWindows { + type AsyncIcmpSocket = AsyncIcmpSocketImpl; + type AsyncUdpSocket = AsyncUdpSocketWindows; + + fn bind_socket_to_interface( + socket: &Socket, + interface: &Interface, + ip_version: Ip, + ) -> anyhow::Result<()> { + common::bind_socket_to_interface::(socket, interface, ip_version) + } + + fn get_interface_ip(interface: &Interface, ip_version: Ip) -> anyhow::Result { + get_interface_ip(interface, ip_version) + } + + fn configure_icmp_socket(socket: &socket2::Socket, _opt: &TracerouteOpt) -> anyhow::Result<()> { + configure_icmp_socket(socket) + } +} + +impl AsyncIcmpSocket for AsyncIcmpSocketImpl { + fn from_socket2(socket: Socket) -> Self { + let raw_socket = socket.as_socket().as_raw_socket(); + mem::forget(socket); + let std_socket = unsafe { std::net::UdpSocket::from_raw_socket(raw_socket) }; + let tokio_socket = tokio::net::UdpSocket::from_std(std_socket).unwrap(); + AsyncIcmpSocketImpl(tokio_socket) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for ICMP socket") + } + + async fn send_to(&self, packet: &[u8], destination: impl Into) -> io::Result { + self.0.send_to(packet, (destination.into(), 0)).await + } + + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, std::net::IpAddr)> { + let (n, source) = self.0.recv_from(buf).await?; + Ok((n, source.ip())) + } + + async fn recv_ttl_responses(&self, opt: &TracerouteOpt) -> anyhow::Result { + common::recv_ttl_responses(self, &opt.interface).await + } +} + +impl AsyncUdpSocket for AsyncUdpSocketWindows { + fn from_socket2(socket: socket2::Socket) -> Self { + // HACK: Wrap the socket in a tokio::net::UdpSocket to be able to use it async + let udp_socket = unsafe { std::net::UdpSocket::from_raw_socket(socket.into_raw_socket()) }; + let udp_socket = tokio::net::UdpSocket::from_std(udp_socket).unwrap(); + AsyncUdpSocketWindows(udp_socket) + } + + fn set_ttl(&self, ttl: u32) -> anyhow::Result<()> { + self.0 + .set_ttl(ttl) + .context("Failed to set TTL value for UDP socket") + } + + async fn send_to( + &self, + packet: &[u8], + destination: impl Into, + ) -> std::io::Result { + self.0.send_to(packet, destination.into()).await + } +} + +/// Configure the raw socket we use for listening to ICMP responses. +/// +/// This will set the `SIO_RCVALL`-option. +pub fn configure_icmp_socket(socket: &Socket) -> anyhow::Result<()> { + let j = 1; + let mut _in: u32 = 0; + let result = unsafe { + WSAIoctl( + socket.as_raw_socket() as SOCKET, + SIO_RCVALL, + &j as *const _ as *const c_void, + size_of_val(&j) as u32, + null_mut(), + 0, + &mut _in as *mut u32, + null_mut(), + None, + ) + }; + + if result == SOCKET_ERROR { + let code = unsafe { WSAGetLastError() }; + bail!("Failed to call WSAIoctl(listen_socket, SIO_RCVALL, ...), code = {code}"); + } + + Ok(()) +} +*/ diff --git a/leak-checker/src/util.rs b/leak-checker/src/util.rs new file mode 100644 index 000000000000..656e948c6af0 --- /dev/null +++ b/leak-checker/src/util.rs @@ -0,0 +1,39 @@ +// TODO: Remove this file + +use match_cfg::match_cfg; + +#[cfg(any(target_os = "windows", target_os = "macos", target_os = "android"))] +use std::net::IpAddr; + +match_cfg! { + #[cfg(target_os = "windows")] => { + pub fn get_interface_ip(interface: &str) -> anyhow::Result { + use anyhow::anyhow; + + use talpid_windows::net::{get_ip_address_for_interface, luid_from_alias, AddressFamily}; + + let interface_luid = luid_from_alias(interface)?; + + // TODO: ipv6 + let interface_ip = get_ip_address_for_interface(AddressFamily::Ipv4, interface_luid)? + .ok_or(anyhow!("No IP for interface {interface:?}"))?; + + Ok(interface_ip) + } + } + #[cfg(any(target_os = "macos", target_os = "android"))] => { + pub fn get_interface_ip(interface: &str) -> anyhow::Result { + for interface_address in nix::ifaddrs::getifaddrs()? { + if interface_address.interface_name != interface { continue }; + let Some(address) = interface_address.address else { continue }; + let Some(address) = address.as_sockaddr_in() else { continue }; + // TODO: ipv6 + //let Some(address) = address.as_sockaddr_in6() else { continue }; + + return Ok(address.ip().into()); + } + + anyhow::bail!("Interface {interface:?} has no valid IP to bind to"); + } + } +} diff --git a/mullvad-cli/Cargo.toml b/mullvad-cli/Cargo.toml index 9997531d53f7..a489123847fd 100644 --- a/mullvad-cli/Cargo.toml +++ b/mullvad-cli/Cargo.toml @@ -15,7 +15,7 @@ name = "mullvad" path = "src/main.rs" [dependencies] -anyhow = "1.0" +anyhow = { workspace = true } chrono = { workspace = true } clap = { workspace = true } thiserror = { workspace = true } diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index d8b1a2b1c605..ee7b3c901b57 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -15,6 +15,7 @@ workspace = true api-override = ["mullvad-api/api-override"] [dependencies] +anyhow = { workspace = true } chrono = { workspace = true } thiserror = { workspace = true } either = "1.11" @@ -27,6 +28,7 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tokio = { workspace = true, features = ["fs", "io-util", "rt-multi-thread", "sync", "time"] } tokio-stream = "0.1" +socket2 = { workspace = true } mullvad-relay-selector = { path = "../mullvad-relay-selector" } mullvad-types = { path = "../mullvad-types" } @@ -40,6 +42,9 @@ talpid-future = { path = "../talpid-future" } talpid-platform-metadata = { path = "../talpid-platform-metadata" } talpid-time = { path = "../talpid-time" } talpid-types = { path = "../talpid-types" } +talpid-routing = { path = "../talpid-routing" } + +leak-checker = { path = "../leak-checker" } clap = { workspace = true } log-panics = "2.0.0" diff --git a/mullvad-daemon/src/leak_checker/mod.rs b/mullvad-daemon/src/leak_checker/mod.rs new file mode 100644 index 000000000000..a0414c6e3e1d --- /dev/null +++ b/mullvad-daemon/src/leak_checker/mod.rs @@ -0,0 +1,254 @@ +use anyhow::{anyhow, Context}; +use futures::{select, FutureExt}; +use leak_checker::traceroute::TracerouteOpt; +pub use leak_checker::LeakInfo; +use std::time::Duration; +use talpid_routing::RouteManagerHandle; +use talpid_types::{net::Endpoint, tunnel::TunnelStateTransition}; +use tokio::sync::mpsc; + +/// An actor that tries to leak traffic outside the tunnel while we are connected. +pub struct LeakChecker { + task_event_tx: mpsc::UnboundedSender, +} + +/// [LeakChecker] internal task state. +struct Task { + events_rx: mpsc::UnboundedReceiver, + route_manager: RouteManagerHandle, + callbacks: Vec>, +} + +enum TaskEvent { + NewTunnelState(TunnelStateTransition), + AddCallback(Box), +} + +pub enum CallbackResult { + /// Callback completed successfully + Ok, + + /// Callback is no longer valid and should be dropped. + Drop, +} + +pub trait LeakCheckerCallback: Send + 'static { + fn on_leak(&mut self, info: LeakInfo) -> CallbackResult; +} + +impl LeakChecker { + pub fn new(route_manager: RouteManagerHandle) -> Self { + let (task_event_tx, events_rx) = mpsc::unbounded_channel(); + + let task = Task { + events_rx, + route_manager, + callbacks: vec![], + }; + + tokio::task::spawn(task.run()); + // TODO: remove this if the above compiles on macos and android + //tokio::task::spawn_blocking(|| Handle::current().block_on(task.run())); + + LeakChecker { task_event_tx } + } + + /// Call when we transition to a new tunnel state. + pub fn on_tunnel_state_transition(&mut self, tunnel_state: TunnelStateTransition) { + self.send(TaskEvent::NewTunnelState(tunnel_state)) + } + + /// Call `callback` if a leak is detected. + pub fn add_leak_callback(&mut self, callback: impl LeakCheckerCallback) { + self.send(TaskEvent::AddCallback(Box::new(callback))) + } + + /// Send a [TaskEvent] to the running [Task]; + fn send(&mut self, event: TaskEvent) { + if self.task_event_tx.send(event).is_err() { + panic!("LeakChecker unexpectedly closed"); + } + } +} + +impl Task { + async fn run(mut self) { + loop { + let Some(event) = self.events_rx.recv().await else { + break; // All LeakChecker handles dropped. + }; + + match event { + TaskEvent::NewTunnelState(s) => self.on_new_tunnel_state(s).await, + TaskEvent::AddCallback(c) => self.on_add_callback(c), + } + } + } + + fn on_add_callback(&mut self, c: Box) { + self.callbacks.push(c); + } + + async fn on_new_tunnel_state(&mut self, mut tunnel_state: TunnelStateTransition) { + 'leak_test: loop { + let TunnelStateTransition::Connected(tunnel) = &tunnel_state else { + break 'leak_test; + }; + + let ping_destination = tunnel.endpoint; + let route_manager = self.route_manager.clone(); + let leak_test = async { + // Give the connection a little time to settle before starting the test. + // TODO: is this necessary? is there some better way? + // TODO: ether remove this or add some concrete motivation. + tokio::time::sleep(Duration::from_millis(5000)).await; + + check_for_leaks(&route_manager, ping_destination).await + }; + + // Make sure the tunnel state doesn't change while we're doing the leak test. + // If that happens, then our results might be invalid. + let another_tunnel_state = async { + 'listen_for_events: while let Some(event) = self.events_rx.recv().await { + let new_state = match event { + TaskEvent::NewTunnelState(tunnel_state) => tunnel_state, + TaskEvent::AddCallback(c) => { + self.on_add_callback(c); + continue 'listen_for_events; + } + }; + + if let TunnelStateTransition::Connected(..) = new_state { + // Still connected, all is well... + } else { + // Tunnel state changed! We have to discard the leak test and try again. + tunnel_state = new_state; + break 'listen_for_events; + } + } + }; + + let leak_result = select! { + // If tunnel state changes, restart the test. + _ = another_tunnel_state.fuse() => continue 'leak_test, + + leak_result = leak_test.fuse() => leak_result, + }; + + let leak_info = match leak_result { + Ok(Some(leak_info)) => leak_info, + Ok(None) => { + log::debug!("No leak detected"); + break 'leak_test; + } + Err(e) => { + log::debug!("Leak check errored: {e:#?}"); + break 'leak_test; + } + }; + + log::debug!("Leak detected: {leak_info:?}"); + + for callback in &mut self.callbacks { + callback.on_leak(leak_info.clone()); + } + + break 'leak_test; + } + } +} + +async fn check_for_leaks( + route_manager: &RouteManagerHandle, + destination: Endpoint, +) -> anyhow::Result> { + #[cfg(target_os = "linux")] + let interface = { + // By setting FWMARK, we are effectively getting the same route as when using split tunneling. + let route = route_manager + .get_destination_route(destination.address.ip(), Some(mullvad_types::TUNNEL_FWMARK)) + .await + .context("Failed to get route to relay")? + .ok_or(anyhow!("No route to relay"))?; + + route + .get_node() + .get_device() + .context("No device for default route")? + .to_string() + .into() + }; + + // TODO (android): + // Maybe connectivity monitor? + // It should be possible somehow. `ifconfig` can print interfaces. + // needs further investigation + #[cfg(target_os = "android")] + let interface = todo!("get default interface"); + + #[cfg(target_os = "macos")] + let interface = { + let (v4_route, v6_route) = route_manager + .get_default_routes() + .await + .context("Failed to get default interface")?; + if destination.address.is_ipv4() { + let v4_route = v4_route.context("Missing IPv4 default interface")?; + v4_route.interface.into() + } else { + let v6_route = v6_route.context("Missing IPv6 default interface")?; + v6_route.interface.into() + } + // TODO: use route.interface_index? + }; + + #[cfg(target_os = "windows")] + let interface = { + use std::net::IpAddr; + use talpid_windows::net::AddressFamily; + + let _ = route_manager; // don't need this on windows + + let family = match destination.address.ip() { + IpAddr::V4(..) => AddressFamily::Ipv4, + IpAddr::V6(..) => AddressFamily::Ipv6, + }; + + let route = talpid_routing::get_best_default_route(family) + .context("Failed to get best default route")? + .ok_or_else(|| anyhow!("No default route found"))?; + + leak_checker::Interface::Luid(route.iface) + }; + + log::debug!("attempting to leak traffic on interface {interface:?} to {destination}"); + + // TODO: use UDP on windows + leak_checker::traceroute::try_run_leak_test(&TracerouteOpt { + interface, + destination: destination.address.ip(), + port: None, + + exclude_port: cfg!(target_os = "windows").then_some(destination.address.port()), + icmp: cfg!(not(target_os = "windows")), + }) + .await + .map_err(|e| anyhow!("{e:#}")) + .map(|status| match status { + leak_checker::LeakStatus::NoLeak => None, + leak_checker::LeakStatus::LeakDetected(info) => Some(info), + }) +} + +impl LeakCheckerCallback for T +where + T: FnMut(LeakInfo) -> bool + Send + 'static, +{ + fn on_leak(&mut self, info: LeakInfo) -> CallbackResult { + if self(info) { + CallbackResult::Ok + } else { + CallbackResult::Drop + } + } +} diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index b313e274bc2c..69c319440e0f 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -13,6 +13,7 @@ pub mod device; mod dns; pub mod exception_logging; mod geoip; +mod leak_checker; pub mod logging; #[cfg(target_os = "macos")] mod macos; @@ -38,6 +39,7 @@ use futures::{ StreamExt, }; use geoip::GeoIpHandler; +use leak_checker::{LeakChecker, LeakInfo}; use management_interface::ManagementInterfaceServer; use mullvad_api::ApiEndpoint; use mullvad_relay_selector::{RelaySelector, SelectorConfig}; @@ -82,6 +84,7 @@ use talpid_core::{ split_tunnel, tunnel_state_machine::{self, TunnelCommand, TunnelStateMachineHandle}, }; +use talpid_routing::RouteManagerHandle; #[cfg(target_os = "android")] use talpid_types::android::AndroidContext; #[cfg(target_os = "windows")] @@ -181,6 +184,10 @@ pub enum Error { #[error("Tunnel state machine error")] TunnelError(#[source] tunnel_state_machine::Error), + /// Errors from [talpid_routing::RouteManagerHandle]. + #[error("Route manager error")] + RouteManager(#[source] talpid_routing::Error), + /// Custom list already exists #[error("Custom list error: {0}")] CustomListError(#[source] mullvad_types::custom_list::Error), @@ -413,6 +420,8 @@ pub(crate) enum InternalDaemonEvent { /// The split tunnel paths or state were updated. #[cfg(any(windows, target_os = "android", target_os = "macos"))] ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender>), + /// A network leak was detected. + LeakDetected(LeakInfo), } #[cfg(any(windows, target_os = "android", target_os = "macos"))] @@ -587,6 +596,7 @@ pub struct Daemon { #[cfg(target_os = "windows")] volume_update_tx: mpsc::UnboundedSender<()>, location_handler: GeoIpHandler, + leak_checker: LeakChecker, } pub struct DaemonConfig { pub log_dir: Option, @@ -775,6 +785,15 @@ impl Daemon { let _ = settings_changed_event_sender.send(InternalDaemonEvent::SettingsChanged); }); + let route_manager = RouteManagerHandle::spawn( + #[cfg(target_os = "linux")] + mullvad_types::TUNNEL_FWMARK, + #[cfg(target_os = "linux")] + mullvad_types::TUNNEL_TABLE_ID, + ) + .await + .map_err(Error::RouteManager)?; + let (offline_state_tx, offline_state_rx) = mpsc::unbounded(); #[cfg(target_os = "windows")] let (volume_update_tx, volume_update_rx) = mpsc::unbounded(); @@ -798,6 +817,7 @@ impl Daemon { config.resource_dir.clone(), internal_event_tx.to_specialized_sender(), offline_state_tx, + route_manager.clone(), #[cfg(target_os = "windows")] volume_update_rx, #[cfg(target_os = "android")] @@ -849,6 +869,17 @@ impl Daemon { internal_event_tx.clone().to_specialized_sender(), ); + let leak_checker = { + let mut leak_checker = LeakChecker::new(route_manager); + let internal_event_tx = internal_event_tx.clone(); + leak_checker.add_leak_callback(move |info| { + internal_event_tx + .send(InternalDaemonEvent::LeakDetected(info)) + .is_ok() + }); + leak_checker + }; + let daemon = Daemon { tunnel_state: TunnelState::Disconnected { location: None, @@ -879,6 +910,7 @@ impl Daemon { #[cfg(target_os = "windows")] volume_update_tx, location_handler, + leak_checker, }; api_availability.unsuspend(); @@ -977,7 +1009,7 @@ impl Daemon { let mut should_stop = false; match event { TunnelStateTransition(transition) => { - self.handle_tunnel_state_transition(transition).await + self.handle_tunnel_state_transition(transition).await; } Command(command) => self.handle_command(command).await, TriggerShutdown(user_init_shutdown) => { @@ -999,6 +1031,9 @@ impl Daemon { } #[cfg(any(windows, target_os = "android", target_os = "macos"))] ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await, + LeakDetected(leak_info) => { + log::warn!("LEAK DETECTED! AAAH: {leak_info:?}"); + } } should_stop } @@ -1007,6 +1042,9 @@ impl Daemon { &mut self, tunnel_state_transition: TunnelStateTransition, ) { + self.leak_checker + .on_tunnel_state_transition(tunnel_state_transition.clone()); + self.reset_rpc_sockets_on_tunnel_state_transition(&tunnel_state_transition); self.device_checker .handle_state_transition(&tunnel_state_transition); diff --git a/talpid-core/src/firewall/macos.rs b/talpid-core/src/firewall/macos.rs index 953c4abfe0cf..71899f2292ea 100644 --- a/talpid-core/src/firewall/macos.rs +++ b/talpid-core/src/firewall/macos.rs @@ -295,11 +295,6 @@ impl Firewall { peer_endpoint, tunnel, .. - } - | FirewallPolicy::Connecting { - peer_endpoint, - tunnel: Some(tunnel), - .. }) = policy else { return Ok(vec![]); @@ -329,7 +324,7 @@ impl Firewall { // no nat to [vpn ip] let no_nat_to_vpn_server = pfctl::NatRuleBuilder::default() .action(pfctl::NatRuleAction::NoNat) - .to(peer_endpoint.endpoint.address.ip()) + .to(peer_endpoint.endpoint.address) .build()?; rules.push(no_nat_to_vpn_server); @@ -432,6 +427,9 @@ impl Firewall { rules.push(self.get_allow_relay_rule(peer_endpoint)?); + // TODO: do we need this? + //rules.push(self.get_block_relay_rule(peer_endpoint)?); + // Important to block DNS *before* we allow the tunnel and allow LAN. So DNS // can't leak to the wrong IPs in the tunnel or on the LAN. rules.append(&mut self.get_block_dns_rules()?); @@ -577,6 +575,7 @@ impl Firewall { Ok(rules) } + /// Allow traffic to relay_endpoint on the correct ip/port/protocol, for the root-user only. fn get_allow_relay_rule(&self, relay_endpoint: &AllowedEndpoint) -> Result { let pfctl_proto = as_pfctl_proto(relay_endpoint.endpoint.protocol); @@ -595,6 +594,17 @@ impl Firewall { builder.build() } + /// Block traffic to relay_endpoint ip. Should come after [Self::get_allow_relay_rule]. + fn get_block_relay_rule(&self, relay_endpoint: &AllowedEndpoint) -> Result { + let mut builder = self.create_rule_builder(FilterRuleAction::Drop(DropAction::Return)); + builder + .direction(pfctl::Direction::Out) + .to(relay_endpoint.endpoint.address.ip()) + .quick(true); + + builder.build() + } + /// Produces a rule that allows traffic to flow to the API. Allows the app (or other apps if /// configured) to reach the API in blocked states. fn get_allowed_endpoint_rule( diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index e8bd4ed64980..cae7f2384bf5 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -131,6 +131,7 @@ pub async fn spawn( resource_dir: PathBuf, state_change_listener: impl Sender + Send + 'static, offline_state_listener: mpsc::UnboundedSender, + route_manager: RouteManagerHandle, #[cfg(target_os = "windows")] volume_update_rx: mpsc::UnboundedReceiver<()>, #[cfg(target_os = "android")] android_context: AndroidContext, #[cfg(target_os = "android")] connectivity_listener: ConnectivityListener, @@ -158,6 +159,7 @@ pub async fn spawn( log_dir, resource_dir, commands_rx: command_rx, + route_manager, #[cfg(target_os = "windows")] volume_update_rx, #[cfg(target_os = "android")] @@ -258,6 +260,7 @@ struct TunnelStateMachineInitArgs { log_dir: Option, resource_dir: PathBuf, commands_rx: mpsc::UnboundedReceiver, + route_manager: RouteManagerHandle, #[cfg(target_os = "windows")] volume_update_rx: mpsc::UnboundedReceiver<()>, #[cfg(target_os = "android")] @@ -280,28 +283,19 @@ impl TunnelStateMachine { #[cfg(target_os = "macos")] let filtering_resolver = crate::resolver::start_resolver().await?; - let route_manager = RouteManagerHandle::spawn( - #[cfg(target_os = "linux")] - args.linux_ids.fwmark, - #[cfg(target_os = "linux")] - args.linux_ids.table_id, - ) - .await - .map_err(Error::InitRouteManagerError)?; - #[cfg(windows)] let split_tunnel = split_tunnel::SplitTunnel::new( runtime.clone(), args.resource_dir.clone(), args.command_tx.clone(), volume_update_rx, - route_manager.clone(), + args.route_manager.clone(), ) .map_err(Error::InitSplitTunneling)?; #[cfg(target_os = "macos")] let split_tunnel = - split_tunnel::SplitTunnel::spawn(args.command_tx.clone(), route_manager.clone()); + split_tunnel::SplitTunnel::spawn(args.command_tx.clone(), args.route_manager.clone()); let fw_args = FirewallArguments { #[cfg(not(target_os = "android"))] @@ -326,7 +320,7 @@ impl TunnelStateMachine { #[cfg(target_os = "linux")] runtime.clone(), #[cfg(target_os = "linux")] - route_manager.clone(), + args.route_manager.clone(), ) .map_err(Error::InitDnsMonitorError)?; @@ -345,7 +339,7 @@ impl TunnelStateMachine { let offline_monitor = offline::spawn_monitor( offline_tx, #[cfg(not(target_os = "android"))] - route_manager.clone(), + args.route_manager.clone(), #[cfg(target_os = "linux")] Some(args.linux_ids.fwmark), #[cfg(target_os = "android")] @@ -385,7 +379,7 @@ impl TunnelStateMachine { runtime, firewall, dns_monitor, - route_manager, + route_manager: args.route_manager, _offline_monitor: offline_monitor, allow_lan: args.settings.allow_lan, #[cfg(not(target_os = "android"))] diff --git a/talpid-net/Cargo.toml b/talpid-net/Cargo.toml index aa30ed1b5b6a..861e1765cc60 100644 --- a/talpid-net/Cargo.toml +++ b/talpid-net/Cargo.toml @@ -13,5 +13,5 @@ workspace = true [target.'cfg(unix)'.dependencies] libc = "0.2" talpid-types = { path = "../talpid-types" } -socket2 = { version = "0.5.3", features = ["all"] } +socket2 = { workspace = true, features = ["all"] } log = { workspace = true } diff --git a/talpid-routing/src/unix/mod.rs b/talpid-routing/src/unix/mod.rs index d257140f7e3c..34d2570137c6 100644 --- a/talpid-routing/src/unix/mod.rs +++ b/talpid-routing/src/unix/mod.rs @@ -34,7 +34,7 @@ mod imp; pub use imp::Error as PlatformError; -/// Errors that can be encountered whilst initializing route manager +/// Errors that can be encountered whilst interacting with a [RouteManagerHandle]. #[derive(thiserror::Error, Debug)] pub enum Error { /// Route manager thread may have panicked diff --git a/talpid-windows/Cargo.toml b/talpid-windows/Cargo.toml index a44229b61d07..0b9e1d267217 100644 --- a/talpid-windows/Cargo.toml +++ b/talpid-windows/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [target.'cfg(windows)'.dependencies] thiserror = { workspace = true } -socket2 = { version = "0.5.3" } +socket2 = { workspace = true } futures = { workspace = true } talpid-types = { path = "../talpid-types" } diff --git a/talpid-wireguard/Cargo.toml b/talpid-wireguard/Cargo.toml index e02bf874d253..620cbab5cc26 100644 --- a/talpid-wireguard/Cargo.toml +++ b/talpid-wireguard/Cargo.toml @@ -39,7 +39,7 @@ duct = "0.13" [target.'cfg(not(target_os="android"))'.dependencies] byteorder = "1" internet-checksum = "0.2" -socket2 = { version = "0.5.3", features = ["all"] } +socket2 = { workspace = true, features = ["all"] } tokio-stream = { version = "0.1", features = ["io-util"] } [target.'cfg(unix)'.dependencies] diff --git a/test/test-runner/Cargo.toml b/test/test-runner/Cargo.toml index fd53f4b7cb79..af84ef4daeb0 100644 --- a/test/test-runner/Cargo.toml +++ b/test/test-runner/Cargo.toml @@ -33,7 +33,7 @@ test-rpc = { path = "../test-rpc" } mullvad-paths = { path = "../../mullvad-paths" } talpid-platform-metadata = { path = "../../talpid-platform-metadata", default-features = false } -socket2 = { version = "0.5.4", features = ["all"] } +socket2 = { workspace = true, features = ["all"] } [target."cfg(target_os=\"windows\")".dependencies] talpid-windows = { path = "../../talpid-windows" } diff --git a/windows-installer/Cargo.toml b/windows-installer/Cargo.toml index 5c09cf560c70..518fe2d82d53 100644 --- a/windows-installer/Cargo.toml +++ b/windows-installer/Cargo.toml @@ -13,11 +13,11 @@ workspace = true [target.'cfg(all(target_os = "windows", target_arch = "x86_64"))'.dependencies] windows-sys = { version = "0.52.0", features = ["Win32_System", "Win32_System_LibraryLoader", "Win32_System_SystemInformation", "Win32_System_Threading"] } tempfile = "3.10" -anyhow = "1.0" +anyhow.workspace = true [build-dependencies] winres = "0.1" -anyhow = "1.0" +anyhow.workspace = true windows-sys = { version = "0.52.0", features = ["Win32_System", "Win32_System_LibraryLoader", "Win32_System_SystemServices"] } mullvad-version = { path = "../mullvad-version" } diff --git a/wireguard-go-rs/Cargo.toml b/wireguard-go-rs/Cargo.toml index cfaef554cc40..cdea4afdad1d 100644 --- a/wireguard-go-rs/Cargo.toml +++ b/wireguard-go-rs/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" license.workspace = true [build-dependencies] -anyhow = "1.0" +anyhow.workspace = true [target.'cfg(unix)'.dependencies] thiserror.workspace = true