diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 8bd501a..15266ec 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -197,19 +197,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "actix-ws" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3" -dependencies = [ - "actix-codec", - "actix-http", - "actix-web", - "futures-core", - "tokio", -] - [[package]] name = "addr2line" version = "0.21.0" @@ -312,11 +299,10 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backend" -version = "0.9.6" +version = "0.9.7" dependencies = [ "actix-cors", "actix-web", - "actix-ws", "async-trait", "chrono", "criterion", @@ -332,6 +318,7 @@ dependencies = [ "simplelog", "slotmap", "tokio", + "tokio-stream", ] [[package]] @@ -1693,6 +1680,18 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.7.8" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 29ad0a2..2683a25 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "backend" -version = "0.9.6" +version = "0.9.7" edition = "2021" license = "GPL-2.0" authors = ["Christopher-Robin Ebbinghaus "] @@ -8,22 +8,22 @@ authors = ["Christopher-Robin Ebbinghaus "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-web = "4.4.0" actix-cors = "0.6.4" +actix-web = "4.4.0" log = "0.4" simplelog = "0.12" once_cell = "1.18.0" chrono = "0.4.24" serde_json = "1.0" tokio = { version = "1.26.0", features = ["full"] } +tokio-stream = { version = "0.1.14", features = ["sync"] } futures = "0.3.26" -keyvalues-serde = "0.1.0" # Key Value format that Steam uses +keyvalues-serde = "0.1.0" # Key Value format that Steam uses async-trait = "0.1.68" serde = { version = "1.0.145", features = ["derive", "rc"] } slotmap = { version = "1.0.6", features = ["serde"] } glob = "0.3.1" semver = { version = "1.0.20", features = ["serde"] } -actix-ws = "0.2.5" either = "1.9.0" [dev-dependencies] diff --git a/backend/src/api.rs b/backend/src/api.rs index d37ff5c..2b23f4a 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -5,12 +5,16 @@ use crate::{ err::Error, sdcard::{get_card_cid, is_card_inserted}, }; -use actix_web::{delete, get, post, web, HttpResponse, Responder, Result, http::StatusCode, Either, HttpResponseBuilder}; -use actix_ws::Message; +use actix_web::{ + delete, get, http::StatusCode, post, web::{self, Bytes}, Either, HttpResponse, HttpResponseBuilder, Responder, + Result, +}; use futures::StreamExt; +use log::debug; use serde::Deserialize; use std::{ops::Deref, sync::Arc}; use tokio::sync::broadcast::Sender; +use tokio_stream::wrappers::BroadcastStream; pub(crate) fn config(cfg: &mut web::ServiceConfig) { cfg // @@ -52,10 +56,25 @@ pub(crate) async fn health() -> impl Responder { } #[get("/listen")] -pub(crate) async fn listen(sender: web::Data>) -> Result { - Ok(web::Json(sender.subscribe().recv().await.map_err( - |_| Error::from_str("Unable to retrieve update"), - )?)) +pub(crate) async fn listen(sender: web::Data>) -> Result { + let event_stream = BroadcastStream::new(sender.subscribe()).map(|res| + { + debug!("Streaming Event {:?}", res); + let bytes = match res { + Err(_) => return Err(Error::from_str("Subscriber Closed")), + Ok(value) => { + let data = format!("data: {}\n\n", serde_json::to_string(&value)?); + Bytes::from(data) + } + }; + + Ok::(bytes) + } + ); + Ok(HttpResponse::Ok() + .content_type("text/event-stream") + .streaming(event_stream) + ) } #[get("/list")] @@ -90,12 +109,20 @@ pub(crate) async fn get_current_card_and_games( datastore: web::Data>, ) -> Result> { if !is_card_inserted() { - return Ok(Either::Right(HttpResponseBuilder::new(StatusCode::NO_CONTENT).reason("No Card inserted").finish())); + return Ok(Either::Right( + HttpResponseBuilder::new(StatusCode::NO_CONTENT) + .reason("No Card inserted") + .finish(), + )); } match get_card_cid() { Some(uid) => Ok(Either::Left(web::Json(datastore.get_card_and_games(&uid)?))), - None => Ok(Either::Right(HttpResponseBuilder::new(StatusCode::NO_CONTENT).reason("Card Id could not be resolved").finish())) + None => Ok(Either::Right( + HttpResponseBuilder::new(StatusCode::NO_CONTENT) + .reason("Card Id could not be resolved") + .finish(), + )), } } @@ -105,7 +132,7 @@ pub(crate) async fn get_current_card(datastore: web::Data>) -> Result return Err(Error::from_str("No card is inserted").into()); } - let uid = get_card_cid().ok_or(Error::Error("Unable to evaluate Card Id".into()))?; + let uid = get_card_cid().ok_or(Error::from_str("Unable to evaluate Card Id"))?; Ok(web::Json(datastore.get_card(&uid)?)) } diff --git a/backend/src/ds.rs b/backend/src/ds.rs index f1ef3bd..8b7d92d 100644 --- a/backend/src/ds.rs +++ b/backend/src/ds.rs @@ -94,13 +94,13 @@ impl StoreData { let node = self .node_ids .get(card_id) - .ok_or(Error::Error("Card Id not present".into()))?; + .ok_or(Error::from_str("Card Id not present"))?; match self.nodes.get_mut(*node).unwrap().element { StoreElement::Card(ref mut card) => { func(card)?; } - StoreElement::Game(_) => return Err(Error::Error("Expected Card, got Game".into())), + StoreElement::Game(_) => return Err(Error::from_str("Expected Card, got Game")), } Ok(()) @@ -427,6 +427,7 @@ impl Store { } pub fn remove_element(&self, id: &str) -> Result<(), Error> { + // these two operations have to happen within a single scope otherwise the try_write_to_file causes a deadlock { let mut lock = self.data.write().unwrap(); lock.remove_item(id)?; diff --git a/backend/src/err.rs b/backend/src/err.rs index 05ba25b..5e39568 100644 --- a/backend/src/err.rs +++ b/backend/src/err.rs @@ -1,42 +1,53 @@ #![allow(dead_code)] -use std::error; + use std::fmt; use actix_web::ResponseError; #[derive(Debug)] -pub enum Error { - Error(String), +struct StdErr; + +impl std::error::Error for StdErr{} + +impl fmt::Display for StdErr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "StdErr") + } } +#[derive(Debug)] +pub struct Error(String); + impl Error { pub fn new_boxed(value: &str) -> Box { - Box::new(Error::Error(value.to_string())) + Box::new(Error(value.to_string())) } pub fn from_str(value: &str) -> Self { - Error::Error(value.to_string()) + Error(value.to_string()) } pub fn new_res(value: &str) -> Result { - Err(Error::Error(value.to_string())) + Err(Error(value.to_string())) } } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - // Both underlying errors already impl `Display`, so we defer to - // their implementations. - Error::Error(err) => write!(f, "Error: {}", err), - } + write!(f, "Error: {}", self) } } -impl From for Error { +impl Into> for Error { + fn into(self) -> Box { + Box::new(StdErr) + } +} + +impl From for Error { fn from(e: T) -> Self { - Error::Error(e.to_string()) + Error(e.to_string()) } } diff --git a/backend/src/log.rs b/backend/src/log.rs index 19bbfea..2f30982 100644 --- a/backend/src/log.rs +++ b/backend/src/log.rs @@ -30,9 +30,15 @@ impl Logger { .ok()?; let max_level = env::var("LOG_LEVEL") - .map_err(|e| Error::from(e)) - .and_then(|v| Level::from_str(&v).map_err(|e| Error::from(e))) - .unwrap_or(Level::Info); + .map_err(Error::from) + .and_then(|v| Level::from_str(&v).map_err(Error::from)) + .unwrap_or({ + if cfg!(debug_assertions) { + Level::Debug + } else { + Level::Info + } + }); println!("Logging enabled to {file_path} with level {max_level}"); diff --git a/backend/src/watch.rs b/backend/src/watch.rs index 2be33ba..15615a4 100644 --- a/backend/src/watch.rs +++ b/backend/src/watch.rs @@ -28,7 +28,7 @@ fn read_msd_directory(datastore: &Store, mount: &Option) -> Result<(), E libid: library.contentid.clone(), mount: mount.clone(), name: library.label, - position: 0, + position: u32::MAX, hidden: false, }, ); diff --git a/lib/package.json b/lib/package.json index ccb0a82..76cecda 100644 --- a/lib/package.json +++ b/lib/package.json @@ -1,6 +1,6 @@ { "name": "@cebbinghaus/microsdeck", - "version": "0.9.6", + "version": "0.9.7", "description": "", "keywords": [], "author": "CEbbinghaus", diff --git a/lib/src/state/MicoSDeckManager.ts b/lib/src/MicoSDeckManager.ts similarity index 78% rename from lib/src/state/MicoSDeckManager.ts rename to lib/src/MicoSDeckManager.ts index a440182..5c20cfd 100644 --- a/lib/src/state/MicoSDeckManager.ts +++ b/lib/src/MicoSDeckManager.ts @@ -1,6 +1,6 @@ -import { fetchCardsAndGames, fetchCardsForGame, fetchCurrentCardAndGames, fetchDeleteCard, fetchEventPoll, fetchHealth, fetchUpdateCard, fetchVersion } from "../backend.js"; +import { fetchCardsAndGames, fetchCardsForGame, fetchCurrentCardAndGames, fetchDeleteCard, fetchEventPoll, fetchHealth, fetchUpdateCard, fetchVersion } from "./backend.js"; import Logger from "lipe"; -import { CardAndGames, CardsAndGames, MicroSDCard } from "../types.js" +import { CardAndGames, CardsAndGames, MicroSDCard } from "./types.js" function sleep(ms: number): Promise { return new Promise(resolve => setTimeout(() => resolve(), ms)); @@ -44,6 +44,7 @@ export class MicroSDeckManager { } destruct() { + this.logger?.Debug("Deconstruct Called"); if (this.isDestructed) return; this.isDestructed = true; this.logger?.Log("Deinitializing MicroSDeckManager"); @@ -81,6 +82,8 @@ export class MicroSDeckManager { let sleepDelay = 500; this.logger?.Debug("Starting poll"); + + while (true) { if (signal.aborted) { this.logger?.Debug("Aborting poll") @@ -95,27 +98,23 @@ export class MicroSDeckManager { this.pollLock = {}; this.logger?.Debug("Poll"); - let result = await fetchEventPoll({ ...this.fetchProps, signal }); - - this.logger?.Debug("Backend detected an update: {result}", { result }); - - switch (result) { - // Server is down. Lets try again but back off a bit - case undefined: - this.logger?.Warn("Unable to contact Server. Backing off and waiting {sleepDelay}ms", { sleepDelay }); - await sleep(sleepDelay = Math.min(sleepDelay * 1.5, 1000 * 60)); - break; - - // Request must have timed out - case false: - sleepDelay = 100; - break; - - // We got an update. Time to refresh. - default: - this.eventBus.dispatchEvent(new Event(result)); - await this.fetch(); - } + await new Promise((res, rej) => { + const source = new EventSource(`${this.fetchProps.url}/listen`); + this.abortController.signal.addEventListener("abort", () => { + this.logger?.Debug("Abort was called. Trying to close the EventSource"); + source.close(); + }) + + source.onopen = () => this.logger?.Debug("Successfully subscribed to events"); + source.onmessage = async (message) => { + this.logger?.Debug("Recieved message {data}", {message, data: message.data}); + let data = message.data && JSON.parse(message.data); + + this.eventBus.dispatchEvent(new Event(data)); + await this.fetch(); + } + source.onerror = rej; + }) this.pollLock = undefined; } diff --git a/lib/src/backend.ts b/lib/src/backend.ts index 708e050..504236b 100644 --- a/lib/src/backend.ts +++ b/lib/src/backend.ts @@ -32,26 +32,6 @@ async function wrapFetch({ url, logger }: FetchProps, init?: RequestInit): Promi return undefined; } -export async function fetchEventPoll({ url, logger, signal }: FetchProps & { signal: AbortSignal }): Promise { - try { - const result = await fetch(`${url}/listen`, { - keepalive: true, - signal - }); - - if (!result.ok) { - logger?.Log("Poll timed out..."); - return false; - } - - return await result.json(); - - } catch (err) { - logger?.Error("Fetch failed with error {err}", { err }); - } - return undefined; -} - export async function fetchHealth({url, logger}: FetchProps): Promise { return await wrapFetch({url: `${url}/health`, logger}) !== undefined; } diff --git a/lib/src/state/MicroSDeckContext.tsx b/lib/src/components/MicroSDeckContext.tsx similarity index 95% rename from lib/src/state/MicroSDeckContext.tsx rename to lib/src/components/MicroSDeckContext.tsx index 210cdb6..c358450 100644 --- a/lib/src/state/MicroSDeckContext.tsx +++ b/lib/src/components/MicroSDeckContext.tsx @@ -1,5 +1,5 @@ import { createContext, useContext, useEffect, useState } from "react"; -import { MicroSDeckManager } from "./MicoSDeckManager.js"; +import { MicroSDeckManager } from "../MicoSDeckManager.js"; import { CardAndGames, CardsAndGames } from "../types.js"; const MicroSDeckContext = createContext(null as any); diff --git a/lib/src/index.ts b/lib/src/index.ts index 0c42cf9..a152cd3 100644 --- a/lib/src/index.ts +++ b/lib/src/index.ts @@ -1,4 +1,4 @@ export * from "./types.js"; export * from "./hooks.js"; -export * from "./state/MicoSDeckManager.js" -export * from "./state/MicroSDeckContext.js" \ No newline at end of file +export * from "./MicoSDeckManager.js" +export * from "./components/MicroSDeckContext.js" \ No newline at end of file diff --git a/package.json b/package.json index 9fcb5ac..024adb1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "microsdeck", - "version": "0.9.6", + "version": "0.9.7", "description": "A SteamDeck plugin to track games across MicroSD cards", "keywords": [ "decky",