diff --git a/binaries/geph5-broker/src/auth_token.rs b/binaries/geph5-broker/src/auth_token.rs new file mode 100644 index 00000000..1b254f28 --- /dev/null +++ b/binaries/geph5-broker/src/auth_token.rs @@ -0,0 +1,32 @@ +use std::ops::Deref as _; + +use rand::Rng as _; + +use crate::database::POSTGRES; + +pub async fn new_auth_token(user_id: i64) -> anyhow::Result { + let token: String = std::iter::repeat(()) + .map(|()| rand::thread_rng().sample(rand::distributions::Alphanumeric)) + .map(char::from) + .take(30) + .collect(); + + match sqlx::query("INSERT INTO auth_tokens (token, user_id) VALUES ($1, $2)") + .bind(&token) + .bind(user_id) + .execute(POSTGRES.deref()) + .await + { + Ok(_) => Ok(token), + Err(e) => anyhow::bail!("database failed {e}"), // If insertion fails, return RateLimited error + } +} + +pub async fn valid_auth_token(token: &str) -> anyhow::Result { + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM auth_tokens WHERE token = $1") + .bind(token) + .fetch_one(POSTGRES.deref()) + .await?; + + Ok(row.0 > 0) +} diff --git a/binaries/geph5-client/src/auth.rs b/binaries/geph5-client/src/auth.rs new file mode 100644 index 00000000..20a42475 --- /dev/null +++ b/binaries/geph5-client/src/auth.rs @@ -0,0 +1,82 @@ +use std::time::Duration; + +use anyctx::AnyCtx; +use anyhow::Context as _; +use blind_rsa_signatures as brs; +use geph5_broker_protocol::{AccountLevel, AuthError, Credential}; +use mizaru2::ClientToken; +use stdcode::StdcodeSerializeExt; + +use crate::{ + broker::broker_client, + client::Config, + database::{db_read, db_write}, +}; + +// Basic workflow, we have a maintenance task that, given an auth token, refreshes the connection token for this and the next epoch, every 24 hours. + +pub async fn auth_loop(ctx: &AnyCtx) -> anyhow::Result<()> { + // Dummy authentication for now! + let auth_token = if let Some(token) = db_read(ctx, "auth_token").await? { + String::from_utf8_lossy(&token).to_string() + } else { + let auth_token = broker_client(ctx)? + .get_auth_token(Credential::TestDummy) + .await??; + db_write(ctx, "auth_token", auth_token.as_bytes()).await?; + auth_token + }; + loop { + refresh_conn_token(ctx, &auth_token).await?; + smol::Timer::after(Duration::from_secs(10)).await; + } +} + +#[tracing::instrument(skip_all)] +async fn refresh_conn_token(ctx: &AnyCtx, auth_token: &str) -> anyhow::Result<()> { + let epoch = mizaru2::current_epoch(); + let broker_client = broker_client(ctx)?; + for epoch in [epoch, epoch + 1] { + if db_read(ctx, &format!("conn_token_{epoch}")) + .await? + .is_none() + { + let token = ClientToken::random(); + for level in [AccountLevel::Plus, AccountLevel::Free] { + tracing::debug!(epoch, level = debug(level), "refreshing conn token"); + let subkey = broker_client + .get_mizaru_subkey(level, epoch) + .await + .context("cannot get subkey")?; + tracing::debug!(epoch, subkey_len = subkey.len(), "got subkey"); + let subkey: brs::PublicKey = + brs::PublicKey::from_der(&subkey).context("cannot decode subkey")?; + let (blind_token, secret) = token.blind(&subkey); + let conn_token = broker_client + .get_connect_token(auth_token.to_string(), level, epoch, blind_token) + .await + .context("cannot get connect token")?; + match conn_token { + Ok(res) => { + let u_sig = res + .unblind(&secret, token) + .context("cannot unblind response")?; + db_write( + ctx, + &format!("conn_token_{epoch}"), + &(token, u_sig).stdcode(), + ) + .await?; + break; + } + Err(AuthError::WrongLevel) => { + tracing::debug!(epoch, level = debug(level), "switching to next level"); + continue; + } + Err(e) => anyhow::bail!("cannot get token: {e}"), + } + } + } + } + anyhow::Ok(()) +} diff --git a/binaries/geph5-client/src/client_inner.rs b/binaries/geph5-client/src/client_inner.rs new file mode 100644 index 00000000..b2fd58a0 --- /dev/null +++ b/binaries/geph5-client/src/client_inner.rs @@ -0,0 +1,176 @@ +use anyctx::AnyCtx; +use anyhow::Context; +use ed25519_dalek::VerifyingKey; +use futures_util::AsyncReadExt as _; +use geph5_misc_rpc::{ + exit::{ClientCryptHello, ClientExitCryptPipe, ClientHello, ExitHello, ExitHelloInner}, + read_prepend_length, write_prepend_length, +}; +use nursery_macro::nursery; +use picomux::{LivenessConfig, PicoMux}; +use sillad::{dialer::Dialer as _, EitherPipe, Pipe}; +use smol::future::FutureExt as _; +use smol_timeout::TimeoutExt; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use stdcode::StdcodeSerializeExt; + +use crate::{client::CtxField, route::get_dialer}; + +use super::Config; + +pub async fn open_conn(ctx: &AnyCtx, dest_addr: &str) -> anyhow::Result { + let (send, recv) = oneshot::channel(); + let elem = (dest_addr.to_string(), send); + let _ = ctx.get(CONN_REQ_CHAN).0.send(elem).await; + Ok(recv.await?) +} + +type ChanElem = (String, oneshot::Sender); + +static CONN_REQ_CHAN: CtxField<( + smol::channel::Sender, + smol::channel::Receiver, +)> = |_| smol::channel::unbounded(); + +static COUNTER: AtomicU64 = AtomicU64::new(0); + +#[tracing::instrument(skip_all, fields(instance=COUNTER.fetch_add(1, Ordering::Relaxed)))] +pub async fn client_once(ctx: AnyCtx) -> anyhow::Result<()> { + tracing::info!("(re)starting main logic"); + let start = Instant::now(); + let authed_pipe = async { + let (pubkey, raw_dialer) = get_dialer(&ctx).await?; + tracing::debug!(elapsed = debug(start.elapsed()), "raw dialer constructed"); + let raw_pipe = raw_dialer.dial().await?; + tracing::debug!( + elapsed = debug(start.elapsed()), + protocol = raw_pipe.protocol(), + "dial completed" + ); + let authed_pipe = client_auth(raw_pipe, pubkey).await?; + tracing::debug!( + elapsed = debug(start.elapsed()), + "authentication done, starting mux system" + ); + anyhow::Ok(authed_pipe) + } + .timeout(Duration::from_secs(60)) + .await + .context("overall dial/mux/auth timeout")??; + client_inner(ctx, authed_pipe).await +} + +#[tracing::instrument(skip_all, fields(remote=display(authed_pipe.remote_addr().unwrap_or("(none)"))))] +async fn client_inner(ctx: AnyCtx, authed_pipe: impl Pipe) -> anyhow::Result<()> { + let (read, write) = authed_pipe.split(); + let mut mux = PicoMux::new(read, write); + mux.set_liveness(LivenessConfig { + ping_interval: Duration::from_secs(600), + timeout: Duration::from_secs(10), + }); + let mux = Arc::new(mux); + + let (send_stop, mut recv_stop) = tachyonix::channel(1); + // run a socks5 loop + async { + let err: std::io::Error = recv_stop.recv().await?; + Err(err.into()) + } + .race(async { + nursery!({ + loop { + let mux = mux.clone(); + let send_stop = send_stop.clone(); + let ctx = ctx.clone(); + let (remote_addr, send_back) = ctx.get(CONN_REQ_CHAN).1.recv().await?; + smol::future::yield_now().await; + spawn!(async move { + tracing::debug!(remote_addr = display(&remote_addr), "connecting to remote"); + let stream = mux.open(remote_addr.as_bytes()).await; + match stream { + Ok(stream) => { + let _ = send_back.send(stream); + } + Err(err) => { + tracing::warn!(remote_addr = display(&remote_addr), err = debug(&err), "session is dead, hot-potatoing the connection request to somebody else"); + let _ = send_stop.try_send(err); + let _ = ctx.get(CONN_REQ_CHAN).0.try_send((remote_addr, send_back)); + } + } + anyhow::Ok(()) + }) + .detach(); + } + }) + }) + .await +} + +#[tracing::instrument(skip_all, fields(pubkey = hex::encode(pubkey.as_bytes())))] +async fn client_auth(mut pipe: impl Pipe, pubkey: VerifyingKey) -> anyhow::Result { + match pipe.shared_secret().map(|s| s.to_owned()) { + Some(ss) => { + tracing::debug!("using shared secret for authentication"); + let challenge = rand::random(); + let client_hello = ClientHello { + credentials: Default::default(), // no authentication support yet + crypt_hello: ClientCryptHello::SharedSecretChallenge(challenge), + }; + write_prepend_length(&client_hello.stdcode(), &mut pipe).await?; + + let mac = blake3::keyed_hash(&challenge, &ss); + let exit_response: ExitHello = + stdcode::deserialize(&read_prepend_length(&mut pipe).await?)?; + match exit_response.inner { + ExitHelloInner::SharedSecretResponse(response_mac) => { + if mac == response_mac { + tracing::debug!("authentication successful with shared secret"); + Ok(EitherPipe::Left(pipe)) + } else { + anyhow::bail!("authentication failed with shared secret"); + } + } + _ => anyhow::bail!("unexpected response from server"), + } + } + None => { + tracing::debug!("requiring full authentication"); + let my_esk = x25519_dalek::EphemeralSecret::random_from_rng(rand::thread_rng()); + let client_hello = ClientHello { + credentials: Default::default(), // no authentication support yet + crypt_hello: ClientCryptHello::X25519((&my_esk).into()), + }; + write_prepend_length(&client_hello.stdcode(), &mut pipe).await?; + let exit_hello: ExitHello = + stdcode::deserialize(&read_prepend_length(&mut pipe).await?)?; + // verify the exit hello + let signed_value = (&client_hello, &exit_hello.inner).stdcode(); + pubkey + .verify_strict(&signed_value, &exit_hello.signature) + .context("exit hello failed validation")?; + match exit_hello.inner { + ExitHelloInner::Reject(reason) => { + anyhow::bail!("exit rejected our authentication attempt: {reason}") + } + ExitHelloInner::SharedSecretResponse(_) => anyhow::bail!( + "exit sent a shared-secret response to our full authentication request" + ), + ExitHelloInner::X25519(their_epk) => { + let shared_secret = my_esk.diffie_hellman(&their_epk); + let read_key = blake3::derive_key("e2c", shared_secret.as_bytes()); + let write_key = blake3::derive_key("c2e", shared_secret.as_bytes()); + Ok(EitherPipe::Right(ClientExitCryptPipe::new( + pipe, read_key, write_key, + ))) + } + } + } + } +} diff --git a/binaries/geph5-client/src/database.rs b/binaries/geph5-client/src/database.rs new file mode 100644 index 00000000..ea00609f --- /dev/null +++ b/binaries/geph5-client/src/database.rs @@ -0,0 +1,67 @@ +use anyctx::AnyCtx; +use event_listener::Event; +use sqlx::Row; +use sqlx::{sqlite::SqliteConnectOptions, Pool, SqlitePool}; +use std::str::FromStr; + +use crate::client::{Config, CtxField}; + +static DATABASE: CtxField = |ctx| { + let db_path = ctx + .init() + .cache + .as_ref() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| ":memory:".into()); + tracing::debug!("INITIALIZING DATABASE"); + let options = SqliteConnectOptions::from_str(&db_path) + .unwrap() + .create_if_missing(true); + + smol::future::block_on(async move { + let pool = Pool::connect_with(options).await.unwrap(); + sqlx::query( + "CREATE TABLE IF NOT EXISTS misc ( + key TEXT PRIMARY KEY, + value BLOB NOT NULL + );", + ) + .execute(&pool) + .await + .unwrap(); + + pool + }) +}; + +static EVENT: CtxField = |_| Event::new(); + +pub async fn db_write(ctx: &AnyCtx, key: &str, value: &[u8]) -> Result<(), sqlx::Error> { + sqlx::query("INSERT INTO misc (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value") + .bind(key) + .bind(value) + .execute(ctx.get(DATABASE)) + .await?; + ctx.get(EVENT).notify(usize::MAX); + Ok(()) +} + +pub async fn db_read(ctx: &AnyCtx, key: &str) -> Result>, sqlx::Error> { + let result = sqlx::query("SELECT value FROM misc WHERE key = ?") + .bind(key) + .fetch_optional(ctx.get(DATABASE)) + .await? + .map(|row| row.get("value")); + Ok(result) +} + +pub async fn db_read_or_wait(ctx: &AnyCtx, key: &str) -> Result, sqlx::Error> { + loop { + let event = ctx.get(EVENT).listen(); + let result = db_read(ctx, key).await?; + match result { + Some(val) => return Ok(val), + None => event.await, + } + } +} diff --git a/binaries/geph5-client/src/route.rs b/binaries/geph5-client/src/route.rs new file mode 100644 index 00000000..77171956 --- /dev/null +++ b/binaries/geph5-client/src/route.rs @@ -0,0 +1,148 @@ +use std::time::Duration; + +use anyctx::AnyCtx; +use anyhow::Context; + +use ed25519_dalek::VerifyingKey; +use geph5_broker_protocol::{RouteDescriptor, DOMAIN_EXIT_DESCRIPTOR}; +use isocountry::CountryCode; +use rand::seq::SliceRandom; +use serde::{Deserialize, Serialize}; +use sillad::{ + dialer::{DialerExt, DynDialer, FailingDialer}, + tcp::TcpDialer, +}; +use sillad_sosistab3::{dialer::SosistabDialer, Cookie}; + +use crate::{broker::broker_client, client::Config}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "snake_case")] +pub enum ExitConstraint { + Auto, + Direct(String), + Country(CountryCode), + CountryCity(CountryCode, String), +} + +/// Gets a sillad Dialer that produces a single, pre-authentication pipe, as well as the public key. +pub async fn get_dialer(ctx: &AnyCtx) -> anyhow::Result<(VerifyingKey, DynDialer)> { + let mut country_constraint = None; + let mut city_constraint = None; + match &ctx.init().exit_constraint { + ExitConstraint::Direct(dir) => { + let (dir, pubkey) = dir + .split_once('/') + .context("did not find / in a direct constraint")?; + let pubkey = VerifyingKey::from_bytes( + hex::decode(pubkey) + .context("cannot decode pubkey as hex")? + .as_slice() + .try_into() + .context("pubkey wrong length")?, + )?; + return Ok(( + pubkey, + TcpDialer { + dest_addr: *smol::net::resolve(dir) + .await? + .choose(&mut rand::thread_rng()) + .context("could not resolve destination for direct exit connection")?, + } + .dynamic(), + )); + } + ExitConstraint::Country(country) => country_constraint = Some(*country), + ExitConstraint::CountryCity(country, city) => { + country_constraint = Some(*country); + city_constraint = Some(city.clone()) + } + ExitConstraint::Auto => {} + } + tracing::debug!( + country_constraint = debug(country_constraint), + city_constraint = debug(&city_constraint), + "created dialer" + ); + + let broker = broker_client(ctx)?; + let exits = broker + .get_exits() + .await? + .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?; + // TODO we need to ACTUALLY verify the response!!! + let exits = exits.verify(DOMAIN_EXIT_DESCRIPTOR, |_| true)?; + // filter for things that fit + let (pubkey, exit) = exits + .all_exits + .into_iter() + .filter(|(_, exit)| { + let country_pass = if let Some(country) = &country_constraint { + exit.country == *country + } else { + true + }; + let city_pass = if let Some(city) = &city_constraint { + &exit.city == city + } else { + true + }; + country_pass && city_pass + }) + .min_by_key(|e| (e.1.load * 1000.0) as u64) + .context("no exits that fit the criterion")?; + tracing::debug!(exit = debug(&exit), "narrowed down choice of exit"); + let direct_dialer = TcpDialer { + dest_addr: exit.c2e_listen, + }; + + // Also obtain the bridges + let bridge_routes = broker + .get_routes(todo!(), todo!(), exit.b2e_listen) + .await? + .map_err(|e| anyhow::anyhow!("broker refused to serve bridge routes: {e}"))?; + tracing::debug!( + bridge_routes = debug(&bridge_routes), + "bridge routes obtained too" + ); + + let bridge_dialer = route_to_dialer(&bridge_routes); + + Ok((pubkey, direct_dialer.race(bridge_dialer).dynamic())) +} + +fn route_to_dialer(route: &RouteDescriptor) -> DynDialer { + match route { + RouteDescriptor::Tcp(addr) => TcpDialer { dest_addr: *addr }.dynamic(), + RouteDescriptor::Sosistab3 { cookie, lower } => { + let inner = route_to_dialer(lower); + SosistabDialer { + inner, + cookie: Cookie::new(cookie), + } + .dynamic() + } + RouteDescriptor::Race(inside) => inside + .iter() + .map(route_to_dialer) + .reduce(|a, b| a.race(b).dynamic()) + .unwrap_or_else(|| FailingDialer.dynamic()), + RouteDescriptor::Fallback(a) => a + .iter() + .map(route_to_dialer) + .reduce(|a, b| a.fallback(b).dynamic()) + .unwrap_or_else(|| FailingDialer.dynamic()), + RouteDescriptor::Timeout { + milliseconds, + lower, + } => route_to_dialer(lower) + .timeout(Duration::from_millis(*milliseconds as _)) + .dynamic(), + RouteDescriptor::Delay { + milliseconds, + lower, + } => route_to_dialer(lower) + .delay(Duration::from_millis((*milliseconds).into())) + .dynamic(), + } +} diff --git a/binaries/geph5-client/src/socks5.rs b/binaries/geph5-client/src/socks5.rs new file mode 100644 index 00000000..0d636487 --- /dev/null +++ b/binaries/geph5-client/src/socks5.rs @@ -0,0 +1,59 @@ +use crate::client_inner::open_conn; +use anyctx::AnyCtx; +use futures_util::AsyncReadExt as _; +use nursery_macro::nursery; +use sillad::listener::Listener as _; +use smol::future::FutureExt as _; +use socksv5::v5::{ + read_handshake, read_request, write_auth_method, write_request_status, SocksV5AuthMethod, + SocksV5Host, SocksV5RequestStatus, +}; +use std::net::Ipv4Addr; + +use super::Config; + +#[tracing::instrument(skip_all)] +pub async fn socks5_loop(ctx: AnyCtx) -> anyhow::Result<()> { + let mut listener = sillad::tcp::TcpListener::bind(ctx.init().socks5_listen).await?; + nursery!({ + loop { + let client = listener.accept().await?; + spawn!(async { + tracing::trace!("socks5 connection accepted"); + let (mut read_client, mut write_client) = client.split(); + let _handshake = read_handshake(&mut read_client).await?; + write_auth_method(&mut write_client, SocksV5AuthMethod::Noauth).await?; + let request = read_request(&mut read_client).await?; + let port = request.port; + let domain: String = match &request.host { + SocksV5Host::Domain(dom) => String::from_utf8_lossy(dom).parse()?, + SocksV5Host::Ipv4(v4) => { + let v4addr = Ipv4Addr::new(v4[0], v4[1], v4[2], v4[3]); + v4addr.to_string() + } + _ => anyhow::bail!("IPv6 not supported"), + }; + let remote_addr = format!("{domain}:{port}"); + tracing::trace!( + remote_addr = display(&remote_addr), + "socks5 request received" + ); + let stream = open_conn(&ctx, &remote_addr).await?; + write_request_status( + &mut write_client, + SocksV5RequestStatus::Success, + request.host, + port, + ) + .await?; + tracing::trace!(remote_addr = display(&remote_addr), "connection opened"); + let (read_stream, write_stream) = stream.split(); + smol::io::copy(read_stream, write_client) + .race(smol::io::copy(read_client, write_stream)) + .await?; + anyhow::Ok(()) + }) + .detach(); + } + }) +}