Skip to content

Commit

Permalink
Add timeout and live count handling to SinglePool creation and connec…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
nullchinchilla committed Jan 4, 2025
1 parent c1f6ec7 commit a29c745
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions binaries/geph5-bridge/src/listen_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,38 +160,49 @@ async fn dial_pooled(b2e_dest: SocketAddr, metadata: &[u8]) -> anyhow::Result<pi
});
let pool = POOLS
.try_get_with(b2e_dest, async {
let pool = SinglePool::create(b2e_dest).await?;
let pool = SinglePool::create(b2e_dest)
.timeout(Duration::from_secs(1))
.await
.context("timed out creating pool")??;
tracing::info!(b2e_dest = display(b2e_dest), "**** created a NEW pool ****");
anyhow::Ok(Arc::new(pool))
})
.await
.map_err(|e| anyhow::anyhow!(e))?;
let stream = pool
.connect(metadata)
.timeout(Duration::from_secs(1))
.await
.context("timeout connecting through pool")?
.context(format!("cannot open through mux, b2e_dest={b2e_dest}"))?;
Ok(stream)
}

struct SinglePool {
send: Sender<(Vec<u8>, oneshot::Sender<Stream>)>,

live_count: Arc<AtomicUsize>,
_tasks: Vec<smol::Task<()>>,
}

impl SinglePool {
pub async fn create(dest: SocketAddr) -> anyhow::Result<Self> {
let (send, recv) = async_channel::bounded(100);
let live_count = Arc::new(AtomicUsize::new(0));
let mut tasks = vec![];
for _ in 0..32 {
let recv = recv.clone();
let live_count = live_count.clone();
let task = smolscale::spawn(async move {
loop {
let conn = sillad::tcp::TcpDialer { dest_addr: dest }.dial().await;
if let Ok(conn) = conn {
let (read, write) = conn.split();
let mux = PicoMux::new(read, write);
let recv = recv.clone();
live_count.fetch_add(1, Ordering::Relaxed);
scopeguard::defer!({
live_count.fetch_sub(1, Ordering::Relaxed);
});
if let Err(err) = remote_once(recv.clone(), &mux).await {
tracing::error!(dest = display(dest), "remote_once error: {}", err);
}
Expand All @@ -203,12 +214,15 @@ impl SinglePool {
}
Ok(Self {
send,

live_count,
_tasks: tasks,
})
}

pub async fn connect(&self, metadata: &[u8]) -> anyhow::Result<Stream> {
if self.live_count.load(Ordering::Relaxed) == 0 {
anyhow::bail!("no live workers")
}
let (back, front) = oneshot::channel();
self.send
.send((metadata.to_vec(), back))
Expand Down

0 comments on commit a29c745

Please sign in to comment.