Skip to content

Commit

Permalink
improve quic perf 40% by using owned tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 25, 2024
1 parent d1d5e5f commit a61f251
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 62 deletions.
9 changes: 8 additions & 1 deletion broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ use proto::{
error::{Error, Result},
parse_socket_address, DiscoveryClientType,
};
use tokio::{select, spawn};
use tokio::{select, spawn, sync::Semaphore};
use tracing::info;

use crate::metrics::RUNNING_SINCE;

/// The maximum number of in-flight bytes.
/// 5GB right now
const MAX_QUEUE_SIZE: usize = 1024 * 1024 * 1024 * 5;

/// Create a permit that accounts for each byte
static IN_FLIGHT_BYTES: Semaphore = Semaphore::const_new(MAX_QUEUE_SIZE);

/// The broker's configuration. We need this when we create a new one.
#[derive(Builder)]
pub struct Config<BrokerScheme: SignatureScheme> {
Expand Down
169 changes: 109 additions & 60 deletions proto/src/connection/protocols/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
//! logic.
use async_trait::async_trait;
use quinn::{ClientConfig, Connecting, Endpoint, ServerConfig, VarInt};
use kanal::{unbounded_async, AsyncReceiver, AsyncSender};
use quinn::{ClientConfig, Connecting, Endpoint, ServerConfig};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::spawn;

#[cfg(feature = "metrics")]
use crate::connection::metrics;

#[cfg(feature = "insecure")]
use crate::crypto::tls::SkipServerVerification;
Expand All @@ -13,18 +19,16 @@ use crate::{
crypto,
error::{Error, Result},
message::Message,
parse_socket_address, MAX_MESSAGE_SIZE,
parse_socket_address, read_length_delimited, write_length_delimited, MAX_MESSAGE_SIZE,
};
use std::{
net::{SocketAddr, ToSocketAddrs},
result::Result as StdResult,
sync::Arc,
};

use super::{Listener, Protocol, Receiver, Sender, UnfinalizedConnection};

#[cfg(feature = "metrics")]
use crate::connection::metrics::{BYTES_RECV, BYTES_SENT};

/// The `Quic` protocol. We use this to define commonalities between QUIC
/// listeners, connections, etc.
#[derive(Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -94,10 +98,25 @@ impl Protocol for Quic {
"failed quic connect to remote address"
);

Ok((
QuicSender(Arc::from(QuicSenderRef(connection.clone()))),
QuicReceiver(Arc::from(QuicReceiverRef(connection))),
))
// Open an outgoing bidirectional stream
let (mut sender, receiver) = bail!(
connection.open_bi().await,
Connection,
"failed to accept bidirectional stream"
);

// Write a `u8` to bootstrap the connection (make the sender aware of our
// outbound stream request)
bail!(
sender.write_u8(0).await,
Connection,
"failed to bootstrap connection"
);

// Convert to owned channel implementation
let (sender, receiver) = into_channels(sender, receiver);

Ok((sender, receiver))
}

/// Binds to a local endpoint. Uses `maybe_tls_cert_path` and `maybe_tls_cert_key`
Expand Down Expand Up @@ -146,56 +165,86 @@ impl Protocol for Quic {
pub struct QuicSender(Arc<QuicSenderRef>);

#[derive(Clone)]
pub struct QuicSenderRef(quinn::Connection);
pub struct QuicSenderRef(AsyncSender<Bytes>);

/// Convert a Quic `SendStream` and `RecvStream` to use dedicated tasks and channels
/// under the hood.
/// TODO: this is almost the same as with TCP, figure out how to combine them
fn into_channels(
mut write_half: quinn::SendStream,
mut read_half: quinn::RecvStream,
) -> (QuicSender, QuicReceiver) {
// Create a channel for sending messages to the task
let (send_to_task, receive_as_task) = unbounded_async();

// Create a channel for receiving messages from the task
let (send_as_task, receive_from_task) = unbounded_async();

// Start the sending task
spawn(async move {
loop {
// Receive a message from our code
let Ok(message): StdResult<Bytes, _> = receive_as_task.recv().await else {
// If the channel is closed, stop.
return;
};

// Send a message over the real connection
write_length_delimited!(write_half, message);
}
})
.abort_handle();

// Start the receiving task
spawn(async move {
loop {
// Receive a message from the real connection
let message = Bytes::from(read_length_delimited!(read_half));

// Send a message to our code
if send_as_task.send(message).await.is_err() {
// If the channel is closed, stop
return;
};
}
})
.abort_handle();

(
QuicSender(Arc::from(QuicSenderRef(send_to_task))),
QuicReceiver(Arc::from(QuicReceiverRef(receive_from_task))),
)
}

#[async_trait]
impl Sender for QuicSender {
/// Send an (unserialized) message over the stream.
/// Send an unserialized message over the stream.
///
/// # Errors
/// If we fail to send or serialize the message
async fn send_message(&self, message: Message) -> Result<()> {
// Serialize the message
// Serialize our message
let raw_message = Bytes::from(bail!(
message.serialize(),
Serialize,
"failed to serialize message"
));

// Send the now-raw message
// Send the message in its raw form
self.send_message_raw(raw_message).await
}

/// Send a pre-serialized message over the connection.
///
/// # Errors
/// - If we fail to deliver the message. This usually means a connection problem.
async fn send_message_raw(&self, raw_message: Bytes) -> Result<()> {
// Open an outgoing `SendStream`
let mut send_stream = bail!(
self.0 .0.open_uni().await,
Connection,
"failed to open outgoing stream"
);

// Send the serialized message over the stream
// Send the message over our channel
bail!(
send_stream.write_all(&raw_message).await,
self.0 .0.send(raw_message).await,
Connection,
"failed to write message to stream"
"failed to send message: connection closed"
);

// Close the stream, indiciating all data has been sent
bail!(
send_stream.finish().await,
Connection,
"failed to finish sending stream"
);

// If enabled, write to our metrics
#[cfg(feature = "metrics")]
BYTES_SENT.add(raw_message.len() as f64);

Ok(())
}
}
Expand All @@ -204,15 +253,14 @@ impl Sender for QuicSender {
pub struct QuicReceiver(Arc<QuicReceiverRef>);

#[derive(Clone)]
pub struct QuicReceiverRef(quinn::Connection);
pub struct QuicReceiverRef(AsyncReceiver<Bytes>);

#[async_trait]
impl Receiver for QuicReceiver {
/// Receives a single message over the stream and deserializes
/// it.
///
/// # Errors
/// - if we fail to accept an incoming stream
/// - if we fail to receive the message
/// - if we fail to deserialize the message
async fn recv_message(&self) -> Result<Message> {
Expand All @@ -231,28 +279,15 @@ impl Receiver for QuicReceiver {
/// it.
///
/// # Errors
/// - if we fail to accept an incoming stream
/// - if we fail to receive the message
async fn recv_message_raw(&self) -> Result<Bytes> {
// Accept an incoming unidirectional stream
let mut recv_stream = bail!(
self.0 .0.accept_uni().await,
// Receive the message
let raw_message = bail!(
self.0 .0.recv().await,
Connection,
"failed to accept incoming stream"
"failed to receive message: connection closed"
);

// Read until the end, when the sender has indicated it's done reading
// TODO: serverside, set time out here
let raw_message = Bytes::from(bail!(
recv_stream.read_to_end(MAX_MESSAGE_SIZE as usize).await,
Connection,
"failed to read bytes from receiving stream"
));

// If enabled, write to our metrics
#[cfg(feature = "metrics")]
BYTES_RECV.add(raw_message.len() as f64);

Ok(raw_message)
}
}
Expand All @@ -271,11 +306,25 @@ impl UnfinalizedConnection<QuicSender, QuicReceiver> for UnfinalizedQuicConnecti
// Await on the `Connecting` to obtain `Connection`
let connection = bail!(self.0.await, Connection, "failed to finalize connection");

// Accept an incoming bidirectional stream
let (sender, mut receiver) = bail!(
connection.accept_bi().await,
Connection,
"failed to accept bidirectional stream"
);

// Read the `u8` required to bootstrap the connection
bail!(
receiver.read_u8().await,
Connection,
"failed to bootstrap connection"
);

// Convert to owned channel implementation
let (sender, receiver) = into_channels(sender, receiver);

// Clone and return the connection
Ok((
QuicSender(Arc::from(QuicSenderRef(connection.clone()))),
QuicReceiver(Arc::from(QuicReceiverRef(connection))),
))
Ok((sender, receiver))
}
}

Expand Down Expand Up @@ -309,7 +358,7 @@ impl Listener<UnfinalizedQuicConnection> for QuicListener {
impl Drop for QuicSenderRef {
fn drop(&mut self) {
// Close the connection with no reason
self.0.close(VarInt::from_u32(0), b"");
self.0.close();
}
}

Expand All @@ -318,7 +367,7 @@ impl Drop for QuicSenderRef {
impl Drop for QuicReceiverRef {
fn drop(&mut self) {
// Close the connection with no reason
self.0.close(VarInt::from_u32(0), b"");
self.0.close();
}
}

Expand Down
8 changes: 8 additions & 0 deletions proto/src/connection/protocols/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ fn into_split(connection: TcpStream) -> (TcpSender, TcpReceiver) {

#[async_trait]
impl Sender for TcpSender {
/// Send an unserialized message over the stream.
///
/// # Errors
/// If we fail to send or serialize the message
async fn send_message(&self, message: Message) -> Result<()> {
// Serialize our message
let raw_message = Bytes::from(bail!(
Expand All @@ -170,6 +174,10 @@ impl Sender for TcpSender {
self.send_message_raw(raw_message).await
}

/// Send a raw (already serialized) message over the stream.
///
/// # Errors
/// If we fail to send the message
async fn send_message_raw(&self, raw_message: Bytes) -> Result<()> {
// Send the message over our channel
bail!(
Expand Down
2 changes: 1 addition & 1 deletion proto/src/crypto/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::rng::DeterministicRng;

/// This trait defines a generic signature scheme, wherein we can sign and verify messages
/// with the associated public and private keys.
pub trait SignatureScheme: Send + Sync + Clone + 'static {
pub trait SignatureScheme: Send + Sync + Clone {
/// The signing key type
type PrivateKey: Clone + Send + Sync;
/// The verification key type
Expand Down

0 comments on commit a61f251

Please sign in to comment.