From 8676f9634883e284358ed6bc6abbd8b5edb2822a Mon Sep 17 00:00:00 2001 From: CEbbinghaus Date: Wed, 25 Oct 2023 14:00:18 +0000 Subject: [PATCH 1/2] Added long polling to backend --- backend/src/api.rs | 11 ++++++++++- backend/src/main.rs | 11 ++++++++--- backend/src/watch.rs | 5 ++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/backend/src/api.rs b/backend/src/api.rs index 0dc52ad..73fa7e6 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -7,9 +7,11 @@ use crate::{ use actix_web::{delete, get, post, web, HttpResponse, Responder, Result}; use serde::Deserialize; use std::{ops::Deref, sync::Arc}; +use tokio::sync::broadcast::Sender; pub(crate) fn config(cfg: &mut web::ServiceConfig) { - cfg.service(get_current_card) + cfg.service(listen) + .service(get_current_card) .service(get_current_card_id) .service(get_current_card_and_games) .service(create_card) @@ -35,6 +37,13 @@ pub(crate) async fn health() -> impl Responder { HttpResponse::Ok() } +#[get("/listen")] +pub(crate) async fn listen(sender: web::Data>) -> Result { + sender.subscribe().recv().await.map_err(|_| Error::from_str("Unable to retrieve update"))?; + Ok(HttpResponse::Ok()) +} + + #[get("/ListCardsWithGames")] pub(crate) async fn list_cards_with_games(datastore: web::Data>) -> impl Responder { web::Json(datastore.list_cards_with_games()) diff --git a/backend/src/main.rs b/backend/src/main.rs index a923f52..74ff57b 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -21,6 +21,7 @@ use err::Error; use futures::{pin_mut, select, FutureExt}; use once_cell::sync::Lazy; use simplelog::LevelFilter; +use tokio::sync::broadcast::{self, Sender}; use std::path::PathBuf; use std::process::exit; use std::sync::Arc; @@ -39,7 +40,7 @@ pub fn init() -> Result<(), ::log::SetLoggerError> { type MainResult = Result<(), Error>; -async fn run_server(datastore: Arc) -> MainResult { +async fn run_server(datastore: Arc, sender: Sender<()>) -> MainResult { // let log_filepath = format!("/tmp/{}.log", PACKAGE_NAME); // WriteLogger::init( // #[cfg(debug_assertions)] @@ -58,6 +59,7 @@ async fn run_server(datastore: Arc) -> MainResult { info!("Starting HTTP server..."); HttpServer::new(move || { + let cors = Cors::default() .allow_any_header() .allow_any_method() @@ -68,6 +70,7 @@ async fn run_server(datastore: Arc) -> MainResult { .wrap(cors) // .app_data(web::Data::new(api::AppState{datastore: datastore.clone()})) .app_data(web::Data::new(datastore.clone())) + .app_data(web::Data::new(sender.clone())) .configure(config) }) .workers(1) @@ -119,9 +122,11 @@ async fn main() { info!("Starting Program..."); - let server_future = run_server(store.clone()).fuse(); + let (txtx, _) = broadcast::channel::<()>(1); + + let server_future = run_server(store.clone(), txtx.clone()).fuse(); - let watch_future = start_watch(store.clone()).fuse(); + let watch_future = start_watch(store.clone(), txtx.clone()).fuse(); pin_mut!(server_future, watch_future); diff --git a/backend/src/watch.rs b/backend/src/watch.rs index 041df99..0bf31d8 100644 --- a/backend/src/watch.rs +++ b/backend/src/watch.rs @@ -5,6 +5,7 @@ use std::fs::DirEntry; use std::hash::{Hash, Hasher}; use std::{borrow::Borrow, collections::HashMap, fs, sync::Arc, time::Duration}; use tokio::time::interval; +use tokio::sync::broadcast::Sender; const STEAM_LIB_FILE: &'static str = "/run/media/mmcblk0p1/libraryfolder.vdf"; const STEAM_LIB_FOLDER: &'static str = "/run/media/mmcblk0p1/steamapps/"; @@ -120,7 +121,7 @@ fn read_msd_directory(datastore: &Store) -> Result<(), Error> { Ok(()) } -pub async fn start_watch(datastore: Arc) -> Result<(), Error> { +pub async fn start_watch(datastore: Arc, sender: Sender<()>) -> Result<(), Error> { let mut interval = interval(Duration::from_secs(5)); let mut changeset = ChangeSet::new(); @@ -162,5 +163,7 @@ pub async fn start_watch(datastore: Arc) -> Result<(), Error> { // commit update changeset.update(&cid, hash); + + let _ = sender.send(()); } } From 9faf1c1adfd1e227f6a08e0198ca5cbe38fdf46d Mon Sep 17 00:00:00 2001 From: CEbbinghaus Date: Thu, 26 Oct 2023 03:30:27 +1100 Subject: [PATCH 2/2] Finished implementing Long Polling --- backend/Cargo.lock | 2 +- backend/Cargo.toml | 2 +- backend/src/watch.rs | 22 ++++++++- package.json | 2 +- src/hooks/backend.ts | 88 ++++++++++++++++++++++++++-------- src/index.tsx | 10 +++- src/state/MicoSDeckManager.tsx | 74 +++++++++++++++++++++++++--- 7 files changed, 166 insertions(+), 34 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 66abeb9..bf57d7d 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -311,7 +311,7 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backend" -version = "0.9.3" +version = "0.9.4" dependencies = [ "actix-cors", "actix-web", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7ea1174..c97f56a 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "backend" -version = "0.9.3" +version = "0.9.4" edition = "2021" license = "GPL-2.0" authors = ["Christopher-Robin Ebbinghaus "] diff --git a/backend/src/watch.rs b/backend/src/watch.rs index 0bf31d8..b381697 100644 --- a/backend/src/watch.rs +++ b/backend/src/watch.rs @@ -4,8 +4,8 @@ use std::collections::hash_map::DefaultHasher; use std::fs::DirEntry; use std::hash::{Hash, Hasher}; use std::{borrow::Borrow, collections::HashMap, fs, sync::Arc, time::Duration}; -use tokio::time::interval; use tokio::sync::broadcast::Sender; +use tokio::time::interval; const STEAM_LIB_FILE: &'static str = "/run/media/mmcblk0p1/libraryfolder.vdf"; const STEAM_LIB_FOLDER: &'static str = "/run/media/mmcblk0p1/steamapps/"; @@ -126,14 +126,26 @@ pub async fn start_watch(datastore: Arc, sender: Sender<()>) -> Result<() let mut changeset = ChangeSet::new(); + let mut card_inserted = false; + loop { interval.tick().await; // No card no worries. if !is_card_inserted() { + // The card has been removed since the last check + if card_inserted { + let _ = sender.send(()); + } + + card_inserted = false; continue; } + // was the card inserted since the last check. + let card_changed = !card_inserted; + card_inserted = true; + // There is no steam directory so it hasn't been formatted. if !is_card_steam_formatted() { continue; @@ -149,7 +161,13 @@ pub async fn start_watch(datastore: Arc, sender: Sender<()>) -> Result<() // Do we have changes in the steam directory. This should only occur when something has been added/deleted let hash = match changeset.is_changed(&cid) { - None => continue, + None => { + // A new card has been inserted but no content on it changed. + if card_changed { + let _ = sender.send(()); + } + continue; + } Some(v) => v, }; diff --git a/package.json b/package.json index a65ddc4..2e95454 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "microsdeck", - "version": "0.9.3", + "version": "0.9.4", "description": "A SteamDeck plugin to track games across MicroSD cards", "scripts": { "build": "shx rm -rf dist && rollup -c", diff --git a/src/hooks/backend.ts b/src/hooks/backend.ts index f9685bd..deefba4 100644 --- a/src/hooks/backend.ts +++ b/src/hooks/backend.ts @@ -12,7 +12,7 @@ export async function SetNameForMicroSDCard(CardId: string, Name: string) { body: JSON.stringify({ id: CardId, name: Name }), referrerPolicy: "unsafe-url", }) - .catch(Error => Logger.Error("There was a critical error: \"{Error}\"", { Error })); + .catch(Error => Logger.Error("There was a critical error: \"{Error}\"", { Error })); } export function GetCardsForGame(appId: string) { @@ -38,35 +38,81 @@ export function GetCardsForGame(appId: string) { } } +export async function fetchEventPoll({signal}: {signal: AbortSignal}): Promise { + try { + const response = await fetch(`${API_URL}/listen`, { + keepalive: true, + referrerPolicy: "unsafe-url", + signal + }); + + if (response.ok) { + return true; + } + + Logger.Log("Poll timed out...") + return false; + } catch (error) { + Logger.Error("Lost contact with server.."); + return undefined; + } +} + export async function fetchDeleteCard(card: MicroSDCard) { - await fetch(`${API_URL}/card/${card.uid}`, { - method: "DELETE", - referrerPolicy: "unsafe-url", - }) - .catch(Error => Logger.Error("There was a critical error: \"{Error}\"", { Error })); + try { + await fetch(`${API_URL}/card/${card.uid}`, { + method: "DELETE", + referrerPolicy: "unsafe-url", + }); + } catch (error) { + Logger.Error("There was a critical error: \"{Error}\"", { Error }); + } } export async function fetchUpdateCard(card: MicroSDCard) { - await fetch(`${API_URL}/card/${card.uid}`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(card), - referrerPolicy: "unsafe-url", - }) - .catch(Error => Logger.Error("There was a critical error: \"{Error}\"", { Error })); + try { + await fetch(`${API_URL}/card/${card.uid}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(card), + referrerPolicy: "unsafe-url", + }); + } catch (error) { + Logger.Error("There was a critical error: \"{Error}\"", { Error }); + } } export async function fetchCurrentCardAndGames(): Promise { - return await fetch(`${API_URL}/current`, { referrerPolicy: "unsafe-url", }) - .then(res => res.json()) - .catch(Error => Logger.Error("There was a critical error: \"{Error}\"", { Error })); + try { + let result = await fetch(`${API_URL}/current`, { referrerPolicy: "unsafe-url", }); + + if(!result.ok) { + Logger.Warn("Fetch returned non 200 code {status} status, {statusText}", {status: result.status, statusText: result.statusText}) + return undefined; + } + + return await result.json(); + } catch (error) { + Logger.Error("There was a critical error: \"{Error}\"", { Error }); + return undefined; + } } export async function fetchCardsAndGames(): Promise { - return await fetch(`${API_URL}/ListCardsWithGames`, { referrerPolicy: "unsafe-url", }) - .then(res => res.json()) - .catch(Error => Logger.Error("There was a critical error: \"{Error}\"", { Error })); + try { + let result = await fetch(`${API_URL}/ListCardsWithGames`, { referrerPolicy: "unsafe-url", }); + + if(!result.ok) { + Logger.Warn("Fetch returned non 200 code {status} status, {statusText}", {status: result.status, statusText: result.statusText}) + return undefined; + } + + return await result.json(); + } catch (error) { + Logger.Error("There was a critical error: \"{Error}\"", { Error }); + return undefined; + } } export function GetCardsAndGames() { diff --git a/src/index.tsx b/src/index.tsx index 6a8b5f3..6d90df6 100755 --- a/src/index.tsx +++ b/src/index.tsx @@ -162,8 +162,10 @@ export default definePlugin((serverApi: ServerAPI) => { }); const microSDeckManager = new MicroSDeckManager(); - //@ts-ignore sssshhhhh - window.microSDeckManager = microSDeckManager; + + //@ts-ignore ssshhhh 🤫 + window.MicroSDeck = microSDeckManager; + microSDeckManager.init(); DeckyAPI.SetApi(serverApi); @@ -182,6 +184,10 @@ export default definePlugin((serverApi: ServerAPI) => { onDismount() { serverApi.routerHook.removeRoute(DOCUMENTATION_PATH); patch && serverApi.routerHook.removePatch('/library/app/:appid', patch); + + //@ts-ignore + window.MicroSDeck = null; + microSDeckManager.deinit(); }, }; }); diff --git a/src/state/MicoSDeckManager.tsx b/src/state/MicoSDeckManager.tsx index 8e0f8ed..5ed4e30 100644 --- a/src/state/MicoSDeckManager.tsx +++ b/src/state/MicoSDeckManager.tsx @@ -1,16 +1,32 @@ import { Logger } from "../Logging"; -import { fetchCardsAndGames, fetchCurrentCardAndGames, fetchDeleteCard, fetchUpdateCard } from "../hooks/backend"; +import { fetchCardsAndGames, fetchCurrentCardAndGames, fetchDeleteCard, fetchEventPoll, fetchUpdateCard } from "../hooks/backend"; import { CardAndGames, CardsAndGames, MicroSDCard } from "../lib/Types" +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(() => resolve(), ms)); +} + export class MicroSDeckManager { + private abortController = new AbortController(); + public eventBus = new EventTarget(); - private currentCardAndGames: CardAndGames | undefined; + private currentCardAndGames: CardAndGames | undefined; private cardsAndGames: CardsAndGames = []; + private pollLock: any | undefined; + init() { + Logger.Log("Initializing MicroSDeckManager"); + this.init = () => { throw "Do Not call init more than once"; }; this.fetch(); + this.subscribeToUpdates(); + } + + deinit() { + Logger.Log("Deinitializing MicroSDeckManager"); + this.abortController.abort("deinit"); } async fetch() { @@ -18,7 +34,7 @@ export class MicroSDeckManager { this.cardsAndGames = await fetchCardsAndGames() || []; this.eventBus.dispatchEvent(new Event("stateUpdate")); } - + getCardsAndGames() { return { cardsAndGames: this.cardsAndGames @@ -31,18 +47,64 @@ export class MicroSDeckManager { } } + async subscribeToUpdates() { + let signal = this.abortController.signal; + + let sleepDelay = 500; + Logger.Debug("Starting poll"); + + while (true) { + if(signal.aborted) { + Logger.Debug("Aborting poll") + return; + } + + if (this.pollLock !== undefined) { + Logger.Error("Tried Polling twice at the same time..."); + return; + } + + this.pollLock = {}; + Logger.Debug("Poll"); + + let result = await fetchEventPoll({signal}); + + Logger.Debug("Result was: " + (result === undefined ? "undefined" : result) , {result}); + + switch(result) { + // Server is down. Lets try again but back off a bit + case undefined: + Logger.Warn("Unable to contact Server. Backing off and waiting {sleepDelay}ms", {sleepDelay}); + await sleep(sleepDelay *= 1.5); + break; + + // We got an update. Time to refresh. + case true: + Logger.Debug("Card detected an update."); + await this.fetch(); + + // Request must have timed out + case false: + sleepDelay = 100; + break; + } + + this.pollLock = undefined; + } + } + async updateCard(card: MicroSDCard) { Logger.Debug("Updating card {uid}", card); await fetchUpdateCard(card); await this.fetch() } - + async deleteCard(card: MicroSDCard) { - Logger.Log("Deleting Card {uid}", card); + Logger.Debug("Deleting Card {uid}", card); await fetchDeleteCard(card); await this.fetch(); } - + async hideCard(card: MicroSDCard) { card.hidden = true; //TODO: Implement