Skip to content

Commit

Permalink
more info
Browse files Browse the repository at this point in the history
  • Loading branch information
nullchinchilla committed Mar 14, 2024
1 parent fcf3788 commit 3c32761
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions binaries/geph5-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ blake3 = "1.5.0"
isocountry = "0.3.2"
ed25519-dalek = "2.1.1"
tokio = { version = "1.0", features = ["full"] }
tracing-subscriber = {version="0.3.18", features=["fmt"]}
tracing-subscriber = {version="0.3.18", features=["fmt", "env-filter"]}
nanorpc-sillad={path="../../libraries/nanorpc-sillad"}
sillad={path="../../libraries/sillad"}
sillad-sosistab3={path="../../libraries/sillad-sosistab3"}
sillad-sosistab3={path="../../libraries/sillad-sosistab3"}
smol-timeout = "0.6.0"
15 changes: 9 additions & 6 deletions binaries/geph5-broker/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::{
net::SocketAddr,
time::{Duration, SystemTime},
};

use anyhow::Context;
use geph5_broker_protocol::{BridgeDescriptor, RouteDescriptor};
use geph5_misc_rpc::bridge::{B2eMetadata, BridgeControlClient, ObfsProtocol};
use moka::future::Cache;
use nanorpc_sillad::DialerTransport;
use once_cell::sync::Lazy;
use sillad::tcp::TcpDialer;
use sillad_sosistab3::{dialer::SosistabDialer, Cookie};
use smol_timeout::TimeoutExt;
use std::{
net::SocketAddr,
time::{Duration, SystemTime},
};

pub async fn bridge_to_leaf_route(
bridge: &BridgeDescriptor,
Expand Down Expand Up @@ -41,7 +42,9 @@ pub async fn bridge_to_leaf_route(
expiry: SystemTime::now() + Duration::from_secs(86400),
},
)
.await?;
.timeout(Duration::from_secs(5))
.await
.context("timeout ")??;
anyhow::Ok(RouteDescriptor::Sosistab3 {
cookie,
lower: RouteDescriptor::Tcp(forwarded_listen).into(),
Expand Down
4 changes: 2 additions & 2 deletions binaries/geph5-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use smolscale::immortal::{Immortal, RespawnStrategy};

use crate::{broker::BrokerSource, exit::ExitConstraint};

use self::{inner::client_inner, socks5::socks5_loop};
use self::{inner::client_once, socks5::socks5_loop};

#[derive(Serialize, Deserialize, Clone)]
pub struct Config {
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
.map(|_| {
Immortal::respawn(
RespawnStrategy::JitterDelay(Duration::from_secs(1), Duration::from_secs(5)),
clone!([ctx], move || client_inner(ctx.clone())
clone!([ctx], move || client_once(ctx.clone())
.inspect_err(|e| tracing::warn!("client_inner died: {:?}", e))),
)
})
Expand Down
6 changes: 5 additions & 1 deletion binaries/geph5-client/src/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static CONN_REQ_CHAN: CtxField<(
static COUNTER: AtomicU64 = AtomicU64::new(0);

#[tracing::instrument(skip_all, fields(instance=COUNTER.fetch_add(1, Ordering::Relaxed)))]
pub async fn client_inner(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
pub async fn client_once(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
tracing::info!("(re)starting main logic");
let start = Instant::now();
let authed_pipe = async {
Expand All @@ -62,6 +62,10 @@ pub async fn client_inner(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
.timeout(Duration::from_secs(60))
.await
.context("overall dial/mux/auth timeout")??;
client_inner(ctx, authed_pipe).await
}
#[tracing::instrument(skip_all, fields(remote=authed_pipe.remote_addr().unwrap_or("(none)")))]
async fn client_inner(ctx: AnyCtx<Config>, authed_pipe: impl Pipe) -> anyhow::Result<()> {
let (read, write) = authed_pipe.split();
let mut mux = PicoMux::new(read, write);
mux.set_liveness(LivenessConfig {
Expand Down
9 changes: 9 additions & 0 deletions libraries/geph5-misc-rpc/src/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct ClientExitCryptPipe {
#[pin]
write_outgoing: BipeWriter,
_write_task: Task<()>,

addr: Option<String>,
}

impl AsyncRead for ClientExitCryptPipe {
Expand Down Expand Up @@ -108,6 +110,7 @@ impl AsyncWrite for ClientExitCryptPipe {
impl ClientExitCryptPipe {
/// Creates a new pipe, given read and write keys
pub fn new(pipe: impl Pipe, read_key: [u8; 32], write_key: [u8; 32]) -> Self {
let addr = pipe.remote_addr().map(|s| s.to_string());
let (mut pipe_read, mut pipe_write) = pipe.split();
let (mut write_incoming, read_incoming) = bipe::bipe(32768);
let (write_outgoing, mut read_outgoing) = bipe::bipe(32768);
Expand Down Expand Up @@ -154,6 +157,8 @@ impl ClientExitCryptPipe {
_read_task,
write_outgoing,
_write_task,

addr,
}
}
}
Expand All @@ -162,4 +167,8 @@ impl Pipe for ClientExitCryptPipe {
fn protocol(&self) -> &str {
"client-exit"
}

fn remote_addr(&self) -> Option<&str> {
self.addr.as_deref()
}
}
4 changes: 4 additions & 0 deletions libraries/picomux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,10 @@ impl sillad::Pipe for Stream {
fn protocol(&self) -> &str {
"sillad-stream"
}

fn remote_addr(&self) -> Option<&str> {
None
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions libraries/sillad-sosistab3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,8 @@ impl<P: Pipe> Pipe for SosistabPipe<P> {
fn protocol(&self) -> &str {
"sosistab3"
}

fn remote_addr(&self) -> Option<&str> {
self.lower.remote_addr()
}
}
14 changes: 14 additions & 0 deletions libraries/sillad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub trait Pipe: AsyncRead + AsyncWrite + Send + Unpin + 'static {

/// This must return a string that uniquely identifies the protocol type.
fn protocol(&self) -> &str;

/// This might return a string that is some sort of human-readable identifier of the remote address.
fn remote_addr(&self) -> Option<&str>;
}

impl Pipe for Box<dyn Pipe> {
Expand All @@ -26,6 +29,10 @@ impl Pipe for Box<dyn Pipe> {
fn protocol(&self) -> &str {
(**self).protocol()
}

fn remote_addr(&self) -> Option<&str> {
(**self).remote_addr()
}
}

/// EitherPipe is a pipe that is either left or right.
Expand Down Expand Up @@ -95,4 +102,11 @@ impl<L: Pipe, R: Pipe> Pipe for EitherPipe<L, R> {
EitherPipe::Right(r) => r.protocol(),
}
}

fn remote_addr(&self) -> Option<&str> {
match self {
EitherPipe::Left(l) => l.remote_addr(),
EitherPipe::Right(r) => r.remote_addr(),
}
}
}
12 changes: 8 additions & 4 deletions libraries/sillad/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl Listener for TcpListener {
async fn accept(&mut self) -> std::io::Result<Self::P> {
let (conn, _) = self.inner.accept().await?;
set_tcp_options(&conn)?;

Ok(TcpPipe(conn))
let addr = conn.as_ref().peer_addr()?.to_string();
Ok(TcpPipe(conn, addr))
}
}

Expand Down Expand Up @@ -68,12 +68,12 @@ impl Dialer for TcpDialer {
async fn dial(&self) -> std::io::Result<Self::P> {
let inner = Async::<TcpStream>::connect(self.dest_addr).await?;
set_tcp_options(&inner)?;
Ok(TcpPipe(inner))
Ok(TcpPipe(inner, self.dest_addr.to_string()))
}
}

#[pin_project]
pub struct TcpPipe(#[pin] Async<TcpStream>);
pub struct TcpPipe(#[pin] Async<TcpStream>, String);

impl AsyncRead for TcpPipe {
fn poll_read(
Expand Down Expand Up @@ -113,4 +113,8 @@ impl Pipe for TcpPipe {
fn protocol(&self) -> &str {
"tcp"
}

fn remote_addr(&self) -> Option<&str> {
Some(&self.1)
}
}

0 comments on commit 3c32761

Please sign in to comment.