From b4e8788b157f521686d942aab7c714ae46ba25d2 Mon Sep 17 00:00:00 2001 From: CEbbinghaus Date: Sun, 25 Feb 2024 20:06:33 +1100 Subject: [PATCH] Added first working version of websockets --- backend/Cargo.lock | 20 ++++++- backend/Cargo.toml | 2 + backend/src/api.rs | 3 +- backend/src/main.rs | 6 +- backend/src/ws.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 backend/src/ws.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 0bd16ca..411d795 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -98,10 +98,11 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e" +checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" dependencies = [ + "actix-macros", "futures-core", "tokio", ] @@ -197,6 +198,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "actix-ws" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3" +dependencies = [ + "actix-codec", + "actix-http", + "actix-web", + "futures-core", + "tokio", +] + [[package]] name = "addr2line" version = "0.21.0" @@ -302,7 +316,9 @@ name = "backend" version = "0.0.0" dependencies = [ "actix-cors", + "actix-rt", "actix-web", + "actix-ws", "async-trait", "chrono", "criterion", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index e047738..718d864 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -26,6 +26,8 @@ slotmap = { version = "1.0.6", features = ["serde"] } glob = "0.3.1" semver = { version = "1.0.20", features = ["serde"] } either = "1.9.0" +actix-ws = "0.2.5" +actix-rt = "2.9.0" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports"] } diff --git a/backend/src/api.rs b/backend/src/api.rs index 5f4e144..953d3a2 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -4,7 +4,7 @@ use crate::{ env::PACKAGE_VERSION, err::Error, event::Event, - sdcard::{get_card_cid, is_card_inserted}, + sdcard::{get_card_cid, is_card_inserted}, ws::ws, }; use actix_web::{ delete, get, @@ -21,6 +21,7 @@ use tokio_stream::wrappers::BroadcastStream; pub(crate) fn config(cfg: &mut web::ServiceConfig) { cfg // + .service(ws) .service(health) .service(version) .service(listen) diff --git a/backend/src/main.rs b/backend/src/main.rs index 52bf68e..2a431e7 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -8,12 +8,13 @@ mod log; mod sdcard; mod steam; mod watch; +mod ws; -use crate::{api::config, dto::CardEvent}; use crate::ds::Store; use crate::env::*; use crate::log::Logger; use crate::watch::start_watch; +use crate::{api::config, dto::CardEvent}; use ::log::{debug, error, info}; use actix_cors::Cors; use actix_web::{web, App, HttpServer}; @@ -36,7 +37,6 @@ pub fn init() -> Result<(), ::log::SetLoggerError> { type MainResult = Result<(), Error>; async fn run_server(datastore: Arc, sender: Sender) -> MainResult { - info!("Starting HTTP server..."); HttpServer::new(move || { @@ -73,7 +73,7 @@ async fn main() { } Ok(()) => debug!("Initialized..."), } - + info!( "{}@{} by {}", PACKAGE_NAME, PACKAGE_VERSION, PACKAGE_AUTHORS diff --git a/backend/src/ws.rs b/backend/src/ws.rs new file mode 100644 index 0000000..d1824b6 --- /dev/null +++ b/backend/src/ws.rs @@ -0,0 +1,135 @@ +use std::{sync::Arc, time::{Duration, Instant}}; + +use actix_web::{get, web, Error, HttpRequest, HttpResponse}; +use actix_ws::Message; +use futures::StreamExt; +use tokio::{select, sync::broadcast::*, time::interval}; + +use crate::{dto::CardEvent, event::Event}; + +/// How often heartbeat pings are sent. +/// +/// Should be half (or less) of the acceptable client timeout. +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); + +/// How long before lack of client response causes a timeout. +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + + +#[get("/ws-test")] +async fn ws(req: HttpRequest, body: web::Payload, sender: web::Data>) -> Result { + let (response, session, msg_stream) = actix_ws::handle(&req, body)?; + + actix_rt::spawn(async move { + handle_ws(session, msg_stream, sender.into_inner()).await; + }); + + Ok(response) +} + + +/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor +/// connection health to detect network issues and free up resources. +pub async fn handle_ws( + mut session: actix_ws::Session, + mut msg_stream: actix_ws::MessageStream, + channel: Arc>, +) { + log::info!("connected"); + + let mut last_heartbeat = Instant::now(); + let mut interval = interval(HEARTBEAT_INTERVAL); + + let mut reciever = channel.subscribe(); + + let reason = loop { + // waits for either `msg_stream` to receive a message from the client, the broadcast channel + // to send a message, or the heartbeat interval timer to tick, yielding the value of + // whichever one is ready first + select! { + broadcast_msg = reciever.recv() => { + let msg = match broadcast_msg { + Ok(msg) => msg, + Err(error::RecvError::Closed) => break None, + Err(error::RecvError::Lagged(_)) => continue, + }; + + let res = session.text(Event::new(msg)).await; + + if let Err(err) = res { + log::error!("{err}"); + break None; + } + } + + // heartbeat interval ticked + _tick = interval.tick() => { + // if no heartbeat ping/pong received recently, close the connection + if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { + log::info!( + "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" + ); + + break None; + } + + // send heartbeat ping + let _ = session.ping(b"").await; + }, + + msg = msg_stream.next() => { + let msg = match msg { + // received message from WebSocket client + Some(Ok(msg)) => msg, + + // client WebSocket stream error + Some(Err(err)) => { + log::error!("{err}"); + break None; + } + + // client WebSocket stream ended + None => break None + }; + + log::debug!("msg: {msg:?}"); + + match msg { + Message::Text(_) => { + channel.send(CardEvent::Updated); + // drop client's text messages + } + + Message::Binary(_) => { + // drop client's binary messages + } + + Message::Close(reason) => { + break reason; + } + + Message::Ping(bytes) => { + last_heartbeat = Instant::now(); + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => { + last_heartbeat = Instant::now(); + } + + Message::Continuation(_) => { + log::warn!("no support for continuation frames"); + } + + // no-op; ignore + Message::Nop => {} + }; + } + } + }; + + // attempt to close connection gracefully + let _ = session.close(reason).await; + + log::info!("disconnected"); +}