Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend/http: add "intelligent" graph locking behavior #84

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions backend/crates/eiffelvis_core/src/domain/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Key for Uuid {}

pub trait EiffelVisApp: EiffelGraph {
/// Inserts a new eiffel event into storage
fn push(&mut self, event: BaseEvent);
fn push(&mut self, event: BaseEvent) -> bool;

/// Looks up the event of given id
fn get_event(&self, id: Uuid) -> Option<&BaseEvent>;
Expand All @@ -29,15 +29,14 @@ pub trait EiffelVisApp: EiffelGraph {
}

impl<G: EiffelGraph> EiffelVisApp for G {
fn push(&mut self, event: BaseEvent) {
fn push(&mut self, event: BaseEvent) -> bool {
let links = event.links.clone();
self.add_node_with_edges(
event.meta.id,
event,
links.into_iter().map(|link| (link.target, link.link_type)),
);

println!("Graph size: {}", self.node_count());
)
.is_some()
}

fn get_event(&self, id: Uuid) -> Option<&BaseEvent> {
Expand Down
15 changes: 10 additions & 5 deletions backend/crates/eiffelvis_core/src/graph_storage/chunked_storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::graph;
use crate::graph::{self, Indexable};
use ahash::RandomState;
use indexmap::IndexMap;
use std::{
Expand Down Expand Up @@ -98,7 +98,12 @@ impl<K: graph::Key, N, E> ChunkedGraph<K, N, E> {
}
}

fn add_node(&mut self, key: K, data: N) -> ChunkedIndex {
fn add_node(&mut self, key: K, data: N) -> Option<ChunkedIndex> {
// TODO: decide if we want to be correct or not :/
if self.get(key).is_some() {
return None;
}

if self.store[self.head_chunk()].len() >= self.max_elements() {
self.newest_generation += 1;
if self.chunks() < self.max_chunks() {
Expand All @@ -116,10 +121,10 @@ impl<K: graph::Key, N, E> ChunkedGraph<K, N, E> {

self.store[head_chunk].insert(key, Element(NodeData { data }, Vec::default()));

ChunkedIndex::new(
Some(ChunkedIndex::new(
self.newest_generation,
(self.store[head_chunk].len() - 1) as u32,
)
))
}

fn add_edge(&mut self, a: K, b: K, data: E) {
Expand Down Expand Up @@ -321,7 +326,7 @@ impl<'a, K: graph::Key, N, E> graph::ItemIter for ChunkedGraph<K, N, E> {

impl<'a, K: graph::Key, N, E> graph::Graph for ChunkedGraph<K, N, E> {
fn add_node(&mut self, key: K, data: N) -> Option<ChunkedIndex> {
Some(self.add_node(key, data))
self.add_node(key, data)
}

fn add_edge(&mut self, a: K, b: K, data: E) {
Expand Down
53 changes: 42 additions & 11 deletions backend/crates/eiffelvis_http/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{collections::VecDeque, ops::Add};

use crate::*;
use serde::Serialize;

Expand Down Expand Up @@ -32,7 +34,7 @@ pub(crate) fn make_service<T: EiffelVisHttpApp>(app: App<T>) -> Router {
pub async fn event_dump<T: EiffelVisHttpApp>(
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
let lk = app.graph.read().await;

let dump = lk.dump::<&BaseEvent>();

Expand All @@ -44,7 +46,7 @@ pub async fn get_event<T: EiffelVisHttpApp>(
Path(find_id): Path<Uuid>,
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
let lk = app.graph.read().await;
if let Some(event) = lk.get_event(find_id) {
Json(event).into_response()
} else {
Expand All @@ -61,7 +63,7 @@ pub async fn events_with_root<T: EiffelVisHttpApp>(
Path(find_id): Path<Uuid>,
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
let lk = app.graph.read().await;
Json(lk.get_subgraph_with_roots::<&BaseEvent>(&[find_id])).into_response()
}

Expand All @@ -86,6 +88,15 @@ pub async fn establish_websocket<T: EiffelVisHttpApp>(
ws.on_upgrade(move |mut socket| async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));

interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let hist_max = 8;
let mut delta_hist = VecDeque::with_capacity(hist_max);
let mut last_heurstic = 0;
let mut heuristic_changed = false;

delta_hist.push_back(0);

let mut req_handler: Option<TrackedQuery<_>> = None;

while let Ok(()) = tokio::select! {
Expand All @@ -101,22 +112,42 @@ pub async fn establish_websocket<T: EiffelVisHttpApp>(
};
let res = QueryRes { repr: msg.clone(), error: res };
println!("Request {:?}", res);
heuristic_changed = true;
socket.send(Message::Text(serde_json::to_string(&res).unwrap())).await.map_err(|_| ())
},
_ => Err(())
}
},
_ = interval.tick() => {
if let Some(handler) = req_handler.as_mut() {
let events: Vec<LeanEvent> = handler.handle(&*app.read().await);
if !events.is_empty() {
socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ())
} else {
Ok(())
let heuristic = app.heuristic.load(std::sync::atomic::Ordering::Relaxed);
let average = delta_hist.iter().fold(0, Add::add) / delta_hist.len() as u64;
let delta = heuristic - last_heurstic;

if last_heurstic != heuristic {
// the delta might be too high for us to lock the graph so we need to remember
heuristic_changed = true;
}

let mut res = Ok(());
if heuristic_changed && delta <= average {
if let Some(handler) = req_handler.as_mut() {
info!("locking graph!");
let events: Vec<LeanEvent> = handler.handle(&*app.graph.read().await);
if !events.is_empty() {
res = socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ())
}
}
} else {
Ok(())

heuristic_changed = false;
}

if delta_hist.len() >= hist_max {
delta_hist.pop_front();
}
delta_hist.push_back(delta);
last_heurstic = heuristic;

res
}
} {}

Expand Down
12 changes: 10 additions & 2 deletions backend/crates/eiffelvis_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::info;
use std::{
io,
net::{IpAddr, SocketAddr},
sync::Arc,
sync::{atomic::AtomicU64, Arc},
};

use tower_http::cors::{any, CorsLayer};
Expand All @@ -26,7 +26,15 @@ use eiffelvis_core::domain::{app::EiffelVisApp, types::BaseEvent};
pub trait EiffelVisHttpApp: EiffelVisApp + Send + Sync + 'static {}
impl<T> EiffelVisHttpApp for T where T: EiffelVisApp + Send + Sync + 'static {}

type App<T> = Arc<tokio::sync::RwLock<T>>;
pub struct AppData<T> {
/// Graph to serve to the web
pub graph: tokio::sync::RwLock<T>,
/// Heuristic used to determine when locking of graph should occur,
/// library consumers should increment this counter on a succesful push to the graph.
pub heuristic: AtomicU64,
}

type App<T> = Arc<AppData<T>>;

/// Takes an eiffelvis app and binds the http server on the given address.
/// This is likely the only function you'll ever need to call.
Expand Down
25 changes: 17 additions & 8 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
use std::{sync::Arc, time::Duration};

use eiffelvis_core::{domain::app::EiffelVisApp, graph_storage::ChunkedGraph};
use eiffelvis_http::AppData;
use structopt::StructOpt;
use tracing::info;
use tracing::{info, warn};

/// Command line options
#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -61,10 +62,10 @@ async fn main() {

let cli = Cli::from_args();

let graph = Arc::new(tokio::sync::RwLock::new(ChunkedGraph::new(
cli.max_chunks,
cli.chunk_size,
)));
let graph = Arc::new(AppData {
heuristic: 0.into(),
graph: tokio::sync::RwLock::new(ChunkedGraph::new(cli.max_chunks, cli.chunk_size)),
});

let http_server_handle = eiffelvis_http::Handle::new();
let http_server = tokio::spawn(eiffelvis_http::app(
Expand All @@ -88,12 +89,20 @@ async fn main() {
loop {
if let Some(bytes) = event_parser.next().await {
if let Ok(des) = serde_json::from_slice(&bytes) {
EiffelVisApp::push(&mut *graph.write().await, des);
let mut lk = graph.graph.write().await;
if EiffelVisApp::push(&mut *lk, des) {
graph
.heuristic
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
info!("size: {}", lk.node_count());
} else {
warn!("Failed to push graph! maybe a duplicate event?")
}
} else {
info!("Received new message but failed to deserialize");
warn!("Received new message but failed to deserialize");
}
} else {
info!("Event stream failed, sleeping for 5 seconds to retry");
warn!("Event stream failed, sleeping for 5 seconds to retry");
tokio::time::sleep(Duration::from_secs(timeout)).await;
}
}
Expand Down