Skip to content

Commit

Permalink
network protocol benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 25, 2024
1 parent 35d38ff commit dea04a2
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 85 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ prometheus = { version = "0.13.3" }
lazy_static = "1.4.0"
derive_builder = "0.13.1"
async-std = { version = "1.12.0", features = ["tokio1", "attributes"] }
rkyv = { version = "0.7.44", features = ["validation"] }
rkyv = { version = "0.7.44", features = ["validation"] }

# Dev dependencies (can't be defined explicitly in the workspace)
# TODO: figure out if this actually builds on non-test targets
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
pprof = { version = "0.13.0", features = ["flamegraph", "criterion"] }
4 changes: 2 additions & 2 deletions broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ runtime-tokio = ["proto/runtime-tokio"]
runtime-async-std = ["dep:async-std", "proto/runtime-async-std"]

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
pprof = { version = "0.13.0", features = ["flamegraph", "criterion"] }
criterion.workspace = true
pprof.workspace = true

# Benchmark direct messages (non-networked)
[[bench]]
Expand Down
18 changes: 14 additions & 4 deletions proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ insecure = ["rustls/dangerous_configuration"]
runtime-tokio = []
runtime-async-std = ["dep:async-std"]

[dev-dependencies]
portpicker = "0.1.1"
criterion.workspace = true
pprof.workspace = true

# Network protocol (tcp/quic) benchmarks
[[bench]]
name = "protocols"
harness = false


[dependencies]
# TODO: make this optional if local not specified
Expand All @@ -33,7 +43,10 @@ prometheus = { workspace = true, optional = true }
lazy_static = { workspace = true, optional = true }

tokio = { workspace = true }
async-std = { version = "1.12.0", features = ["tokio1", "attributes"], optional = true }
async-std = { version = "1.12.0", features = [
"tokio1",
"attributes",
], optional = true }

capnp = "0.19.1"
thiserror = "1.0.56"
Expand All @@ -53,6 +66,3 @@ anyhow = "1.0.79"
kanal = "0.1.0-pre8"
rkyv.workspace = true
mnemonic = "1.0.1"

[dev-dependencies]
portpicker = "0.1.1"
164 changes: 164 additions & 0 deletions proto/benches/protocols.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
//! Benchmarks for network protocols [Quic/TCP]
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use pprof::criterion::{Output, PProfProfiler};
use proto::{
connection::{
protocols::{
quic::Quic, tcp::Tcp, Listener, Protocol, Receiver, Sender, UnfinalizedConnection,
},
Bytes,
},
message::{Broadcast, Message},
};
use tokio::{join, runtime::Runtime, spawn};

/// Transfer a message `raw_message` from `conn1` to `conn2.` This is the primary
/// function used for testing network protocol speed.
async fn transfer<Proto: Protocol>(
conn1: (Proto::Sender, Proto::Receiver),
conn2: (Proto::Sender, Proto::Receiver),
raw_message: Bytes,
) {
// Send from the first connection
let conn1_jh = spawn(async move {
conn1
.0
.send_message_raw(raw_message.clone())
.await
.expect("failed to send message");
});

// Receive from the second connection
let conn2_jh = spawn(async move {
conn2
.1
.recv_message_raw()
.await
.expect("failed to receive message");
});

// Wait for both to finish
let _ = join!(conn1_jh, conn2_jh);
}

/// Set up our protocol benchmarks, including async runtime, given the message size
/// to test.
fn set_up_bench<Proto: Protocol>(
message_size: usize,
) -> (
Runtime,
(Proto::Sender, Proto::Receiver),
(Proto::Sender, Proto::Receiver),
Bytes,
) {
// Create new tokio runtime
let benchmark_runtime = tokio::runtime::Runtime::new().expect("failed to create Tokio runtime");

// Set up our protocol under test
let (conn1, conn2, message) = benchmark_runtime.block_on(async move {
// Find random, open port to use
let port = portpicker::pick_unused_port().expect("no ports available");

// Create listener, bind to port
let listener = Proto::bind(&format!("127.0.0.1:{}", port), None, None)
.await
.expect("failed to listen on port");

// Spawn future that resolves to our inbound connection
let listener_jh = spawn(async move {
// Accept connection
let unfinalized_connection =
listener.accept().await.expect("failed to open connection");

// Finalize the connection
unfinalized_connection
.finalize()
.await
.expect("failed to finalize connection")
});

// Attempt to connect
let conn1 = Proto::connect(&format!("127.0.0.1:{}", port))
.await
.expect("failed to connect to listener");

// Wait for listener to resolve
let conn2 = listener_jh.await.expect("failed to join listener task");

// Create message of particular size
let message = Bytes::from(
Message::Broadcast(Broadcast {
topics: vec![],
message: vec![0; message_size],
})
.serialize()
.expect("failed to serialize message"),
);

(conn1, conn2, message)
});

(benchmark_runtime, conn1, conn2, message)
}

/// Bench the `QUIC` protocol implementation
fn bench_quic(c: &mut Criterion) {
static KB: usize = 1024;
static MB: usize = 1024 * 1024;
let mut group = c.benchmark_group("quic_transfer");
// The message sizes we want to test
for size in [100, 1 * KB, 100 * KB, 10 * MB, 100 * MB].iter() {
// Set up our bench
let (runtime, conn1, conn2, message) = set_up_bench::<Quic>(*size);

// Run with variable throughput
group.throughput(Throughput::Bytes(*size as u64));
group.bench_function(BenchmarkId::from_parameter(size), |b| {
b.to_async(&runtime).iter(|| {
transfer::<Quic>(
black_box(conn1.clone()),
black_box(conn2.clone()),
black_box(message.clone()),
)
});
});
}

group.finish();
}

/// Bench the `TCP` protocol implementation
fn bench_tcp(c: &mut Criterion) {
static KB: usize = 1024;
static MB: usize = 1024 * 1024;
let mut group = c.benchmark_group("tcp_transfer");
// The message sizes we want to test
for size in [100, 1 * KB, 100 * KB, 10 * MB, 100 * MB].iter() {
// Set up our bench
let (runtime, conn1, conn2, message) = set_up_bench::<Tcp>(*size);

// Run with variable throughput
group.throughput(Throughput::Bytes(*size as u64));
group.bench_function(BenchmarkId::from_parameter(size), |b| {
b.to_async(&runtime).iter(|| {
transfer::<Tcp>(
black_box(conn1.clone()),
black_box(conn2.clone()),
black_box(message.clone()),
)
});
});
}

group.finish();
}

// Set up the benchmnark with the optional flamegraph profiler
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_quic, bench_tcp
}

criterion_main!(benches);
25 changes: 19 additions & 6 deletions proto/src/connection/protocols/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,24 @@ impl Receiver for MemoryReceiver {
/// - If the other side of the channel is closed
/// - If we fail deserialization
async fn recv_message(&self) -> Result<Message> {
// Receive raw message
let raw_message = self.recv_message_raw().await?;

// Deserialize and return the message
Ok(bail!(
Message::deserialize(&raw_message),
Deserialize,
"failed to deserialize message"
))
}

/// Receives a single message from our channel without
/// deserializing≥
///
/// # Errors
/// - If the other side of the channel is closed
/// - If we fail deserialization
async fn recv_message_raw(&self) -> Result<Bytes> {
// Receive a message from the channel
let raw_message = bail!(
self.0 .0.recv().await,
Expand All @@ -180,12 +198,7 @@ impl Receiver for MemoryReceiver {
#[cfg(feature = "metrics")]
BYTES_RECV.add(raw_message.len() as f64);

// Deserialize and return the message
Ok(bail!(
Message::deserialize(&raw_message),
Deserialize,
"failed to deserialize message"
))
Ok(raw_message)
}
}

Expand Down
60 changes: 60 additions & 0 deletions proto/src/connection/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ pub trait Receiver {
/// - if we fail to receive the message
/// - if we fail deserialization
async fn recv_message(&self) -> Result<Message>;

/// Receives a message or message[s] over the stream without deserializing.
///
/// # Errors
/// - if we fail to receive the message
async fn recv_message_raw(&self) -> Result<Bytes>;
}

#[automock]
Expand Down Expand Up @@ -170,3 +176,57 @@ pub mod tests {
Ok(())
}
}

/// A macro to read a length-delimited (serialized) message from a stream.
/// Has a bounds check for if the message is too big
#[macro_export]
macro_rules! read_length_delimited {
($stream: expr) => {{
// Read the message size from the stream
let Ok(message_size) = $stream.read_u64().await else {
return;
};

// Make sure the message isn't too big
if message_size > MAX_MESSAGE_SIZE {
return;
}

// Create buffer of the proper size
let mut buffer = vec![0; usize::try_from(message_size).expect("64 bit system")];

// Read the message from the stream
if $stream.read_exact(&mut buffer).await.is_err() {
return;
}

// Add to our metrics, if desired
#[cfg(feature = "metrics")]
metrics::BYTES_RECV.add(message_size as f64);

buffer
}};
}

/// A macro to write a length-delimited (serialized) message to a stream.
#[macro_export]
macro_rules! write_length_delimited {
($stream: expr, $message:expr) => {
// Get the length of the message
let message_len = $message.len() as u64;

// Write the message size to the stream
if $stream.write_u64(message_len).await.is_err() {
return;
}

// Write the message to the stream
if $stream.write_all(&$message).await.is_err() {
return;
}

// Increment the number of bytes we've sent by this amount
#[cfg(feature = "metrics")]
metrics::BYTES_SENT.add(message_len as f64);
};
}
27 changes: 20 additions & 7 deletions proto/src/connection/protocols/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,26 @@ impl Receiver for QuicReceiver {
/// # Errors
/// - if we fail to accept an incoming stream
/// - if we fail to receive the message
/// - if we fail deserialization
/// - if we fail to deserialize the message
async fn recv_message(&self) -> Result<Message> {
// Receive the raw message
let raw_message = self.recv_message_raw().await?;

// Deserialize and return the message
Ok(bail!(
Message::deserialize(&raw_message),
Deserialize,
"failed to deserialize message"
))
}

/// Receives a single message over the stream and deserializes
/// it.
///
/// # Errors
/// - if we fail to accept an incoming stream
/// - if we fail to receive the message
async fn recv_message_raw(&self) -> Result<Bytes> {
// Accept an incoming unidirectional stream
let mut recv_stream = bail!(
self.0 .0.accept_uni().await,
Expand All @@ -235,12 +253,7 @@ impl Receiver for QuicReceiver {
#[cfg(feature = "metrics")]
BYTES_RECV.add(raw_message.len() as f64);

// Deserialize and return the message
Ok(bail!(
Message::deserialize(&raw_message),
Deserialize,
"failed to deserialize message"
))
Ok(raw_message)
}
}

Expand Down
Loading

0 comments on commit dea04a2

Please sign in to comment.