Skip to content

Commit

Permalink
new
Browse files Browse the repository at this point in the history
  • Loading branch information
nullchinchilla committed Mar 25, 2024
1 parent f32361d commit 0d16ee7
Show file tree
Hide file tree
Showing 6 changed files with 564 additions and 0 deletions.
32 changes: 32 additions & 0 deletions binaries/geph5-broker/src/auth_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::ops::Deref as _;

use rand::Rng as _;

use crate::database::POSTGRES;

pub async fn new_auth_token(user_id: i64) -> anyhow::Result<String> {
let token: String = std::iter::repeat(())
.map(|()| rand::thread_rng().sample(rand::distributions::Alphanumeric))
.map(char::from)
.take(30)
.collect();

match sqlx::query("INSERT INTO auth_tokens (token, user_id) VALUES ($1, $2)")
.bind(&token)
.bind(user_id)
.execute(POSTGRES.deref())
.await
{
Ok(_) => Ok(token),
Err(e) => anyhow::bail!("database failed {e}"), // If insertion fails, return RateLimited error
}
}

pub async fn valid_auth_token(token: &str) -> anyhow::Result<bool> {
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM auth_tokens WHERE token = $1")
.bind(token)
.fetch_one(POSTGRES.deref())
.await?;

Ok(row.0 > 0)
}
82 changes: 82 additions & 0 deletions binaries/geph5-client/src/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::time::Duration;

use anyctx::AnyCtx;
use anyhow::Context as _;
use blind_rsa_signatures as brs;
use geph5_broker_protocol::{AccountLevel, AuthError, Credential};
use mizaru2::ClientToken;
use stdcode::StdcodeSerializeExt;

use crate::{
broker::broker_client,
client::Config,
database::{db_read, db_write},
};

// Basic workflow, we have a maintenance task that, given an auth token, refreshes the connection token for this and the next epoch, every 24 hours.

pub async fn auth_loop(ctx: &AnyCtx<Config>) -> anyhow::Result<()> {
// Dummy authentication for now!
let auth_token = if let Some(token) = db_read(ctx, "auth_token").await? {
String::from_utf8_lossy(&token).to_string()
} else {
let auth_token = broker_client(ctx)?
.get_auth_token(Credential::TestDummy)
.await??;
db_write(ctx, "auth_token", auth_token.as_bytes()).await?;
auth_token
};
loop {
refresh_conn_token(ctx, &auth_token).await?;
smol::Timer::after(Duration::from_secs(10)).await;
}
}

#[tracing::instrument(skip_all)]
async fn refresh_conn_token(ctx: &AnyCtx<Config>, auth_token: &str) -> anyhow::Result<()> {
let epoch = mizaru2::current_epoch();
let broker_client = broker_client(ctx)?;
for epoch in [epoch, epoch + 1] {
if db_read(ctx, &format!("conn_token_{epoch}"))
.await?
.is_none()
{
let token = ClientToken::random();
for level in [AccountLevel::Plus, AccountLevel::Free] {
tracing::debug!(epoch, level = debug(level), "refreshing conn token");
let subkey = broker_client
.get_mizaru_subkey(level, epoch)
.await
.context("cannot get subkey")?;
tracing::debug!(epoch, subkey_len = subkey.len(), "got subkey");
let subkey: brs::PublicKey =
brs::PublicKey::from_der(&subkey).context("cannot decode subkey")?;
let (blind_token, secret) = token.blind(&subkey);
let conn_token = broker_client
.get_connect_token(auth_token.to_string(), level, epoch, blind_token)
.await
.context("cannot get connect token")?;
match conn_token {
Ok(res) => {
let u_sig = res
.unblind(&secret, token)
.context("cannot unblind response")?;
db_write(
ctx,
&format!("conn_token_{epoch}"),
&(token, u_sig).stdcode(),
)
.await?;
break;
}
Err(AuthError::WrongLevel) => {
tracing::debug!(epoch, level = debug(level), "switching to next level");
continue;
}
Err(e) => anyhow::bail!("cannot get token: {e}"),
}
}
}
}
anyhow::Ok(())
}
176 changes: 176 additions & 0 deletions binaries/geph5-client/src/client_inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use anyctx::AnyCtx;
use anyhow::Context;
use ed25519_dalek::VerifyingKey;
use futures_util::AsyncReadExt as _;
use geph5_misc_rpc::{
exit::{ClientCryptHello, ClientExitCryptPipe, ClientHello, ExitHello, ExitHelloInner},
read_prepend_length, write_prepend_length,
};
use nursery_macro::nursery;
use picomux::{LivenessConfig, PicoMux};
use sillad::{dialer::Dialer as _, EitherPipe, Pipe};
use smol::future::FutureExt as _;
use smol_timeout::TimeoutExt;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};

use stdcode::StdcodeSerializeExt;

use crate::{client::CtxField, route::get_dialer};

use super::Config;

pub async fn open_conn(ctx: &AnyCtx<Config>, dest_addr: &str) -> anyhow::Result<picomux::Stream> {
let (send, recv) = oneshot::channel();
let elem = (dest_addr.to_string(), send);
let _ = ctx.get(CONN_REQ_CHAN).0.send(elem).await;
Ok(recv.await?)
}

type ChanElem = (String, oneshot::Sender<picomux::Stream>);

static CONN_REQ_CHAN: CtxField<(
smol::channel::Sender<ChanElem>,
smol::channel::Receiver<ChanElem>,
)> = |_| smol::channel::unbounded();

static COUNTER: AtomicU64 = AtomicU64::new(0);

#[tracing::instrument(skip_all, fields(instance=COUNTER.fetch_add(1, Ordering::Relaxed)))]
pub async fn client_once(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
tracing::info!("(re)starting main logic");
let start = Instant::now();
let authed_pipe = async {
let (pubkey, raw_dialer) = get_dialer(&ctx).await?;
tracing::debug!(elapsed = debug(start.elapsed()), "raw dialer constructed");
let raw_pipe = raw_dialer.dial().await?;
tracing::debug!(
elapsed = debug(start.elapsed()),
protocol = raw_pipe.protocol(),
"dial completed"
);
let authed_pipe = client_auth(raw_pipe, pubkey).await?;
tracing::debug!(
elapsed = debug(start.elapsed()),
"authentication done, starting mux system"
);
anyhow::Ok(authed_pipe)
}
.timeout(Duration::from_secs(60))
.await
.context("overall dial/mux/auth timeout")??;
client_inner(ctx, authed_pipe).await
}

#[tracing::instrument(skip_all, fields(remote=display(authed_pipe.remote_addr().unwrap_or("(none)"))))]
async fn client_inner(ctx: AnyCtx<Config>, authed_pipe: impl Pipe) -> anyhow::Result<()> {
let (read, write) = authed_pipe.split();
let mut mux = PicoMux::new(read, write);
mux.set_liveness(LivenessConfig {
ping_interval: Duration::from_secs(600),
timeout: Duration::from_secs(10),
});
let mux = Arc::new(mux);

let (send_stop, mut recv_stop) = tachyonix::channel(1);
// run a socks5 loop
async {
let err: std::io::Error = recv_stop.recv().await?;
Err(err.into())
}
.race(async {
nursery!({
loop {
let mux = mux.clone();
let send_stop = send_stop.clone();
let ctx = ctx.clone();
let (remote_addr, send_back) = ctx.get(CONN_REQ_CHAN).1.recv().await?;
smol::future::yield_now().await;
spawn!(async move {
tracing::debug!(remote_addr = display(&remote_addr), "connecting to remote");
let stream = mux.open(remote_addr.as_bytes()).await;
match stream {
Ok(stream) => {
let _ = send_back.send(stream);
}
Err(err) => {
tracing::warn!(remote_addr = display(&remote_addr), err = debug(&err), "session is dead, hot-potatoing the connection request to somebody else");
let _ = send_stop.try_send(err);
let _ = ctx.get(CONN_REQ_CHAN).0.try_send((remote_addr, send_back));
}
}
anyhow::Ok(())
})
.detach();
}
})
})
.await
}

#[tracing::instrument(skip_all, fields(pubkey = hex::encode(pubkey.as_bytes())))]
async fn client_auth(mut pipe: impl Pipe, pubkey: VerifyingKey) -> anyhow::Result<impl Pipe> {
match pipe.shared_secret().map(|s| s.to_owned()) {
Some(ss) => {
tracing::debug!("using shared secret for authentication");
let challenge = rand::random();
let client_hello = ClientHello {
credentials: Default::default(), // no authentication support yet
crypt_hello: ClientCryptHello::SharedSecretChallenge(challenge),
};
write_prepend_length(&client_hello.stdcode(), &mut pipe).await?;

let mac = blake3::keyed_hash(&challenge, &ss);
let exit_response: ExitHello =
stdcode::deserialize(&read_prepend_length(&mut pipe).await?)?;
match exit_response.inner {
ExitHelloInner::SharedSecretResponse(response_mac) => {
if mac == response_mac {
tracing::debug!("authentication successful with shared secret");
Ok(EitherPipe::Left(pipe))
} else {
anyhow::bail!("authentication failed with shared secret");
}
}
_ => anyhow::bail!("unexpected response from server"),
}
}
None => {
tracing::debug!("requiring full authentication");
let my_esk = x25519_dalek::EphemeralSecret::random_from_rng(rand::thread_rng());
let client_hello = ClientHello {
credentials: Default::default(), // no authentication support yet
crypt_hello: ClientCryptHello::X25519((&my_esk).into()),
};
write_prepend_length(&client_hello.stdcode(), &mut pipe).await?;
let exit_hello: ExitHello =
stdcode::deserialize(&read_prepend_length(&mut pipe).await?)?;
// verify the exit hello
let signed_value = (&client_hello, &exit_hello.inner).stdcode();
pubkey
.verify_strict(&signed_value, &exit_hello.signature)
.context("exit hello failed validation")?;
match exit_hello.inner {
ExitHelloInner::Reject(reason) => {
anyhow::bail!("exit rejected our authentication attempt: {reason}")
}
ExitHelloInner::SharedSecretResponse(_) => anyhow::bail!(
"exit sent a shared-secret response to our full authentication request"
),
ExitHelloInner::X25519(their_epk) => {
let shared_secret = my_esk.diffie_hellman(&their_epk);
let read_key = blake3::derive_key("e2c", shared_secret.as_bytes());
let write_key = blake3::derive_key("c2e", shared_secret.as_bytes());
Ok(EitherPipe::Right(ClientExitCryptPipe::new(
pipe, read_key, write_key,
)))
}
}
}
}
}
67 changes: 67 additions & 0 deletions binaries/geph5-client/src/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use anyctx::AnyCtx;
use event_listener::Event;
use sqlx::Row;
use sqlx::{sqlite::SqliteConnectOptions, Pool, SqlitePool};
use std::str::FromStr;

use crate::client::{Config, CtxField};

static DATABASE: CtxField<SqlitePool> = |ctx| {
let db_path = ctx
.init()
.cache
.as_ref()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| ":memory:".into());
tracing::debug!("INITIALIZING DATABASE");
let options = SqliteConnectOptions::from_str(&db_path)
.unwrap()
.create_if_missing(true);

smol::future::block_on(async move {
let pool = Pool::connect_with(options).await.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS misc (
key TEXT PRIMARY KEY,
value BLOB NOT NULL
);",
)
.execute(&pool)
.await
.unwrap();

pool
})
};

static EVENT: CtxField<Event> = |_| Event::new();

pub async fn db_write(ctx: &AnyCtx<Config>, key: &str, value: &[u8]) -> Result<(), sqlx::Error> {
sqlx::query("INSERT INTO misc (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value")
.bind(key)
.bind(value)
.execute(ctx.get(DATABASE))
.await?;
ctx.get(EVENT).notify(usize::MAX);
Ok(())
}

pub async fn db_read(ctx: &AnyCtx<Config>, key: &str) -> Result<Option<Vec<u8>>, sqlx::Error> {
let result = sqlx::query("SELECT value FROM misc WHERE key = ?")
.bind(key)
.fetch_optional(ctx.get(DATABASE))
.await?
.map(|row| row.get("value"));
Ok(result)
}

pub async fn db_read_or_wait(ctx: &AnyCtx<Config>, key: &str) -> Result<Vec<u8>, sqlx::Error> {
loop {
let event = ctx.get(EVENT).listen();
let result = db_read(ctx, key).await?;
match result {
Some(val) => return Ok(val),
None => event.await,
}
}
}
Loading

0 comments on commit 0d16ee7

Please sign in to comment.