diff --git a/common/src/stream/metrics.rs b/common/src/stream/metrics.rs index 7009591..1afd4ee 100644 --- a/common/src/stream/metrics.rs +++ b/common/src/stream/metrics.rs @@ -1,7 +1,7 @@ use std::{ fmt, net::SocketAddr, - sync::{Arc, Mutex}, + sync::Mutex, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -13,7 +13,9 @@ use monitor_table::{ }; use tokio_throughput::GaugeHandle; -use super::addr::StreamAddr; +use crate::addr::InternetAddrHdv; + +use super::addr::{StreamAddr, StreamAddrHdv}; pub type StreamSessionTable = Table>; @@ -96,13 +98,13 @@ impl ValueDisplay for Session { #[derive(Debug, HdvSerde)] struct SessionView { - pub destination: Option>, + pub destination: Option, pub duration: u64, pub start_ms: u64, pub end_ms: Option, - pub upstream_local: Option>, - pub upstream_remote: Arc, - pub downstream_remote: Option>, + pub upstream_local: Option, + pub upstream_remote: StreamAddrHdv, + pub downstream_remote: Option, pub up: Option, pub dn: Option, } @@ -119,15 +121,15 @@ impl SessionView { None => now_unix.saturating_sub(start_unix), }; - let destination = s.destination.as_ref().map(|d| d.to_string().into()); + let destination = s.destination.as_ref().map(|d| d.into()); let duration = duration.as_millis() as u64; let start_ms = start_unix.as_millis() as u64; let end_ms = s .end .map(|e| e.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64); - let upstream_local = s.upstream_local.map(|x| x.to_string().into()); - let upstream_remote = s.upstream_remote.to_string().into(); - let downstream_remote = s.downstream_remote.map(|x| x.to_string().into()); + let upstream_local = s.upstream_local.map(|x| x.into()); + let upstream_remote = (&s.upstream_remote).into(); + let downstream_remote = s.downstream_remote.map(|x| x.into()); let now = Instant::now(); let up = s .up_gauge diff --git a/common/src/udp/metrics.rs b/common/src/udp/metrics.rs index c97627d..8fb8522 100644 --- a/common/src/udp/metrics.rs +++ b/common/src/udp/metrics.rs @@ -1,6 +1,6 @@ use std::{ net::SocketAddr, - sync::{Arc, Mutex}, + sync::Mutex, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -12,7 +12,7 @@ use monitor_table::{ }; use tokio_throughput::GaugeHandle; -use crate::addr::InternetAddr; +use crate::addr::{InternetAddr, InternetAddrHdv}; pub type UdpSessionTable = Table; @@ -29,11 +29,11 @@ pub struct Session { } impl TableRow for Session { fn schema() -> Vec<(String, LiteralType)> { - ::schema() + ::schema() } fn fields(&self) -> Vec> { - let view = SessionView::from_session(self); + let view = SessionHdv::from_session(self); TableRow::fields(&view) } } @@ -93,18 +93,18 @@ impl ValueDisplay for Session { } #[derive(Debug, HdvSerde)] -struct SessionView { - pub destination: Option>, +struct SessionHdv { + pub destination: Option, pub duration: u64, pub start_ms: u64, pub end_ms: Option, - pub upstream_local: Option>, - pub upstream_remote: Arc, - pub downstream_remote: Arc, - pub up: GaugeView, - pub dn: GaugeView, + pub upstream_local: Option, + pub upstream_remote: InternetAddrHdv, + pub downstream_remote: InternetAddrHdv, + pub up: GaugeHdv, + pub dn: GaugeHdv, } -impl SessionView { +impl SessionHdv { pub fn from_session(s: &Session) -> Self { let start_unix = s.start.duration_since(UNIX_EPOCH).unwrap(); let now_unix = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); @@ -117,18 +117,18 @@ impl SessionView { None => now_unix.saturating_sub(start_unix), }; - let destination = s.destination.as_ref().map(|d| d.to_string().into()); + let destination = s.destination.as_ref().map(|d| d.into()); let duration = duration.as_millis() as u64; let start_ms = start_unix.as_millis() as u64; let end_ms = s .end .map(|e| e.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64); - let upstream_local = s.upstream_local.map(|x| x.to_string().into()); - let upstream_remote = s.upstream_remote.to_string().into(); - let downstream_remote = s.downstream_remote.to_string().into(); + let upstream_local = s.upstream_local.map(|x| x.into()); + let upstream_remote = (&s.upstream_remote).into(); + let downstream_remote = s.downstream_remote.into(); let now = Instant::now(); - let up = GaugeView::from_gauge_handle(&s.up_gauge, now); - let dn = GaugeView::from_gauge_handle(&s.dn_gauge, now); + let up = GaugeHdv::from_gauge_handle(&s.up_gauge, now); + let dn = GaugeHdv::from_gauge_handle(&s.dn_gauge, now); Self { destination, @@ -145,11 +145,11 @@ impl SessionView { } #[derive(Debug, HdvSerde)] -struct GaugeView { +struct GaugeHdv { pub thruput: f64, pub bytes: u64, } -impl GaugeView { +impl GaugeHdv { pub fn from_gauge_handle(g: &Mutex, now: Instant) -> Self { let mut g = g.lock().unwrap(); g.update(now); diff --git a/server/src/monitor.rs b/server/src/monitor.rs index 5245e2c..551250e 100644 --- a/server/src/monitor.rs +++ b/server/src/monitor.rs @@ -58,14 +58,24 @@ pub fn monitor_router() -> (SessionTables, Router) { (session_tables, router) } -fn default_sql() -> String { - const SQL: &str = "sort start_ms select destination duration upstream_remote"; +fn stream_default_sql() -> String { + const SQL: &str = r#" +sort start_ms +select (col "destination.addr.host") (col "destination.addr.port") duration (col "upstream_remote.addr.host") (col "upstream_remote.addr.port") +"#; + SQL.to_string() +} +fn udp_default_sql() -> String { + const SQL: &str = r#" +sort start_ms +select (col "destination.host") (col "destination.port") duration (col "upstream_remote.host") (col "upstream_remote.port") +"#; SQL.to_string() } #[derive(Debug, Deserialize)] struct SessionsParams { - #[serde(default = "default_sql")] + #[serde(default = "stream_default_sql")] stream_sql: String, - #[serde(default = "default_sql")] + #[serde(default = "udp_default_sql")] udp_sql: String, }