diff --git a/examples/postgres-integration/Cargo.toml b/examples/postgres-integration/Cargo.toml index 7d2beaa..69db645 100644 --- a/examples/postgres-integration/Cargo.toml +++ b/examples/postgres-integration/Cargo.toml @@ -8,6 +8,7 @@ anyhow = "1.0.79" axum = "0.7.4" blake3 = "1.5.1" clap = { version = "4.4.18", features = ["derive"] } +handlebars = "5.1.2" parlay = { path = "../../", version = "0.1.0" } reqwest = { version = "0.11.23", features = ["json"] } serde = "1.0.197" diff --git a/examples/postgres-integration/policies0.json b/examples/postgres-integration/policies0.json index 6c4e9af..0eaad77 100644 --- a/examples/postgres-integration/policies0.json +++ b/examples/postgres-integration/policies0.json @@ -16,6 +16,22 @@ "setup": "TRUNCATE results", "output": "INSERT INTO results (name) VALUES ($1)", "output_db": "postgres://postgres:test@localhost:5555/postgres" + }, + { + "participants": [ + "http://localhost:8000", + "http://localhost:8001", + "http://localhost:8002" + ], + "program": "does_not_exist.garble.rs", + "leader": 1, + "party": 0, + "input": "SELECT id, name, age FROM residents", + "input_db": "postgres://postgres:test@localhost:5550/postgres", + "max_rows": 4, + "setup": "TRUNCATE results", + "output": "INSERT INTO results (name) VALUES ($1)", + "output_db": "postgres://postgres:test@localhost:5555/postgres" } ] } diff --git a/examples/postgres-integration/src/main.rs b/examples/postgres-integration/src/main.rs index 1cf2721..ad75859 100644 --- a/examples/postgres-integration/src/main.rs +++ b/examples/postgres-integration/src/main.rs @@ -2,10 +2,12 @@ use anyhow::{anyhow, bail, Context, Error}; use axum::{ body::Bytes, extract::{DefaultBodyLimit, Path, State}, - routing::post, + response::Html, + routing::{get, post}, Json, Router, }; use clap::Parser; +use handlebars::Handlebars; use parlay::{ channel::Channel, fpre::fpre, @@ -18,6 +20,7 @@ use parlay::{ }; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; +use serde_json::json; use sqlx::{postgres::PgPoolOptions, Row}; use std::{ borrow::BorrowMut, net::SocketAddr, path::PathBuf, process::exit, result::Result, sync::Arc, @@ -51,12 +54,12 @@ struct Cli { config: PathBuf, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct Policies { accepted: Vec, } -#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] struct Policy { participants: Vec, program: PathBuf, @@ -83,9 +86,9 @@ type MpcState = Arc>>>>; async fn main() -> Result<(), Error> { tracing_subscriber::fmt::init(); let Cli { port, config } = Cli::parse(); - let policies = load_policies(config).await?; - if policies.accepted.len() == 1 { - let policy = &policies.accepted[0]; + let local_policies = load_policies(config).await?; + if local_policies.accepted.len() == 1 { + let policy = &local_policies.accepted[0]; if policy.input.is_empty() { info!("Running as preprocessor..."); return run_fpre(port, policy.participants.clone()).await; @@ -97,7 +100,9 @@ async fn main() -> Result<(), Error> { let app = Router::new() .route("/run", post(run)) .route("/msg/:from", post(msg)) - .with_state((policies.clone(), Arc::clone(&state))) + .route("/policies", get(policies)) + .route("/policies/:id", get(policy)) + .with_state((local_policies.clone(), Arc::clone(&state))) .layer(DefaultBodyLimit::disable()) .layer(TraceLayer::new_for_http()); @@ -105,9 +110,9 @@ async fn main() -> Result<(), Error> { info!("listening on {}", addr); let listener = tokio::net::TcpListener::bind(&addr).await?; tokio::spawn(async move { axum::serve(listener, app).await.unwrap() }); - info!("Found {} active policies", policies.accepted.len()); + info!("Found {} active policies", local_policies.accepted.len()); loop { - for policy in &policies.accepted { + for policy in &local_policies.accepted { if policy.leader == policy.party { info!( "Acting as leader (party {}) for program {}", @@ -125,24 +130,32 @@ async fn main() -> Result<(), Error> { leader: policy.leader, program_hash: hash, }; + let mut participant_missing = false; for party in policy.participants.iter().rev().skip(1).rev() { if party != &policy.participants[policy.party] { info!("Waiting for confirmation from party {party}"); let url = format!("{party}run"); - match client - .post(&url) - .json(&policy_request) - .send() - .await? - .status() - { + let Ok(res) = client.post(&url).json(&policy_request).send().await else { + error!("Could not reach {url}"); + participant_missing = true; + continue; + }; + match res.status() { StatusCode::OK => {} code => { error!("Unexpected response while trying to trigger execution for {url}: {code}"); + participant_missing = true; } } } } + if participant_missing { + error!( + "Some participants of program {} are missing, skipping execution...", + policy.program.display() + ); + continue; + } info!("All participants have accepted the session, starting calculation now..."); fn decode_literal(l: Literal) -> Result>, String> { let Literal::Array(rows) = l else { @@ -321,7 +334,7 @@ async fn execute_mpc( let field = if let Ok(s) = row.try_get::(c) { let mut fixed_str = vec![Literal::NumUnsigned(0, UnsignedNumType::U8); STR_LEN_BYTES]; - for (i, b) in s.as_bytes().into_iter().enumerate() { + for (i, b) in s.as_bytes().iter().enumerate() { if i < STR_LEN_BYTES { fixed_str[i] = Literal::NumUnsigned(*b as u64, UnsignedNumType::U8); } else { @@ -426,6 +439,56 @@ async fn msg(State((_, state)): State<(Policies, MpcState)>, Path(from): Path, +) -> Result, axum::http::StatusCode> { + let mut accepted = vec![]; + for (i, p) in policies.accepted.into_iter().enumerate() { + accepted.push(json!({ + "id": i, + "num_participants": p.participants.len(), + "program": p.program.to_str().unwrap_or(""), + "leader": p.leader, + "party": p.party, + })); + } + let params = json!({ + "accepted": accepted + }); + render_template(include_str!("../templates/policies.html"), ¶ms) +} + +async fn policy( + State((policies, _)): State<(Policies, MpcState)>, + Path(id): Path, +) -> Result, axum::http::StatusCode> { + let Some(p) = policies.accepted.get(id) else { + return Err(axum::http::StatusCode::NOT_FOUND); + }; + let params = json!({ + "policy": { + "num_participants": p.participants.len(), + "participants": p.participants, + "program": p.program.to_str().unwrap_or(""), + "code": fs::read_to_string(&p.program).await.unwrap_or("Program not found".to_string()), + "leader": p.leader, + "party": p.party, + } + }); + render_template(include_str!("../templates/policy.html"), ¶ms) +} + +fn render_template( + template: &str, + params: &serde_json::Value, +) -> Result, axum::http::StatusCode> { + let h = Handlebars::new(); + let Ok(html) = h.render_template(template, ¶ms) else { + return Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR); + }; + Ok(Html(html)) +} + struct HttpChannel { urls: Vec, party: usize, diff --git a/examples/postgres-integration/templates/policies.html b/examples/postgres-integration/templates/policies.html new file mode 100644 index 0000000..4030824 --- /dev/null +++ b/examples/postgres-integration/templates/policies.html @@ -0,0 +1,28 @@ + + + + + + Policies + + + +
+

Policies

+ {{#each accepted}} +
+ +
{{num_participants}} participants
+
Own Role: Party {{party}}
+
Leader: Party {{leader}}
+
+ {{/each}} +
+ + diff --git a/examples/postgres-integration/templates/policy.html b/examples/postgres-integration/templates/policy.html new file mode 100644 index 0000000..30a3ef4 --- /dev/null +++ b/examples/postgres-integration/templates/policy.html @@ -0,0 +1,32 @@ + + + {{#with policy}} + + + + Policy {{program}} + + + +
+

+ Policies + > {{program}} +

+
+
{{num_participants}} participants
+
    + {{#each participants}} +
  1. {{this}}
  2. + {{/each}} +
+
Own Role: Party {{party}}
+
Leader: Party {{leader}}
+
{{code}}
+
+
+ + {{/with}} +