Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Nov 26, 2024
1 parent 3c62526 commit d7ccd72
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 18 deletions.
6 changes: 3 additions & 3 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ impl PolymorphicEnvelopeBuffer {
/// Creates either a memory-based or a disk-based envelope buffer,
/// depending on the given configuration.
pub async fn from_config(
partition_id: u32,
partition_id: u8,
config: &Config,
memory_checker: MemoryChecker,
) -> Result<Self, EnvelopeBufferError> {
let buffer = if config.spool_envelopes_path().is_some() {
let buffer = if config.spool_envelopes_path(partition_id).is_some() {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
Self::Sqlite(buffer)
Expand Down Expand Up @@ -242,7 +242,7 @@ impl EnvelopeBuffer<MemoryStackProvider> {
#[allow(dead_code)]
impl EnvelopeBuffer<SqliteStackProvider> {
/// Creates an empty sqlite-based buffer.
pub async fn new(partition_id: u32, config: &Config) -> Result<Self, EnvelopeBufferError> {
pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
Ok(Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
Expand Down
74 changes: 60 additions & 14 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::error::Error;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::num::NonZeroU8;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -10,6 +11,7 @@ use std::time::SystemTime;

use chrono::DateTime;
use chrono::Utc;
use fnv::FnvHasher;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::ServiceRunner;
Expand Down Expand Up @@ -90,17 +92,54 @@ impl FromMessage<Self> for EnvelopeBuffer {
#[derive(Debug, Clone)]
pub struct PartitionedEnvelopeBuffer {
buffers: Arc<Vec<ObservableEnvelopeBuffer>>,
partitions: u32,
}

impl PartitionedEnvelopeBuffer {
/// Creates a new [`PartitionedEnvelopeBuffer`] with already instantiated buffers and the number
/// of partitions.
pub fn new(buffers: Vec<ObservableEnvelopeBuffer>, partitions: u32) -> Self {
debug_assert!(buffers.len() == partitions as usize);
/// Creates a [`PartitionedEnvelopeBuffer`] with no partitions.
#[cfg(test)]
pub fn empty() -> Self {
Self {
buffers: Arc::new(buffers),
partitions,
buffers: Arc::new(Vec::new()),
}
}

/// Creates a new [`PartitionedEnvelopeBuffer`] by instantiating inside all the necessary
/// [`ObservableEnvelopeBuffer`]s.
#[allow(clippy::too_many_arguments)]
pub fn create(
partitions: NonZeroU8,
config: Arc<Config>,
memory_stat: MemoryStat,
global_config_rx: watch::Receiver<global_config::Status>,
envelopes_tx: mpsc::Sender<legacy::DequeuedEnvelope>,
project_cache_handle: ProjectCacheHandle,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
runner: &mut ServiceRunner,
) -> Self {
let mut envelope_buffers = Vec::with_capacity(partitions.get() as usize);
for partition_id in 0..partitions.get() {
let envelope_buffer = EnvelopeBufferService::new(
partition_id,
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
Services {
envelopes_tx: envelopes_tx.clone(),
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.map(|b| b.start_in(runner));

if let Some(envelope_buffer) = envelope_buffer {
envelope_buffers.push(envelope_buffer);
}
}

Self {
buffers: Arc::new(envelope_buffers),
}
}

Expand All @@ -110,16 +149,23 @@ impl PartitionedEnvelopeBuffer {
/// The rationale of using this partitioning strategy is to reduce memory usage across buffers
/// since each individual buffer will only take care of a subset of projects.
pub fn buffer(&self, project_key_pair: ProjectKeyPair) -> Option<&ObservableEnvelopeBuffer> {
let mut hasher = DefaultHasher::new();
let mut hasher = FnvHasher::default();
project_key_pair.hash(&mut hasher);
let buffer_index = (hasher.finish() % self.partitions as u64) as usize;
let buffer_index = (hasher.finish() % self.buffers.len() as u64) as usize;
let buffer = self.buffers.get(buffer_index);
buffer
}

/// Returns all the [`ObservableEnvelopeBuffer`]s.
pub fn buffers(&self) -> &Vec<ObservableEnvelopeBuffer> {
&self.buffers
/// Returns `true` if all [`ObservableEnvelopeBuffer`]s have capacity to get new [`Envelope`]s.
///
/// If no buffers are specified, the function returns `true`, assuming that there is capacity
/// if the buffer is not setup.
pub fn has_capacity(&self) -> bool {
if self.buffers.is_empty() {
return true;
}

self.buffers.iter().all(|buffer| buffer.has_capacity())
}
}

Expand Down Expand Up @@ -161,7 +207,7 @@ pub struct Services {
/// Spool V2 service which buffers envelopes and forwards them to the project cache when a project
/// becomes ready.
pub struct EnvelopeBufferService {
partition_id: u32,
partition_id: u8,
config: Arc<Config>,
memory_stat: MemoryStat,
global_config_rx: watch::Receiver<global_config::Status>,
Expand All @@ -182,7 +228,7 @@ impl EnvelopeBufferService {
/// NOTE: until the V1 spooler implementation is removed, this function returns `None`
/// if V2 spooling is not configured.
pub fn new(
partition_id: u32,
partition_id: u8,
config: Arc<Config>,
memory_stat: MemoryStat,
global_config_rx: watch::Receiver<global_config::Status>,
Expand Down
2 changes: 1 addition & 1 deletion tools/buffer-load-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;

const NUM_PROJECTS: usize = 10;
const NUM_PROJECTS: usize = 100000;
const DEFAULT_DURATION_SECS: u64 = 60;
const CONCURRENT_TASKS: usize = 10;
const ENVELOPE_POOL_SIZE: usize = 100;
Expand Down

0 comments on commit d7ccd72

Please sign in to comment.