Skip to content

Commit

Permalink
backend/http: add "intelligent" graph locking behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
RuthgerD committed Jan 10, 2022
1 parent 5e8c057 commit ab48146
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 31 deletions.
9 changes: 4 additions & 5 deletions backend/crates/eiffelvis_core/src/domain/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Key for Uuid {}

pub trait EiffelVisApp: EiffelGraph {
/// Inserts a new eiffel event into storage
fn push(&mut self, event: BaseEvent);
fn push(&mut self, event: BaseEvent) -> bool;

/// Looks up the event of given id
fn get_event(&self, id: Uuid) -> Option<&BaseEvent>;
Expand All @@ -29,15 +29,14 @@ pub trait EiffelVisApp: EiffelGraph {
}

impl<G: EiffelGraph> EiffelVisApp for G {
fn push(&mut self, event: BaseEvent) {
fn push(&mut self, event: BaseEvent) -> bool {
let links = event.links.clone();
self.add_node_with_edges(
event.meta.id,
event,
links.into_iter().map(|link| (link.target, link.link_type)),
);

println!("Graph size: {}", self.node_count());
)
.is_some()
}

fn get_event(&self, id: Uuid) -> Option<&BaseEvent> {
Expand Down
15 changes: 10 additions & 5 deletions backend/crates/eiffelvis_core/src/graph_storage/chunked_storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::graph;
use crate::graph::{self, Indexable};
use ahash::RandomState;
use indexmap::IndexMap;
use std::{
Expand Down Expand Up @@ -98,7 +98,12 @@ impl<K: graph::Key, N, E> ChunkedGraph<K, N, E> {
}
}

fn add_node(&mut self, key: K, data: N) -> ChunkedIndex {
fn add_node(&mut self, key: K, data: N) -> Option<ChunkedIndex> {
// TODO: decide if we want to be correct or not :/
if self.get(key).is_some() {
return None;
}

if self.store[self.head_chunk()].len() >= self.max_elements() {
self.newest_generation += 1;
if self.chunks() < self.max_chunks() {
Expand All @@ -116,10 +121,10 @@ impl<K: graph::Key, N, E> ChunkedGraph<K, N, E> {

self.store[head_chunk].insert(key, Element(NodeData { data }, Vec::default()));

ChunkedIndex::new(
Some(ChunkedIndex::new(
self.newest_generation,
(self.store[head_chunk].len() - 1) as u32,
)
))
}

fn add_edge(&mut self, a: K, b: K, data: E) {
Expand Down Expand Up @@ -321,7 +326,7 @@ impl<'a, K: graph::Key, N, E> graph::ItemIter for ChunkedGraph<K, N, E> {

impl<'a, K: graph::Key, N, E> graph::Graph for ChunkedGraph<K, N, E> {
fn add_node(&mut self, key: K, data: N) -> Option<ChunkedIndex> {
Some(self.add_node(key, data))
self.add_node(key, data)
}

fn add_edge(&mut self, a: K, b: K, data: E) {
Expand Down
53 changes: 42 additions & 11 deletions backend/crates/eiffelvis_http/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{collections::VecDeque, ops::Add};

use crate::*;
use serde::Serialize;

Expand Down Expand Up @@ -32,7 +34,7 @@ pub(crate) fn make_service<T: EiffelVisHttpApp>(app: App<T>) -> Router {
pub async fn event_dump<T: EiffelVisHttpApp>(
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
let lk = app.graph.read().await;

let dump = lk.dump::<&BaseEvent>();

Expand All @@ -44,7 +46,7 @@ pub async fn get_event<T: EiffelVisHttpApp>(
Path(find_id): Path<Uuid>,
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
let lk = app.graph.read().await;
if let Some(event) = lk.get_event(find_id) {
Json(event).into_response()
} else {
Expand All @@ -61,7 +63,7 @@ pub async fn events_with_root<T: EiffelVisHttpApp>(
Path(find_id): Path<Uuid>,
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
let lk = app.graph.read().await;
Json(lk.get_subgraph_with_roots::<&BaseEvent>(&[find_id])).into_response()
}

Expand All @@ -86,6 +88,15 @@ pub async fn establish_websocket<T: EiffelVisHttpApp>(
ws.on_upgrade(move |mut socket| async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));

interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let hist_max = 8;
let mut delta_hist = VecDeque::with_capacity(hist_max);
let mut last_heurstic = 0;
let mut heuristic_changed = false;

delta_hist.push_back(0);

let mut req_handler: Option<TrackedQuery<_>> = None;

while let Ok(()) = tokio::select! {
Expand All @@ -101,22 +112,42 @@ pub async fn establish_websocket<T: EiffelVisHttpApp>(
};
let res = QueryRes { repr: msg.clone(), error: res };
println!("Request {:?}", res);
heuristic_changed = true;
socket.send(Message::Text(serde_json::to_string(&res).unwrap())).await.map_err(|_| ())
},
_ => Err(())
}
},
_ = interval.tick() => {
if let Some(handler) = req_handler.as_mut() {
let events: Vec<LeanEvent> = handler.handle(&*app.read().await);
if !events.is_empty() {
socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ())
} else {
Ok(())
let heuristic = app.heuristic.load(std::sync::atomic::Ordering::Relaxed);
let average = delta_hist.iter().fold(0, Add::add) / delta_hist.len() as u64;
let delta = heuristic - last_heurstic;

if last_heurstic != heuristic {
// the delta might be too high for us to lock the graph so we need to remember
heuristic_changed = true;
}

let mut res = Ok(());
if heuristic_changed && delta <= average {
if let Some(handler) = req_handler.as_mut() {
info!("locking graph!");
let events: Vec<LeanEvent> = handler.handle(&*app.graph.read().await);
if !events.is_empty() {
res = socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ())
}
}
} else {
Ok(())

heuristic_changed = false;
}

if delta_hist.len() >= hist_max {
delta_hist.pop_front();
}
delta_hist.push_back(delta);
last_heurstic = heuristic;

res
}
} {}

Expand Down
12 changes: 10 additions & 2 deletions backend/crates/eiffelvis_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::info;
use std::{
io,
net::{IpAddr, SocketAddr},
sync::Arc,
sync::{atomic::AtomicU64, Arc},
};

use tower_http::cors::{any, CorsLayer};
Expand All @@ -26,7 +26,15 @@ use eiffelvis_core::domain::{app::EiffelVisApp, types::BaseEvent};
pub trait EiffelVisHttpApp: EiffelVisApp + Send + Sync + 'static {}
impl<T> EiffelVisHttpApp for T where T: EiffelVisApp + Send + Sync + 'static {}

type App<T> = Arc<tokio::sync::RwLock<T>>;
pub struct AppData<T> {
/// Graph to serve to the web
pub graph: tokio::sync::RwLock<T>,
/// Heuristic used to determine when locking of graph should occur,
/// library consumers should increment this counter on a succesful push to the graph.
pub heuristic: AtomicU64,
}

type App<T> = Arc<AppData<T>>;

/// Takes an eiffelvis app and binds the http server on the given address.
/// This is likely the only function you'll ever need to call.
Expand Down
25 changes: 17 additions & 8 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
use std::{sync::Arc, time::Duration};

use eiffelvis_core::{domain::app::EiffelVisApp, graph_storage::ChunkedGraph};
use eiffelvis_http::AppData;
use structopt::StructOpt;
use tracing::info;
use tracing::{info, warn};

/// Command line options
#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -61,10 +62,10 @@ async fn main() {

let cli = Cli::from_args();

let graph = Arc::new(tokio::sync::RwLock::new(ChunkedGraph::new(
cli.max_chunks,
cli.chunk_size,
)));
let graph = Arc::new(AppData {
heuristic: 0.into(),
graph: tokio::sync::RwLock::new(ChunkedGraph::new(cli.max_chunks, cli.chunk_size)),
});

let http_server_handle = eiffelvis_http::Handle::new();
let http_server = tokio::spawn(eiffelvis_http::app(
Expand All @@ -88,12 +89,20 @@ async fn main() {
loop {
if let Some(bytes) = event_parser.next().await {
if let Ok(des) = serde_json::from_slice(&bytes) {
EiffelVisApp::push(&mut *graph.write().await, des);
let mut lk = graph.graph.write().await;
if EiffelVisApp::push(&mut *lk, des) {
graph
.heuristic
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
info!("size: {}", lk.node_count());
} else {
warn!("Failed to push graph! maybe a duplicate event?")
}
} else {
info!("Received new message but failed to deserialize");
warn!("Received new message but failed to deserialize");
}
} else {
info!("Event stream failed, sleeping for 5 seconds to retry");
warn!("Event stream failed, sleeping for 5 seconds to retry");
tokio::time::sleep(Duration::from_secs(timeout)).await;
}
}
Expand Down

0 comments on commit ab48146

Please sign in to comment.