Skip to content

Commit

Permalink
Added first working version of websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
CEbbinghaus committed Feb 25, 2024
1 parent 1ec022e commit b4e8788
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 6 deletions.
20 changes: 18 additions & 2 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ slotmap = { version = "1.0.6", features = ["serde"] }
glob = "0.3.1"
semver = { version = "1.0.20", features = ["serde"] }
either = "1.9.0"
actix-ws = "0.2.5"
actix-rt = "2.9.0"

[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports"] }
Expand Down
3 changes: 2 additions & 1 deletion backend/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
env::PACKAGE_VERSION,
err::Error,
event::Event,
sdcard::{get_card_cid, is_card_inserted},
sdcard::{get_card_cid, is_card_inserted}, ws::ws,
};
use actix_web::{
delete, get,
Expand All @@ -21,6 +21,7 @@ use tokio_stream::wrappers::BroadcastStream;

pub(crate) fn config(cfg: &mut web::ServiceConfig) {
cfg //
.service(ws)
.service(health)
.service(version)
.service(listen)
Expand Down
6 changes: 3 additions & 3 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ mod log;
mod sdcard;
mod steam;
mod watch;
mod ws;

use crate::{api::config, dto::CardEvent};
use crate::ds::Store;
use crate::env::*;
use crate::log::Logger;
use crate::watch::start_watch;
use crate::{api::config, dto::CardEvent};
use ::log::{debug, error, info};
use actix_cors::Cors;
use actix_web::{web, App, HttpServer};
Expand All @@ -36,7 +37,6 @@ pub fn init() -> Result<(), ::log::SetLoggerError> {
type MainResult = Result<(), Error>;

async fn run_server(datastore: Arc<Store>, sender: Sender<CardEvent>) -> MainResult {

info!("Starting HTTP server...");

HttpServer::new(move || {
Expand Down Expand Up @@ -73,7 +73,7 @@ async fn main() {
}
Ok(()) => debug!("Initialized..."),
}

info!(
"{}@{} by {}",
PACKAGE_NAME, PACKAGE_VERSION, PACKAGE_AUTHORS
Expand Down
135 changes: 135 additions & 0 deletions backend/src/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::{sync::Arc, time::{Duration, Instant}};

use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_ws::Message;
use futures::StreamExt;
use tokio::{select, sync::broadcast::*, time::interval};

use crate::{dto::CardEvent, event::Event};

/// How often heartbeat pings are sent.
///
/// Should be half (or less) of the acceptable client timeout.
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

/// How long before lack of client response causes a timeout.
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);


#[get("/ws-test")]
async fn ws(req: HttpRequest, body: web::Payload, sender: web::Data<Sender<CardEvent>>) -> Result<HttpResponse, Error> {
let (response, session, msg_stream) = actix_ws::handle(&req, body)?;

actix_rt::spawn(async move {
handle_ws(session, msg_stream, sender.into_inner()).await;
});

Ok(response)
}


/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn handle_ws(
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
channel: Arc<Sender<CardEvent>>,
) {
log::info!("connected");

let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);

let mut reciever = channel.subscribe();

let reason = loop {
// waits for either `msg_stream` to receive a message from the client, the broadcast channel
// to send a message, or the heartbeat interval timer to tick, yielding the value of
// whichever one is ready first
select! {
broadcast_msg = reciever.recv() => {
let msg = match broadcast_msg {
Ok(msg) => msg,
Err(error::RecvError::Closed) => break None,
Err(error::RecvError::Lagged(_)) => continue,
};

let res = session.text(Event::new(msg)).await;

if let Err(err) = res {
log::error!("{err}");
break None;
}
}

// heartbeat interval ticked
_tick = interval.tick() => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);

break None;
}

// send heartbeat ping
let _ = session.ping(b"").await;
},

msg = msg_stream.next() => {
let msg = match msg {
// received message from WebSocket client
Some(Ok(msg)) => msg,

// client WebSocket stream error
Some(Err(err)) => {
log::error!("{err}");
break None;
}

// client WebSocket stream ended
None => break None
};

log::debug!("msg: {msg:?}");

match msg {
Message::Text(_) => {
channel.send(CardEvent::Updated);
// drop client's text messages
}

Message::Binary(_) => {
// drop client's binary messages
}

Message::Close(reason) => {
break reason;
}

Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}

Message::Pong(_) => {
last_heartbeat = Instant::now();
}

Message::Continuation(_) => {
log::warn!("no support for continuation frames");
}

// no-op; ignore
Message::Nop => {}
};
}
}
};

// attempt to close connection gracefully
let _ = session.close(reason).await;

log::info!("disconnected");
}

0 comments on commit b4e8788

Please sign in to comment.