Skip to content

Commit

Permalink
Add async-channel dependency and integrate async_channel::Sender/Rece…
Browse files Browse the repository at this point in the history
…iver in PicoMux
  • Loading branch information
nullchinchilla committed Mar 18, 2024
1 parent 3e678fd commit 92d8067
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libraries/picomux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ serde_json = "1.0.114"
async-io = "2.3.1"
sillad={version="0.1", path="../sillad"}
futures-intrusive = "0.5.0"
async-channel = "2.2.0"

[dev-dependencies]
argh = "0.1"
Expand Down
8 changes: 4 additions & 4 deletions libraries/picomux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct PicoMux {
task: Shared<Task<Arc<std::io::Result<Infallible>>>>,
send_open_req: Sender<(Bytes, oneshot::Sender<Stream>)>,
last_forced_ping: Mutex<Instant>,
recv_accepted: Receiver<Stream>,
recv_accepted: async_channel::Receiver<Stream>,
send_liveness: Sender<LivenessConfig>,
liveness: LivenessConfig,
}
Expand All @@ -73,7 +73,7 @@ impl PicoMux {
write: impl AsyncWrite + Send + Unpin + 'static,
) -> Self {
let (send_open_req, recv_open_req) = tachyonix::channel(1);
let (send_accepted, recv_accepted) = tachyonix::channel(10000);
let (send_accepted, recv_accepted) = async_channel::bounded(10000);
let (send_liveness, recv_liveness) = tachyonix::channel(1000);
let liveness = LivenessConfig::default();
send_liveness.try_send(liveness).unwrap();
Expand Down Expand Up @@ -103,7 +103,7 @@ impl PicoMux {
}

/// Accepts a new stream from the peer.
pub async fn accept(&mut self) -> std::io::Result<Stream> {
pub async fn accept(&self) -> std::io::Result<Stream> {
let err = self.wait_error();
async {
if let Ok(val) = self.recv_accepted.recv().await {
Expand Down Expand Up @@ -161,7 +161,7 @@ static MUX_ID_CTR: AtomicU64 = AtomicU64::new(0);
async fn picomux_inner(
read: impl AsyncRead + 'static + Send + Unpin,
mut write: impl AsyncWrite + Send + Unpin + 'static,
send_accepted: Sender<Stream>,
send_accepted: async_channel::Sender<Stream>,
mut recv_open_req: Receiver<(Bytes, oneshot::Sender<Stream>)>,
mut recv_liveness: Receiver<LivenessConfig>,
) -> Result<Infallible, std::io::Error> {
Expand Down

0 comments on commit 92d8067

Please sign in to comment.