Skip to content

Commit

Permalink
bench/flamegraph updates
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 12, 2024
1 parent 6508a6d commit 962e22c
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 131 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ members = ["proto", "broker", "marshal", "client"]

package.description = "The PushCDN is a distributed, scalable, and fault-tolerant pub/sub messaging system"

[profile.release]
debug = true

[workspace.dependencies]
tokio = { version = "1.35.1", features = ["full"] }
jf-primitives.git = "https://github.com/EspressoSystems/jellyfish.git"
Expand All @@ -13,4 +16,3 @@ tracing = "0.1.40"
tracing-subscriber = "0.3.18"
clap = { version = "4.4.18", features = ["derive"] }
async-trait = "0.1.77"
paste = "1.0.14"
5 changes: 4 additions & 1 deletion broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ edition = "2021"

description = "Defines the broker server, which is responsible for routing messages from clients"

[profile.release]
debug = true

[dependencies]
jf-primitives.workspace = true
proto.path = "../proto"
Expand All @@ -17,4 +20,4 @@ local-ip-address = "0.5.7"
slotmap = "1.0.7"
delegate = "0.12.0"
async-trait.workspace = true
paste.workspace = true
paste = "1.0.14"
3 changes: 2 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ ark-serialize.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
rand.workspace = true
tracing.workspace = true
tracing.workspace = true
clap.workspace = true
178 changes: 54 additions & 124 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,143 +2,73 @@
//! We spawn two clients. In a single-broker run, this lets them connect
//! cross-broker.
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, time::Duration};

use clap::Parser;
use client::{Client, Config};
use proto::{
connection::protocols::quic::Quic,
crypto::{self, KeyPair},
crypto::{self, DeterministicRng, KeyPair},
error::Result,
message::{Message, Topic},
};

use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS;
use rand::{rngs::StdRng, SeedableRng};
use tokio::{join, spawn};
use tokio::time::sleep;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
/// The main component of the push CDN.
struct Args {
/// The node's identifier (for deterministically creating keys)
#[arg(short, long)]
id: u64,
}

#[tokio::main]
async fn main() -> Result<()> {
// Get command-line args
let args = Args::parse();

// Initialize tracing
tracing_subscriber::fmt::init();

// Generate two random keypairs, one for each client
let (signing_key_1, verification_key_1) =
crypto::generate_random_keypair::<BLS, StdRng>(StdRng::from_entropy())?;

let (signing_key_2, verification_key_2) =
crypto::generate_random_keypair::<BLS, StdRng>(StdRng::from_entropy())?;

// Create our first client
// TODO: constructors for config
let client1 = Arc::new(
// We are running with the `BLS` key signing algorithm
// and `Quic` as a networking protocol.
Client::<BLS, Quic>::new(Config {
// Our marshal address, locally running on port 8082
endpoint: "127.0.0.1:8082".to_string(),
keypair: KeyPair {
signing_key: signing_key_1,
verification_key: verification_key_1,
},

// The topics we want to subscribe to initially
subscribed_topics: vec![Topic::DA, Topic::Global],

// TODO: remove this via means of constructor
pd: PhantomData,
})
.await?,
);

// Create our second client
let client2 = Arc::new(
Client::<BLS, Quic>::new(Config {
// This is the same marshal, but a possibly different broker.
endpoint: "127.0.0.1:8082".to_string(),
keypair: KeyPair {
signing_key: signing_key_2,
verification_key: verification_key_2,
},
subscribed_topics: vec![Topic::DA, Topic::Global],
pd: PhantomData,
})
.await?,
);

// Run our first client, which sends a message to our second.
let client1 = spawn(async move {
// Clone our client
let client1_ = client1.clone();

// The sending side
let jh1 = spawn(async move {
// Send a message to client 2
let message = "hello client2";
client1_
.send_direct_message(&verification_key_2, b"hello client2".to_vec())
.expect("failed to send message");

println!("client 1 sent \"{message}\"");
});

// The receiving side
let jh2 = spawn(async move {
let message = client1
.receive_message()
.await
.expect("failed to receive message");

if let Message::Direct(direct) = message {
println!(
"client 1 received {}",
String::from_utf8(direct.message).expect("failed to deserialize message")
);
} else {
panic!("received wrong message type");
}
});

let _ = tokio::join!(jh1, jh2);
});

// Run our second client, which sends a message to our first.
let client2 = spawn(async move {
// Clone our client
let client2_ = client2.clone();

// The sending side
let jh1 = spawn(async move {
// Send a message to client 2
let message = "hello client1";
client2_
.send_direct_message(&verification_key_1, b"hello client1".to_vec())
.expect("failed to send message");

println!("client 2 sent \"{message}\"");
});

// The receiving side
let jh2 = spawn(async move {
let message = client2
.receive_message()
.await
.expect("failed to receive message");

if let Message::Direct(direct) = message {
println!(
"client 2 received {}",
String::from_utf8(direct.message).expect("failed to deserialize message")
);
} else {
panic!("received wrong message type")
}
});

let _ = tokio::join!(jh1, jh2);
});

// Wait for both to finish
let _ = join!(client1, client2);

Ok(())
let (signing_key, verification_key) =
crypto::generate_random_keypair::<BLS, DeterministicRng>(DeterministicRng(args.id))?;

let client = Client::<BLS, Quic>::new(Config {
endpoint: "127.0.0.1:8082".to_string(),
keypair: KeyPair {
verification_key,
signing_key,
},
subscribed_topics: vec![],
pd: PhantomData,
})
.await?;

// We want the first node to send to the second
if args.id != 0 {
// Generate two random keypairs, one for each client
let (_, other_verification_key) =
crypto::generate_random_keypair::<BLS, DeterministicRng>(DeterministicRng(0))?;

loop {
// Create a big 512MB message
let m = vec![0u8; 256000000];

if let Err(err) = client.send_direct_message(&other_verification_key, m) {
tracing::error!("failed to send message: {}", err);
};

sleep(Duration::from_secs(1)).await;
}
} else {
loop {
if let Err(err) = client.receive_message().await {
tracing::error!("failed to receive message: {}", err);
continue;
};
}
}
}
12 changes: 9 additions & 3 deletions process-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@ processes:
condition: process_started

broker_0:
command: cargo run --package broker -- -b 8082 -u 8083
command: cargo run --release --package broker -- -b 8082 -u 8083
depends_on:
redis:
condition: process_started

broker_1:
command: cargo run --package broker -- -b 8084 -u 8085
command: cargo run --release --package broker -- -b 8084 -u 8085
depends_on:
redis:
condition: process_started

client_0:
command: cargo run --bin client
command: cargo run --release --bin client -- --id 0
depends_on:
marshal_0:
condition: process_started

client_1:
command: cargo run --release --bin client -- --id 1
depends_on:
marshal_0:
condition: process_started
Expand Down
6 changes: 5 additions & 1 deletion proto/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
self, authenticate_response, authenticate_with_key, authenticate_with_permit, broadcast,
direct,
},
MAX_MESSAGE_SIZE,
};

/// This is a helper macro for serializing `CapnProto` values.
Expand Down Expand Up @@ -236,7 +237,10 @@ impl Message {
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
// Create reader
let reader = bail!(
serialize::read_message(bytes, ReaderOptions::new()),
serialize::read_message(
bytes,
*ReaderOptions::new().traversal_limit_in_words(Some(MAX_MESSAGE_SIZE as usize))
),
Deserialize,
"failed to create reader"
);
Expand Down

0 comments on commit 962e22c

Please sign in to comment.