From dc6f953c116b6e70b4b680bddd279f2faf9c65b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leonard=20G=C3=B6hrs?= Date: Wed, 28 Aug 2024 22:03:03 +0200 Subject: [PATCH] WIP: ingres: webhook: use hyper instead of the ad-hoc http implementation In the future we will likely want to do more things via HTTP, namely serving prometheus metrics, providing a status page or allowing the jobs inside of the machines to upload artifacts. Growing a custom web server to do all of these things would be possible, but not really a reasonable thing to do. Instead use the hyper crate as a HTTP server implementation for the webhook endpoint. Using hyper as a server crate has the benefit of already being in our dependencies via the octocrab crate. As far as I can tell this is however the only benefit of hyper (in our low-traffic usecase at least). The interface seems to be designed by a committee of drunk lawyers that are payed by the number of crates they split their completely bonkers traits into, as a result of this it is about as ergononic as a designer chair. The documentation is strewn all over the place between all these crates and the hyper doc writers have, in all their wisdom, included `#[doc(hidden)]`s in their documentation, making it even less navigable. Maybe what's hidden is proprietary knowledge that they can bill extra for. I hated any minute working with hyper so far. We are in for a good time. --- Cargo.lock | 10 ++ Cargo.toml | 9 ++ src/ingres/webhook.rs | 285 +++++++++++++++++++----------------------- 3 files changed, 145 insertions(+), 159 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 029b13a..333aaa3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,6 +292,9 @@ dependencies = [ "fatfs", "hex", "hmac", + "http-body-util", + "hyper", + "hyper-util", "jsonwebtoken", "log", "octocrab", @@ -503,6 +506,12 @@ version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humantime" version = "2.1.0" @@ -521,6 +530,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", diff --git a/Cargo.toml b/Cargo.toml index cf10900..42a846d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ chrono = "0.4" fatfs = "0.3" hex = "0.4" hmac = "0.12" +http-body-util = "0.1" jsonwebtoken = "9.3" log = "0.4" octocrab = "0.39" @@ -25,6 +26,14 @@ serde_json = "1.0" serde_yml = "0.0.12" sha2 = "0.10" +[dependencies.hyper] +version = "1.4" +features = ["http1", "server"] + +[dependencies.hyper-util] +version = "0.1" +features = ["tokio"] + [dependencies.tokio] version = "1.38" features = ["io-util", "process", "rt", "macros"] diff --git a/src/ingres/webhook.rs b/src/ingres/webhook.rs index ec8859d..a0a8477 100644 --- a/src/ingres/webhook.rs +++ b/src/ingres/webhook.rs @@ -1,38 +1,26 @@ use std::fs::Permissions; use std::os::unix::fs::PermissionsExt; use std::sync::Arc; -use std::time::Duration; use hmac::{Hmac, Mac}; -use log::{error, info, trace, warn}; +use http_body_util::BodyExt; +use hyper::body::Incoming; +use hyper::server::conn::http1::Builder as HttpConnectionBuilder; +use hyper::service::service_fn; +use hyper::{Method, Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; +use log::{error, info, trace}; use octocrab::models::webhook_events::EventInstallation; use octocrab::models::webhook_events::{WebhookEvent, WebhookEventPayload}; use octocrab::models::workflows::Job; use sha2::Sha256; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::net::unix::ReadHalf; -use tokio::net::{UnixListener, UnixStream}; -use tokio::time::timeout; +use tokio::net::UnixListener; use crate::auth::Auth; use crate::config::{Config, ConfigFile}; use crate::jobs::Manager as JobManager; use crate::machines::OwnerAndRepo; -const WEBHOOK_TIMEOUT: Duration = Duration::from_secs(5); -const WEBHOOK_SIZE_LIMIT: u64 = 4 * 1024 * 1024; -const ERROR_RESPONSE: &[u8] = b"HTTP/1.1 400 Bad Request\r -Server: Forrest\r -Content-Length: 35\r -\r -Your request could not be processed -"; -const OK_RESPONSE: &[u8] = b"HTTP/1.1 204 No Content\r -Server: Forrest\r -Content-Length: 0\r -\r -"; - pub struct WebhookHandler { config: Config, auth: Arc, @@ -72,143 +60,104 @@ impl WebhookHandler { let auth = self.auth.clone(); let job_manager = self.job_manager.clone(); + // Wrap the tokio socket in something that hyper understands. + let sock = TokioIo::new(sock); + tokio::task::spawn(async move { - let timeout_error = Err(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "Handler function took too long to run", - )); - - let res = timeout( - WEBHOOK_TIMEOUT, - webook_handler(sock, &config, &auth, job_manager), - ) - .await - .or(timeout_error); - - if let Err(err) = res { - warn!("Webhook handler failed due to: {err}"); - } + // Wrap our handler function in somethin that hyper understands. + let service = + service_fn(|conn| webhook_handler(conn, &config, &auth, &job_manager)); + + HttpConnectionBuilder::new() + .serve_connection(sock, service) + .await }); } } } -async fn webook_handler( - mut sock: UnixStream, +async fn webhook_handler( + request: Request, config: &ConfigFile, auth: &Auth, - job_manager: JobManager, -) -> std::io::Result<()> { - let (read, mut write) = sock.split(); - - let secret = config.github.webhook_secret.as_bytes(); - - let response = match read_req(secret, read).await { - Ok(res) => { - workflow_job_handler(res, config, auth, job_manager).await; + job_manager: &JobManager, +) -> anyhow::Result> { + let (parts, body) = request.into_parts(); + + if parts.uri.path() != "/webhook" { + return Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("Not found".into()) + .unwrap()); + } - OK_RESPONSE - } - Err(e) => { - error!("Got malformed webhook request: {e}"); + if parts.method != Method::POST { + return Ok(Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .body("Only HTTP POST is allowed".into()) + .unwrap()); + } - ERROR_RESPONSE + let event_type = match parts.headers.get("X-GitHub-Event") { + Some(et) => et, + None => { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Request is missing an X-GitHub-Event Header".into()) + .unwrap()); } }; - write.write_all(response).await -} - -async fn read_req<'a>(secret: &[u8], read: ReadHalf<'a>) -> std::io::Result { - // Limit the maximum request size and buffer the stream so we can read - // individual bytes like when searching for a '\n'. - let mut read = BufReader::new(read.take(WEBHOOK_SIZE_LIMIT)); - - let mut line = String::new(); - - read.read_line(&mut line).await?; - - // This is where I hide my custom webserver. - // Don't tell anyone though. - // The webserver crates in the tokio universe all looked a bit over the top. - // This one is very minimalistic and assumes that the only client it ever - // has to interact with is an nginx reverse proxy. - - if line.trim_end() != "POST /webhook HTTP/1.1" { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Got unexpected request", - )); - } - - let mut content_length: Option = None; - let mut event_type: Option = None; - let mut signature: Option> = None; - - loop { - line.clear(); - read.read_line(&mut line).await?; - line.make_ascii_lowercase(); - - if line.trim().is_empty() { - // We are done with the headers - break; + let event_type = match event_type.to_str() { + Ok(et) => et, + Err(_) => { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Failed to decode X-GitHub-Event Header".into()) + .unwrap()); } + }; - if let Some(cl) = line.strip_prefix("content-length:") { - content_length = cl.trim().parse().ok(); + let signature = match parts.headers.get("X-Hub-Signature-256") { + Some(sig) => sig, + None => { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Request is missing an X-Hub-Signature-256 Header".into()) + .unwrap()); } + }; - if let Some(et) = line.strip_prefix("x-github-event:") { - event_type = Some(et.trim().to_owned()); - } + let signature = signature + .to_str() + .ok() + .and_then(|sig| sig.strip_prefix("sha256=")) + .and_then(|sig| hex::decode(sig).ok()); - if let Some(sig) = line - .strip_prefix("x-hub-signature-256:") - .and_then(|sig| sig.trim().strip_prefix("sha256=")) - { - signature = hex::decode(sig).ok(); + let signature = match signature { + Some(sig) => sig, + None => { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Failed to decode X-Hub-Signature-256 Header".into()) + .unwrap()); } - } + }; - let content_length = content_length.ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Missing Content-Length header", - ) - })?; - - let event_type = event_type.ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Missing X-GitHub-Event header", - ) - })?; - - let signature = signature.ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Missing X-Hub-Signature-256 header", - ) - })?; - - if (content_length as u64) > WEBHOOK_SIZE_LIMIT { - Err(std::io::Error::other("Content-Length is too large"))?; - } + let secret = config.github.webhook_secret.as_bytes(); let content = { - let mut content = vec![0; content_length]; - read.read_exact(&mut content).await?; + let content = body.collect().await?.to_bytes(); let mut hmac: Hmac = Hmac::new_from_slice(secret).unwrap(); hmac.update(&content); let content_valid = hmac.verify_slice(&signature); if content_valid.is_err() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "HMAC signature does not match", - )); + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Signature validation failed".into()) + .unwrap()); } content @@ -216,23 +165,24 @@ async fn read_req<'a>(secret: &[u8], read: ReadHalf<'a>) -> std::io::Result ev, + Err(_) => { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Failed to parse request body".into()) + .unwrap()); + } + }; -async fn workflow_job_handler( - event: WebhookEvent, - config: &ConfigFile, - auth: &Auth, - job_manager: JobManager, -) { let job = match event.specific { WebhookEventPayload::WorkflowJob(job) => job, - _ => return, + _ => { + return Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body("".into()) + .unwrap()) + } }; let oar = { @@ -240,7 +190,10 @@ async fn workflow_job_handler( Some(repo) => repo, None => { error!("Got workflow_job webhook event without repository field"); - return; + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Workflow job is missing a repository field".into()) + .unwrap()); } }; @@ -248,7 +201,10 @@ async fn workflow_job_handler( Some(owner) => owner.login, None => { error!("Got workflow_job webhook event without user in repository field"); - return; + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Workflow job repository is missing an owner field".into()) + .unwrap()); } }; @@ -263,7 +219,10 @@ async fn workflow_job_handler( if !exists { info!("Refusing to service webhook from unlisted user/repo {oar}"); - return; + return Ok(Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("Unauthorized user/repo combination".into()) + .unwrap()); } let installation_id = match event.installation { @@ -271,7 +230,10 @@ async fn workflow_job_handler( Some(EventInstallation::Minimal(inst)) => inst.id, None => { error!("Got webhook event that was not sent by an installation"); - return; + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("The webhook event is missing an installation id".into()) + .unwrap()); } }; @@ -279,7 +241,10 @@ async fn workflow_job_handler( Ok(workflow_job) => workflow_job, Err(err) => { error!("Could not parse workflow job received from webhook: {err}"); - return; + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Failed to parse workflow job".into()) + .unwrap()); } }; @@ -292,16 +257,18 @@ async fn workflow_job_handler( // requests on their behalf later. auth.update_user(oar.owner(), installation_id); - let triplet = match oar.into_triplet_via_labels(&workflow_job.labels) { - Some(triplet) => triplet, - None => return, - }; + if let Some(triplet) = oar.into_triplet_via_labels(&workflow_job.labels) { + job_manager.status_feedback( + &triplet, + workflow_job.id, + workflow_job.run_id, + workflow_job.status, + workflow_job.runner_name.as_deref(), + ); + } - job_manager.status_feedback( - &triplet, - workflow_job.id, - workflow_job.run_id, - workflow_job.status, - workflow_job.runner_name.as_deref(), - ); + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body("".into()) + .unwrap()) }