Skip to content

Commit

Permalink
Changed to Server Events
Browse files Browse the repository at this point in the history
  • Loading branch information
CEbbinghaus committed Nov 15, 2023
1 parent b4da848 commit e266b8a
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 95 deletions.
29 changes: 14 additions & 15 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
[package]
name = "backend"
version = "0.9.6"
version = "0.9.7"
edition = "2021"
license = "GPL-2.0"
authors = ["Christopher-Robin Ebbinghaus <[email protected]>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix-web = "4.4.0"
actix-cors = "0.6.4"
actix-web = "4.4.0"
log = "0.4"
simplelog = "0.12"
once_cell = "1.18.0"
chrono = "0.4.24"
serde_json = "1.0"
tokio = { version = "1.26.0", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
futures = "0.3.26"
keyvalues-serde = "0.1.0" # Key Value format that Steam uses
keyvalues-serde = "0.1.0" # Key Value format that Steam uses
async-trait = "0.1.68"
serde = { version = "1.0.145", features = ["derive", "rc"] }
slotmap = { version = "1.0.6", features = ["serde"] }
glob = "0.3.1"
semver = { version = "1.0.20", features = ["serde"] }
actix-ws = "0.2.5"
either = "1.9.0"

[dev-dependencies]
Expand Down
45 changes: 36 additions & 9 deletions backend/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ use crate::{
err::Error,
sdcard::{get_card_cid, is_card_inserted},
};
use actix_web::{delete, get, post, web, HttpResponse, Responder, Result, http::StatusCode, Either, HttpResponseBuilder};
use actix_ws::Message;
use actix_web::{
delete, get, http::StatusCode, post, web::{self, Bytes}, Either, HttpResponse, HttpResponseBuilder, Responder,
Result,
};
use futures::StreamExt;
use log::debug;
use serde::Deserialize;
use std::{ops::Deref, sync::Arc};
use tokio::sync::broadcast::Sender;
use tokio_stream::wrappers::BroadcastStream;

pub(crate) fn config(cfg: &mut web::ServiceConfig) {
cfg //
Expand Down Expand Up @@ -52,10 +56,25 @@ pub(crate) async fn health() -> impl Responder {
}

#[get("/listen")]
pub(crate) async fn listen(sender: web::Data<Sender<CardEvent>>) -> Result<impl Responder> {
Ok(web::Json(sender.subscribe().recv().await.map_err(
|_| Error::from_str("Unable to retrieve update"),
)?))
pub(crate) async fn listen(sender: web::Data<Sender<CardEvent>>) -> Result<HttpResponse> {
let event_stream = BroadcastStream::new(sender.subscribe()).map(|res|
{
debug!("Streaming Event {:?}", res);
let bytes = match res {
Err(_) => return Err(Error::from_str("Subscriber Closed")),
Ok(value) => {
let data = format!("data: {}\n\n", serde_json::to_string(&value)?);
Bytes::from(data)
}
};

Ok::<actix_web::web::Bytes, Error>(bytes)
}
);
Ok(HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(event_stream)
)
}

#[get("/list")]
Expand Down Expand Up @@ -90,12 +109,20 @@ pub(crate) async fn get_current_card_and_games(
datastore: web::Data<Arc<Store>>,
) -> Result<Either<impl Responder, impl Responder>> {
if !is_card_inserted() {
return Ok(Either::Right(HttpResponseBuilder::new(StatusCode::NO_CONTENT).reason("No Card inserted").finish()));
return Ok(Either::Right(
HttpResponseBuilder::new(StatusCode::NO_CONTENT)
.reason("No Card inserted")
.finish(),
));
}

match get_card_cid() {
Some(uid) => Ok(Either::Left(web::Json(datastore.get_card_and_games(&uid)?))),
None => Ok(Either::Right(HttpResponseBuilder::new(StatusCode::NO_CONTENT).reason("Card Id could not be resolved").finish()))
None => Ok(Either::Right(
HttpResponseBuilder::new(StatusCode::NO_CONTENT)
.reason("Card Id could not be resolved")
.finish(),
)),
}
}

Expand All @@ -105,7 +132,7 @@ pub(crate) async fn get_current_card(datastore: web::Data<Arc<Store>>) -> Result
return Err(Error::from_str("No card is inserted").into());
}

let uid = get_card_cid().ok_or(Error::Error("Unable to evaluate Card Id".into()))?;
let uid = get_card_cid().ok_or(Error::from_str("Unable to evaluate Card Id"))?;

Ok(web::Json(datastore.get_card(&uid)?))
}
Expand Down
5 changes: 3 additions & 2 deletions backend/src/ds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ impl StoreData {
let node = self
.node_ids
.get(card_id)
.ok_or(Error::Error("Card Id not present".into()))?;
.ok_or(Error::from_str("Card Id not present"))?;

match self.nodes.get_mut(*node).unwrap().element {
StoreElement::Card(ref mut card) => {
func(card)?;
}
StoreElement::Game(_) => return Err(Error::Error("Expected Card, got Game".into())),
StoreElement::Game(_) => return Err(Error::from_str("Expected Card, got Game")),
}

Ok(())
Expand Down Expand Up @@ -427,6 +427,7 @@ impl Store {
}

pub fn remove_element(&self, id: &str) -> Result<(), Error> {
// these two operations have to happen within a single scope otherwise the try_write_to_file causes a deadlock
{
let mut lock = self.data.write().unwrap();
lock.remove_item(id)?;
Expand Down
37 changes: 24 additions & 13 deletions backend/src/err.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,53 @@
#![allow(dead_code)]

use std::error;

use std::fmt;

use actix_web::ResponseError;

#[derive(Debug)]
pub enum Error {
Error(String),
struct StdErr;

impl std::error::Error for StdErr{}

impl fmt::Display for StdErr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "StdErr")
}
}

#[derive(Debug)]
pub struct Error(String);

impl Error {
pub fn new_boxed(value: &str) -> Box<Error> {
Box::new(Error::Error(value.to_string()))
Box::new(Error(value.to_string()))
}

pub fn from_str(value: &str) -> Self {
Error::Error(value.to_string())
Error(value.to_string())
}

pub fn new_res<T>(value: &str) -> Result<T, Self> {
Err(Error::Error(value.to_string()))
Err(Error(value.to_string()))
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
// Both underlying errors already impl `Display`, so we defer to
// their implementations.
Error::Error(err) => write!(f, "Error: {}", err),
}
write!(f, "Error: {}", self)
}
}

impl<T: error::Error + Send + Sync + 'static> From<T> for Error {
impl Into<Box<dyn std::error::Error>> for Error {
fn into(self) -> Box<dyn std::error::Error> {
Box::new(StdErr)
}
}

impl<T: std::error::Error + Send + Sync + 'static> From<T> for Error {
fn from(e: T) -> Self {
Error::Error(e.to_string())
Error(e.to_string())
}
}

Expand Down
12 changes: 9 additions & 3 deletions backend/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ impl Logger {
.ok()?;

let max_level = env::var("LOG_LEVEL")
.map_err(|e| Error::from(e))
.and_then(|v| Level::from_str(&v).map_err(|e| Error::from(e)))
.unwrap_or(Level::Info);
.map_err(Error::from)
.and_then(|v| Level::from_str(&v).map_err(Error::from))
.unwrap_or({
if cfg!(debug_assertions) {
Level::Debug
} else {
Level::Info
}
});

println!("Logging enabled to {file_path} with level {max_level}");

Expand Down
2 changes: 1 addition & 1 deletion backend/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn read_msd_directory(datastore: &Store, mount: &Option<String>) -> Result<(), E
libid: library.contentid.clone(),
mount: mount.clone(),
name: library.label,
position: 0,
position: u32::MAX,
hidden: false,
},
);
Expand Down
2 changes: 1 addition & 1 deletion lib/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cebbinghaus/microsdeck",
"version": "0.9.6",
"version": "0.9.7",
"description": "",
"keywords": [],
"author": "CEbbinghaus",
Expand Down
45 changes: 22 additions & 23 deletions lib/src/state/MicoSDeckManager.ts → lib/src/MicoSDeckManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { fetchCardsAndGames, fetchCardsForGame, fetchCurrentCardAndGames, fetchDeleteCard, fetchEventPoll, fetchHealth, fetchUpdateCard, fetchVersion } from "../backend.js";
import { fetchCardsAndGames, fetchCardsForGame, fetchCurrentCardAndGames, fetchDeleteCard, fetchEventPoll, fetchHealth, fetchUpdateCard, fetchVersion } from "./backend.js";
import Logger from "lipe";
import { CardAndGames, CardsAndGames, MicroSDCard } from "../types.js"
import { CardAndGames, CardsAndGames, MicroSDCard } from "./types.js"

function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(() => resolve(), ms));
Expand Down Expand Up @@ -44,6 +44,7 @@ export class MicroSDeckManager {
}

destruct() {
this.logger?.Debug("Deconstruct Called");
if (this.isDestructed) return;
this.isDestructed = true;
this.logger?.Log("Deinitializing MicroSDeckManager");
Expand Down Expand Up @@ -81,6 +82,8 @@ export class MicroSDeckManager {
let sleepDelay = 500;
this.logger?.Debug("Starting poll");



while (true) {
if (signal.aborted) {
this.logger?.Debug("Aborting poll")
Expand All @@ -95,27 +98,23 @@ export class MicroSDeckManager {
this.pollLock = {};
this.logger?.Debug("Poll");

let result = await fetchEventPoll({ ...this.fetchProps, signal });

this.logger?.Debug("Backend detected an update: {result}", { result });

switch (result) {
// Server is down. Lets try again but back off a bit
case undefined:
this.logger?.Warn("Unable to contact Server. Backing off and waiting {sleepDelay}ms", { sleepDelay });
await sleep(sleepDelay = Math.min(sleepDelay * 1.5, 1000 * 60));
break;

// Request must have timed out
case false:
sleepDelay = 100;
break;

// We got an update. Time to refresh.
default:
this.eventBus.dispatchEvent(new Event(result));
await this.fetch();
}
await new Promise((res, rej) => {
const source = new EventSource(`${this.fetchProps.url}/listen`);
this.abortController.signal.addEventListener("abort", () => {
this.logger?.Debug("Abort was called. Trying to close the EventSource");
source.close();
})

source.onopen = () => this.logger?.Debug("Successfully subscribed to events");
source.onmessage = async (message) => {
this.logger?.Debug("Recieved message {data}", {message, data: message.data});
let data = message.data && JSON.parse(message.data);

this.eventBus.dispatchEvent(new Event(data));
await this.fetch();
}
source.onerror = rej;
})

this.pollLock = undefined;
}
Expand Down
20 changes: 0 additions & 20 deletions lib/src/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,6 @@ async function wrapFetch({ url, logger }: FetchProps, init?: RequestInit): Promi
return undefined;
}

export async function fetchEventPoll({ url, logger, signal }: FetchProps & { signal: AbortSignal }): Promise<string | false | undefined> {
try {
const result = await fetch(`${url}/listen`, {
keepalive: true,
signal
});

if (!result.ok) {
logger?.Log("Poll timed out...");
return false;
}

return await result.json();

} catch (err) {
logger?.Error("Fetch failed with error {err}", { err });
}
return undefined;
}

export async function fetchHealth({url, logger}: FetchProps): Promise<boolean> {
return await wrapFetch({url: `${url}/health`, logger}) !== undefined;
}
Expand Down
Loading

0 comments on commit e266b8a

Please sign in to comment.