Skip to content

Commit

Permalink
Merge pull request #391 from systemaccounting/381-graphql-subscriptio…
Browse files Browse the repository at this point in the history
…n-delay

381 graphql subscription delay
  • Loading branch information
mxfactorial authored Nov 17, 2024
2 parents d08e5b2 + 21cb268 commit cbcd813
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 248 deletions.
392 changes: 197 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ removing financial appeasement guides the freedom of speech by recalibrating exp
**q.** how does systemaccounting affect setting public policy?
**a.** solving problems one at a time depends on 1) researching facts, 2) designing a solution, 3) applying the solution, 4) measuring its input and output and 5) holding contributors accountable. empowering sincere fact finders and problem solvers with more convenient access to public data science reduces the risk of luring them into the idle game of assigning blame between fictional social groups

**q.** how does systemaccounting create jobs?
**a.** the demand for labor increases when capital is allocated. for example, when an investor can quickly and confidently find a profitable grocery store chain and supply the owner capital to open more stores, the owner will hire more labor. systemaccounting accelerates the movement of capital by setting the measure of its demand to the conveniently discoverable and empirical supply of return

on the other hand, systemaccounting will eliminate a lot of archaic labor dependent on market friction in the short term and manual labor dependent on highly repetitive work in the long term (automation attracts capital). [be patient](https://gist.github.com/mxfactorial/e127115f4e9d240dd992cfc0920d1527)

**q.** do you have any demos?
**a.** watch the [economic policy as code](https://mxfactorial.video/) video series

Expand Down
47 changes: 27 additions & 20 deletions client/src/routes/measure/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import { onMount } from 'svelte';
import b64 from 'base-64';
import { createClient as createWSClient } from 'graphql-ws';
import type { Client } from 'graphql-ws';
let searchQuery = '';
let price = 0.0;
let price: string;
let priceTag: HTMLDivElement;
let map: google.maps.Map;
let markers = [] as any[]; // todo: google.maps.marker.AdvancedMarkerElement does not expect setMap method
let messageCount = 0;
let wsClient: any = null; // todo: graphql-ws Client type
let wsClient: Client;
let stop = false;
let defaultCoordinates = { lat: 39.8283, lng: -98.5795 }; // usa
Expand All @@ -20,39 +20,36 @@
region: string,
municipality: string | null
) {
if (wsClient) {
stop = true;
} else {
wsClient = createWSClient({
if (messageCount) {
await resetWebsocket();
}
wsClient = createWSClient({
url: b64.decode(process.env.GRAPHQL_SUBSCRIPTIONS_URI as string),
lazy: false
});
}
const variables: any = { date, country, region };
if (municipality) {
variables.municipality = municipality;
}
let subscription = wsClient.iterate({
const subscription = wsClient.iterate({
query: `subscription QueryGdp($date: String!, $country: String, $region: String, $municipality: String) {
queryGdp(date: $date, country: $country, region: $region, municipality: $municipality)
}`,
variables
});
messageCount = 0;
for await (const { data } of subscription) {
if (stop) {
// console.log('stopping subscription');
stop = false;
break;
}
if (messageCount == 0) {
// console.log('starting subscription');
if (data && typeof data.queryGdp === 'number') {
price = data.queryGdp.toFixed(3);
messageCount++;
}
messageCount++;
price = data.queryGdp.toFixed(3);
}
}
Expand All @@ -66,7 +63,7 @@
event.preventDefault();
if (searchQuery.trim() === '') {
resetMap();
await resetMap();
return;
}
Expand All @@ -89,6 +86,15 @@
await subscribeGdp(queryVars.time, queryVars.country, queryVars.region, queryVars.municipality);
}
async function resetWebsocket() {
stop = true;
// wait for subscription loop to break
while (stop) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
messageCount = 0;
}
function toTitleCase(str: string) {
return str.replace(/\w\S*/g, function (txt) {
return txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase();
Expand Down Expand Up @@ -138,8 +144,9 @@
}, 100); // check every 100 milliseconds
}
function resetMap() {
async function resetMap() {
if (map) {
await resetWebsocket();
markers.forEach((marker) => marker.setMap(null));
markers = [];
map.setCenter(defaultCoordinates);
Expand Down Expand Up @@ -210,13 +217,13 @@
priceTag.appendChild(pseudoElement);
const market = new AdvancedMarkerElement({
const marker = new AdvancedMarkerElement({
map,
position: location,
content: priceTag
});
markers.push(market);
markers.push(marker);
waitForMessages();
} else {
Expand Down
5 changes: 4 additions & 1 deletion project.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ services:
- GRAPHQL_PORT
- MEASURE_URL
- MEASURE_RESOURCE
- RUST_LOG
request-create:
runtime: rust1.x
min_code_cov: null
Expand Down Expand Up @@ -518,7 +519,8 @@ services:
default: localhost
RUST_LOG:
ssm: null
default: info
# https://docs.rs/env_logger/latest/env_logger/#enabling-logging
default: off
RUST_BACKTRACE:
ssm: null
default: 1
Expand Down Expand Up @@ -724,6 +726,7 @@ services:
- REDIS_PASSWORD
- MEASURE_PORT
- READINESS_CHECK_PATH
- RUST_LOG
auto-confirm:
runtime: rust1.x
min_code_cov: null
Expand Down
2 changes: 2 additions & 0 deletions services/event/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.93"
fred = { version = "9.0.3", features = ["i-scripts"] }
rust_decimal = "1.36.0"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
23 changes: 15 additions & 8 deletions services/event/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use fred::prelude::*;
use futures_channel::mpsc;
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
use serde::Deserialize;
use std::env;
use tokio_postgres::{AsyncMessage, NoTls};
mod events;

Expand Down Expand Up @@ -46,6 +47,12 @@ struct Event {

#[tokio::main]
async fn main() {
if let Ok(level) = env::var("RUST_LOG") {
tracing_subscriber::fmt().with_env_filter(level).init();
} else {
tracing_subscriber::fmt().init();
}

let pg_uri = pg_conn_uri();
let redis_uri = redis_conn_uri();
let redis_config = RedisConfig::from_url(&redis_uri).unwrap();
Expand All @@ -54,11 +61,11 @@ async fn main() {
loop {
match redis_client.init().await {
Ok(_) => {
// println!("event connected to redis");
tracing::info!("event connected to redis");
break;
}
Err(_e) => {
// println!("failed to connect to redis: {}", _e);
tracing::info!("failed to connect to redis: {}", _e);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
Expand All @@ -68,7 +75,7 @@ async fn main() {
let (client, mut connection) = match tokio_postgres::connect(pg_uri.as_str(), NoTls).await {
Ok(conn) => conn,
Err(_e) => {
// println!("failed to connect to postgres: {}", _e);
tracing::info!("failed to connect to postgres: {}", _e);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
Expand All @@ -82,7 +89,7 @@ async fn main() {
let handler = tokio::spawn(connection);

if let Err(e) = client.batch_execute("LISTEN event;").await {
println!("failed to execute LISTEN command: {}", e);
tracing::info!("failed to execute LISTEN command: {}", e);
handler.abort();
continue;
}
Expand All @@ -91,7 +98,7 @@ async fn main() {
let message = match rx.next().await {
Some(message) => message,
None => {
// println!("connection terminated. attempting to reconnect...");
tracing::info!("connection terminated. attempting to reconnect...");
handler.abort();
break;
}
Expand All @@ -105,15 +112,15 @@ async fn main() {
events::redis_incrby_gdp(&redis_client, gdp_map).await;
}
_ => {
println!("unknown event: {}", event.name);
tracing::info!("unknown event: {}", event.name);
}
},
Err(e) => {
println!("failed to parse event: {}", e);
tracing::info!("failed to parse event: {}", e);
}
},
_ => {
println!("unhandled message: {:?}", message);
tracing::info!("unhandled message: {:?}", message);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
Expand Down
1 change: 0 additions & 1 deletion services/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ futures-util = "0.3.30"
wsclient = { path = "../../crates/wsclient" }
tungstenite = { version = "0.24.0", default-features = false }
async-stream = "0.3.5"
log = "0.4.22"

[target.x86_64-unknown-linux-musl.dependencies]
# https://github.com/cross-rs/cross/wiki/Recipes#vendored
Expand Down
27 changes: 16 additions & 11 deletions services/graphql/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use axum::{
};
use futures_util::stream::Stream;
use httpclient::HttpClient as Client;
use log::debug;
use serde_json::json;
use shutdown::shutdown_signal;
use std::{env, net::ToSocketAddrs, result::Result};
Expand Down Expand Up @@ -202,42 +201,43 @@ impl Subscription {
let resource = env::var("MEASURE_RESOURCE").unwrap();
let uri = format!("{}/{}", base_uri, resource);
let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality);

stream! {
let mut measure_socket = match ws_client.connect() {
Ok(ws) => {
debug!("measure websocket connection created");
tracing::info!("graphql websocket connection created with measure");
ws
}
Err(_e) => {
debug!("measure webSocket connection failure: {:?}", _e);
tracing::info!("graphql failed to create webSocket with measure: {:?}", _e);
return;
}
};

loop {
match measure_socket.read() {
Ok(msg) => {
match msg {
tungstenite::Message::Text(text) => {
let gdp: f64 = serde_json::from_str(&text).unwrap();
tracing::info!("sending gdp from measure: {}", gdp);
yield gdp;
}
_ => {
debug!("received non-text message: {:?}", msg);
tracing::info!("received non-text message from measure: {:?}", msg);
}
}
}
Err(WsError::ConnectionClosed) => {
measure_socket.close(None).unwrap();
debug!("measure websocket closed");
tracing::info!("graphql received closed message from measure");
break;
}
Err(e) => {
debug!("measure message receipt failure: {:?}", e);
tracing::info!("graphql message receipt failure from measure: {:?}", e);
break;
}
}
// throttle reads from measure service to avoid blocking close messages from client
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
}
Expand Down Expand Up @@ -305,17 +305,22 @@ async fn graphql_subscription(
) -> impl IntoResponse {
ws.protocols(http::ALL_WEBSOCKET_PROTOCOLS)
.on_upgrade(move |socket| async move {
// println!("connection opened");
tracing::info!("graphql subscription started");
GraphQLWebSocket::new(socket, schema, protocol)
.keepalive_timeout(None)
.serve()
.await;
// println!("connection closed");
tracing::info!("graphql subscription closed");
})
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_ansi(false).init();
if let Ok(level) = env::var("RUST_LOG") {
tracing_subscriber::fmt().with_env_filter(level).init();
} else {
tracing_subscriber::fmt().init();
}

let readiness_check_path = env::var(READINESS_CHECK_PATH)
.unwrap_or_else(|_| panic!("{READINESS_CHECK_PATH} variable assignment"));
Expand Down
Loading

0 comments on commit cbcd813

Please sign in to comment.