Skip to content

Commit

Permalink
Refactor bridge descriptor passing and improve async handling in brok…
Browse files Browse the repository at this point in the history
…er; add new Fronted broker source; update dependencies
  • Loading branch information
nullchinchilla committed Mar 28, 2024
1 parent 6892373 commit d519f9e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion binaries/geph5-broker/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
};

pub async fn bridge_to_leaf_route(
bridge: &BridgeDescriptor,
bridge: BridgeDescriptor,
exit_b2e: SocketAddr,
) -> anyhow::Result<RouteDescriptor> {
static CACHE: Lazy<Cache<(SocketAddr, SocketAddr), RouteDescriptor>> = Lazy::new(|| {
Expand Down
21 changes: 12 additions & 9 deletions binaries/geph5-broker/src/rpc_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::{net::SocketAddr, ops::Deref, sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use ed25519_dalek::{VerifyingKey};
use ed25519_dalek::VerifyingKey;
use futures_util::future::join_all;
use geph5_broker_protocol::{
AccountLevel, AuthError, BridgeDescriptor, BrokerProtocol, Credential, ExitDescriptor,
ExitList, GenericError, Mac, RouteDescriptor, Signed, DOMAIN_EXIT_DESCRIPTOR,
Expand All @@ -12,7 +13,6 @@ use mizaru2::{BlindedClientToken, BlindedSignature, ClientToken, UnblindedSignat
use moka::future::Cache;
use once_cell::sync::Lazy;


use crate::{
auth::{new_auth_token, valid_auth_token, validate_username_pwd},
database::{insert_exit, query_bridges, ExitRow, POSTGRES},
Expand Down Expand Up @@ -143,18 +143,21 @@ impl BrokerProtocol for BrokerImpl {

let raw_descriptors = query_bridges(&format!("{:?}", token)).await?;
let mut routes = vec![];
for desc in raw_descriptors {
match bridge_to_leaf_route(&desc, exit).await {
for route in join_all(
raw_descriptors
.into_iter()
.map(|desc| bridge_to_leaf_route(desc, exit)),
)
.await
{
match route {
Ok(route) => routes.push(route),
Err(err) => {
tracing::warn!(
err = debug(err),
bridge = debug(desc),
"could not communicate"
)
tracing::warn!(err = debug(err), "could not communicate")
}
}
}

Ok(RouteDescriptor::Race(routes))
}

Expand Down
1 change: 1 addition & 0 deletions binaries/geph5-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ hex = "0.4.3"
nursery_macro = "0.1.0"
event-listener = "5.2.0"
blind-rsa-signatures = "0.15.0"
tap = "1.0.1"
27 changes: 24 additions & 3 deletions binaries/geph5-client/src/broker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::net::SocketAddr;

use anyctx::AnyCtx;
use anyhow::Context;
use async_trait::async_trait;
Expand All @@ -8,13 +6,16 @@ use nanorpc::{DynRpcTransport, JrpcRequest, JrpcResponse, RpcTransport};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use sillad::tcp::TcpDialer;
use std::net::SocketAddr;
use tap::Pipe;

use crate::client::{Config, CtxField};

#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum BrokerSource {
Direct(String),
Fronted { front: String, host: String },
DirectTcp(SocketAddr),
}

Expand All @@ -24,13 +25,19 @@ impl BrokerSource {
match self {
BrokerSource::Direct(s) => DynRpcTransport::new(HttpRpcTransport {
url: s.clone(),
host: None,
client: reqwest::Client::new(),
}),
BrokerSource::DirectTcp(dest_addr) => {
DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
dest_addr: *dest_addr,
}))
}
BrokerSource::Fronted { front, host } => DynRpcTransport::new(HttpRpcTransport {
url: front.clone(),
host: Some(host.clone()),
client: reqwest::Client::new(),
}),
}
}
}
Expand All @@ -50,6 +57,7 @@ static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {

struct HttpRpcTransport {
url: String,
host: Option<String>,
client: reqwest::Client,
}

Expand All @@ -62,9 +70,22 @@ impl RpcTransport for HttpRpcTransport {
.client
.request(Method::POST, &self.url)
.header("content-type", "application/json")
.pipe(|s| {
if let Some(host) = self.host.as_ref() {
s.header("host", host)
} else {
s
}
})
.body(serde_json::to_vec(&req).unwrap())
.send()
.await?;
Ok(serde_json::from_slice(&resp.bytes().await?)?)
let resp = resp.bytes().await?;
tracing::trace!(
req = serde_json::to_string(&req).unwrap(),
resp = debug(&resp),
"response got"
);
Ok(serde_json::from_slice(&resp)?)
}
}

0 comments on commit d519f9e

Please sign in to comment.