From 22c815316d2da755d2129e4a0e76a4a53a12139b Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Thu, 26 Dec 2024 12:11:54 -0500 Subject: [PATCH 1/8] Update dependencies and add task limit feature to geph5-exit module --- Cargo.lock | 7 +++++-- binaries/geph5-exit/Cargo.toml | 3 +++ binaries/geph5-exit/src/listen.rs | 2 ++ binaries/geph5-exit/src/main.rs | 8 ++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e77a9a..fcc9107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1921,9 +1921,9 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" dependencies = [ "crossbeam-utils", ] @@ -3241,6 +3241,7 @@ name = "geph5-exit" version = "0.1.0" dependencies = [ "anyhow", + "async-event", "async-io 2.4.0", "async-io-bufpool", "async-trait", @@ -3248,6 +3249,7 @@ dependencies = [ "blake3", "bytes", "clap", + "crossbeam-queue", "dashmap 6.1.0", "ed25519-dalek", "fastrand 2.3.0", @@ -3269,6 +3271,7 @@ dependencies = [ "quanta", "rand", "reqwest", + "scopeguard", "serde", "serde_json", "serde_yaml", diff --git a/binaries/geph5-exit/Cargo.toml b/binaries/geph5-exit/Cargo.toml index 8cb182b..097a4c3 100644 --- a/binaries/geph5-exit/Cargo.toml +++ b/binaries/geph5-exit/Cargo.toml @@ -56,3 +56,6 @@ bytes = "1.8.0" simple-dns = "0.9.0" dashmap = "6.1.0" globset = "0.4.15" +crossbeam-queue = "0.3.12" +scopeguard = "1.2.0" +async-event = "0.2.1" diff --git a/binaries/geph5-exit/src/listen.rs b/binaries/geph5-exit/src/listen.rs index 9232586..82a9580 100644 --- a/binaries/geph5-exit/src/listen.rs +++ b/binaries/geph5-exit/src/listen.rs @@ -26,6 +26,7 @@ use crate::{ broker::{broker_loop, ACCEPT_FREE}, proxy::proxy_stream, ratelimit::{get_ratelimiter, RateLimiter}, + tasklimit::new_task_until_death, CONFIG_FILE, SIGNING_SECRET, }; @@ -194,6 +195,7 @@ async fn handle_client(mut client: impl Pipe) -> anyhow::Result<()> { let sess_metadata = sess_metadata.clone(); smolscale::spawn( proxy_stream(sess_metadata.clone(), ratelimit.clone(), stream, is_free) + .race(new_task_until_death(Duration::from_secs(30))) .map_err(|e| tracing::trace!(metadata = display(metadata), "stream died with {e}")), ) .detach(); diff --git a/binaries/geph5-exit/src/main.rs b/binaries/geph5-exit/src/main.rs index 809456a..3543bed 100644 --- a/binaries/geph5-exit/src/main.rs +++ b/binaries/geph5-exit/src/main.rs @@ -1,5 +1,6 @@ mod asn; mod dns; +mod tasklimit; use clap::Parser; use ed25519_dalek::SigningKey; @@ -57,6 +58,9 @@ struct ConfigFile { #[serde(default = "default_free_port_whitelist")] free_port_whitelist: Vec, + + #[serde(default = "default_task_limit")] + task_limit: usize, } fn default_free_ratelimit() -> u32 { @@ -71,6 +75,10 @@ fn default_total_ratelimit() -> u32 { 125000 } +fn default_task_limit() -> usize { + 1_000_000 +} + fn default_free_port_whitelist() -> Vec { vec![80, 443, 8080, 8443, 22, 53] } From 83ca615879fd53eabe047a9030dbb54d3964cd40 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Thu, 26 Dec 2024 12:12:00 -0500 Subject: [PATCH 2/8] Add task limiting mechanism to geph5-exit with async handling --- binaries/geph5-exit/src/tasklimit.rs | 35 ++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 binaries/geph5-exit/src/tasklimit.rs diff --git a/binaries/geph5-exit/src/tasklimit.rs b/binaries/geph5-exit/src/tasklimit.rs new file mode 100644 index 0000000..7f9efe5 --- /dev/null +++ b/binaries/geph5-exit/src/tasklimit.rs @@ -0,0 +1,35 @@ +use std::{ + sync::{atomic::AtomicUsize, LazyLock}, + time::Duration, +}; + +use crate::CONFIG_FILE; + +static TASK_COUNT: AtomicUsize = AtomicUsize::new(0); +static TASK_KILLER: LazyLock = LazyLock::new(async_event::Event::new); + +/// Adds a task to the limited task pool, then waits for the death signal. +pub async fn new_task_until_death(protected_period: Duration) -> anyhow::Result<()> { + let task_limit = CONFIG_FILE.wait().task_limit; + let count = TASK_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + tracing::debug!(count, task_limit, "making a task death handle"); + scopeguard::defer!({ + TASK_COUNT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + }); + TASK_KILLER.notify_one(); + + smol::Timer::after(protected_period).await; + + // wait until something horrible happens + TASK_KILLER + .wait_until(|| { + if TASK_COUNT.load(std::sync::atomic::Ordering::Relaxed) > task_limit { + Some(()) + } else { + None + } + }) + .await; + tracing::warn!(task_limit, "a task is killed due to overflow"); + anyhow::bail!("too many tasks") +} From 0e2644fb01932334a71b2008e039089b88283ba1 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Fri, 27 Dec 2024 16:12:22 -0500 Subject: [PATCH 3/8] Update dependencies, integrate IPv6 support, and adjust CPU load calculation --- Cargo.lock | 16 ++++++++++------ binaries/geph5-exit/Cargo.toml | 4 ++++ binaries/geph5-exit/src/listen.rs | 17 +++++++++++++---- binaries/geph5-exit/src/main.rs | 9 +++++++++ binaries/geph5-exit/src/proxy.rs | 8 +++++--- binaries/geph5-exit/src/ratelimit.rs | 2 +- deploy-exit.sh | 1 + 7 files changed, 43 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fcc9107..0c0f269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3077,7 +3077,7 @@ dependencies = [ "nanorpc", "serde", "serde_json", - "serde_with 3.11.0", + "serde_with 3.12.0", "stdcode", "thiserror 1.0.69", "tracing", @@ -3254,12 +3254,14 @@ dependencies = [ "ed25519-dalek", "fastrand 2.3.0", "flate2", + "futures-concurrency", "futures-util", "geph5-broker-protocol", "geph5-misc-rpc", "globset", "governor", "hex", + "ipnet", "isocountry", "mizaru2", "moka", @@ -3274,6 +3276,7 @@ dependencies = [ "scopeguard", "serde", "serde_json", + "serde_with 3.12.0", "serde_yaml", "sillad", "sillad-sosistab3", @@ -3281,6 +3284,7 @@ dependencies = [ "smol 2.0.2", "smol-timeout2", "smolscale", + "socket2 0.5.8", "stdcode", "sysinfo", "tachyonix", @@ -6272,9 +6276,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" dependencies = [ "base64 0.22.1", "chrono", @@ -6284,7 +6288,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "serde_with_macros 3.11.0", + "serde_with_macros 3.12.0", "time", ] @@ -6302,9 +6306,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" dependencies = [ "darling 0.20.10", "proc-macro2", diff --git a/binaries/geph5-exit/Cargo.toml b/binaries/geph5-exit/Cargo.toml index 097a4c3..05b4a6c 100644 --- a/binaries/geph5-exit/Cargo.toml +++ b/binaries/geph5-exit/Cargo.toml @@ -59,3 +59,7 @@ globset = "0.4.15" crossbeam-queue = "0.3.12" scopeguard = "1.2.0" async-event = "0.2.1" +ipnet = "2.10.1" +socket2 = "0.5.8" +serde_with = "3.12.0" +futures-concurrency = "7.6.2" diff --git a/binaries/geph5-exit/src/listen.rs b/binaries/geph5-exit/src/listen.rs index 82a9580..a48dea7 100644 --- a/binaries/geph5-exit/src/listen.rs +++ b/binaries/geph5-exit/src/listen.rs @@ -24,6 +24,7 @@ use crate::{ asn::ip_to_asn_country, auth::verify_user, broker::{broker_loop, ACCEPT_FREE}, + ipv6::{configure_ipv6_routing, EyeballDialer}, proxy::proxy_stream, ratelimit::{get_ratelimiter, RateLimiter}, tasklimit::new_task_until_death, @@ -31,6 +32,7 @@ use crate::{ }; pub async fn listen_main() -> anyhow::Result<()> { + configure_ipv6_routing().await?; let c2e = c2e_loop(); let b2e = b2e_loop(); let broker = broker_loop(); @@ -180,7 +182,7 @@ async fn handle_client(mut client: impl Pipe) -> anyhow::Result<()> { let mux = PicoMux::new(client_read, client_write); let mut sess_metadata = Arc::new(serde_json::Value::Null); - + let dialer = EyeballDialer::new(); loop { let stream = mux.accept().await?; let metadata = String::from_utf8_lossy(stream.metadata()).to_string(); @@ -193,10 +195,17 @@ async fn handle_client(mut client: impl Pipe) -> anyhow::Result<()> { continue; } let sess_metadata = sess_metadata.clone(); + let dialer = dialer.clone(); smolscale::spawn( - proxy_stream(sess_metadata.clone(), ratelimit.clone(), stream, is_free) - .race(new_task_until_death(Duration::from_secs(30))) - .map_err(|e| tracing::trace!(metadata = display(metadata), "stream died with {e}")), + proxy_stream( + dialer, + sess_metadata.clone(), + ratelimit.clone(), + stream, + is_free, + ) + .race(new_task_until_death(Duration::from_secs(30))) + .map_err(|e| tracing::trace!(metadata = display(metadata), "stream died with {e}")), ) .detach(); } diff --git a/binaries/geph5-exit/src/main.rs b/binaries/geph5-exit/src/main.rs index 3543bed..5306818 100644 --- a/binaries/geph5-exit/src/main.rs +++ b/binaries/geph5-exit/src/main.rs @@ -1,14 +1,18 @@ mod asn; mod dns; +mod ipv6; mod tasklimit; use clap::Parser; use ed25519_dalek::SigningKey; +use ipnet::Ipv6Net; +use ipv6::configure_ipv6_routing; use isocountry::CountryCode; use listen::listen_main; use once_cell::sync::{Lazy, OnceCell}; use rand::Rng; use serde::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; use std::{ net::{IpAddr, SocketAddr}, path::PathBuf, @@ -32,6 +36,7 @@ use crate::ratelimit::update_load_loop; static CONFIG_FILE: OnceCell = OnceCell::new(); /// This struct defines the structure of our configuration file +#[serde_as] #[derive(Deserialize)] struct ConfigFile { signing_secret: PathBuf, @@ -61,6 +66,10 @@ struct ConfigFile { #[serde(default = "default_task_limit")] task_limit: usize, + + #[serde_as(as = "DisplayFromStr")] + #[serde(default)] + ipv6_subnet: Ipv6Net, } fn default_free_ratelimit() -> u32 { diff --git a/binaries/geph5-exit/src/proxy.rs b/binaries/geph5-exit/src/proxy.rs index 34c41cd..bd5c1f6 100644 --- a/binaries/geph5-exit/src/proxy.rs +++ b/binaries/geph5-exit/src/proxy.rs @@ -15,6 +15,7 @@ use smol::{future::FutureExt as _, io::BufWriter, net::UdpSocket}; use crate::{ allow::proxy_allowed, dns::{dns_resolve, raw_dns_respond, FilterOptions}, + ipv6::EyeballDialer, ratelimit::RateLimiter, }; @@ -22,6 +23,7 @@ use smol_timeout2::TimeoutExt; #[tracing::instrument(skip_all)] pub async fn proxy_stream( + dialer: EyeballDialer, sess_metadata: Arc, ratelimit: RateLimiter, stream: picomux::Stream, @@ -45,10 +47,10 @@ pub async fn proxy_stream( match protocol { "tcp" => { let start = Instant::now(); - let dest_tcp = HappyEyeballsTcpDialer(dest_addrs) - .dial() + let dest_tcp = dialer + .connect(dest_addrs) .await - .context("failed to dial")?; + .inspect_err(|err| tracing::warn!(err = debug(err), dest_host, "fail to dial"))?; tracing::trace!( protocol, dest_host = display(dest_host), diff --git a/binaries/geph5-exit/src/ratelimit.rs b/binaries/geph5-exit/src/ratelimit.rs index cca8c59..1fa84a1 100644 --- a/binaries/geph5-exit/src/ratelimit.rs +++ b/binaries/geph5-exit/src/ratelimit.rs @@ -37,7 +37,7 @@ static CURRENT_SPEED: Lazy = Lazy::new(|| AtomicF32::new(0.0)); pub fn get_load() -> f32 { // we weigh CPU usage lower until it's really close to massively overloading - let cpu = CPU_USAGE.load(Ordering::Relaxed).powi(4); + let cpu = CPU_USAGE.load(Ordering::Relaxed).powi(2); let speed = CURRENT_SPEED.load(Ordering::Relaxed) / (CONFIG_FILE.wait().total_ratelimit as f32 * 1000.0); cpu.max(speed) diff --git a/deploy-exit.sh b/deploy-exit.sh index 536e1e7..5cc6be7 100644 --- a/deploy-exit.sh +++ b/deploy-exit.sh @@ -16,6 +16,7 @@ net.ipv4.tcp_congestion_control=bbr net.core.default_qdisc = fq net.ipv4.tcp_slow_start_after_idle = 0 net.ipv4.tcp_syncookies=1 +net.ipv4.ip_local_port_range = 1024 65535 EOF sysctl -p From 6f27a03c53be922e83a3dbf15f788f1baaa7c0d0 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Fri, 27 Dec 2024 16:12:28 -0500 Subject: [PATCH 4/8] Add IPv6 support with EyeballDialer for happy-eyeballs dialing --- binaries/geph5-exit/src/ipv6.rs | 212 ++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 binaries/geph5-exit/src/ipv6.rs diff --git a/binaries/geph5-exit/src/ipv6.rs b/binaries/geph5-exit/src/ipv6.rs new file mode 100644 index 0000000..87aaba9 --- /dev/null +++ b/binaries/geph5-exit/src/ipv6.rs @@ -0,0 +1,212 @@ +use std::{ + io::ErrorKind, + net::{Ipv6Addr, SocketAddr}, + time::Duration, +}; + +use anyhow::Context; +use futures_concurrency::prelude::{ConcurrentStream, IntoConcurrentStream}; +use ipnet::Ipv6Net; +use rand::Rng; +use smol::{net::TcpStream, process::Command, Async}; +use socket2::{Domain, Protocol, SockAddr, Socket, Type}; + +use crate::CONFIG_FILE; + +/// Something that can be used for happy-eyeballs dialing, with its own IPv6 address. +#[derive(Clone, Debug)] +pub struct EyeballDialer { + inner: Option, +} + +impl EyeballDialer { + /// Create a new eyeball dialer. + pub fn new() -> Self { + let subnet = CONFIG_FILE.wait().ipv6_subnet; + if subnet == Ipv6Net::default() { + Self { inner: None } + } else { + Self { + inner: Some(random_ipv6_in_net(subnet)), + } + } + } + + /// Connect to a given remote. + pub async fn connect(&self, addrs: Vec) -> anyhow::Result { + let my_addr = self.inner; + let mut streams: Vec<_> = addrs + .into_co_stream() + .enumerate() + .map(|(idx, addr)| async move { + if idx > 0 { + smol::Timer::after(Duration::from_millis(200 * idx as u64)).await; + } + if addr.is_ipv6() { + if let Some(my_addr) = my_addr { + return connect_from(my_addr, addr).await; + } + } + Ok(TcpStream::connect(addr).await?) + }) + .take(1) + .collect() + .await; + streams.swap_remove(0) + } +} + +/// Given an `Ipv6Net`, generate a random IPv6 address within that subnet. +fn random_ipv6_in_net(net: Ipv6Net) -> Ipv6Addr { + let prefix_len = net.prefix_len(); + + // The number of bits we can randomize: + let host_bits = 128 - prefix_len; + + // Convert the network address to a u128 (big-endian). + let network_u128 = u128::from_be_bytes(net.network().octets()); + + // Maximum number of addresses in this subnet = 2^(host_bits). + // We'll generate a random offset in [0, 2^host_bits). + let max_offset = if host_bits == 0 { + // If the prefix is /128, there's only one address in the subnet (no offset). + 0 + } else { + (1u128 << host_bits) - 1 + }; + let random_offset = if max_offset == 0 { + 0 + } else { + rand::thread_rng().gen_range(0..=max_offset) + }; + + // Our randomly chosen address is (network_address + random_offset). + let addr_u128 = network_u128 + random_offset; + let addr_octets = addr_u128.to_be_bytes(); + Ipv6Addr::from(addr_octets) +} + +/// Connect to a remote IPv6 address using the given IPv6 address. +async fn connect_from(from: Ipv6Addr, remote: SocketAddr) -> anyhow::Result { + tracing::debug!( + from = display(from), + remote = display(remote), + "connecting from an ephemeral IPv6" + ); + let socket = Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))?; + socket.set_reuse_address(true)?; + socket.set_reuse_port(true)?; + let local_addr = SocketAddr::new(std::net::IpAddr::V6(from), 0); + socket + .bind(&SockAddr::from(local_addr)) + .context("cannot bind")?; + + let async_socket = Async::new(socket).context("cannot build Async")?; + let _ = async_socket.get_ref().connect(&SockAddr::from(remote)); + async_socket + .writable() + .await + .context("cannot wait until socket is writable")?; + match async_socket.get_ref().connect(&SockAddr::from(remote)) { + Ok(_) => {} + Err(e) if e.kind() == ErrorKind::AlreadyExists => {} + Err(e) => anyhow::bail!("cannot finish connect: {:?}", e), + } + let socket: std::net::TcpStream = async_socket.into_inner()?.into(); + let socket: TcpStream = socket.try_into()?; + Ok(socket) +} + +pub async fn configure_ipv6_routing() -> anyhow::Result<()> { + let range = CONFIG_FILE.wait().ipv6_subnet; + if range != Ipv6Net::default() { + let iface = detect_ipv6_interface().await?; + Command::new("ip") + .arg("-6") + .arg("route") + .arg("del") + .arg("local") + .arg(format!("{}", range)) + .spawn()? + .output() + .await?; + Command::new("ip") + .arg("-6") + .arg("route") + .arg("add") + .arg("local") + .arg(format!("{}", range)) + .arg("dev") + .arg(iface) + .spawn()? + .output() + .await?; + } + Ok(()) +} + +async fn detect_ipv6_interface() -> anyhow::Result { + let output = Command::new("ip").args(&["-6", "route"]).output().await?; + + let stdout = String::from_utf8(output.stdout)?; + + // Find the default route + let default_route = stdout + .lines() + .find(|line| line.starts_with("default")) + .context("No default IPv6 route found")?; + + // Extract the interface name + let iface = default_route + .split_whitespace() + .nth(4) + .context("Unable to parse interface name from default route")?; + + Ok(iface.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use ipnet::Ipv6Net; + + #[test] + fn test_random_ipv6_in_net_basic() { + // Given a /64 + let cidr_str = "2001:db8::/64"; + let net: Ipv6Net = cidr_str.parse().expect("Failed to parse IPv6 CIDR"); + + // When we generate a random address + let addr = random_ipv6_in_net(net); + + // Then the address should be contained within that net + // ipnet’s `contains()` checks if the address is in the subnet range + assert!(net.contains(&addr), "Generated address not in the subnet"); + } + + #[test] + fn test_random_ipv6_in_net_small_prefix() { + // Given a /120 (just 8 host bits) + let cidr_str = "2001:db8::/120"; + let net: Ipv6Net = cidr_str.parse().expect("Failed to parse IPv6 CIDR"); + + let addr = random_ipv6_in_net(net); + assert!(net.contains(&addr), "Generated address not in the subnet"); + } + + #[test] + fn test_random_ipv6_in_net_full_address() { + // Given /128 => only one valid host in the subnet + let cidr_str = "2001:db8::/128"; + let net: Ipv6Net = cidr_str.parse().expect("Failed to parse IPv6 CIDR"); + + let addr = random_ipv6_in_net(net); + + // There's only one possible address: 2001:db8:: + assert_eq!( + addr, + net.network(), + "Should be exactly the single /128 address" + ); + } +} From f09d4f4194fd3fadbcf9d83cbe59e4de4fb9555e Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Fri, 27 Dec 2024 18:26:55 -0500 Subject: [PATCH 5/8] Optimize DNS blocklist handling and remove unused global allocator code --- binaries/geph5-exit/src/dns.rs | 48 +++++++++++++++++----------- binaries/geph5-exit/src/main.rs | 6 ++-- binaries/geph5-exit/src/tasklimit.rs | 4 +-- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/binaries/geph5-exit/src/dns.rs b/binaries/geph5-exit/src/dns.rs index 06c07a7..342ebf7 100644 --- a/binaries/geph5-exit/src/dns.rs +++ b/binaries/geph5-exit/src/dns.rs @@ -16,6 +16,7 @@ use simple_dns::Packet; use smol::{ channel::{Receiver, Sender}, future::FutureExt as _, + lock::Semaphore, net::UdpSocket, }; @@ -40,26 +41,31 @@ impl FilterOptions { .time_to_live(Duration::from_secs(86400)) .build() }); - let nsfw_list = NSFW_LIST - .try_get_with((), async move { - anyhow::Ok(Arc::new( - parse_oisd("https://nsfw.oisd.nl/domainswild").await?, - )) - }) - .await - .map_err(|e| anyhow::anyhow!(e))?; - let ads_list = ADS_LIST - .try_get_with((), async move { - anyhow::Ok(Arc::new( - parse_oisd("https://big.oisd.nl/domainswild").await?, - )) - }) - .await - .map_err(|e| anyhow::anyhow!(e))?; - if self.nsfw && nsfw_list.is_match(name) { + + if self.nsfw + && NSFW_LIST + .try_get_with((), async move { + anyhow::Ok(Arc::new( + parse_oisd("https://nsfw.oisd.nl/domainswild").await?, + )) + }) + .await + .map_err(|e| anyhow::anyhow!(e))? + .is_match(name) + { anyhow::bail!("blocking NSFW domain") } - if self.ads && ads_list.is_match(name) { + if self.ads + && ADS_LIST + .try_get_with((), async move { + anyhow::Ok(Arc::new( + parse_oisd("https://small.oisd.nl/domainswild").await?, + )) + }) + .await + .map_err(|e| anyhow::anyhow!(e))? + .is_match(name) + { anyhow::bail!("blocking ads domain") } Ok(()) @@ -68,6 +74,8 @@ impl FilterOptions { async fn parse_oisd(url: &str) -> anyhow::Result { let raw = reqwest::get(url).await?.bytes().await?; + tracing::info!(url, "STARTING TO BUILD an oisd blocklist"); + let mut builder = GlobSet::builder(); let mut count = 0; for line in String::from_utf8_lossy(&raw) @@ -77,6 +85,10 @@ async fn parse_oisd(url: &str) -> anyhow::Result { builder.add(Glob::from_str(line)?); builder.add(Glob::from_str(&line.replace("*.", ""))?); count += 1; + if fastrand::f32() < 0.01 { + tracing::info!(url, count, "LOADING an oisd blocklist"); + smol::future::yield_now().await; + } } tracing::info!(url, count, "LOADED an oisd blocklist"); Ok(builder.build()?) diff --git a/binaries/geph5-exit/src/main.rs b/binaries/geph5-exit/src/main.rs index 5306818..ad509e6 100644 --- a/binaries/geph5-exit/src/main.rs +++ b/binaries/geph5-exit/src/main.rs @@ -6,7 +6,7 @@ mod tasklimit; use clap::Parser; use ed25519_dalek::SigningKey; use ipnet::Ipv6Net; -use ipv6::configure_ipv6_routing; + use isocountry::CountryCode; use listen::listen_main; use once_cell::sync::{Lazy, OnceCell}; @@ -27,8 +27,8 @@ mod proxy; mod ratelimit; mod schedlag; -#[global_allocator] -static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +// #[global_allocator] +// static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; use crate::ratelimit::update_load_loop; diff --git a/binaries/geph5-exit/src/tasklimit.rs b/binaries/geph5-exit/src/tasklimit.rs index 7f9efe5..422b276 100644 --- a/binaries/geph5-exit/src/tasklimit.rs +++ b/binaries/geph5-exit/src/tasklimit.rs @@ -12,7 +12,7 @@ static TASK_KILLER: LazyLock = LazyLock::new(async_event::Ev pub async fn new_task_until_death(protected_period: Duration) -> anyhow::Result<()> { let task_limit = CONFIG_FILE.wait().task_limit; let count = TASK_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - tracing::debug!(count, task_limit, "making a task death handle"); + // tracing::debug!(count, task_limit, "making a task death handle"); scopeguard::defer!({ TASK_COUNT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); }); @@ -30,6 +30,6 @@ pub async fn new_task_until_death(protected_period: Duration) -> anyhow::Result< } }) .await; - tracing::warn!(task_limit, "a task is killed due to overflow"); + // tracing::warn!(task_limit, "a task is killed due to overflow"); anyhow::bail!("too many tasks") } From bcb06acffbebdab57298f529eb0407ff3088e8c4 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Fri, 27 Dec 2024 18:27:07 -0500 Subject: [PATCH 6/8] Enable global allocator using tikv_jemallocator in main.rs --- binaries/geph5-exit/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binaries/geph5-exit/src/main.rs b/binaries/geph5-exit/src/main.rs index ad509e6..4fa2161 100644 --- a/binaries/geph5-exit/src/main.rs +++ b/binaries/geph5-exit/src/main.rs @@ -27,8 +27,8 @@ mod proxy; mod ratelimit; mod schedlag; -// #[global_allocator] -// static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; use crate::ratelimit::update_load_loop; From d18a1cd6240751f0a83a715651bd0ba39d5147d7 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Fri, 27 Dec 2024 18:27:42 -0500 Subject: [PATCH 7/8] Add Musl target-specific Jemalloc allocator configuration --- binaries/geph5-exit/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/binaries/geph5-exit/src/main.rs b/binaries/geph5-exit/src/main.rs index 4fa2161..9f2f859 100644 --- a/binaries/geph5-exit/src/main.rs +++ b/binaries/geph5-exit/src/main.rs @@ -27,6 +27,7 @@ mod proxy; mod ratelimit; mod schedlag; +#[cfg(target_env = "musl")] #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; From 3d45c1919daaaab8236d3bfd3a9dd333f970f401 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Sat, 28 Dec 2024 08:57:38 -0500 Subject: [PATCH 8/8] bound queues to limit memory usage --- binaries/geph5-client/src/vpn.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/binaries/geph5-client/src/vpn.rs b/binaries/geph5-client/src/vpn.rs index 8fd0abc..6e9e3f2 100644 --- a/binaries/geph5-client/src/vpn.rs +++ b/binaries/geph5-client/src/vpn.rs @@ -2,7 +2,7 @@ #[cfg(target_os = "linux")] mod linux; use bytes::Bytes; -use crossbeam_queue::SegQueue; +use crossbeam_queue::{ArrayQueue, SegQueue}; use dashmap::DashMap; use ipstack_geph::{IpStack, IpStackConfig}; @@ -65,13 +65,13 @@ pub async fn recv_vpn_packet(ctx: &AnyCtx) -> Bytes { static VPN_EVENT: CtxField = |_| async_event::Event::new(); -static VPN_CAPTURE: CtxField> = |_| SegQueue::new(); +static VPN_CAPTURE: CtxField> = |_| ArrayQueue::new(100); -static VPN_INJECT: CtxField> = |_| SegQueue::new(); +static VPN_INJECT: CtxField> = |_| ArrayQueue::new(100); pub async fn vpn_loop(ctx: &AnyCtx) -> anyhow::Result<()> { - let (send_captured, recv_captured) = smol::channel::unbounded(); - let (send_injected, recv_injected) = smol::channel::unbounded(); + let (send_captured, recv_captured) = smol::channel::bounded(100); + let (send_injected, recv_injected) = smol::channel::bounded(100); let ipstack = IpStack::new( #[cfg(target_os = "ios")] @@ -115,8 +115,6 @@ pub async fn vpn_loop(ctx: &AnyCtx) -> anyhow::Result<()> { tracing::trace!(len = bts.len(), "vpn shuffling down"); ctx.get(VPN_INJECT).push(bts); ctx.get(VPN_EVENT).notify_all(); - - smol::future::yield_now().await; } }; up_loop.race(dn_loop).await