Skip to content

Commit

Permalink
feat: tracing for rust components
Browse files Browse the repository at this point in the history
  • Loading branch information
AH-dark committed Apr 3, 2024
1 parent 07d5192 commit 2677355
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 72 deletions.
17 changes: 0 additions & 17 deletions .idea/runConfigurations/Compose.xml

This file was deleted.

105 changes: 71 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ members = [
]

[workspace.dependencies]
teloxide = { version = "0.12", features = ["macros", "redis-storage"] }
teloxide = { version = "0.12", features = ["macros", "redis-storage"], git = "https://github.com/AH-dark/teloxide.git", branch = "master" }
log = "0.4"
pretty_env_logger = "0.5"
tokio = "1"
Expand Down
20 changes: 13 additions & 7 deletions rust-common/src/bot/channel.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::sync::Arc;

use futures::{FutureExt, StreamExt};
use lapin::options::BasicCancelOptions;
use lapin::protocol::constants::REPLY_SUCCESS;
use opentelemetry::trace::Span;
use teloxide::prelude::Update;
use teloxide::stop::{mk_stop_token, StopFlag, StopToken};
use teloxide::update_listeners::{AsUpdateStream, UpdateListener};

use crate::bot::utils::extract_span_from_delivery;
use crate::settings::Settings;

pub struct MqUpdateListener {
Expand All @@ -23,17 +23,23 @@ impl<'a> AsUpdateStream<'a> for MqUpdateListener {
Box<dyn futures::Stream<Item = Result<Update, Self::StreamErr>> + Unpin + Send + 'a>;

fn as_stream(&'a mut self) -> Self::Stream {
let flag = Arc::new(&self.flag);

let flag = self.flag.clone();
let stream = self.consumer.clone().filter_map(move |delivery| {
if flag.is_stopped() {
return async { None }.boxed();
assert!(!flag.is_stopped(), "Update listener stopped");
if self.consumer.state() != lapin::ConsumerState::Active
&& self.consumer.state() != lapin::ConsumerState::ActiveWithDelegate
{
panic!("Consumer state is not Active.");
}

async move {
match delivery {
Ok(delivery) => match serde_json::from_slice::<Update>(&delivery.data) {
Ok(update) => Some(Ok(update)),
Ok(mut update) => {
let cx = extract_span_from_delivery(&delivery);
update.cx = Some(cx);
Some(Ok(update))
}
Err(e) => {
log::error!("Error deserializing message: {}", e);
None
Expand Down
1 change: 1 addition & 0 deletions rust-common/src/bot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::settings::TelegramBot;

pub mod channel;
pub mod state;
mod utils;

pub fn new_bot(settings: &TelegramBot) -> Bot {
let api_url = settings
Expand Down
46 changes: 46 additions & 0 deletions rust-common/src/bot/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::collections::HashMap;

use lapin::types::AMQPValue;
use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer};
use opentelemetry::{global, Context};

pub fn extract_span_from_delivery(delivery: &lapin::message::Delivery) -> Context {
let headers = delivery
.properties
.headers()
.as_ref()
.unwrap_or(&Default::default())
.inner()
.clone();

let parent_cx = global::get_text_map_propagator(|propagator| {
let trace_data = match headers.get("x-trace") {
Some(AMQPValue::FieldTable(t)) => t.clone(),
_ => Default::default(),
};
let mut trace_data_map = HashMap::new();
for x in &trace_data {
let s = match x.1 {
AMQPValue::ShortString(s) => s.to_string(),
AMQPValue::LongString(s) => s.to_string(),
_ => {
continue;
}
};

trace_data_map.insert(x.0.to_string(), s);
}

propagator.extract(&trace_data_map)
});

let tracer = global::tracer("pegasus/rust-common/bot/channel");
let span = tracer
.span_builder("UpdateListener::as_stream")
.with_kind(SpanKind::Consumer)
.start_with_context(&tracer, &parent_cx);

let cx = parent_cx.with_span(span);

cx
}
Loading

0 comments on commit 2677355

Please sign in to comment.