Skip to content

Commit

Permalink
implement sending batches of messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 6, 2024
1 parent 749d63f commit e7080d5
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 45 deletions.
3 changes: 1 addition & 2 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,6 @@ where
verify_broker!(connection, inner);
authenticate_with_broker!(connection, inner)
};


}

/// This function handles a user (public) connection. We take the following steps:
Expand Down Expand Up @@ -310,6 +308,7 @@ where
Ok(brokers) => {
// Calculate the difference, spawn tasks to connect to them
for broker in brokers.difference(&inner.brokers_connected.read()) {
// TODO: make this into a separate function
// Extrapolate the address to connect to
let to_connect_address = broker.broker_advertise_address.clone();

Expand Down
34 changes: 0 additions & 34 deletions broker/src/state/mod.rs

This file was deleted.

Empty file removed broker/src/state/user.rs
Empty file.
4 changes: 2 additions & 2 deletions client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,11 @@ where
/// - If the message sending failed
pub async fn send_message(&self, message: Message) -> Result<()> {
// Serialize the message
let message = bail!(
let message = Arc::from(bail!(
message.serialize(),
Serialize,
"failed to serialize message"
);
));

// Try to send the message, reconnecting if needed
Ok(try_with_reconnect!(self, send_message_raw, message,))
Expand Down
11 changes: 9 additions & 2 deletions proto/src/connection/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! This module defines connections, listeners, and their implementations.
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use async_trait::async_trait;

Expand Down Expand Up @@ -36,7 +36,14 @@ pub trait Connection: Send + Sync + 'static {
///
/// # Errors
/// - If we fail to deliver the message. This usually means a connection problem.
async fn send_message_raw(&self, message: Vec<u8>) -> Result<()>;
async fn send_message_raw(&self, message: Arc<Vec<u8>>) -> Result<()>;

/// Send a vector of pre-formed messages over the connection.
///
/// # Errors
/// - If we fail to deliver any of the messages. This usually means a connection problem.
async fn send_messages_raw(&self, messages: Vec<Arc<Vec<u8>>>) -> Result<()>;


/// Connect to a remote address, returning an instance of `Self`.
///
Expand Down
26 changes: 23 additions & 3 deletions proto/src/connection/protocols/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
MAX_MESSAGE_SIZE,
};
use core::hash::Hash;
use std::net::ToSocketAddrs;
use std::{net::ToSocketAddrs, sync::Arc};

use super::{Connection, Listener, Protocol};

Expand Down Expand Up @@ -114,14 +114,14 @@ impl Connection for QuicConnection {
);

// Send the message
self.send_message_raw(message_bytes).await
self.send_message_raw(Arc::from(message_bytes)).await
}

/// Send a pre-formed message over the connection.
///
/// # Errors
/// - If we fail to deliver the message. This usually means a connection problem.
async fn send_message_raw(&self, message: Vec<u8>) -> Result<()> {
async fn send_message_raw(&self, message: Arc<Vec<u8>>) -> Result<()> {
// Open the outgoing unidirectional stream
let mut stream = bail!(
self.0.open_uni().await,
Expand All @@ -145,6 +145,26 @@ impl Connection for QuicConnection {
))
}

/// Send a vector of pre-formed message over the connection.
///
/// TODO: FIGURE OUT IF WE WANT TO FRAME LIKE THIS. it may be more performant with batching
/// to not do it this way.
///
/// # Errors
/// - If we fail to deliver any of the messages. This usually means a connection problem.
async fn send_messages_raw(&self, messages: Vec<Arc<Vec<u8>>>) -> Result<()> {
// Send each message over the connection
for message in messages {
bail!(
self.send_message_raw(message).await,
Connection,
"failed to send message"
);
}

Ok(())
}

/// Connect to a remote endpoint, returning an instance of `Self`. With QUIC,
/// this requires creating an endpoint, binding to it, and then attempting
/// a connection.
Expand Down
34 changes: 32 additions & 2 deletions proto/src/connection/protocols/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ impl Connection for TcpConnection {
);

// Send the serialized message
self.send_message_raw(serialized_message).await
self.send_message_raw(Arc::from(serialized_message)).await
}

/// Send a pre-formed message over the connection.
///
/// # Errors
/// - If we fail to deliver the message. This usually means a connection problem.
async fn send_message_raw(&self, message: Vec<u8>) -> Result<()> {
async fn send_message_raw(&self, message: Arc<Vec<u8>>) -> Result<()> {
// Lock the stream so we don't send message/message sizes interleaved
let mut sender_guard = self.sender.lock().await;

Expand All @@ -130,6 +130,36 @@ impl Connection for TcpConnection {
Ok(())
}

/// Send a vector pre-formed messages over the connection.
///
/// # Errors
/// - If we fail to deliver the message. This usually means a connection problem.
async fn send_messages_raw(&self, messages: Vec<Arc<Vec<u8>>>) -> Result<()> {
// Lock the stream so we don't send message/message sizes interleaved
let mut sender_guard = self.sender.lock().await;

// For each message:
for message in messages {
// Write the message size to the stream
bail!(
sender_guard.write_u64(message.len() as u64).await,
Connection,
"failed to send message size"
);

// Write the message to the stream
bail!(
sender_guard.write_all(&message).await,
Connection,
"failed to send message"
);
}

drop(sender_guard);

Ok(())
}

/// Connect to a remote endpoint, returning an instance of `Self`.
/// With TCP, this requires just connecting to the remote endpoint.
///
Expand Down

0 comments on commit e7080d5

Please sign in to comment.