Skip to content

Commit

Permalink
Fixed problem with no thread avaliability & made SSE spec compliant
Browse files Browse the repository at this point in the history
  • Loading branch information
CEbbinghaus committed Nov 16, 2023
1 parent e266b8a commit 1ae8e9b
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 60 deletions.
32 changes: 13 additions & 19 deletions backend/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ use crate::{
dto::{CardEvent, Game, MicroSDCard},
env::PACKAGE_VERSION,
err::Error,
event::Event,
sdcard::{get_card_cid, is_card_inserted},
};
use actix_web::{
delete, get, http::StatusCode, post, web::{self, Bytes}, Either, HttpResponse, HttpResponseBuilder, Responder,
Result,
delete, get,
http::StatusCode,
post,
web,
Either, HttpResponse, HttpResponseBuilder, Responder, Result,
};
use futures::StreamExt;
use log::debug;
Expand Down Expand Up @@ -57,24 +61,13 @@ pub(crate) async fn health() -> impl Responder {

#[get("/listen")]
pub(crate) async fn listen(sender: web::Data<Sender<CardEvent>>) -> Result<HttpResponse> {
let event_stream = BroadcastStream::new(sender.subscribe()).map(|res|
{
debug!("Streaming Event {:?}", res);
let bytes = match res {
Err(_) => return Err(Error::from_str("Subscriber Closed")),
Ok(value) => {
let data = format!("data: {}\n\n", serde_json::to_string(&value)?);
Bytes::from(data)
}
};

Ok::<actix_web::web::Bytes, Error>(bytes)
}
);
let event_stream = BroadcastStream::new(sender.subscribe()).map(|res| match res {
Err(_) => Err(Error::from_str("Subscriber Closed")),
Ok(value) => Ok(Event::new(value).into()),
});
Ok(HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(event_stream)
)
.streaming(event_stream))
}

#[get("/list")]
Expand Down Expand Up @@ -211,6 +204,7 @@ pub(crate) async fn get_card(
pub(crate) async fn update_cards(
body: web::Json<Vec<MicroSDCard>>,
datastore: web::Data<Arc<Store>>,
sender: web::Data<Sender<CardEvent>>,
) -> Result<impl Responder> {
for card in body.iter() {
let card = card.to_owned();
Expand All @@ -226,6 +220,7 @@ pub(crate) async fn update_cards(
}
}

_ = sender.send(CardEvent::Updated);
Ok(HttpResponse::Ok())
}

Expand All @@ -251,7 +246,6 @@ pub(crate) async fn create_game(
}

datastore.add_game(id.into_inner(), game);

Ok(HttpResponse::Ok())
}

Expand Down
19 changes: 18 additions & 1 deletion backend/src/dto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::err::Error;
use crate::{err::Error, event::EventTrait};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Copy, Clone, Debug)]
Expand All @@ -8,6 +8,23 @@ pub enum CardEvent {
Updated
}

impl EventTrait for CardEvent {
fn get_event(&self) -> Option<&'static str> {
Some(match self {
CardEvent::Inserted => "Inserted",
CardEvent::Removed => "Removed",
CardEvent::Updated => "Updated",
})
}
// fn get_data(&self) -> Option<&'static str> {
// match self {
// CardEvent::Inserted => None,
// CardEvent::Removed => None,
// CardEvent::Updated => Some("Hello, World!"),
// }
// }
}

fn default_true() -> bool {
true
}
Expand Down
93 changes: 93 additions & 0 deletions backend/src/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use actix_web::web::Bytes;

pub(crate) struct Event<T : EventTrait> {
val: T
}

pub(crate) trait EventTrait {
fn get_id(&self) -> Option<&'static str> {
None
}
fn get_event(&self) -> Option<&'static str> {
None
}
fn get_data(&self) -> Option<&'static str> {
None
}
}

impl<T: EventTrait> Event<T> {
pub fn new(val: T) -> Self {
Event { val }
}
}

impl<T : EventTrait> ToString for Event<T> {
fn to_string(&self) -> String {
let mut output = "".to_string();

if let Some(value) = self.val.get_id() {
output += &format!("id: {}\n", value);
}
if let Some(value) = self.val.get_event() {
output += &format!("event: {}\n", value);
}
if let Some(value) = self.val.get_data() {
output += &format!("data: {}\n", value);
}

if output != "" {
output += "\n";
}

return output;
}
}

impl<T: EventTrait> Into<Bytes> for Event<T> {
fn into(self) -> Bytes {
Bytes::from(self.to_string())
}
}

impl<T: EventTrait> From<T> for Event<T> {
fn from(value: T) -> Self {
Event { val: value }
}
}

pub(crate) struct EventBuilder {
id: Option<&'static str>,
event: Option<&'static str>,
data: Option<&'static str>,
}

impl EventBuilder {
pub fn new() -> Self {
EventBuilder { id: None, event: None, data: None }
}
pub fn with_id(mut self, id: &'static str) -> Self {
self.id = Some(id);
self
}
pub fn with_event(mut self, event: &'static str) -> Self {
self.event = Some(event);
self
}
pub fn with_data(mut self, data: &'static str) -> Self {
self.data = Some(data);
self
}
}

impl EventTrait for EventBuilder {
fn get_data(&self) -> Option<&'static str> {
self.data
}
fn get_event(&self) -> Option<&'static str> {
self.event
}
fn get_id(&self) -> Option<&'static str> {
self.id
}
}
3 changes: 2 additions & 1 deletion backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod ds;
mod dto;
mod env;
mod err;
mod event;
mod log;
mod sdcard;
mod steam;
Expand Down Expand Up @@ -52,7 +53,7 @@ async fn run_server(datastore: Arc<Store>, sender: Sender<CardEvent>) -> MainRes
.app_data(web::Data::new(sender.clone()))
.configure(config)
})
.workers(1)
.workers(2)
.bind(("0.0.0.0", PORT))?
.run()
.await
Expand Down
1 change: 1 addition & 0 deletions backend/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub async fn start_watch(datastore: Arc<Store>, sender: Sender<CardEvent>) -> Re
}

if !card_inserted {
debug!("Card was inserted");
let _ = sender.send(CardEvent::Inserted);
mount = None;
}
Expand Down
62 changes: 31 additions & 31 deletions lib/src/MicoSDeckManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fetchCardsAndGames, fetchCardsForGame, fetchCurrentCardAndGames, fetchDeleteCard, fetchEventPoll, fetchHealth, fetchUpdateCard, fetchVersion } from "./backend.js";
import { Event, fetchCardsAndGames, fetchCardsForGame, fetchCurrentCardAndGames, fetchDeleteCard, fetchEventTarget, fetchHealth, fetchUpdateCard, fetchVersion } from "./backend.js";
import Logger from "lipe";
import { CardAndGames, CardsAndGames, MicroSDCard } from "./types.js"

Expand Down Expand Up @@ -79,45 +79,45 @@ export class MicroSDeckManager {
private async subscribeToUpdates() {
let signal = this.abortController.signal;

let sleepDelay = 500;
this.logger?.Debug("Starting poll");



while (true) {
if (signal.aborted) {
this.logger?.Debug("Aborting poll")
return;
}
const handleCallback = async (event: string, data: Event) => {
await this.fetch();
this.eventBus.dispatchEvent(new CustomEvent(event, { detail: data }));
}

if (this.pollLock !== undefined) {
this.logger?.Error("Tried Polling twice at the same time...");
return;
}
let sleepDelay = 5000;

this.pollLock = {};
this.logger?.Debug("Poll");
if (this.pollLock !== undefined) {
this.logger?.Error("Tried Polling twice at the same time...");
return;
}

await new Promise((res, rej) => {
const source = new EventSource(`${this.fetchProps.url}/listen`);
this.abortController.signal.addEventListener("abort", () => {
this.logger?.Debug("Abort was called. Trying to close the EventSource");
source.close();
})
this.pollLock = {};

source.onopen = () => this.logger?.Debug("Successfully subscribed to events");
source.onmessage = async (message) => {
this.logger?.Debug("Recieved message {data}", {message, data: message.data});
let data = message.data && JSON.parse(message.data);
this.logger?.Debug("Starting poll");

this.eventBus.dispatchEvent(new Event(data));
await this.fetch();
try {
while (true) {
if (signal.aborted) {
this.logger?.Debug("Aborting poll")
return;
}
source.onerror = rej;
})

this.logger?.Debug("Poll listen");

await new Promise(async (res, rej) => {
await sleep(sleepDelay);

fetchEventTarget({ ...this.fetchProps, callback: handleCallback }, { signal })
.catch((reason) => {
this.logger?.Warn(`Listen was aborted with reason "${reason}"`);
res(0);
});
})
}
} finally {
this.pollLock = undefined;
}

}

async updateCard(card: MicroSDCard) {
Expand Down
Loading

0 comments on commit 1ae8e9b

Please sign in to comment.