Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p channels using iroh-net #7

Merged
merged 5 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ jobs:
- run: cargo build --verbose
- run: cargo test --verbose
- run: cargo clippy -- -Dwarnings
- run: bash ./simulate.sh
- run: cargo test -- --nocapture
working-directory: ./examples/http-channels
- run: cargo test -- --nocapture
working-directory: ./examples/iroh-channels
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = ["examples/http-channels"] }
workspace = { members = ["examples/http-channels", "examples/iroh-channels"] }
[package]
name = "parlay"
version = "0.1.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/http-channels/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "http-channels"
name = "parlay-http"
version = "0.1.0"
edition = "2021"

Expand Down
17 changes: 0 additions & 17 deletions examples/http-channels/simulate.sh

This file was deleted.

2 changes: 1 addition & 1 deletion examples/http-channels/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ enum Commands {
/// A name that uniquely identifies the MPC session on the server.
#[arg(short, long)]
session: String,
#[arg(long)]
/// The path to the Garble program to execute.
#[arg(long)]
program: PathBuf,
/// The index of the party (0 for the first participant, 1 for the second, etc).
#[arg(long)]
Expand Down
86 changes: 86 additions & 0 deletions examples/http-channels/tests/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::{
io::{BufRead, BufReader},
process::{Command, Stdio},
thread,
};

const ENDPOINT: &str = "http://127.0.0.1:8000";

#[test]
fn simulate() {
Command::new("cargo")
.args(["run", "--", "serve"])
.spawn()
.unwrap();
let mut cmd = Command::new("cargo")
.args([
"run",
"--",
"pre",
ENDPOINT,
"--session=test",
"--parties=3",
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let mut stdout = BufReader::new(cmd.stdout.take().unwrap()).lines();
let mut stderr = BufReader::new(cmd.stderr.take().unwrap()).lines();
thread::spawn(move || {
while let Some(Ok(line)) = stdout.next() {
println!("pre> {line}");
}
});
thread::spawn(move || {
while let Some(Ok(line)) = stderr.next() {
eprintln!("pre> {line}");
}
});
for p in [1, 2] {
let party_arg = format!("--party={p}");
let args = vec![
"run",
"--",
"party",
ENDPOINT,
"--session=test",
"--program=.add.garble.rs",
"--input=2u32",
&party_arg,
];
let mut cmd = Command::new("cargo")
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let mut stdout = BufReader::new(cmd.stdout.take().unwrap()).lines().skip(4);
let mut stderr = BufReader::new(cmd.stderr.take().unwrap()).lines();
thread::spawn(move || {
while let Some(Ok(line)) = stdout.next() {
println!("party{p}> {line}");
}
});
thread::spawn(move || {
while let Some(Ok(line)) = stderr.next() {
eprintln!("party{p}> {line}");
}
});
}
let args = vec![
"run",
"--",
"party",
ENDPOINT,
"--session=test",
"--program=.add.garble.rs",
"--input=2u32",
"--party=0",
];
let out = Command::new("cargo").args(args).output().unwrap();
eprintln!("{}", String::from_utf8(out.stderr).unwrap());
let out = String::from_utf8(out.stdout).unwrap();
let out = out.lines().last().unwrap_or_default();
assert_eq!("The result is 6u32", out);
}
3 changes: 3 additions & 0 deletions examples/iroh-channels/.add.garble.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub fn main(x: u32, y: u32, z: u32) -> u32 {
x + y + z
}
17 changes: 17 additions & 0 deletions examples/iroh-channels/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "parlay-iroh"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.79"
bincode = "1.3.3"
clap = { version = "4.4.18", features = ["derive"] }
iroh-net = "0.12.0"
parlay = { path = "../../", version = "0.1.0" }
quinn = "0.10.2"
serde = { version = "1.0.195", features = ["derive"] }
tokio = { version = "1.35.1", features = ["macros", "rt", "rt-multi-thread"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
url = "2.5.0"
37 changes: 37 additions & 0 deletions examples/iroh-channels/src/iroh_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use parlay::channel::Channel;
use quinn::Connection;
use tracing::info;

pub(crate) struct IrohChannel {
pub(crate) conns: Vec<Option<Connection>>,
pub(crate) max_msg_bytes: usize,
}

impl IrohChannel {
pub(crate) fn new(conns: Vec<Option<Connection>>, max_msg_bytes: usize) -> Self {
Self {
conns,
max_msg_bytes,
}
}
}

impl Channel for IrohChannel {
type SendError = anyhow::Error;
type RecvError = anyhow::Error;

async fn send_bytes_to(&mut self, p: usize, msg: Vec<u8>) -> Result<(), Self::SendError> {
info!("sending {} bytes to {p}", msg.len());
let mut send = self.conns[p].as_ref().unwrap().open_uni().await?;
send.write_all(&msg).await?;
send.finish().await?;
Ok(())
}

async fn recv_bytes_from(&mut self, p: usize) -> Result<Vec<u8>, Self::RecvError> {
let mut recv = self.conns[p].as_ref().unwrap().accept_uni().await?;
let msg = recv.read_to_end(self.max_msg_bytes).await?;
info!("received {} bytes from {p}", msg.len());
Ok(msg)
}
}
Loading
Loading