Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Nov 26, 2024
1 parent 8fd4a2b commit 5ebb0f6
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 25 deletions.
3 changes: 0 additions & 3 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ pub enum SqliteEnvelopeStoreError {
#[error("no file path for the spool was provided")]
NoFilePath,

#[error("no file name for the spool was provided")]
NoFileName,

#[error("failed to migrate the database: {0}")]
MigrationError(MigrateError),

Expand Down
137 changes: 115 additions & 22 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,22 @@ impl EnvelopeBufferService {
/// Wait for the configured amount of time and make sure the project cache is ready to receive.
async fn ready_to_pop(
&mut self,
partition_tag: &str,
buffer: &PolymorphicEnvelopeBuffer,
dequeue: bool,
) -> Option<Permit<legacy::DequeuedEnvelope>> {
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checking"
status = "checking",
partition_id = partition_tag
);

self.system_ready(buffer, dequeue).await;

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "system_ready"
status = "system_ready",
partition_id = partition_tag
);

if self.sleep > Duration::ZERO {
Expand All @@ -276,14 +279,16 @@ impl EnvelopeBufferService {

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "slept"
status = "slept",
partition_id = partition_tag
);

let permit = self.services.envelopes_tx.reserve().await.ok();

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checked"
status = "checked",
partition_id = partition_tag
);

permit
Expand Down Expand Up @@ -315,6 +320,7 @@ impl EnvelopeBufferService {

/// Tries to pop an envelope for a ready project.
async fn try_pop<'a>(
partition_tag: &str,
config: &Config,
buffer: &mut PolymorphicEnvelopeBuffer,
services: &Services,
Expand All @@ -324,7 +330,8 @@ impl EnvelopeBufferService {
Peek::Empty => {
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "empty"
peek_result = "empty",
partition_id = partition_tag
);

Duration::MAX // wait for reset by `handle_message`.
Expand All @@ -346,7 +353,8 @@ impl EnvelopeBufferService {
relay_log::trace!("EnvelopeBufferService: popping envelope");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "ready"
peek_result = "ready",
partition_id = partition_tag
);
let envelope = buffer
.pop()
Expand All @@ -364,7 +372,8 @@ impl EnvelopeBufferService {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready"
peek_result = "not_ready",
partition_id = partition_tag
);

// We want to fetch the configs again, only if some time passed between the last
Expand Down Expand Up @@ -406,6 +415,7 @@ impl EnvelopeBufferService {
}

async fn handle_message(
partition_tag: &str,
buffer: &mut PolymorphicEnvelopeBuffer,
services: &Services,
message: EnvelopeBuffer,
Expand All @@ -424,7 +434,10 @@ impl EnvelopeBufferService {
"EnvelopeBufferService: received project not ready message for project key {}",
&project_key
);
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1);
relay_statsd::metric!(
counter(RelayCounters::BufferEnvelopesReturned) += 1,
partition_id = partition_tag
);
Self::push(buffer, envelope).await;
let project = services.project_cache_handle.get(project_key);
buffer.mark_ready(&project_key, !project.state().is_pending());
Expand Down Expand Up @@ -494,6 +507,9 @@ impl Service for EnvelopeBufferService {

buffer.initialize().await;

// We convert the partition id to string to use it as a tag for all the metrics.
let partition_tag = self.partition_id.to_string();

let mut shutdown = Controller::shutdown_handle();
let mut project_changes = self.services.project_cache_handle.changes();

Expand All @@ -510,12 +526,13 @@ impl Service for EnvelopeBufferService {
}
});

relay_log::info!("EnvelopeBufferService: starting");
relay_log::info!("EnvelopeBufferService {}: starting", self.partition_id);
loop {
let used_capacity =
self.services.envelopes_tx.max_capacity() - self.services.envelopes_tx.capacity();
relay_statsd::metric!(
histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = used_capacity as u64
histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = used_capacity as u64,
partition_id = &partition_tag,
);

let mut sleep = Duration::MAX;
Expand All @@ -525,10 +542,10 @@ impl Service for EnvelopeBufferService {
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "pop");
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "pop", {
match Self::try_pop(&config, &mut buffer, &services, permit).await {
Some(permit) = self.ready_to_pop(&partition_tag, &buffer, dequeue.load(Ordering::Relaxed)) => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "pop", partition_id = &partition_tag);
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "pop", partition_id = &partition_tag, {
match Self::try_pop(&partition_tag, &config, &mut buffer, &services, permit).await {
Ok(new_sleep) => {
sleep = new_sleep;
}
Expand All @@ -541,25 +558,25 @@ impl Service for EnvelopeBufferService {
}});
}
change = project_changes.recv() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "project_change");
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "project_change", {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "project_change", partition_id = &partition_tag);
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "project_change", partition_id = &partition_tag, {
if let Ok(ProjectChange::Ready(project_key)) = change {
buffer.mark_ready(&project_key, true);
}
sleep = Duration::ZERO;
});
}
Some(message) = rx.recv() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "handle_message");
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "handle_message", partition_id = &partition_tag);
let message_name = message.name();
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, {
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, partition_id = &partition_tag, {
Self::handle_message(&mut buffer, &services, message).await;
sleep = Duration::ZERO;
});
}
shutdown = shutdown.notified() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "shutdown");
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "shutdown", {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "shutdown", partition_id = &partition_tag);
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "shutdown", partition_id = &partition_tag, {
// In case the shutdown was handled, we break out of the loop signaling that
// there is no need to process anymore envelopes.
if Self::handle_shutdown(&mut buffer, shutdown).await {
Expand All @@ -568,7 +585,7 @@ impl Service for EnvelopeBufferService {
});
}
Ok(()) = global_config_rx.changed() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "global_config_change");
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "global_config_change", partition_id = &partition_tag);
sleep = Duration::ZERO;

}
Expand All @@ -579,7 +596,7 @@ impl Service for EnvelopeBufferService {
self.update_observable_state(&mut buffer);
}

relay_log::info!("EnvelopeBufferService: stopping");
relay_log::info!("EnvelopeBufferService {}: stopping", self.partition_id);
}
}

Expand Down Expand Up @@ -867,4 +884,80 @@ mod tests {
5
);
}

#[tokio::test(start_paused = true)]
async fn test_partitioned_buffer() {
let mut runner = ServiceRunner::new();
let (_global_tx, global_rx) = watch::channel(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));
let (envelopes_tx, mut envelopes_rx) = mpsc::channel(10);
let (outcome_aggregator, _outcome_rx) = Addr::custom();
let project_cache_handle = ProjectCacheHandle::for_test();

// Create common services for both buffers
let services = Services {
envelopes_tx,
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator,
test_store: Addr::dummy(),
};

// Create two buffer services
let config = Arc::new(
Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"version": "experimental"
}
}
}))
.unwrap(),
);

let buffer1 = EnvelopeBufferService::new(
0,
config.clone(),
MemoryStat::default(),
global_rx.clone(),
services.clone(),
)
.unwrap();

let buffer2 = EnvelopeBufferService::new(
1,
config.clone(),
MemoryStat::default(),
global_rx,
services,
)
.unwrap();

// Start both services and create partitioned buffer
let observable1 = buffer1.start_in(&mut runner);
let observable2 = buffer2.start_in(&mut runner);

let partitioned = PartitionedEnvelopeBuffer {
buffers: Arc::new(vec![observable1, observable2]),
};

// Create two envelopes with different project keys
let envelope1 = new_envelope(false, "foo");
let envelope2 = new_envelope(false, "bar");

// Send envelopes to their respective buffers
let buffer1 = &partitioned.buffers[0];
let buffer2 = &partitioned.buffers[1];

buffer1.addr().send(EnvelopeBuffer::Push(envelope1));
buffer2.addr().send(EnvelopeBuffer::Push(envelope2));

// Wait for processing
tokio::time::sleep(Duration::from_millis(100)).await;

// Verify both envelopes were received
let mut received = vec![];
envelopes_rx.recv_many(&mut received, 2).await;
assert_eq!(received.len(), 2);
}
}

0 comments on commit 5ebb0f6

Please sign in to comment.