From 92d8067e03cc0814cada52d21556ae281e68a05c Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Mon, 18 Mar 2024 15:35:38 -0400 Subject: [PATCH] Add async-channel dependency and integrate async_channel::Sender/Receiver in PicoMux --- Cargo.lock | 1 + libraries/picomux/Cargo.toml | 1 + libraries/picomux/src/lib.rs | 8 ++++---- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 82bc144f..d11b7f5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2527,6 +2527,7 @@ dependencies = [ "ahash", "anyhow", "argh", + "async-channel 2.2.0", "async-dup", "async-event", "async-io 2.3.2", diff --git a/libraries/picomux/Cargo.toml b/libraries/picomux/Cargo.toml index 103c29fa..dbfd0f73 100644 --- a/libraries/picomux/Cargo.toml +++ b/libraries/picomux/Cargo.toml @@ -39,6 +39,7 @@ serde_json = "1.0.114" async-io = "2.3.1" sillad={version="0.1", path="../sillad"} futures-intrusive = "0.5.0" +async-channel = "2.2.0" [dev-dependencies] argh = "0.1" diff --git a/libraries/picomux/src/lib.rs b/libraries/picomux/src/lib.rs index 160a464c..63670e6c 100644 --- a/libraries/picomux/src/lib.rs +++ b/libraries/picomux/src/lib.rs @@ -61,7 +61,7 @@ pub struct PicoMux { task: Shared>>>, send_open_req: Sender<(Bytes, oneshot::Sender)>, last_forced_ping: Mutex, - recv_accepted: Receiver, + recv_accepted: async_channel::Receiver, send_liveness: Sender, liveness: LivenessConfig, } @@ -73,7 +73,7 @@ impl PicoMux { write: impl AsyncWrite + Send + Unpin + 'static, ) -> Self { let (send_open_req, recv_open_req) = tachyonix::channel(1); - let (send_accepted, recv_accepted) = tachyonix::channel(10000); + let (send_accepted, recv_accepted) = async_channel::bounded(10000); let (send_liveness, recv_liveness) = tachyonix::channel(1000); let liveness = LivenessConfig::default(); send_liveness.try_send(liveness).unwrap(); @@ -103,7 +103,7 @@ impl PicoMux { } /// Accepts a new stream from the peer. - pub async fn accept(&mut self) -> std::io::Result { + pub async fn accept(&self) -> std::io::Result { let err = self.wait_error(); async { if let Ok(val) = self.recv_accepted.recv().await { @@ -161,7 +161,7 @@ static MUX_ID_CTR: AtomicU64 = AtomicU64::new(0); async fn picomux_inner( read: impl AsyncRead + 'static + Send + Unpin, mut write: impl AsyncWrite + Send + Unpin + 'static, - send_accepted: Sender, + send_accepted: async_channel::Sender, mut recv_open_req: Receiver<(Bytes, oneshot::Sender)>, mut recv_liveness: Receiver, ) -> Result {