From 2be6c60179192e3cfce5bcd58d455ada3d75a184 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 26 Nov 2024 12:14:20 +0100 Subject: [PATCH] Fix --- .../services/buffer/envelope_buffer/mod.rs | 115 ++++++++++++------ relay-server/src/services/buffer/mod.rs | 2 +- 2 files changed, 79 insertions(+), 38 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 779551c125..bca28533b7 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -64,7 +64,7 @@ impl PolymorphicEnvelopeBuffer { Self::Sqlite(buffer) } else { relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer"); - let buffer = EnvelopeBuffer::::new(memory_checker); + let buffer = EnvelopeBuffer::::new(partition_id, memory_checker); Self::InMemory(buffer) }; @@ -83,38 +83,57 @@ impl PolymorphicEnvelopeBuffer { pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { relay_statsd::metric!( histogram(RelayHistograms::BufferEnvelopeBodySize) = - envelope.items().map(Item::len).sum::() as u64 + envelope.items().map(Item::len).sum::() as u64, + partition_id = self.partition_tag() ); - relay_statsd::metric!(timer(RelayTimers::BufferPush), { - match self { - Self::Sqlite(buffer) => buffer.push(envelope).await, - Self::InMemory(buffer) => buffer.push(envelope).await, - }?; - }); - relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1); + relay_statsd::metric!( + timer(RelayTimers::BufferPush), + partition_id = self.partition_tag(), + { + match self { + Self::Sqlite(buffer) => buffer.push(envelope).await, + Self::InMemory(buffer) => buffer.push(envelope).await, + }?; + } + ); + relay_statsd::metric!( + counter(RelayCounters::BufferEnvelopesWritten) += 1, + partition_id = self.partition_tag() + ); Ok(()) } /// Returns a reference to the next-in-line envelope. pub async fn peek(&mut self) -> Result { - relay_statsd::metric!(timer(RelayTimers::BufferPeek), { - match self { - Self::Sqlite(buffer) => buffer.peek().await, - Self::InMemory(buffer) => buffer.peek().await, + relay_statsd::metric!( + timer(RelayTimers::BufferPeek), + partition_id = self.partition_tag(), + { + match self { + Self::Sqlite(buffer) => buffer.peek().await, + Self::InMemory(buffer) => buffer.peek().await, + } } - }) + ) } /// Pops the next-in-line envelope. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - let envelope = relay_statsd::metric!(timer(RelayTimers::BufferPop), { - match self { - Self::Sqlite(buffer) => buffer.pop().await, - Self::InMemory(buffer) => buffer.pop().await, - }? - }); - relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1); + let envelope = relay_statsd::metric!( + timer(RelayTimers::BufferPop), + partition_id = self.partition_tag(), + { + match self { + Self::Sqlite(buffer) => buffer.pop().await, + Self::InMemory(buffer) => buffer.pop().await, + }? + } + ); + relay_statsd::metric!( + counter(RelayCounters::BufferEnvelopesRead) += 1, + partition_id = self.partition_tag() + ); Ok(envelope) } @@ -167,6 +186,14 @@ impl PolymorphicEnvelopeBuffer { true } + + /// Returns the partition tag for this [`PolymorphicEnvelopeBuffer`]. + fn partition_tag(&self) -> &str { + match self { + PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag, + PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag, + } + } } /// Error that occurs while interacting with the envelope buffer. @@ -215,17 +242,20 @@ struct EnvelopeBuffer { /// This boolean is just used for tagging the metric that tracks the total count of envelopes /// in the buffer. total_count_initialized: bool, + /// The tag value of this partition which is used for reporting purposes. + partition_tag: String, } impl EnvelopeBuffer { /// Creates an empty memory-based buffer. - pub fn new(memory_checker: MemoryChecker) -> Self { + pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self { Self { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: MemoryStackProvider::new(memory_checker), total_count: Arc::new(AtomicI64::new(0)), total_count_initialized: false, + partition_tag: partition_id.to_string(), } } } @@ -240,6 +270,7 @@ impl EnvelopeBuffer { stack_provider: SqliteStackProvider::new(partition_id, config).await?, total_count: Arc::new(AtomicI64::new(0)), total_count_initialized: false, + partition_tag: partition_id.to_string(), }) } } @@ -251,12 +282,16 @@ where /// Initializes the [`EnvelopeBuffer`] given the initialization state from the /// [`StackProvider`]. pub async fn initialize(&mut self) { - relay_statsd::metric!(timer(RelayTimers::BufferInitialization), { - let initialization_state = self.stack_provider.initialize().await; - self.load_stacks(initialization_state.project_key_pairs) - .await; - self.load_store_total_count().await; - }); + relay_statsd::metric!( + timer(RelayTimers::BufferInitialization), + partition_id = &self.partition_tag, + { + let initialization_state = self.stack_provider.initialize().await; + self.load_stacks(initialization_state.project_key_pairs) + .await; + self.load_store_total_count().await; + } + ); } /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack. @@ -342,7 +377,10 @@ where match last_received_at { None => { - relay_statsd::metric!(counter(RelayCounters::BufferEnvelopeStacksPopped) += 1); + relay_statsd::metric!( + counter(RelayCounters::BufferEnvelopeStacksPopped) += 1, + partition_id = &self.partition_tag + ); self.pop_stack(project_key_pair); } Some(last_received_at) => { @@ -456,7 +494,8 @@ where .insert(project_key_pair); } relay_statsd::metric!( - gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 + gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64, + partition_id = &self.partition_tag ); Ok(()) @@ -473,7 +512,8 @@ where self.priority_queue.remove(&project_key_pair); relay_statsd::metric!( - gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 + gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64, + partition_id = &self.partition_tag ); } @@ -523,7 +563,8 @@ where relay_statsd::metric!( histogram(RelayHistograms::BufferEnvelopesCount) = total_count, initialized = initialized, - stack_type = self.stack_provider.stack_type() + stack_type = self.stack_provider.stack_type(), + partition_id = &self.partition_tag ); } } @@ -728,7 +769,7 @@ mod tests { #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::::new(0, mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -796,7 +837,7 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::::new(0, mock_memory_checker()); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -823,7 +864,7 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::::new(0, mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -898,7 +939,7 @@ mod tests { assert_ne!(project_key_pair1, project_key_pair2); - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::::new(0, mock_memory_checker()); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -930,7 +971,7 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::::new(0, mock_memory_checker()); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 477c9417c7..5f212d1dd4 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -570,7 +570,7 @@ impl Service for EnvelopeBufferService { relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "handle_message", partition_id = &partition_tag); let message_name = message.name(); relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, partition_id = &partition_tag, { - Self::handle_message(&mut buffer, &services, message).await; + Self::handle_message(&partition_tag, &mut buffer, &services, message).await; sleep = Duration::ZERO; }); }