Skip to content

Commit

Permalink
Add New Telemetry markers for each stage (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
pahearn73 authored Nov 26, 2024
1 parent 5b82547 commit 1099087
Show file tree
Hide file tree
Showing 21 changed files with 298 additions and 153 deletions.
4 changes: 2 additions & 2 deletions core/main/src/bootstrap/extn/load_session_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ impl Bootstep<BootstrapState> for LoadDistributorValuesStep {
}

async fn setup(&self, s: BootstrapState) -> RippleResponse {
let ps = s.platform_state.clone();
let mut ps = s.platform_state.clone();
tokio::spawn(async move {
MetricsState::initialize(&ps).await;
MetricsState::initialize(&mut ps).await;
});

MainContextProcessor::remove_expired_and_inactive_entries(&s.platform_state);
Expand Down
12 changes: 7 additions & 5 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ impl BrokerUtils {
}

pub async fn process_internal_main_request<'a>(
state: &'a PlatformState,
state: &mut PlatformState,
method: &'a str,
params: Option<Value>,
) -> RpcResult<Value> {
match state
.internal_rpc_request(&RpcRequest::internal(method).with_params(params))
.await
{
let rpc_request = RpcRequest::internal(method).with_params(params);
state
.metrics
.add_api_stats(&rpc_request.ctx.request_id, method);

match state.internal_rpc_request(&rpc_request).await {
Ok(res) => match res.as_value() {
Some(v) => Ok(v),
None => Err(JsonRpcApiError::default()
Expand Down
52 changes: 37 additions & 15 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use ripple_sdk::{
api::{
firebolt::fb_capabilities::JSON_RPC_STANDARD_ERROR_INVALID_PARAMS,
gateway::rpc_gateway_api::{
ApiMessage, ApiProtocol, ApiStats, CallContext, JsonRpcApiRequest, JsonRpcApiResponse,
RpcRequest,
ApiMessage, ApiProtocol, CallContext, JsonRpcApiRequest, JsonRpcApiResponse, RpcRequest,
},
session::AccountSession,
},
Expand All @@ -47,9 +46,9 @@ use crate::{
broker::broker_utils::BrokerUtils,
firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError},
service::extn::ripple_client::RippleClient,
state::platform_state::PlatformState,
state::{metrics_state::MetricsState, platform_state::PlatformState},
utils::router_utils::{
add_telemetry_status_code, get_rpc_header, return_api_message_for_transport,
add_telemetry_status_code, capture_stage, get_rpc_header, return_api_message_for_transport,
return_extn_response,
},
};
Expand Down Expand Up @@ -308,6 +307,7 @@ pub struct EndpointBrokerState {
rule_engine: RuleEngine,
cleaner_list: Arc<RwLock<Vec<BrokerCleaner>>>,
reconnect_tx: Sender<BrokerConnectRequest>,
metrics_state: MetricsState,
}
impl Default for EndpointBrokerState {
fn default() -> Self {
Expand All @@ -319,12 +319,14 @@ impl Default for EndpointBrokerState {
rule_engine: RuleEngine::default(),
cleaner_list: Arc::new(RwLock::new(Vec::new())),
reconnect_tx: mpsc::channel(2).0,
metrics_state: MetricsState::default(),
}
}
}

impl EndpointBrokerState {
pub fn new(
metrics_state: MetricsState,
tx: Sender<BrokerOutput>,
rule_engine: RuleEngine,
ripple_client: RippleClient,
Expand All @@ -338,6 +340,7 @@ impl EndpointBrokerState {
rule_engine,
cleaner_list: Arc::new(RwLock::new(Vec::new())),
reconnect_tx,
metrics_state,
};
state.reconnect_thread(rec_tr, ripple_client);
state
Expand Down Expand Up @@ -532,6 +535,9 @@ impl EndpointBrokerState {
data.result = Some(jv);
data.id = Some(id);
let output = BrokerOutput { data };

let metrics_state = self.metrics_state.clone();
capture_stage(&metrics_state, &rpc_request, "static_rule_request");
tokio::spawn(async move { callback.sender.send(output).await });
}

Expand Down Expand Up @@ -581,6 +587,8 @@ impl EndpointBrokerState {
let broker = broker_sender.unwrap();
let (_, updated_request) =
self.update_request(&rpc_request, rule, extn_message, requestor_callback);
let metrics_state = self.metrics_state.clone();
capture_stage(&metrics_state, &rpc_request, "broker_request");
tokio::spawn(async move {
if let Err(e) = broker.send(updated_request.clone()).await {
callback.send_error(updated_request, e).await
Expand Down Expand Up @@ -721,7 +729,7 @@ pub trait EndpointBroker {
pub struct BrokerOutputForwarder;

impl BrokerOutputForwarder {
pub fn start_forwarder(platform_state: PlatformState, mut rx: Receiver<BrokerOutput>) {
pub fn start_forwarder(mut platform_state: PlatformState, mut rx: Receiver<BrokerOutput>) {
// set up the event utility
let event_utility = Arc::new(EventManagementUtility::new());
event_utility.register_custom_functions();
Expand Down Expand Up @@ -808,7 +816,7 @@ impl BrokerOutputForwarder {
let message = ApiMessage::new(
protocol,
serde_json::to_string(&response).unwrap(),
request_id.to_string(),
rpc_request.ctx.request_id.clone(),
);

if let Some(session) = platform_state_c
Expand Down Expand Up @@ -875,7 +883,7 @@ impl BrokerOutputForwarder {
let mut message = ApiMessage::new(
rpc_request.ctx.protocol.clone(),
serde_json::to_string(&response).unwrap(),
request_id.to_string(),
rpc_request.ctx.request_id.clone(),
);
let mut status_code: i64 = 1;
if let Some(e) = &response.error {
Expand All @@ -886,13 +894,26 @@ impl BrokerOutputForwarder {
}
}

message.stats = Some(ApiStats {
stats_ref: add_telemetry_status_code(
platform_state.metrics.update_api_stats_ref(
&rpc_request.ctx.request_id,
add_telemetry_status_code(
&tm_str,
status_code.to_string().as_str(),
),
stats: rpc_request.stats,
});
);

if let Some(api_stats) = platform_state
.metrics
.get_api_stats(&rpc_request.ctx.request_id)
{
message.stats = Some(api_stats);

if rpc_request.ctx.app_id.eq_ignore_ascii_case("internal") {
platform_state
.metrics
.remove_api_stats(&rpc_request.ctx.request_id);
}
}

// Step 3: Handle Non Extension
if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) {
Expand Down Expand Up @@ -948,10 +969,10 @@ impl BrokerOutputForwarder {
let session_id = rpc_request.ctx.get_id();
let request_id = rpc_request.ctx.call_id;
let protocol = rpc_request.ctx.protocol.clone();
let platform_state_c = &platform_state;
let mut platform_state_c = platform_state.clone();

if let Ok(res) =
BrokerUtils::process_internal_main_request(platform_state_c, method.as_str(), None)
BrokerUtils::process_internal_main_request(&mut platform_state_c, method.as_str(), None)
.await
{
response.result = Some(serde_json::to_value(res.clone()).unwrap());
Expand All @@ -961,7 +982,7 @@ impl BrokerOutputForwarder {
let message = ApiMessage::new(
protocol,
serde_json::to_string(&response).unwrap(),
request_id.to_string(),
rpc_request.ctx.request_id.clone(),
);

if let Some(session) = platform_state_c
Expand Down Expand Up @@ -1198,7 +1219,7 @@ mod tests {
endpoint_broker::tests::RippleClient,
rules_engine::{Rule, RuleEngine, RuleSet, RuleTransform},
},
state::bootstrap_state::ChannelsState,
state::{bootstrap_state::ChannelsState, metrics_state::MetricsState},
};

use super::EndpointBrokerState;
Expand All @@ -1208,6 +1229,7 @@ mod tests {
let (tx, _) = channel(2);
let client = RippleClient::new(ChannelsState::new());
let state = EndpointBrokerState::new(
MetricsState::default(),
tx,
RuleEngine {
rules: RuleSet::default(),
Expand Down
12 changes: 8 additions & 4 deletions core/main/src/broker/event_management_utility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{collections::HashMap, pin::Pin};

use futures::Future;
use ripple_sdk::{
api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats},
api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest},
extn::extn_client_message::ExtnResponse,
log::info,
utils::error::RippleError,
Expand Down Expand Up @@ -58,8 +58,9 @@ impl EventManagementUtility {
self.register_function(
"AdvertisingPolicyEventDecorator".to_string(),
Arc::new(|state, ctx, value| {
let s = state.clone();
Box::pin(EventManagementUtility::advertising_policy_event_decorator(
state, ctx, value,
s, ctx, value,
))
}),
);
Expand All @@ -73,7 +74,7 @@ impl EventManagementUtility {
}

pub async fn advertising_policy_event_decorator(
platform_state: PlatformState,
mut platform_state: PlatformState,
ctx: CallContext,
value: Option<Value>,
) -> Result<Option<Value>, RippleError> {
Expand All @@ -85,10 +86,13 @@ impl EventManagementUtility {
let rpc_request = RpcRequest {
ctx: new_ctx.clone(),
method: "advertising.policy".into(),
stats: RpcStats::default(),
params_json: RpcRequest::prepend_ctx(None, &new_ctx),
};

platform_state
.metrics
.add_api_stats(&ctx.request_id, "advertising.policy");

let resp = platform_state
.get_client()
.get_extn_client()
Expand Down
42 changes: 30 additions & 12 deletions core/main/src/firebolt/firebolt_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use ripple_sdk::{
},
gateway::{
rpc_error::RpcError,
rpc_gateway_api::{ApiMessage, ApiProtocol, ApiStats, RpcRequest},
rpc_gateway_api::{ApiMessage, ApiProtocol, RpcRequest},
},
observability::metrics_util::ApiStats,
},
extn::extn_client_message::ExtnMessage,
log::{debug, error, info, trace, warn},
Expand Down Expand Up @@ -178,7 +179,7 @@ impl FireboltGateway {
}
}
}
let platform_state = self.state.platform_state.clone();
let mut platform_state = self.state.platform_state.clone();

/*
* The reason for spawning a new thread is that when request-1 comes, and it waits for
Expand All @@ -189,6 +190,11 @@ impl FireboltGateway {
*/
let mut request_c = request.clone();
request_c.method = FireboltOpenRpcMethod::name_with_lowercase_module(&request.method);

platform_state
.metrics
.add_api_stats(&request_c.ctx.request_id, &request_c.method);

let metrics_timer = TelemetryBuilder::start_firebolt_metrics_timer(
&platform_state.get_client().get_extn_client(),
request_c.method.clone(),
Expand All @@ -205,7 +211,7 @@ impl FireboltGateway {
let open_rpc_state = self.state.platform_state.open_rpc_state.clone();

tokio::spawn(async move {
capture_stage(&mut request_c, "context_ready");
capture_stage(&platform_state.metrics, &request_c, "context_ready");
// Validate incoming request parameters.
if let Err(error_string) = validate_request(open_rpc_state, &request_c, fail_open) {
TelemetryBuilder::stop_and_send_firebolt_metrics_timer(
Expand All @@ -221,18 +227,20 @@ impl FireboltGateway {
data: None,
};

send_json_rpc_error(&platform_state, &request, json_rpc_error).await;
send_json_rpc_error(&mut platform_state, &request, json_rpc_error).await;
return;
}
capture_stage(&mut request_c, "openrpc_val");

capture_stage(&platform_state.metrics, &request_c, "openrpc_val");

let result = if extn_request {
// extn protocol means its an internal Ripple request skip permissions.
Ok(())
} else {
FireboltGatekeeper::gate(platform_state.clone(), request_c.clone()).await
};
capture_stage(&mut request_c, "permission");

capture_stage(&platform_state.metrics, &request_c, "permission");

match result {
Ok(_) => {
Expand Down Expand Up @@ -298,7 +306,7 @@ impl FireboltGateway {
data: None,
};

send_json_rpc_error(&platform_state, &request, json_rpc_error).await;
send_json_rpc_error(&mut platform_state, &request, json_rpc_error).await;
}
}
});
Expand Down Expand Up @@ -373,7 +381,7 @@ fn validate_request(
}

async fn send_json_rpc_error(
platform_state: &PlatformState,
platform_state: &mut PlatformState,
request: &RpcRequest,
json_rpc_error: JsonRpcError,
) {
Expand All @@ -396,10 +404,20 @@ async fn send_json_rpc_error(
request.clone().ctx.request_id,
);

api_message.stats = Some(ApiStats {
stats_ref: get_rpc_header_with_status(request, status_code),
stats: request.stats.clone(),
});
if let Some(api_stats) = platform_state
.metrics
.get_api_stats(&request.ctx.request_id)
{
api_message.stats = Some(ApiStats {
api: request.method.clone(),
stats_ref: get_rpc_header_with_status(request, status_code),
stats: api_stats.stats.clone(),
});
}
platform_state.metrics.update_api_stats_ref(
&request.ctx.request_id,
get_rpc_header_with_status(request, status_code),
);

match session.get_transport() {
EffectiveTransport::Websocket => {
Expand Down
14 changes: 10 additions & 4 deletions core/main/src/firebolt/firebolt_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,28 @@ impl FireboltWs {
}

let (mut sender, mut receiver) = ws_stream.split();
let mut platform_state = state.clone();
tokio::spawn(async move {
while let Some(rs) = resp_rx.recv().await {
let send_result = sender.send(Message::Text(rs.jsonrpc_msg.clone())).await;
match send_result {
Ok(_) => {
if let Some(stats) = rs.stats {
platform_state
.metrics
.update_api_stage(&rs.request_id, "response");

if let Some(stats) = platform_state.metrics.get_api_stats(&rs.request_id) {
info!(
"Sending Firebolt response: {},{}",
"Sending Firebolt response: {:?},{}",
stats.stats_ref,
stats.stats.get_total_time()
);
debug!(
"Full Firebolt Split: {},{}",
"Full Firebolt Split: {:?},{}",
stats.stats_ref,
stats.stats.get_stage_durations()
)
);
platform_state.metrics.remove_api_stats(&rs.request_id);
}
info!(
"Sent Firebolt response cid={} msg={}",
Expand Down
Loading

0 comments on commit 1099087

Please sign in to comment.