diff --git a/Cargo.lock b/Cargo.lock index 20fc6459..32250174 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,6 +1476,7 @@ dependencies = [ "serde_yaml", "sillad", "sillad-sosistab3", + "smol-timeout", "smolscale", "sqlx", "thiserror", diff --git a/binaries/geph5-broker/Cargo.toml b/binaries/geph5-broker/Cargo.toml index aa8e4a9a..c752374a 100644 --- a/binaries/geph5-broker/Cargo.toml +++ b/binaries/geph5-broker/Cargo.toml @@ -30,7 +30,8 @@ blake3 = "1.5.0" isocountry = "0.3.2" ed25519-dalek = "2.1.1" tokio = { version = "1.0", features = ["full"] } -tracing-subscriber = {version="0.3.18", features=["fmt"]} +tracing-subscriber = {version="0.3.18", features=["fmt", "env-filter"]} nanorpc-sillad={path="../../libraries/nanorpc-sillad"} sillad={path="../../libraries/sillad"} -sillad-sosistab3={path="../../libraries/sillad-sosistab3"} \ No newline at end of file +sillad-sosistab3={path="../../libraries/sillad-sosistab3"} +smol-timeout = "0.6.0" diff --git a/binaries/geph5-broker/src/routes.rs b/binaries/geph5-broker/src/routes.rs index 3a9386c2..5a3ce055 100644 --- a/binaries/geph5-broker/src/routes.rs +++ b/binaries/geph5-broker/src/routes.rs @@ -1,8 +1,4 @@ -use std::{ - net::SocketAddr, - time::{Duration, SystemTime}, -}; - +use anyhow::Context; use geph5_broker_protocol::{BridgeDescriptor, RouteDescriptor}; use geph5_misc_rpc::bridge::{B2eMetadata, BridgeControlClient, ObfsProtocol}; use moka::future::Cache; @@ -10,6 +6,11 @@ use nanorpc_sillad::DialerTransport; use once_cell::sync::Lazy; use sillad::tcp::TcpDialer; use sillad_sosistab3::{dialer::SosistabDialer, Cookie}; +use smol_timeout::TimeoutExt; +use std::{ + net::SocketAddr, + time::{Duration, SystemTime}, +}; pub async fn bridge_to_leaf_route( bridge: &BridgeDescriptor, @@ -41,7 +42,9 @@ pub async fn bridge_to_leaf_route( expiry: SystemTime::now() + Duration::from_secs(86400), }, ) - .await?; + .timeout(Duration::from_secs(5)) + .await + .context("timeout ")??; anyhow::Ok(RouteDescriptor::Sosistab3 { cookie, lower: RouteDescriptor::Tcp(forwarded_listen).into(), diff --git a/binaries/geph5-client/src/client.rs b/binaries/geph5-client/src/client.rs index 67984654..caf5db3f 100644 --- a/binaries/geph5-client/src/client.rs +++ b/binaries/geph5-client/src/client.rs @@ -10,7 +10,7 @@ use smolscale::immortal::{Immortal, RespawnStrategy}; use crate::{broker::BrokerSource, exit::ExitConstraint}; -use self::{inner::client_inner, socks5::socks5_loop}; +use self::{inner::client_once, socks5::socks5_loop}; #[derive(Serialize, Deserialize, Clone)] pub struct Config { @@ -44,7 +44,7 @@ async fn client_main(ctx: AnyCtx) -> anyhow::Result<()> { .map(|_| { Immortal::respawn( RespawnStrategy::JitterDelay(Duration::from_secs(1), Duration::from_secs(5)), - clone!([ctx], move || client_inner(ctx.clone()) + clone!([ctx], move || client_once(ctx.clone()) .inspect_err(|e| tracing::warn!("client_inner died: {:?}", e))), ) }) diff --git a/binaries/geph5-client/src/client/inner.rs b/binaries/geph5-client/src/client/inner.rs index a1aafa1b..c27aedf6 100644 --- a/binaries/geph5-client/src/client/inner.rs +++ b/binaries/geph5-client/src/client/inner.rs @@ -40,7 +40,7 @@ static CONN_REQ_CHAN: CtxField<( static COUNTER: AtomicU64 = AtomicU64::new(0); #[tracing::instrument(skip_all, fields(instance=COUNTER.fetch_add(1, Ordering::Relaxed)))] -pub async fn client_inner(ctx: AnyCtx) -> anyhow::Result<()> { +pub async fn client_once(ctx: AnyCtx) -> anyhow::Result<()> { tracing::info!("(re)starting main logic"); let start = Instant::now(); let authed_pipe = async { @@ -62,6 +62,10 @@ pub async fn client_inner(ctx: AnyCtx) -> anyhow::Result<()> { .timeout(Duration::from_secs(60)) .await .context("overall dial/mux/auth timeout")??; + client_inner(ctx, authed_pipe).await +} +#[tracing::instrument(skip_all, fields(remote=authed_pipe.remote_addr().unwrap_or("(none)")))] +async fn client_inner(ctx: AnyCtx, authed_pipe: impl Pipe) -> anyhow::Result<()> { let (read, write) = authed_pipe.split(); let mut mux = PicoMux::new(read, write); mux.set_liveness(LivenessConfig { diff --git a/libraries/geph5-misc-rpc/src/exit.rs b/libraries/geph5-misc-rpc/src/exit.rs index 561197a1..2c1a5c7e 100644 --- a/libraries/geph5-misc-rpc/src/exit.rs +++ b/libraries/geph5-misc-rpc/src/exit.rs @@ -69,6 +69,8 @@ pub struct ClientExitCryptPipe { #[pin] write_outgoing: BipeWriter, _write_task: Task<()>, + + addr: Option, } impl AsyncRead for ClientExitCryptPipe { @@ -108,6 +110,7 @@ impl AsyncWrite for ClientExitCryptPipe { impl ClientExitCryptPipe { /// Creates a new pipe, given read and write keys pub fn new(pipe: impl Pipe, read_key: [u8; 32], write_key: [u8; 32]) -> Self { + let addr = pipe.remote_addr().map(|s| s.to_string()); let (mut pipe_read, mut pipe_write) = pipe.split(); let (mut write_incoming, read_incoming) = bipe::bipe(32768); let (write_outgoing, mut read_outgoing) = bipe::bipe(32768); @@ -154,6 +157,8 @@ impl ClientExitCryptPipe { _read_task, write_outgoing, _write_task, + + addr, } } } @@ -162,4 +167,8 @@ impl Pipe for ClientExitCryptPipe { fn protocol(&self) -> &str { "client-exit" } + + fn remote_addr(&self) -> Option<&str> { + self.addr.as_deref() + } } diff --git a/libraries/picomux/src/lib.rs b/libraries/picomux/src/lib.rs index 3e13a024..bc135c1d 100644 --- a/libraries/picomux/src/lib.rs +++ b/libraries/picomux/src/lib.rs @@ -533,6 +533,10 @@ impl sillad::Pipe for Stream { fn protocol(&self) -> &str { "sillad-stream" } + + fn remote_addr(&self) -> Option<&str> { + None + } } #[cfg(test)] diff --git a/libraries/sillad-sosistab3/src/lib.rs b/libraries/sillad-sosistab3/src/lib.rs index 72ce6489..04adf967 100644 --- a/libraries/sillad-sosistab3/src/lib.rs +++ b/libraries/sillad-sosistab3/src/lib.rs @@ -189,4 +189,8 @@ impl Pipe for SosistabPipe

{ fn protocol(&self) -> &str { "sosistab3" } + + fn remote_addr(&self) -> Option<&str> { + self.lower.remote_addr() + } } diff --git a/libraries/sillad/src/lib.rs b/libraries/sillad/src/lib.rs index 4f772031..66c591bd 100644 --- a/libraries/sillad/src/lib.rs +++ b/libraries/sillad/src/lib.rs @@ -16,6 +16,9 @@ pub trait Pipe: AsyncRead + AsyncWrite + Send + Unpin + 'static { /// This must return a string that uniquely identifies the protocol type. fn protocol(&self) -> &str; + + /// This might return a string that is some sort of human-readable identifier of the remote address. + fn remote_addr(&self) -> Option<&str>; } impl Pipe for Box { @@ -26,6 +29,10 @@ impl Pipe for Box { fn protocol(&self) -> &str { (**self).protocol() } + + fn remote_addr(&self) -> Option<&str> { + (**self).remote_addr() + } } /// EitherPipe is a pipe that is either left or right. @@ -95,4 +102,11 @@ impl Pipe for EitherPipe { EitherPipe::Right(r) => r.protocol(), } } + + fn remote_addr(&self) -> Option<&str> { + match self { + EitherPipe::Left(l) => l.remote_addr(), + EitherPipe::Right(r) => r.remote_addr(), + } + } } diff --git a/libraries/sillad/src/tcp.rs b/libraries/sillad/src/tcp.rs index beb725c1..008f289a 100644 --- a/libraries/sillad/src/tcp.rs +++ b/libraries/sillad/src/tcp.rs @@ -31,8 +31,8 @@ impl Listener for TcpListener { async fn accept(&mut self) -> std::io::Result { let (conn, _) = self.inner.accept().await?; set_tcp_options(&conn)?; - - Ok(TcpPipe(conn)) + let addr = conn.as_ref().peer_addr()?.to_string(); + Ok(TcpPipe(conn, addr)) } } @@ -68,12 +68,12 @@ impl Dialer for TcpDialer { async fn dial(&self) -> std::io::Result { let inner = Async::::connect(self.dest_addr).await?; set_tcp_options(&inner)?; - Ok(TcpPipe(inner)) + Ok(TcpPipe(inner, self.dest_addr.to_string())) } } #[pin_project] -pub struct TcpPipe(#[pin] Async); +pub struct TcpPipe(#[pin] Async, String); impl AsyncRead for TcpPipe { fn poll_read( @@ -113,4 +113,8 @@ impl Pipe for TcpPipe { fn protocol(&self) -> &str { "tcp" } + + fn remote_addr(&self) -> Option<&str> { + Some(&self.1) + } }