Skip to content

Commit

Permalink
feat!: Updated OutgoingMessage to have multiple values for a payload.
Browse files Browse the repository at this point in the history
  • Loading branch information
halzy committed Mar 2, 2020
1 parent 1a46c6d commit 47d7f59
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 24 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "stream_multiplexer"
version = "0.6.0"
version = "0.7.0"
authors = ["Benjamin Halsted <[email protected]>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand All @@ -21,6 +21,7 @@ tokio = { version = "0.2.13", features = ["full"] }
tokio-util = { version = "0.2", features = ["codec"] }
tracing = { version = "0.1", features = ["log"] }
tracing-futures = "0.2"
tinyvec = { version = "0.3", features = ["alloc"] }

[dev-dependencies]
futures = { version = "0.3", default-features = false, features = ["alloc","std"] }
Expand Down
12 changes: 9 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ use send_all_own::*;
use sender::*;
use stream_mover::*;

use std::iter::FromIterator;

type StreamId = usize;

/// Produced by the incoming stream
Expand Down Expand Up @@ -227,12 +229,16 @@ impl<V> std::fmt::Debug for IncomingPacket<V> {
#[derive(Clone)]
pub struct OutgoingMessage<V> {
stream_ids: Vec<StreamId>,
value: V,
value: tinyvec::TinyVec<[Option<V>; 16]>,
}
impl<V> OutgoingMessage<V> {
/// Creates a new message that is to be delivered to streams with `ids`.
pub fn new(stream_ids: Vec<StreamId>, value: V) -> Self {
Self { stream_ids, value }
pub fn new(stream_ids: Vec<StreamId>, values: impl IntoIterator<Item = V>) -> Self {
let values = tinyvec::TinyVec::from_iter(values.into_iter().map(Some));
Self {
stream_ids,
value: values,
}
}
}

Expand Down
47 changes: 29 additions & 18 deletions src/multiplexer_senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,41 @@ where
(futures_len, sender_pairs_len)
}

fn handle_new_message(&mut self, message: OutgoingMessage<Item>) {
fn handle_new_message(&mut self, mut message: OutgoingMessage<Item>) {
for stream_id in message.stream_ids {
match self.sender_pairs.entry(stream_id) {
Entry::Vacant(_) => {
tracing::warn!(stream_id, "Tring to send message to non-existent stream.");
}
Entry::Occupied(mut sender_pair_entry) => {
let sender_pair = sender_pair_entry.get_mut();
match sender_pair.try_send(message.value.clone()) {
Ok(()) => {
// Enqueue the message and move the sender into the FuturesUnordered
tracing::trace!(stream_id, "Enqueued message.");
if let Some(sender) = sender_pair.take() {
self.senders_stream.push(sender.into_future());
let mut should_remove = false;
for value in message.value.drain(..).flatten() {
let sender_pair = sender_pair_entry.get_mut();
match sender_pair.try_send(value.clone()) {
Ok(()) => {
// Enqueue the message and move the sender into the FuturesUnordered
tracing::trace!(stream_id, "Enqueued message.");
if let Some(sender) = sender_pair.take() {
self.senders_stream.push(sender.into_future());
}
}
}
Err(TrySendError::Full(_)) => {
tracing::error!(stream_id, "Stream is full, shutting down sender.");
sender_pair_entry.remove_entry();
}
Err(TrySendError::Closed(_)) => {
tracing::error!(stream_id, "Stream is closed, shutting down sender.");
sender_pair_entry.remove_entry();
}
Err(TrySendError::Full(_)) => {
tracing::error!(stream_id, "Stream is full, shutting down sender.");
should_remove = true;
break;
}
Err(TrySendError::Closed(_)) => {
tracing::error!(
stream_id,
"Stream is closed, shutting down sender."
);
should_remove = true;
break;
}
};
}
if should_remove {
sender_pair_entry.remove_entry();
}
}
};
Expand Down Expand Up @@ -298,7 +309,7 @@ mod tests {

// Send some data to the stream
for x in 0_u8..10 {
let message = OutgoingMessage::new(vec![stream_id], x);
let message = OutgoingMessage::new(vec![stream_id], vec![x]);
message_channel.send(message).unwrap();
}

Expand Down
4 changes: 2 additions & 2 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async fn write_packets() {
// send a message
let data = Bytes::from("a message");
data_write
.send(OutgoingMessage::new(vec![client1_id], data.clone()).into())
.send(OutgoingMessage::new(vec![client1_id], vec![data.clone()]).into())
.unwrap();

let mut read_data = BytesMut::new();
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn change_channel() {
// send a message to the client (so that the client waits and we can change channels)
let data = Bytes::from("a message from the server");
data_write
.send(OutgoingMessage::new(vec![client1_id], data.clone()).into())
.send(OutgoingMessage::new(vec![client1_id], vec![data.clone()]).into())
.unwrap();

// client reads data
Expand Down

0 comments on commit 47d7f59

Please sign in to comment.