Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Nov 26, 2024
2 parents 07fb227 + 2be6c60 commit 3c62526
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 85 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))
- Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283))
- Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291))

**Internal**:

Expand Down
29 changes: 22 additions & 7 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::io::Write;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::num::NonZeroU8;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
Expand Down Expand Up @@ -912,8 +913,8 @@ fn spool_max_backpressure_memory_percent() -> f32 {
}

/// Default number of partitions for the buffer.
fn spool_envelopes_partitions() -> u32 {
1
fn spool_envelopes_partitions() -> NonZeroU8 {
NonZeroU8::new(1).unwrap()
}

/// Persistent buffering configuration for incoming envelopes.
Expand Down Expand Up @@ -973,7 +974,7 @@ pub struct EnvelopeSpool {
max_backpressure_memory_percent: f32,
/// Number of partitions of the buffer.
#[serde(default = "spool_envelopes_partitions")]
partitions: u32,
partitions: NonZeroU8,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
Expand Down Expand Up @@ -2151,13 +2152,27 @@ impl Config {
}

/// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
pub fn spool_envelopes_path(&self) -> Option<PathBuf> {
self.values
///
/// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
/// suffixed with `.partition_id`.
pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
let mut path = self
.values
.spool
.envelopes
.path
.as_ref()
.map(|path| path.to_owned())
.map(|path| path.to_owned())?;

if partition_id == 0 {
return Some(path);
}

let file_name = path.file_name().and_then(|f| f.to_str())?;
let new_file_name = format!("{}.{}", file_name, partition_id,);
path.set_file_name(new_file_name);

Some(path)
}

/// Maximum number of connections to create to buffer file.
Expand Down Expand Up @@ -2220,7 +2235,7 @@ impl Config {
}

/// Returns the number of partitions for the buffer.
pub fn spool_partitions(&self) -> u32 {
pub fn spool_partitions(&self) -> NonZeroU8 {
self.values.spool.envelopes.partitions
}

Expand Down
49 changes: 17 additions & 32 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{
self, EnvelopeBufferService, ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer,
ProjectKeyPair,
ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
};
use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
Expand Down Expand Up @@ -66,7 +65,7 @@ pub struct Registry {
pub global_config: Addr<GlobalConfigManager>,
pub legacy_project_cache: Addr<legacy::ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub partitioned_buffer: PartitionedEnvelopeBuffer,
pub envelope_buffer: PartitionedEnvelopeBuffer,

pub project_cache_handle: ProjectCacheHandle,
}
Expand Down Expand Up @@ -264,32 +263,21 @@ impl ServiceState {

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());

let mut envelope_buffers = Vec::with_capacity(config.spool_partitions() as usize);
for partition_id in 0..config.spool_partitions() {
let envelope_buffer = EnvelopeBufferService::new(
partition_id,
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
buffer::Services {
envelopes_tx: envelopes_tx.clone(),
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.map(|b| b.start_in(&mut runner));

if let Some(envelope_buffer) = envelope_buffer {
envelope_buffers.push(envelope_buffer);
}
}
let partitioned_buffer =
PartitionedEnvelopeBuffer::new(envelope_buffers, config.spool_partitions());
let envelope_buffer = PartitionedEnvelopeBuffer::create(
config.spool_partitions(),
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
envelopes_tx.clone(),
project_cache_handle.clone(),
outcome_aggregator.clone(),
test_store.clone(),
&mut runner,
);

// Keep all the services in one context.
let project_cache_services = legacy::Services {
partitioned_buffer: partitioned_buffer.clone(),
envelope_buffer: envelope_buffer.clone(),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand All @@ -314,7 +302,7 @@ impl ServiceState {
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator_handle,
upstream_relay.clone(),
partitioned_buffer.clone(),
envelope_buffer.clone(),
));

runner.start(RelayStats::new(
Expand All @@ -340,7 +328,7 @@ impl ServiceState {
legacy_project_cache,
project_cache_handle,
upstream_relay,
partitioned_buffer,
envelope_buffer,
};

let state = StateInner {
Expand Down Expand Up @@ -374,10 +362,7 @@ impl ServiceState {
&self,
project_key_pair: ProjectKeyPair,
) -> Option<&ObservableEnvelopeBuffer> {
self.inner
.registry
.partitioned_buffer
.buffer(project_key_pair)
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

/// Returns the address of the [`legacy::ProjectCache`] service.
Expand Down
27 changes: 2 additions & 25 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ pub enum SqliteEnvelopeStoreError {
#[error("no file path for the spool was provided")]
NoFilePath,

#[error("no file name for the spool was provided")]
NoFileName,

#[error("failed to migrate the database: {0}")]
MigrationError(MigrateError),

Expand Down Expand Up @@ -310,34 +307,14 @@ impl SqliteEnvelopeStore {
/// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
/// the folders where data will be stored.
pub async fn prepare(
partition_id: u32,
partition_id: u8,
config: &Config,
) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
// If no path is provided, we can't do disk spooling.
let Some(mut path) = config.spool_envelopes_path() else {
let Some(path) = config.spool_envelopes_path(partition_id) else {
return Err(SqliteEnvelopeStoreError::NoFilePath);
};

// Modify the filename to include the partition_id
let file_name = path
.file_name()
.and_then(|f| f.to_str())
.ok_or(SqliteEnvelopeStoreError::NoFileName)?;
if let Some(extension) = path.extension().and_then(|e| e.to_str()) {
let new_file_name = format!(
"{}_{}.{}",
file_name
.strip_suffix(&format!(".{}", extension))
.unwrap_or(file_name),
partition_id,
extension
);
path.set_file_name(new_file_name);
} else {
let new_file_name = format!("{}_{}", file_name, partition_id);
path.set_file_name(new_file_name);
}

relay_log::info!("buffer file {}", path.to_string_lossy());

Self::setup(&path).await?;
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct SqliteStackProvider {
#[warn(dead_code)]
impl SqliteStackProvider {
/// Creates a new [`SqliteStackProvider`] from the provided [`Config`].
pub async fn new(partition_id: u32, config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
pub async fn new(partition_id: u8, config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
let envelope_store = SqliteEnvelopeStore::prepare(partition_id, config).await?;
Ok(Self {
envelope_store,
Expand Down
19 changes: 4 additions & 15 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct HealthCheckService {
memory_checker: MemoryChecker,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
partitioned_buffer: PartitionedEnvelopeBuffer,
envelope_buffer: PartitionedEnvelopeBuffer,
}

impl HealthCheckService {
Expand All @@ -96,14 +96,14 @@ impl HealthCheckService {
memory_checker: MemoryChecker,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
partitioned_buffer: PartitionedEnvelopeBuffer,
envelope_buffer: PartitionedEnvelopeBuffer,
) -> Self {
Self {
config,
memory_checker,
aggregator,
upstream_relay,
partitioned_buffer,
envelope_buffer,
}
}

Expand Down Expand Up @@ -149,18 +149,7 @@ impl HealthCheckService {
}

async fn spool_health_probe(&self) -> Status {
let buffers = self.partitioned_buffer.buffers();
// If no buffer is supplied, we assume it's healthy.
let all_have_capacity = if buffers.is_empty() {
true
} else {
self.partitioned_buffer
.buffers()
.iter()
.all(|buffer| buffer.has_capacity())
};

match all_have_capacity {
match self.envelope_buffer.has_capacity() {
true => Status::Healthy,
false => Status::Unhealthy,
}
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/projects/cache/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl FromMessage<FlushBuckets> for ProjectCache {
/// Holds the addresses of all services required for [`ProjectCache`].
#[derive(Debug, Clone)]
pub struct Services {
pub partitioned_buffer: PartitionedEnvelopeBuffer,
pub envelope_buffer: PartitionedEnvelopeBuffer,
pub aggregator: Addr<Aggregator>,
pub envelope_processor: Addr<EnvelopeProcessor>,
pub outcome_aggregator: Addr<TrackOutcome>,
Expand Down Expand Up @@ -634,7 +634,7 @@ impl ProjectCacheBroker {
let project_key_pair = ProjectKeyPair::from_envelope(&dequeued_envelope.0);
let envelope_buffer = self
.services
.partitioned_buffer
.envelope_buffer
.clone()
.buffer(project_key_pair)
.map(|b| b.addr())
Expand Down Expand Up @@ -837,7 +837,7 @@ mod tests {
let (test_store, _) = mock_service("test_store", (), |&mut (), _| {});

Services {
partitioned_buffer: PartitionedEnvelopeBuffer::new(vec![], 0),
envelope_buffer: PartitionedEnvelopeBuffer::empty(),
aggregator,
envelope_processor,
project_cache,
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ impl BufferService {
memory_checker: MemoryChecker,
) -> Result<Option<OnDisk>, BufferError> {
// Only if persistent envelopes buffer file path provided, we create the pool and set the config.
let Some(path) = config.spool_envelopes_path() else {
let Some(path) = config.spool_envelopes_path(0) else {
return Ok(None);
};

Expand Down
2 changes: 1 addition & 1 deletion relay/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub fn manage_spool(config: &Config, matches: &ArgMatches) -> Result<()> {
let path = match matches.get_one::<PathBuf>("path") {
Some(path) => path.to_owned(),
None => config
.spool_envelopes_path()
.spool_envelopes_path(0)
.context("Config file does not contain the path to the spool file.")?,
};

Expand Down

0 comments on commit 3c62526

Please sign in to comment.