diff --git a/binaries/geph5-bridge/src/listen_forward.rs b/binaries/geph5-bridge/src/listen_forward.rs index 63804a3..034b3ce 100644 --- a/binaries/geph5-bridge/src/listen_forward.rs +++ b/binaries/geph5-bridge/src/listen_forward.rs @@ -160,7 +160,10 @@ async fn dial_pooled(b2e_dest: SocketAddr, metadata: &[u8]) -> anyhow::Result anyhow::Result, oneshot::Sender)>, - + live_count: Arc, _tasks: Vec>, } impl SinglePool { pub async fn create(dest: SocketAddr) -> anyhow::Result { let (send, recv) = async_channel::bounded(100); + let live_count = Arc::new(AtomicUsize::new(0)); let mut tasks = vec![]; for _ in 0..32 { let recv = recv.clone(); + let live_count = live_count.clone(); let task = smolscale::spawn(async move { loop { let conn = sillad::tcp::TcpDialer { dest_addr: dest }.dial().await; @@ -192,6 +199,10 @@ impl SinglePool { let (read, write) = conn.split(); let mux = PicoMux::new(read, write); let recv = recv.clone(); + live_count.fetch_add(1, Ordering::Relaxed); + scopeguard::defer!({ + live_count.fetch_sub(1, Ordering::Relaxed); + }); if let Err(err) = remote_once(recv.clone(), &mux).await { tracing::error!(dest = display(dest), "remote_once error: {}", err); } @@ -203,12 +214,15 @@ impl SinglePool { } Ok(Self { send, - + live_count, _tasks: tasks, }) } pub async fn connect(&self, metadata: &[u8]) -> anyhow::Result { + if self.live_count.load(Ordering::Relaxed) == 0 { + anyhow::bail!("no live workers") + } let (back, front) = oneshot::channel(); self.send .send((metadata.to_vec(), back))