Skip to content

Commit

Permalink
batched sender backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 14, 2024
1 parent 4d5a8b5 commit 7b44ed3
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 68 deletions.
1 change: 1 addition & 0 deletions broker/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ macro_rules! send_or_remove_many {
if connection
.1
.queue_message($message.clone(), $position)
.await
.is_err()
{
// If it fails, remove the connection.
Expand Down
16 changes: 8 additions & 8 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Client<Scheme, ProtocolTyp
///
/// # Errors
/// If the connection or serialization has failed
pub fn send_broadcast_message(&self, topics: Vec<Topic>, message: Vec<u8>) -> Result<()> {
pub async fn send_broadcast_message(&self, topics: Vec<Topic>, message: Vec<u8>) -> Result<()> {
// Form and send the single message
self.send_message(&Message::Broadcast(Broadcast { topics, message }))
self.send_message(&Message::Broadcast(Broadcast { topics, message })).await
}

/// Sends a pre-serialized message to the server, denoting interest in delivery
/// to a single recipient.
///
/// # Errors
/// If the connection or serialization has failed
pub fn send_direct_message(
pub async fn send_direct_message(
&self,
recipient: &Scheme::PublicKey,
message: Vec<u8>,
Expand All @@ -85,7 +85,7 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Client<Scheme, ProtocolTyp
self.send_message(&Message::Direct(Direct {
recipient: recipient_bytes,
message,
}))
})).await
}

/// Sends a message to the server that asserts that this client is interested in
Expand All @@ -105,7 +105,7 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Client<Scheme, ProtocolTyp

// Send the topics
bail!(
self.send_message(&Message::Subscribe(topics_to_send.clone())),
self.send_message(&Message::Subscribe(topics_to_send.clone())).await,
Connection,
"failed to send subscription message"
);
Expand Down Expand Up @@ -138,7 +138,7 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Client<Scheme, ProtocolTyp

// Send the topics
bail!(
self.send_message(&Message::Unsubscribe(topics_to_send.clone())),
self.send_message(&Message::Unsubscribe(topics_to_send.clone())).await,
Connection,
"failed to send unsubscription message"
);
Expand All @@ -159,7 +159,7 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Client<Scheme, ProtocolTyp
///
/// # Errors
/// - if the downstream message sending fails.
pub fn send_message(&self, message: &Message) -> Result<()> {
self.0.send_message(message)
pub async fn send_message(&self, message: &Message) -> Result<()> {
self.0.send_message(message).await
}
}
24 changes: 10 additions & 14 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! We spawn two clients. In a single-broker run, this lets them connect
//! cross-broker.
use std::time::{Duration, Instant};
use std::time::Instant;

use clap::Parser;
use client::{Client, ConfigBuilder, KeyPair};
Expand All @@ -16,8 +16,6 @@ use proto::{
use jf_primitives::signatures::{
bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme,
};
use tokio::time::sleep;
use tracing::info;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -58,28 +56,26 @@ async fn main() -> Result<()> {
// We want the first node to send to the second
if args.id != 0 {
// Generate two random keypairs, one for each client
let (_, other_public_key) = BLS::key_gen(&(), &mut DeterministicRng(args.id)).unwrap();
let (_, other_public_key) = BLS::key_gen(&(), &mut DeterministicRng(0)).unwrap();

loop {
let now = Instant::now();
for _ in 0..250000 {
// Create a big 512MB message
let m = vec![0u8; 256_000_000];
let m = vec![0u8; 1000];

let now = Instant::now();

if let Err(err) = client.send_direct_message(&other_public_key, m) {
if let Err(err) = client.send_direct_message(&other_public_key, m).await {
tracing::error!("failed to send message: {}", err);
};

info!("in {:?}", now.elapsed());

sleep(Duration::from_secs(1)).await;
}
println!("{:?}", now.elapsed());
} else {
loop {
for _ in 0..250000 {
if let Err(err) = client.receive_message().await {
tracing::error!("failed to receive message: {}", err);
continue;
};
}
}

Ok(())
}
13 changes: 11 additions & 2 deletions client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Retry<Scheme, ProtocolType
/// If the message sending fails. For example:
/// - If we are reconnecting
/// - If we are disconnected
pub fn send_message(&self, message: &Message) -> Result<()> {
pub async fn send_message(&self, message: &Message) -> Result<()> {
// Serialize the message
let message = bail!(
message.serialize(),
Expand All @@ -216,7 +216,9 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Retry<Scheme, ProtocolType
// Try to acquire the read lock. If we can't, we are reconnecting.
if let Ok(send_lock) = self.inner.sender.try_read() {
// Continue if we were able to acquire the lock
let out = send_lock.queue_message(Arc::from(message), Position::Back);
let out = send_lock
.queue_message(Arc::from(message), Position::Back)
.await;
Ok(try_with_reconnect!(self, send_lock, out))
} else {
// Return an error if we're reconnecting
Expand Down Expand Up @@ -245,6 +247,13 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol> Retry<Scheme, ProtocolType
}
}

/// Connect and authenticate to the marshal and then broker at the given endpoint
/// and with the given keypair.
///
/// Subscribe to the topics laid out herein.
///
/// # Errors
/// If we failed to connect or authenticate to the marshal or broker.
async fn connect_and_authenticate<Scheme: SignatureScheme, ProtocolType: Protocol>(
marshal_endpoint: &str,
keypair: &KeyPair<Scheme>,
Expand Down
10 changes: 5 additions & 5 deletions process-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@ processes:
command: echo 'requirepass changeme!' | keydb-server - --save "" --appendonly no

marshal_0:
command: cargo run --package marshal --features insecure --no-default-features -- -d "redis://:[email protected]:6379"
command: cargo run --package marshal --release --features insecure --no-default-features -- -d "redis://:[email protected]:6379"
depends_on:
redis:
condition: process_started

broker_0:
command: cargo run --package broker --features insecure --no-default-features -- --public-advertise-address 127.0.0.1:8083 --private-bind-port 8082 --public-bind-port 8083 -d "redis://:[email protected]:6379"
command: cargo run --package broker --release --features insecure --no-default-features -- --public-advertise-address 127.0.0.1:8083 --private-bind-port 8082 --public-bind-port 8083 -d "redis://:[email protected]:6379"
depends_on:
redis:
condition: process_started

broker_1:
command: cargo run --package broker --features insecure --no-default-features -- --metrics-port=9091 --public-advertise-address 127.0.0.1:8085 --private-bind-port 8084 --public-bind-port 8085 -d "redis://:[email protected]:6379"
command: cargo run --package broker --release --features insecure --no-default-features -- --metrics-port=9091 --public-advertise-address 127.0.0.1:8085 --private-bind-port 8084 --public-bind-port 8085 -d "redis://:[email protected]:6379"
depends_on:
redis:
condition: process_started

client_0:
command: cargo run --bin client --features insecure --features runtime-tokio --no-default-features -- --id 0
command: cargo run --bin client --release --features insecure --features runtime-tokio --no-default-features -- --id 0
depends_on:
marshal_0:
condition: process_started

client_1:
command: cargo run --bin client --features insecure --features runtime-tokio --no-default-features -- --id 1
command: cargo run --bin client --release --features insecure --features runtime-tokio --no-default-features -- --id 1
depends_on:
marshal_0:
condition: process_started
Expand Down
76 changes: 37 additions & 39 deletions proto/src/connection/batch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! This crate defines a batching system for sending messages, wherein


//! This crate defines a batching system for sending messages, wherein
//! we spawn a task that owns the sender and have a handle to a channel it's
//! listening on.
//!
Expand All @@ -7,8 +9,9 @@
use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration};

use tokio::{
select, spawn,
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
spawn,
sync::mpsc::{channel, Receiver as BoundedReceiver, Sender as BoundedSender},
time::timeout,
};
use tracing::error;

Expand Down Expand Up @@ -55,7 +58,7 @@ enum Control {
/// to queue messages for sending with a minimum time or size. Is clonable through an `Arc`.
pub struct BatchedSender<ProtocolType: Protocol> {
/// The underlying channel that we receive messages over.
channel: UnboundedSender<QueueMessage>,
channel: BoundedSender<QueueMessage>,
/// The `PhantomData` we need to use a generic protocol type.
pd: PhantomData<ProtocolType>,
}
Expand Down Expand Up @@ -99,10 +102,10 @@ impl<ProtocolType: Protocol> BatchedSender<ProtocolType> {
///
/// # Errors
/// - If the send-side is closed.
pub fn freeze(&self) -> Result<()> {
pub async fn freeze(&self) -> Result<()> {
// Send a control message to freeze the queue
bail!(
self.channel.send(QueueMessage::Control(Control::Freeze)),
self.channel.send(QueueMessage::Control(Control::Freeze)).await,
Connection,
"connection closed"
);
Expand All @@ -114,10 +117,10 @@ impl<ProtocolType: Protocol> BatchedSender<ProtocolType> {
///
/// # Errors
/// - If the send-side is closed.
pub fn unfreeze(&self) -> Result<()> {
pub async fn unfreeze(&self) -> Result<()> {
// Send a control message to unfreeze the queue
bail!(
self.channel.send(QueueMessage::Control(Control::Unfreeze)),
self.channel.send(QueueMessage::Control(Control::Unfreeze)).await,
Connection,
"connection closed"
);
Expand All @@ -129,10 +132,10 @@ impl<ProtocolType: Protocol> BatchedSender<ProtocolType> {
///
/// # Errors
/// - If the send-side is closed.
pub fn queue_message(&self, message: Arc<Vec<u8>>, position: Position) -> Result<()> {
pub async fn queue_message(&self, message: Arc<Vec<u8>>, position: Position) -> Result<()> {
// Send a data message
bail!(
self.channel.send(QueueMessage::Data(message, position)),
self.channel.send(QueueMessage::Data(message, position)).await,
Connection,
"connection closed"
);
Expand All @@ -148,7 +151,7 @@ impl<ProtocolType: Protocol> BatchedSender<ProtocolType> {
max_size_in_bytes: u64,
) -> Self {
// Create the send and receive sides of a channel.
let (send_side, receive_side) = unbounded_channel();
let (send_side, receive_side) = channel(50);

// Create a new queue from our parameters and defaults
let batch_params = Queue {
Expand All @@ -174,32 +177,29 @@ impl<ProtocolType: Protocol> BatchedSender<ProtocolType> {
/// data and control messages.
async fn batch_loop(
mut sender: ProtocolType::Sender,
mut receiver: UnboundedReceiver<QueueMessage>,
mut receiver: BoundedReceiver<QueueMessage>,
mut queue: Queue,
) {
// Create a timer that ticks every max interval. We reset it if we actually send a message.
let mut timer = tokio::time::interval(queue.max_duration);

loop {
// Select on either a new message or a timer event
select! {
// Receive a message. Will return `None` if the send side is closed.
possible_message = receiver.recv() => {
let Some(message) = possible_message else {
let possible_message = timeout(queue.max_duration, receiver.recv()).await;

if let Ok(message) = possible_message {
// We didn't time out
let Some(message) = message else {
// If the send-side is closed, drop everything and stop.
return
return;
};

// See what type of message we have
match message{
match message {
// A data message. This is a message that we actually want to add
// to the queue.
QueueMessage::Data(data, position) => {
// Get the current length of data.
let data_len = data.len();

// See in which position we wanted to add to the queue
match position{
match position {
Position::Front => {
queue.inner.push_front(data);
}
Expand All @@ -212,12 +212,12 @@ impl<ProtocolType: Protocol> BatchedSender<ProtocolType> {
queue.current_size += data_len as u64;

// If we're frozen, don't continue to any sending logic
if queue.frozen{
continue
if queue.frozen {
continue;
}

// Bounds check to see if we should send
if queue.current_size >= queue.max_size_in_bytes{
if queue.current_size >= queue.max_size_in_bytes {
// Flush the queue, sending all in-flight messages.
flush_queue!(queue, sender);
}
Expand All @@ -226,30 +226,28 @@ impl<ProtocolType: Protocol> BatchedSender<ProtocolType> {
// We got a control message; a message we don't actually want to send.
QueueMessage::Control(control) => {
match control {
Control::Freeze => {queue.frozen = true}
Control::Unfreeze => {queue.frozen = false}
Control::Freeze => queue.frozen = true,
Control::Unfreeze => queue.frozen = false,
// Return if we see a shutdown message
Control::Shutdown => return
Control::Shutdown => return,
}
}
}
}
// We hit this when the timer expires without having sent a message.
_ = timer.tick() => {
// Don't do anything if the queue is currently frozen
if queue.frozen{
continue
} else {
// We timed out
if queue.frozen {
continue;
}

// Flush the queue, sending all in-flight messages.
flush_queue!(queue, sender);
}
}

// Reset the timer when we are done sending the message.
timer.reset();
}
}




}

// When we drop, we want to send the shutdown message to the sender.
Expand Down

0 comments on commit 7b44ed3

Please sign in to comment.