Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Nov 26, 2024
1 parent 5ebb0f6 commit 2be6c60
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 38 deletions.
115 changes: 78 additions & 37 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl PolymorphicEnvelopeBuffer {
Self::Sqlite(buffer)
} else {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(memory_checker);
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
Self::InMemory(buffer)
};

Expand All @@ -83,38 +83,57 @@ impl PolymorphicEnvelopeBuffer {
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
relay_statsd::metric!(
histogram(RelayHistograms::BufferEnvelopeBodySize) =
envelope.items().map(Item::len).sum::<usize>() as u64
envelope.items().map(Item::len).sum::<usize>() as u64,
partition_id = self.partition_tag()
);

relay_statsd::metric!(timer(RelayTimers::BufferPush), {
match self {
Self::Sqlite(buffer) => buffer.push(envelope).await,
Self::InMemory(buffer) => buffer.push(envelope).await,
}?;
});
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1);
relay_statsd::metric!(
timer(RelayTimers::BufferPush),
partition_id = self.partition_tag(),
{
match self {
Self::Sqlite(buffer) => buffer.push(envelope).await,
Self::InMemory(buffer) => buffer.push(envelope).await,
}?;
}
);
relay_statsd::metric!(
counter(RelayCounters::BufferEnvelopesWritten) += 1,
partition_id = self.partition_tag()
);
Ok(())
}

/// Returns a reference to the next-in-line envelope.
pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
relay_statsd::metric!(timer(RelayTimers::BufferPeek), {
match self {
Self::Sqlite(buffer) => buffer.peek().await,
Self::InMemory(buffer) => buffer.peek().await,
relay_statsd::metric!(
timer(RelayTimers::BufferPeek),
partition_id = self.partition_tag(),
{
match self {
Self::Sqlite(buffer) => buffer.peek().await,
Self::InMemory(buffer) => buffer.peek().await,
}
}
})
)
}

/// Pops the next-in-line envelope.
pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
let envelope = relay_statsd::metric!(timer(RelayTimers::BufferPop), {
match self {
Self::Sqlite(buffer) => buffer.pop().await,
Self::InMemory(buffer) => buffer.pop().await,
}?
});
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1);
let envelope = relay_statsd::metric!(
timer(RelayTimers::BufferPop),
partition_id = self.partition_tag(),
{
match self {
Self::Sqlite(buffer) => buffer.pop().await,
Self::InMemory(buffer) => buffer.pop().await,
}?
}
);
relay_statsd::metric!(
counter(RelayCounters::BufferEnvelopesRead) += 1,
partition_id = self.partition_tag()
);
Ok(envelope)
}

Expand Down Expand Up @@ -167,6 +186,14 @@ impl PolymorphicEnvelopeBuffer {

true
}

/// Returns the partition tag for this [`PolymorphicEnvelopeBuffer`].
fn partition_tag(&self) -> &str {
match self {
PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
}
}
}

/// Error that occurs while interacting with the envelope buffer.
Expand Down Expand Up @@ -215,17 +242,20 @@ struct EnvelopeBuffer<P: StackProvider> {
/// This boolean is just used for tagging the metric that tracks the total count of envelopes
/// in the buffer.
total_count_initialized: bool,
/// The tag value of this partition which is used for reporting purposes.
partition_tag: String,
}

impl EnvelopeBuffer<MemoryStackProvider> {
/// Creates an empty memory-based buffer.
pub fn new(memory_checker: MemoryChecker) -> Self {
pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: MemoryStackProvider::new(memory_checker),
total_count: Arc::new(AtomicI64::new(0)),
total_count_initialized: false,
partition_tag: partition_id.to_string(),
}
}
}
Expand All @@ -240,6 +270,7 @@ impl EnvelopeBuffer<SqliteStackProvider> {
stack_provider: SqliteStackProvider::new(partition_id, config).await?,
total_count: Arc::new(AtomicI64::new(0)),
total_count_initialized: false,
partition_tag: partition_id.to_string(),
})
}
}
Expand All @@ -251,12 +282,16 @@ where
/// Initializes the [`EnvelopeBuffer`] given the initialization state from the
/// [`StackProvider`].
pub async fn initialize(&mut self) {
relay_statsd::metric!(timer(RelayTimers::BufferInitialization), {
let initialization_state = self.stack_provider.initialize().await;
self.load_stacks(initialization_state.project_key_pairs)
.await;
self.load_store_total_count().await;
});
relay_statsd::metric!(
timer(RelayTimers::BufferInitialization),
partition_id = &self.partition_tag,
{
let initialization_state = self.stack_provider.initialize().await;
self.load_stacks(initialization_state.project_key_pairs)
.await;
self.load_store_total_count().await;
}
);
}

/// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack.
Expand Down Expand Up @@ -342,7 +377,10 @@ where

match last_received_at {
None => {
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopeStacksPopped) += 1);
relay_statsd::metric!(
counter(RelayCounters::BufferEnvelopeStacksPopped) += 1,
partition_id = &self.partition_tag
);
self.pop_stack(project_key_pair);
}
Some(last_received_at) => {
Expand Down Expand Up @@ -456,7 +494,8 @@ where
.insert(project_key_pair);
}
relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
partition_id = &self.partition_tag
);

Ok(())
Expand All @@ -473,7 +512,8 @@ where
self.priority_queue.remove(&project_key_pair);

relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
partition_id = &self.partition_tag
);
}

Expand Down Expand Up @@ -523,7 +563,8 @@ where
relay_statsd::metric!(
histogram(RelayHistograms::BufferEnvelopesCount) = total_count,
initialized = initialized,
stack_type = self.stack_provider.stack_type()
stack_type = self.stack_provider.stack_type(),
partition_id = &self.partition_tag
);
}
}
Expand Down Expand Up @@ -728,7 +769,7 @@ mod tests {

#[tokio::test]
async fn test_insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
Expand Down Expand Up @@ -796,7 +837,7 @@ mod tests {

#[tokio::test]
async fn test_project_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

Expand All @@ -823,7 +864,7 @@ mod tests {

#[tokio::test]
async fn test_sampling_projects() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
Expand Down Expand Up @@ -898,7 +939,7 @@ mod tests {

assert_ne!(project_key_pair1, project_key_pair2);

let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
buffer
.push(new_envelope(project_key1, Some(project_key2), None))
.await
Expand Down Expand Up @@ -930,7 +971,7 @@ mod tests {

#[tokio::test]
async fn test_last_peek_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());

let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_1 = EventId::new();
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ impl Service for EnvelopeBufferService {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "handle_message", partition_id = &partition_tag);
let message_name = message.name();
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, partition_id = &partition_tag, {
Self::handle_message(&mut buffer, &services, message).await;
Self::handle_message(&partition_tag, &mut buffer, &services, message).await;
sleep = Duration::ZERO;
});
}
Expand Down

0 comments on commit 2be6c60

Please sign in to comment.