From 10990873bfaf50635e09a8bb5e9a80a7054ef23f Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Tue, 26 Nov 2024 13:26:06 -0500 Subject: [PATCH] Add New Telemetry markers for each stage (#685) --- .../src/bootstrap/extn/load_session_step.rs | 4 +- core/main/src/broker/broker_utils.rs | 12 ++-- core/main/src/broker/endpoint_broker.rs | 52 ++++++++++----- .../src/broker/event_management_utility.rs | 12 ++-- core/main/src/firebolt/firebolt_gateway.rs | 42 +++++++++---- core/main/src/firebolt/firebolt_ws.rs | 14 +++-- .../main/src/firebolt/handlers/account_rpc.rs | 9 ++- .../src/firebolt/handlers/advertising_rpc.rs | 7 ++- core/main/src/firebolt/handlers/device_rpc.rs | 11 ++-- .../src/firebolt/handlers/parameters_rpc.rs | 3 +- .../main/src/firebolt/handlers/privacy_rpc.rs | 10 +-- .../firebolt/handlers/second_screen_rpc.rs | 3 +- core/main/src/firebolt/rpc_router.rs | 30 +++++---- core/main/src/processor/settings_processor.rs | 23 ++++--- core/main/src/state/metrics_state.rs | 55 +++++++++++++++- core/main/src/state/platform_state.rs | 10 ++- core/main/src/utils/router_utils.rs | 20 +++--- core/sdk/src/api/gateway/rpc_gateway_api.rs | 62 ++---------------- .../sdk/src/api/observability/metrics_util.rs | 63 +++++++++++++++++++ core/sdk/src/extn/client/extn_client.rs | 5 +- .../rpc_extn/src/rpc/legacy_jsonrpsee_extn.rs | 4 +- 21 files changed, 298 insertions(+), 153 deletions(-) diff --git a/core/main/src/bootstrap/extn/load_session_step.rs b/core/main/src/bootstrap/extn/load_session_step.rs index cc3697692..4d34cd1f3 100644 --- a/core/main/src/bootstrap/extn/load_session_step.rs +++ b/core/main/src/bootstrap/extn/load_session_step.rs @@ -33,9 +33,9 @@ impl Bootstep for LoadDistributorValuesStep { } async fn setup(&self, s: BootstrapState) -> RippleResponse { - let ps = s.platform_state.clone(); + let mut ps = s.platform_state.clone(); tokio::spawn(async move { - MetricsState::initialize(&ps).await; + MetricsState::initialize(&mut ps).await; }); MainContextProcessor::remove_expired_and_inactive_entries(&s.platform_state); diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs index 9b5f81833..a752fce7d 100644 --- a/core/main/src/broker/broker_utils.rs +++ b/core/main/src/broker/broker_utils.rs @@ -70,14 +70,16 @@ impl BrokerUtils { } pub async fn process_internal_main_request<'a>( - state: &'a PlatformState, + state: &mut PlatformState, method: &'a str, params: Option, ) -> RpcResult { - match state - .internal_rpc_request(&RpcRequest::internal(method).with_params(params)) - .await - { + let rpc_request = RpcRequest::internal(method).with_params(params); + state + .metrics + .add_api_stats(&rpc_request.ctx.request_id, method); + + match state.internal_rpc_request(&rpc_request).await { Ok(res) => match res.as_value() { Some(v) => Ok(v), None => Err(JsonRpcApiError::default() diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 1adc6c824..f76b41534 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -19,8 +19,7 @@ use ripple_sdk::{ api::{ firebolt::fb_capabilities::JSON_RPC_STANDARD_ERROR_INVALID_PARAMS, gateway::rpc_gateway_api::{ - ApiMessage, ApiProtocol, ApiStats, CallContext, JsonRpcApiRequest, JsonRpcApiResponse, - RpcRequest, + ApiMessage, ApiProtocol, CallContext, JsonRpcApiRequest, JsonRpcApiResponse, RpcRequest, }, session::AccountSession, }, @@ -47,9 +46,9 @@ use crate::{ broker::broker_utils::BrokerUtils, firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError}, service::extn::ripple_client::RippleClient, - state::platform_state::PlatformState, + state::{metrics_state::MetricsState, platform_state::PlatformState}, utils::router_utils::{ - add_telemetry_status_code, get_rpc_header, return_api_message_for_transport, + add_telemetry_status_code, capture_stage, get_rpc_header, return_api_message_for_transport, return_extn_response, }, }; @@ -308,6 +307,7 @@ pub struct EndpointBrokerState { rule_engine: RuleEngine, cleaner_list: Arc>>, reconnect_tx: Sender, + metrics_state: MetricsState, } impl Default for EndpointBrokerState { fn default() -> Self { @@ -319,12 +319,14 @@ impl Default for EndpointBrokerState { rule_engine: RuleEngine::default(), cleaner_list: Arc::new(RwLock::new(Vec::new())), reconnect_tx: mpsc::channel(2).0, + metrics_state: MetricsState::default(), } } } impl EndpointBrokerState { pub fn new( + metrics_state: MetricsState, tx: Sender, rule_engine: RuleEngine, ripple_client: RippleClient, @@ -338,6 +340,7 @@ impl EndpointBrokerState { rule_engine, cleaner_list: Arc::new(RwLock::new(Vec::new())), reconnect_tx, + metrics_state, }; state.reconnect_thread(rec_tr, ripple_client); state @@ -532,6 +535,9 @@ impl EndpointBrokerState { data.result = Some(jv); data.id = Some(id); let output = BrokerOutput { data }; + + let metrics_state = self.metrics_state.clone(); + capture_stage(&metrics_state, &rpc_request, "static_rule_request"); tokio::spawn(async move { callback.sender.send(output).await }); } @@ -581,6 +587,8 @@ impl EndpointBrokerState { let broker = broker_sender.unwrap(); let (_, updated_request) = self.update_request(&rpc_request, rule, extn_message, requestor_callback); + let metrics_state = self.metrics_state.clone(); + capture_stage(&metrics_state, &rpc_request, "broker_request"); tokio::spawn(async move { if let Err(e) = broker.send(updated_request.clone()).await { callback.send_error(updated_request, e).await @@ -721,7 +729,7 @@ pub trait EndpointBroker { pub struct BrokerOutputForwarder; impl BrokerOutputForwarder { - pub fn start_forwarder(platform_state: PlatformState, mut rx: Receiver) { + pub fn start_forwarder(mut platform_state: PlatformState, mut rx: Receiver) { // set up the event utility let event_utility = Arc::new(EventManagementUtility::new()); event_utility.register_custom_functions(); @@ -808,7 +816,7 @@ impl BrokerOutputForwarder { let message = ApiMessage::new( protocol, serde_json::to_string(&response).unwrap(), - request_id.to_string(), + rpc_request.ctx.request_id.clone(), ); if let Some(session) = platform_state_c @@ -875,7 +883,7 @@ impl BrokerOutputForwarder { let mut message = ApiMessage::new( rpc_request.ctx.protocol.clone(), serde_json::to_string(&response).unwrap(), - request_id.to_string(), + rpc_request.ctx.request_id.clone(), ); let mut status_code: i64 = 1; if let Some(e) = &response.error { @@ -886,13 +894,26 @@ impl BrokerOutputForwarder { } } - message.stats = Some(ApiStats { - stats_ref: add_telemetry_status_code( + platform_state.metrics.update_api_stats_ref( + &rpc_request.ctx.request_id, + add_telemetry_status_code( &tm_str, status_code.to_string().as_str(), ), - stats: rpc_request.stats, - }); + ); + + if let Some(api_stats) = platform_state + .metrics + .get_api_stats(&rpc_request.ctx.request_id) + { + message.stats = Some(api_stats); + + if rpc_request.ctx.app_id.eq_ignore_ascii_case("internal") { + platform_state + .metrics + .remove_api_stats(&rpc_request.ctx.request_id); + } + } // Step 3: Handle Non Extension if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) { @@ -948,10 +969,10 @@ impl BrokerOutputForwarder { let session_id = rpc_request.ctx.get_id(); let request_id = rpc_request.ctx.call_id; let protocol = rpc_request.ctx.protocol.clone(); - let platform_state_c = &platform_state; + let mut platform_state_c = platform_state.clone(); if let Ok(res) = - BrokerUtils::process_internal_main_request(platform_state_c, method.as_str(), None) + BrokerUtils::process_internal_main_request(&mut platform_state_c, method.as_str(), None) .await { response.result = Some(serde_json::to_value(res.clone()).unwrap()); @@ -961,7 +982,7 @@ impl BrokerOutputForwarder { let message = ApiMessage::new( protocol, serde_json::to_string(&response).unwrap(), - request_id.to_string(), + rpc_request.ctx.request_id.clone(), ); if let Some(session) = platform_state_c @@ -1198,7 +1219,7 @@ mod tests { endpoint_broker::tests::RippleClient, rules_engine::{Rule, RuleEngine, RuleSet, RuleTransform}, }, - state::bootstrap_state::ChannelsState, + state::{bootstrap_state::ChannelsState, metrics_state::MetricsState}, }; use super::EndpointBrokerState; @@ -1208,6 +1229,7 @@ mod tests { let (tx, _) = channel(2); let client = RippleClient::new(ChannelsState::new()); let state = EndpointBrokerState::new( + MetricsState::default(), tx, RuleEngine { rules: RuleSet::default(), diff --git a/core/main/src/broker/event_management_utility.rs b/core/main/src/broker/event_management_utility.rs index 29540dda9..877c43022 100644 --- a/core/main/src/broker/event_management_utility.rs +++ b/core/main/src/broker/event_management_utility.rs @@ -19,7 +19,7 @@ use std::{collections::HashMap, pin::Pin}; use futures::Future; use ripple_sdk::{ - api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, + api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest}, extn::extn_client_message::ExtnResponse, log::info, utils::error::RippleError, @@ -58,8 +58,9 @@ impl EventManagementUtility { self.register_function( "AdvertisingPolicyEventDecorator".to_string(), Arc::new(|state, ctx, value| { + let s = state.clone(); Box::pin(EventManagementUtility::advertising_policy_event_decorator( - state, ctx, value, + s, ctx, value, )) }), ); @@ -73,7 +74,7 @@ impl EventManagementUtility { } pub async fn advertising_policy_event_decorator( - platform_state: PlatformState, + mut platform_state: PlatformState, ctx: CallContext, value: Option, ) -> Result, RippleError> { @@ -85,10 +86,13 @@ impl EventManagementUtility { let rpc_request = RpcRequest { ctx: new_ctx.clone(), method: "advertising.policy".into(), - stats: RpcStats::default(), params_json: RpcRequest::prepend_ctx(None, &new_ctx), }; + platform_state + .metrics + .add_api_stats(&ctx.request_id, "advertising.policy"); + let resp = platform_state .get_client() .get_extn_client() diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index e1602271c..98aec6b65 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -25,8 +25,9 @@ use ripple_sdk::{ }, gateway::{ rpc_error::RpcError, - rpc_gateway_api::{ApiMessage, ApiProtocol, ApiStats, RpcRequest}, + rpc_gateway_api::{ApiMessage, ApiProtocol, RpcRequest}, }, + observability::metrics_util::ApiStats, }, extn::extn_client_message::ExtnMessage, log::{debug, error, info, trace, warn}, @@ -178,7 +179,7 @@ impl FireboltGateway { } } } - let platform_state = self.state.platform_state.clone(); + let mut platform_state = self.state.platform_state.clone(); /* * The reason for spawning a new thread is that when request-1 comes, and it waits for @@ -189,6 +190,11 @@ impl FireboltGateway { */ let mut request_c = request.clone(); request_c.method = FireboltOpenRpcMethod::name_with_lowercase_module(&request.method); + + platform_state + .metrics + .add_api_stats(&request_c.ctx.request_id, &request_c.method); + let metrics_timer = TelemetryBuilder::start_firebolt_metrics_timer( &platform_state.get_client().get_extn_client(), request_c.method.clone(), @@ -205,7 +211,7 @@ impl FireboltGateway { let open_rpc_state = self.state.platform_state.open_rpc_state.clone(); tokio::spawn(async move { - capture_stage(&mut request_c, "context_ready"); + capture_stage(&platform_state.metrics, &request_c, "context_ready"); // Validate incoming request parameters. if let Err(error_string) = validate_request(open_rpc_state, &request_c, fail_open) { TelemetryBuilder::stop_and_send_firebolt_metrics_timer( @@ -221,10 +227,11 @@ impl FireboltGateway { data: None, }; - send_json_rpc_error(&platform_state, &request, json_rpc_error).await; + send_json_rpc_error(&mut platform_state, &request, json_rpc_error).await; return; } - capture_stage(&mut request_c, "openrpc_val"); + + capture_stage(&platform_state.metrics, &request_c, "openrpc_val"); let result = if extn_request { // extn protocol means its an internal Ripple request skip permissions. @@ -232,7 +239,8 @@ impl FireboltGateway { } else { FireboltGatekeeper::gate(platform_state.clone(), request_c.clone()).await }; - capture_stage(&mut request_c, "permission"); + + capture_stage(&platform_state.metrics, &request_c, "permission"); match result { Ok(_) => { @@ -298,7 +306,7 @@ impl FireboltGateway { data: None, }; - send_json_rpc_error(&platform_state, &request, json_rpc_error).await; + send_json_rpc_error(&mut platform_state, &request, json_rpc_error).await; } } }); @@ -373,7 +381,7 @@ fn validate_request( } async fn send_json_rpc_error( - platform_state: &PlatformState, + platform_state: &mut PlatformState, request: &RpcRequest, json_rpc_error: JsonRpcError, ) { @@ -396,10 +404,20 @@ async fn send_json_rpc_error( request.clone().ctx.request_id, ); - api_message.stats = Some(ApiStats { - stats_ref: get_rpc_header_with_status(request, status_code), - stats: request.stats.clone(), - }); + if let Some(api_stats) = platform_state + .metrics + .get_api_stats(&request.ctx.request_id) + { + api_message.stats = Some(ApiStats { + api: request.method.clone(), + stats_ref: get_rpc_header_with_status(request, status_code), + stats: api_stats.stats.clone(), + }); + } + platform_state.metrics.update_api_stats_ref( + &request.ctx.request_id, + get_rpc_header_with_status(request, status_code), + ); match session.get_transport() { EffectiveTransport::Websocket => { diff --git a/core/main/src/firebolt/firebolt_ws.rs b/core/main/src/firebolt/firebolt_ws.rs index 759fd67d2..cd915142e 100644 --- a/core/main/src/firebolt/firebolt_ws.rs +++ b/core/main/src/firebolt/firebolt_ws.rs @@ -229,22 +229,28 @@ impl FireboltWs { } let (mut sender, mut receiver) = ws_stream.split(); + let mut platform_state = state.clone(); tokio::spawn(async move { while let Some(rs) = resp_rx.recv().await { let send_result = sender.send(Message::Text(rs.jsonrpc_msg.clone())).await; match send_result { Ok(_) => { - if let Some(stats) = rs.stats { + platform_state + .metrics + .update_api_stage(&rs.request_id, "response"); + + if let Some(stats) = platform_state.metrics.get_api_stats(&rs.request_id) { info!( - "Sending Firebolt response: {},{}", + "Sending Firebolt response: {:?},{}", stats.stats_ref, stats.stats.get_total_time() ); debug!( - "Full Firebolt Split: {},{}", + "Full Firebolt Split: {:?},{}", stats.stats_ref, stats.stats.get_stage_durations() - ) + ); + platform_state.metrics.remove_api_stats(&rs.request_id); } info!( "Sent Firebolt response cid={} msg={}", diff --git a/core/main/src/firebolt/handlers/account_rpc.rs b/core/main/src/firebolt/handlers/account_rpc.rs index 1f8d50cbe..c77ea1af8 100644 --- a/core/main/src/firebolt/handlers/account_rpc.rs +++ b/core/main/src/firebolt/handlers/account_rpc.rs @@ -22,7 +22,7 @@ use jsonrpsee::{ }; use ripple_sdk::{ api::{ - gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, + gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest}, session::AccountSessionTokenRequest, }, extn::extn_client_message::{ExtnMessage, ExtnResponse}, @@ -63,6 +63,12 @@ impl AccountServer for AccountImpl { .session_state .insert_session_token(a_t_r.token.clone()); _ctx.protocol = ApiProtocol::Extn; + + let mut platform_state = self.platform_state.clone(); + platform_state + .metrics + .add_api_stats(&_ctx.request_id, "account.setServiceAccessToken"); + let success = rpc_request_setter( self.platform_state .get_client() @@ -74,7 +80,6 @@ impl AccountServer for AccountImpl { Some(json!({"token": a_t_r.token, "expires": a_t_r.expires_in})), &_ctx, ), - stats: RpcStats::default(), }) .await, ); diff --git a/core/main/src/firebolt/handlers/advertising_rpc.rs b/core/main/src/firebolt/handlers/advertising_rpc.rs index b98d32a2d..a51c0cacd 100644 --- a/core/main/src/firebolt/handlers/advertising_rpc.rs +++ b/core/main/src/firebolt/handlers/advertising_rpc.rs @@ -211,9 +211,10 @@ impl AdvertisingServer for AdvertisingImpl { Some(r) => r.options, None => None, }; + let mut platform_state = self.state.clone(); let payload = AdvertisingRequest::GetAdIdObject(AdIdRequestParams { privacy_data: privacy_rpc::get_allow_app_content_ad_targeting_settings( - &self.state, + &mut platform_state, opts.as_ref(), &ctx.app_id, &ctx, @@ -282,8 +283,10 @@ impl AdvertisingServer for AdvertisingImpl { let ad_opt_out = !PrivacyImpl::get_allow_app_content_ad_targeting(&self.state).await; + let mut platform_state = self.state.clone(); + let mut privacy_data = privacy_rpc::get_allow_app_content_ad_targeting_settings( - &self.state, + &mut platform_state, None, &durable_app_id, &ctx, diff --git a/core/main/src/firebolt/handlers/device_rpc.rs b/core/main/src/firebolt/handlers/device_rpc.rs index ad28eee0c..3a60eb880 100644 --- a/core/main/src/firebolt/handlers/device_rpc.rs +++ b/core/main/src/firebolt/handlers/device_rpc.rs @@ -41,7 +41,7 @@ use ripple_sdk::{ device_request::{AudioProfile, DeviceVersionResponse, HdcpProfile}, }, firebolt::fb_general::{ListenRequest, ListenerResponse}, - gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, + gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest}, session::ProvisionRequest, storage_property::{EVENT_DEVICE_DEVICE_NAME_CHANGED, EVENT_DEVICE_NAME_CHANGED}, }, @@ -527,6 +527,12 @@ impl DeviceServer for DeviceImpl { )); }; _ctx.protocol = ApiProtocol::Extn; + + let mut platform_state = self.state.clone(); + platform_state + .metrics + .add_api_stats(&_ctx.request_id, "account.setServiceAccountId"); + let success = rpc_request_setter( self.state .get_client() @@ -538,7 +544,6 @@ impl DeviceServer for DeviceImpl { Some(json!({"serviceAccountId": provision_request.account_id})), &_ctx, ), - stats: RpcStats::default(), }) .await, ) && rpc_request_setter( @@ -552,7 +557,6 @@ impl DeviceServer for DeviceImpl { Some(json!({"xDeviceId": provision_request.device_id})), &_ctx, ), - stats: RpcStats::default(), }) .await, ) && rpc_request_setter( @@ -566,7 +570,6 @@ impl DeviceServer for DeviceImpl { Some(json!({"partnerId": provision_request.distributor_id })), &_ctx, ), - stats: RpcStats::default(), }) .await, ); diff --git a/core/main/src/firebolt/handlers/parameters_rpc.rs b/core/main/src/firebolt/handlers/parameters_rpc.rs index 1f5e458fe..7c15f5560 100644 --- a/core/main/src/firebolt/handlers/parameters_rpc.rs +++ b/core/main/src/firebolt/handlers/parameters_rpc.rs @@ -81,8 +81,9 @@ impl ParametersServer for ParametersImpl { AppMethod::GetLaunchRequest(ctx.app_id.to_owned()), app_resp_tx, ); + let mut platform_state = self.platform_state.clone(); let privacy_data = privacy_rpc::get_allow_app_content_ad_targeting_settings( - &self.platform_state, + &mut platform_state, None, &ctx.app_id.to_string(), &ctx, diff --git a/core/main/src/firebolt/handlers/privacy_rpc.rs b/core/main/src/firebolt/handlers/privacy_rpc.rs index e45e61192..b257d36d7 100644 --- a/core/main/src/firebolt/handlers/privacy_rpc.rs +++ b/core/main/src/firebolt/handlers/privacy_rpc.rs @@ -27,7 +27,6 @@ use jsonrpsee::{ types::error::CallError, RpcModule, }; -use ripple_sdk::api::gateway::rpc_gateway_api::RpcStats; use ripple_sdk::{ api::{ device::device_peristence::SetBoolProperty, @@ -83,17 +82,20 @@ impl AllowAppContentAdTargetingSettings { pub async fn get_allow_app_content_ad_targeting_settings( &self, - platform_state: &PlatformState, + platform_state: &mut PlatformState, ctx: &CallContext, ) -> HashMap { let mut new_ctx = ctx.clone(); new_ctx.protocol = ApiProtocol::Extn; + platform_state + .metrics + .add_api_stats(&ctx.request_id, "localization.countryCode"); + let rpc_request = RpcRequest { ctx: new_ctx.clone(), method: "localization.countryCode".into(), params_json: RpcRequest::prepend_ctx(None, &new_ctx), - stats: RpcStats::default(), }; let resp = platform_state @@ -320,7 +322,7 @@ pub trait Privacy { } pub async fn get_allow_app_content_ad_targeting_settings( - platform_state: &PlatformState, + platform_state: &mut PlatformState, scope_option: Option<&ScopeOption>, caller_app: &String, ctx: &CallContext, diff --git a/core/main/src/firebolt/handlers/second_screen_rpc.rs b/core/main/src/firebolt/handlers/second_screen_rpc.rs index f984538b8..145699573 100644 --- a/core/main/src/firebolt/handlers/second_screen_rpc.rs +++ b/core/main/src/firebolt/handlers/second_screen_rpc.rs @@ -78,8 +78,9 @@ impl SecondScreenServer for SecondScreenImpl { } async fn friendly_name(&self, _ctx: CallContext) -> RpcResult { + let mut s = self.state.clone(); rpc_value_result_to_string_result( - BrokerUtils::process_internal_main_request(&self.state, "device.name", None).await, + BrokerUtils::process_internal_main_request(&mut s, "device.name", None).await, None, ) } diff --git a/core/main/src/firebolt/rpc_router.rs b/core/main/src/firebolt/rpc_router.rs index 0c63fe930..892a56b87 100644 --- a/core/main/src/firebolt/rpc_router.rs +++ b/core/main/src/firebolt/rpc_router.rs @@ -30,7 +30,7 @@ use jsonrpsee::{ use ripple_sdk::{ api::{ firebolt::fb_metrics::Timer, - gateway::rpc_gateway_api::{ApiMessage, ApiStats, RpcRequest}, + gateway::rpc_gateway_api::{ApiMessage, RpcRequest}, }, chrono::Utc, extn::extn_client_message::ExtnMessage, @@ -83,13 +83,14 @@ impl Default for RouterState { } async fn resolve_route( + platform_state: &mut PlatformState, methods: Methods, resources: Resources, req: RpcRequest, ) -> Result { info!("Routing {}", req.method); let id = Id::Number(req.ctx.call_id); - let mut request_c = req.clone(); + let request_c = req.clone(); let (sink_tx, mut sink_rx) = futures_channel::mpsc::unbounded::(); let sink = MethodSink::new_with_limit(sink_tx, TEN_MB_SIZE_BYTES); let mut method_executors = Vec::new(); @@ -143,13 +144,18 @@ async fn resolve_route( } else { 1 }; - capture_stage(&mut request_c, "routing"); - let mut msg = ApiMessage::new(protocol, r, request_id); - msg.stats = Some(ApiStats { - stats_ref: add_telemetry_status_code(&rpc_header, status_code.to_string().as_str()), - stats: request_c.stats, - }); + capture_stage(&platform_state.metrics, &request_c, "routing"); + + platform_state.metrics.update_api_stats_ref( + &request_id, + add_telemetry_status_code(&rpc_header, status_code.to_string().as_str()), + ); + + let mut msg = ApiMessage::new(protocol, r, request_id.clone()); + if let Some(api_stats) = platform_state.metrics.get_api_stats(&request_id) { + msg.stats = Some(api_stats); + } return Ok(msg); } @@ -158,7 +164,7 @@ async fn resolve_route( impl RpcRouter { pub async fn route( - state: PlatformState, + mut state: PlatformState, mut req: RpcRequest, session: Session, timer: Option, @@ -172,7 +178,7 @@ impl RpcRouter { tokio::spawn(async move { let start = Utc::now().timestamp_millis(); - let resp = resolve_route(methods, resources, req.clone()).await; + let resp = resolve_route(&mut state, methods, resources, req.clone()).await; let status = match resp.clone() { Ok(msg) => { @@ -204,8 +210,10 @@ impl RpcRouter { ) { let methods = state.router_state.get_methods(); let resources = state.router_state.resources.clone(); + + let mut platform_state = state.clone(); tokio::spawn(async move { - if let Ok(msg) = resolve_route(methods, resources, req).await { + if let Ok(msg) = resolve_route(&mut platform_state, methods, resources, req).await { return_extn_response(msg, extn_msg); } }); diff --git a/core/main/src/processor/settings_processor.rs b/core/main/src/processor/settings_processor.rs index 0e2399714..8da9f2855 100644 --- a/core/main/src/processor/settings_processor.rs +++ b/core/main/src/processor/settings_processor.rs @@ -126,16 +126,19 @@ impl SettingsProcessor { AllowPersonalization => Some(SettingValue::bool(cp.enable_recommendations)), AllowWatchHistory => Some(SettingValue::bool(cp.remember_watched_programs)), ShareWatchHistory => Some(SettingValue::bool(cp.share_watch_history)), - DeviceName => Some(SettingValue::string( - broker_utils::BrokerUtils::process_internal_main_request( - state, - "device.name", - None, - ) - .await - .unwrap_or_else(|_| "".into()) - .to_string(), - )), + DeviceName => { + let mut s = state.clone(); + Some(SettingValue::string( + broker_utils::BrokerUtils::process_internal_main_request( + &mut s, + "device.name", + None, + ) + .await + .unwrap_or_else(|_| "".into()) + .to_string(), + )) + } PowerSaving => Some(SettingValue::bool(true)), LegacyMiniGuide => Some(SettingValue::bool(false)), }; diff --git a/core/main/src/state/metrics_state.rs b/core/main/src/state/metrics_state.rs index ba239528c..f1fbd86f9 100644 --- a/core/main/src/state/metrics_state.rs +++ b/core/main/src/state/metrics_state.rs @@ -16,7 +16,7 @@ // use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, sync::{Arc, RwLock}, }; @@ -32,11 +32,12 @@ use ripple_sdk::{ }, gateway::rpc_gateway_api::rpc_value_result_to_string_result, manifest::device_manifest::DataGovernanceConfig, + observability::metrics_util::ApiStats, storage_property::StorageProperty, }, chrono::{DateTime, Utc}, extn::extn_client_message::ExtnResponse, - log::error, + log::{error, warn}, utils::error::RippleError, }; @@ -62,11 +63,14 @@ const PERSISTENT_STORAGE_ACCOUNT_DETAIL_TYPE: &str = "detailType"; const PERSISTENT_STORAGE_ACCOUNT_DEVICE_TYPE: &str = "deviceType"; const PERSISTENT_STORAGE_ACCOUNT_DEVICE_MANUFACTURER: &str = "deviceManufacturer"; +const API_STATS_MAP_SIZE_WARNING: usize = 10; + #[derive(Debug, Clone, Default)] pub struct MetricsState { pub start_time: DateTime, pub context: Arc>, operational_telemetry_listeners: Arc>>, + api_stats_map: Arc>>, } impl MetricsState { @@ -212,7 +216,7 @@ impl MetricsState { format!("{}{}", s, ".unset") } - pub async fn initialize(state: &PlatformState) { + pub async fn initialize(state: &mut PlatformState) { let metrics_percentage = state .get_device_manifest() .configuration @@ -507,4 +511,49 @@ impl MetricsState { } Self::send_context_update_request(&platform_state); } + + pub fn add_api_stats(&mut self, request_id: &str, api: &str) { + let mut api_stats_map = self.api_stats_map.write().unwrap(); + api_stats_map.insert(request_id.to_string(), ApiStats::new(api.into())); + + let size = api_stats_map.len(); + if size >= API_STATS_MAP_SIZE_WARNING { + warn!("add_api_stats: api_stats_map size warning: {}", size); + } + } + + pub fn remove_api_stats(&mut self, request_id: &str) { + let mut api_stats_map = self.api_stats_map.write().unwrap(); + api_stats_map.remove(request_id); + } + + pub fn update_api_stats_ref(&mut self, request_id: &str, stats_ref: Option) { + let mut api_stats_map = self.api_stats_map.write().unwrap(); + if let Some(stats) = api_stats_map.get_mut(request_id) { + stats.stats_ref = stats_ref; + } else { + println!( + "update_api_stats_ref: request_id not found: request_id={}", + request_id + ); + } + } + + pub fn update_api_stage(&mut self, request_id: &str, stage: &str) -> i64 { + let mut api_stats_map = self.api_stats_map.write().unwrap(); + if let Some(stats) = api_stats_map.get_mut(request_id) { + stats.stats.update_stage(stage) + } else { + error!( + "update_api_stage: request_id not found: request_id={}", + request_id + ); + -1 + } + } + + pub fn get_api_stats(&self, request_id: &str) -> Option { + let api_stats_map = self.api_stats_map.read().unwrap(); + api_stats_map.get(request_id).cloned() + } } diff --git a/core/main/src/state/platform_state.rs b/core/main/src/state/platform_state.rs index 5b504dbd0..634e5a249 100644 --- a/core/main/src/state/platform_state.rs +++ b/core/main/src/state/platform_state.rs @@ -125,6 +125,7 @@ impl PlatformState { let rule_engine = RuleEngine::build(&extn_manifest); let extn_sdks = extn_manifest.extn_sdks.clone(); let provider_registations = extn_manifest.provider_registrations.clone(); + let metrics_state = MetricsState::default(); Self { extn_manifest, cap_state: CapState::new(manifest.clone()), @@ -138,11 +139,16 @@ impl PlatformState { open_rpc_state: OpenRpcState::new(Some(exclusory), extn_sdks, provider_registations), router_state: RouterState::new(), data_governance: DataGovernanceState::default(), - metrics: MetricsState::default(), + metrics: metrics_state.clone(), device_session_id: DeviceSessionIdentifier::default(), ripple_cache: RippleCache::default(), version, - endpoint_state: EndpointBrokerState::new(broker_sender, rule_engine, client), + endpoint_state: EndpointBrokerState::new( + metrics_state, + broker_sender, + rule_engine, + client, + ), } } diff --git a/core/main/src/utils/router_utils.rs b/core/main/src/utils/router_utils.rs index 4eff51c29..bd50e5ac7 100644 --- a/core/main/src/utils/router_utils.rs +++ b/core/main/src/utils/router_utils.rs @@ -25,7 +25,9 @@ use ripple_sdk::{ utils::error::RippleError, }; -use crate::state::{platform_state::PlatformState, session_state::Session}; +use crate::state::{ + metrics_state::MetricsState, platform_state::PlatformState, session_state::Session, +}; pub async fn return_api_message_for_transport( session: Session, @@ -77,23 +79,25 @@ pub fn return_extn_response(msg: ApiMessage, extn_msg: ExtnMessage) { } } -pub fn get_rpc_header_with_status(request: &RpcRequest, status_code: i32) -> String { - format!( +pub fn get_rpc_header_with_status(request: &RpcRequest, status_code: i32) -> Option { + Some(format!( "{},{},{}", request.ctx.app_id, request.ctx.method, status_code - ) + )) } pub fn get_rpc_header(request: &RpcRequest) -> String { format!("{},{}", request.ctx.app_id, request.ctx.method) } -pub fn add_telemetry_status_code(original_ref: &str, status_code: &str) -> String { - format!("{},{}", original_ref, status_code) +pub fn add_telemetry_status_code(original_ref: &str, status_code: &str) -> Option { + Some(format!("{},{}", original_ref, status_code)) } -pub fn capture_stage(request: &mut RpcRequest, stage: &str) { - let duration = request.stats.update_stage(stage); +pub fn capture_stage(metrics_state: &MetricsState, request: &RpcRequest, stage: &str) { + let mut state = metrics_state.clone(); + let duration = state.update_api_stage(&request.ctx.request_id, stage); + trace!( "Firebolt processing stage: {},{},{},{}", request.ctx.app_id, diff --git a/core/sdk/src/api/gateway/rpc_gateway_api.rs b/core/sdk/src/api/gateway/rpc_gateway_api.rs index ef6624845..0dcacf4a4 100644 --- a/core/sdk/src/api/gateway/rpc_gateway_api.rs +++ b/core/sdk/src/api/gateway/rpc_gateway_api.rs @@ -15,7 +15,6 @@ // SPDX-License-Identifier: Apache-2.0 // -use chrono::Utc; use log::debug; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -23,7 +22,10 @@ use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; use crate::{ - api::firebolt::{fb_general::ListenRequest, fb_openrpc::FireboltOpenRpcMethod}, + api::{ + firebolt::{fb_general::ListenRequest, fb_openrpc::FireboltOpenRpcMethod}, + observability::metrics_util::ApiStats, + }, extn::extn_client_message::{ExtnPayload, ExtnPayloadProvider, ExtnRequest}, framework::ripple_contract::RippleContract, }; @@ -143,12 +145,6 @@ pub struct ApiMessage { pub stats: Option, } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct ApiStats { - pub stats_ref: String, - pub stats: RpcStats, -} - /// Holds a message in jsonrpc protocol format and the protocol that it should be converted into /// The protocol of the request is passed in context and then when /// a response is being written, that protocol is passed here so the transport layers know which protocol @@ -388,56 +384,11 @@ impl crate::Mockable for JsonRpcApiResponse { } } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct RpcStats { - pub start_time: i64, - pub last_stage: i64, - stage_durations: String, -} - -impl Default for RpcStats { - fn default() -> Self { - Self { - start_time: Utc::now().timestamp_millis(), - last_stage: 0, - stage_durations: String::new(), - } - } -} - -impl RpcStats { - pub fn update_stage(&mut self, stage: &str) -> i64 { - let current_time = Utc::now().timestamp_millis(); - let mut last_stage = self.last_stage; - if last_stage == 0 { - last_stage = self.start_time; - } - self.last_stage = current_time; - let duration = current_time - last_stage; - if self.stage_durations.is_empty() { - self.stage_durations = format!("{}={}", stage, duration); - } else { - self.stage_durations = format!("{},{}={}", self.stage_durations, stage, duration); - } - duration - } - - pub fn get_total_time(&self) -> i64 { - let current_time = Utc::now().timestamp_millis(); - current_time - self.start_time - } - - pub fn get_stage_durations(&self) -> String { - self.stage_durations.clone() - } -} - #[derive(Clone, PartialEq, Debug, Serialize, Deserialize, Default)] pub struct RpcRequest { pub method: String, pub params_json: String, pub ctx: CallContext, - pub stats: RpcStats, } impl RpcRequest { pub fn internal(method: &str) -> Self { @@ -446,7 +397,6 @@ impl RpcRequest { params_json: Self::prepend_ctx(None, &ctx), ctx, method: method.to_owned(), - stats: RpcStats::default(), } } pub fn with_params(mut self, params: Option) -> Self { @@ -477,7 +427,6 @@ impl crate::Mockable for RpcRequest { method: "module.method".to_owned(), params_json: "{}".to_owned(), ctx: CallContext::mock(), - stats: RpcStats::default(), } } } @@ -491,7 +440,6 @@ impl RpcRequest { method, params_json, ctx, - stats: RpcStats::default(), } } /// Serializes a parameter so that the given ctx becomes the first list in a json array of @@ -600,7 +548,6 @@ impl RpcRequest { params_json: Self::prepend_ctx(Some(request), &ctx), ctx, method, - stats: RpcStats::default(), } } } @@ -878,7 +825,6 @@ mod tests { method: "some_method".to_string(), params_json: r#"{"key": "value"}"#.to_string(), ctx: call_context, - stats: RpcStats::default(), }; let contract_type: RippleContract = RippleContract::Rpc; test_extn_payload_provider(rpc_request, contract_type); diff --git a/core/sdk/src/api/observability/metrics_util.rs b/core/sdk/src/api/observability/metrics_util.rs index 45e7baef2..ddf680134 100644 --- a/core/sdk/src/api/observability/metrics_util.rs +++ b/core/sdk/src/api/observability/metrics_util.rs @@ -10,8 +10,71 @@ use crate::{ utils::error::RippleError, }; +use chrono::Utc; #[cfg(not(test))] use log::{debug, error}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct RpcStats { + pub start_time: i64, + pub last_stage: i64, + stage_durations: String, +} + +impl Default for RpcStats { + fn default() -> Self { + Self { + start_time: Utc::now().timestamp_millis(), + last_stage: 0, + stage_durations: String::new(), + } + } +} + +impl RpcStats { + pub fn update_stage(&mut self, stage: &str) -> i64 { + let current_time = Utc::now().timestamp_millis(); + let mut last_stage = self.last_stage; + if last_stage == 0 { + last_stage = self.start_time; + } + self.last_stage = current_time; + let duration = current_time - last_stage; + if self.stage_durations.is_empty() { + self.stage_durations = format!("{}={}", stage, duration); + } else { + self.stage_durations = format!("{},{}={}", self.stage_durations, stage, duration); + } + duration + } + + pub fn get_total_time(&self) -> i64 { + let current_time = Utc::now().timestamp_millis(); + current_time - self.start_time + } + + pub fn get_stage_durations(&self) -> String { + self.stage_durations.clone() + } +} + +#[derive(Clone, PartialEq, Default, Debug, Serialize, Deserialize)] +pub struct ApiStats { + pub api: String, + pub stats_ref: Option, + pub stats: RpcStats, +} + +impl ApiStats { + pub fn new(api: String) -> Self { + Self { + api, + stats_ref: None, + stats: RpcStats::default(), + } + } +} #[cfg(test)] use {println as debug, println as error}; diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index 6e1d24eb3..648f108bd 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -860,7 +860,7 @@ pub mod tests { device_info_request::DeviceInfoRequest, device_request::{AccountToken, DeviceRequest}, }, - gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, + gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest}, session::SessionAdjective, }, extn::{ @@ -1247,6 +1247,7 @@ pub mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_main_internal_request(cap: ExtnId, exp_response: &str) { // test case: main <=> main + let (mock_sender, mock_rx) = ExtnSender::mock_with_params(cap.clone(), Vec::new(), Vec::new(), Some(HashMap::new())); let mut extn_client = ExtnClient::new(mock_rx.clone(), mock_sender.clone()); @@ -1267,11 +1268,11 @@ pub mod tests { true, ); let new_ctx = ctx.clone(); + let rpc_request = RpcRequest { ctx: new_ctx.clone(), method: "some.method".into(), params_json: RpcRequest::prepend_ctx(None, &new_ctx), - stats: RpcStats::default(), }; tokio::spawn(async move { diff --git a/examples/rpc_extn/src/rpc/legacy_jsonrpsee_extn.rs b/examples/rpc_extn/src/rpc/legacy_jsonrpsee_extn.rs index 1f146ed22..e748ccd54 100644 --- a/examples/rpc_extn/src/rpc/legacy_jsonrpsee_extn.rs +++ b/examples/rpc_extn/src/rpc/legacy_jsonrpsee_extn.rs @@ -17,7 +17,7 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use ripple_sdk::{ - api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, + api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest}, async_trait::async_trait, extn::{client::extn_client::ExtnClient, extn_client_message::ExtnResponse}, tokio::runtime::Runtime, @@ -57,7 +57,6 @@ impl LegacyServer for LegacyImpl { let rpc_request = RpcRequest { ctx: new_ctx.clone(), method: "device.make".into(), - stats: RpcStats::default(), params_json: RpcRequest::prepend_ctx(Some(serde_json::Value::Null), &new_ctx), }; if let Ok(Ok(ExtnResponse::Value(v))) = self @@ -80,7 +79,6 @@ impl LegacyServer for LegacyImpl { let rpc_request = RpcRequest { ctx: new_ctx.clone(), method: "device.model".into(), - stats: RpcStats::default(), params_json: RpcRequest::prepend_ctx(Some(serde_json::Value::Null), &new_ctx), }; if let Ok(msg) = client.request(rpc_request).await {