Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(buffer): Remove spool V1 implementation #4303

Merged
merged 13 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
## 24.11.1

**Breaking Changes**:
- Flatten Linux distribution fields into `os.context`([#4292](https://github.com/getsentry/relay/pull/4292))

- Remove `spool.envelopes.{min_connections,max_connections,unspool_interval,max_memory_size}` config options. ([#4303](https://github.com/getsentry/relay/pull/4303))
- Flatten Linux distribution fields into `os.context`. ([#4292](https://github.com/getsentry/relay/pull/4292))

**Bug Fixes**:

Expand All @@ -13,6 +15,7 @@

**Features**:

- Remove old disk spooling logic, default to new version. ([#4303](https://github.com/getsentry/relay/pull/4303))
- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))
- Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283))
- Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291))
Expand Down
89 changes: 0 additions & 89 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,21 +873,6 @@ fn spool_envelopes_max_disk_size() -> ByteSize {
ByteSize::mebibytes(500)
}

/// Default for min connections to keep open in the pool.
fn spool_envelopes_min_connections() -> u32 {
1
}

/// Default for max connections to keep open in the pool.
fn spool_envelopes_max_connections() -> u32 {
1
}

/// Default interval to unspool buffered envelopes, 100ms.
fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default number of encoded envelope bytes to cache before writing to disk.
fn spool_envelopes_batch_size_bytes() -> ByteSize {
ByteSize::kibibytes(10)
Expand Down Expand Up @@ -924,12 +909,6 @@ pub struct EnvelopeSpool {
///
/// If set, this will enable the buffering for incoming envelopes.
path: Option<PathBuf>,
/// Maximum number of connections, which will be maintained by the pool.
#[serde(default = "spool_envelopes_max_connections")]
max_connections: u32,
/// Minimal number of connections, which will be maintained by the pool.
#[serde(default = "spool_envelopes_min_connections")]
min_connections: u32,
/// The maximum size of the buffer to keep, in bytes.
///
/// If not set the default is 524288000 bytes (500MB).
Expand All @@ -940,9 +919,6 @@ pub struct EnvelopeSpool {
/// This is a hard upper bound and defaults to 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_memory_size")]
max_memory_size: ByteSize,
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
/// Number of encoded envelope bytes that are spooled to disk at once.
///
/// Defaults to 10 KiB.
Expand Down Expand Up @@ -975,45 +951,20 @@ pub struct EnvelopeSpool {
/// Number of partitions of the buffer.
#[serde(default = "spool_envelopes_partitions")]
partitions: NonZeroU8,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
}

/// Version of the envelope buffering mechanism.
#[derive(Debug, Default, Deserialize, Serialize)]
pub enum EnvelopeSpoolVersion {
/// Use the spooler service, which only buffers envelopes for unloaded projects and
/// switches between an in-memory mode and a disk mode on-demand.
///
/// This mode will be removed soon.
#[default]
#[serde(rename = "1")]
V1,
/// Use the envelope buffer, through which all envelopes pass before getting unspooled.
/// Can be either disk based or memory based.
///
/// This mode has not yet been stress-tested, do not use in production environments.
#[serde(rename = "experimental")]
V2,
}

impl Default for EnvelopeSpool {
fn default() -> Self {
Self {
path: None,
max_connections: spool_envelopes_max_connections(),
min_connections: spool_envelopes_min_connections(),
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(),
batch_size_bytes: spool_envelopes_batch_size_bytes(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
partitions: spool_envelopes_partitions(),
version: EnvelopeSpoolVersion::default(),
}
}
}
Expand Down Expand Up @@ -2175,45 +2126,17 @@ impl Config {
Some(path)
}

/// Maximum number of connections to create to buffer file.
pub fn spool_envelopes_max_connections(&self) -> u32 {
self.values.spool.envelopes.max_connections
}

/// Minimum number of connections to create to buffer file.
pub fn spool_envelopes_min_connections(&self) -> u32 {
self.values.spool.envelopes.min_connections
}

/// Unspool interval in milliseconds.
pub fn spool_envelopes_unspool_interval(&self) -> Duration {
Duration::from_millis(self.values.spool.envelopes.unspool_interval)
}

/// The maximum size of the buffer, in bytes.
pub fn spool_envelopes_max_disk_size(&self) -> usize {
self.values.spool.envelopes.max_disk_size.as_bytes()
}

/// The maximum size of the memory buffer, in bytes.
pub fn spool_envelopes_max_memory_size(&self) -> usize {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of encoded envelope bytes that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
self.values.spool.envelopes.batch_size_bytes.as_bytes()
}

/// Returns `true` if version 2 of the spooling mechanism is used.
pub fn spool_v2(&self) -> bool {
matches!(
&self.values.spool.envelopes.version,
EnvelopeSpoolVersion::V2
)
}

/// Returns the time after which we drop envelopes as a [`Duration`] object.
pub fn spool_envelopes_max_age(&self) -> Duration {
Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
Expand Down Expand Up @@ -2654,16 +2577,4 @@ cache:
fn test_emit_outcomes_invalid() {
assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
}

#[test]
fn test_spool_defaults_to_v1() {
let config: ConfigValues = serde_json::from_str("{}").unwrap();
assert!(matches!(
config.spool.envelopes.version,
EnvelopeSpoolVersion::V1
));

let config = Config::from_json_value(serde_json::json!({})).unwrap();
assert!(!config.spool_v2());
}
}
32 changes: 11 additions & 21 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::service::ServiceState;
use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::legacy::ValidateEnvelope;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope};

Expand Down Expand Up @@ -294,27 +293,18 @@ fn queue_envelope(
envelope.scope(scoping);

let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope());
match state.envelope_buffer(project_key_pair) {
Some(buffer) => {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
None => {
relay_log::trace!("Sending envelope to project cache for V1 buffer");
state
.legacy_project_cache()
.send(ValidateEnvelope::new(envelope));
}
let buffer = state.envelope_buffer(project_key_pair);
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
// accept it without additional checks.
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ pub use self::envelope::Envelope; // pub for benchmarks
pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
pub use self::services::spooler::spool_utils;
pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks

#[cfg(test)]
Expand Down
8 changes: 1 addition & 7 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,11 @@ impl ServiceState {
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
project_cache: legacy_project_cache.clone(),
test_store: test_store.clone(),
};

runner.start_with(
legacy::ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_handle.clone(),
project_cache_services,
global_config_rx,
Expand Down Expand Up @@ -363,10 +360,7 @@ impl ServiceState {
}

/// Returns the V2 envelope buffer, if present.
pub fn envelope_buffer(
&self,
project_key_pair: ProjectKeyPair,
) -> Option<&ObservableEnvelopeBuffer> {
pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ impl SqliteEnvelopeStore {
.shared_cache(true);

let db = SqlitePoolOptions::new()
.max_connections(config.spool_envelopes_max_connections())
.min_connections(config.spool_envelopes_min_connections())
.max_connections(1)
.min_connections(1)
.connect_with(options)
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
Expand Down
43 changes: 12 additions & 31 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,6 @@ pub struct PartitionedEnvelopeBuffer {
}

impl PartitionedEnvelopeBuffer {
/// Creates a [`PartitionedEnvelopeBuffer`] with no partitions.
#[cfg(test)]
pub fn empty() -> Self {
Self {
buffers: Arc::new(Vec::new()),
}
}

/// Creates a new [`PartitionedEnvelopeBuffer`] by instantiating inside all the necessary
/// [`ObservableEnvelopeBuffer`]s.
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -130,11 +122,9 @@ impl PartitionedEnvelopeBuffer {
test_store: test_store.clone(),
},
)
.map(|b| b.start_in(runner));
.start_in(runner);

if let Some(envelope_buffer) = envelope_buffer {
envelope_buffers.push(envelope_buffer);
}
envelope_buffers.push(envelope_buffer);
}

Self {
Expand All @@ -147,16 +137,13 @@ impl PartitionedEnvelopeBuffer {
///
/// The rationale of using this partitioning strategy is to reduce memory usage across buffers
/// since each individual buffer will only take care of a subset of projects.
pub fn buffer(&self, project_key_pair: ProjectKeyPair) -> Option<&ObservableEnvelopeBuffer> {
if self.buffers.is_empty() {
return None;
}

pub fn buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
let mut hasher = FnvHasher::default();
project_key_pair.own_key.hash(&mut hasher);
let buffer_index = (hasher.finish() % self.buffers.len() as u64) as usize;
let buffer = self.buffers.get(buffer_index);
buffer
self.buffers
.get(buffer_index)
.expect("buffers should not be empty")
}

/// Returns `true` if all [`ObservableEnvelopeBuffer`]s have capacity to get new [`Envelope`]s.
Expand Down Expand Up @@ -227,25 +214,22 @@ const DEFAULT_SLEEP: Duration = Duration::from_secs(1);

impl EnvelopeBufferService {
/// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config.
///
/// NOTE: until the V1 spooler implementation is removed, this function returns `None`
/// if V2 spooling is not configured.
pub fn new(
partition_id: u8,
config: Arc<Config>,
memory_stat: MemoryStat,
global_config_rx: watch::Receiver<global_config::Status>,
services: Services,
) -> Option<Self> {
config.spool_v2().then(|| Self {
) -> Self {
Self {
partition_id,
config,
memory_stat,
global_config_rx,
services,
has_capacity: Arc::new(AtomicBool::new(true)),
sleep: Duration::ZERO,
})
}
}

/// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
Expand Down Expand Up @@ -659,8 +643,7 @@ mod tests {
outcome_aggregator,
test_store: Addr::dummy(),
},
)
.unwrap();
);

EnvelopeBufferServiceResult {
service: envelope_buffer_service,
Expand Down Expand Up @@ -925,17 +908,15 @@ mod tests {
MemoryStat::default(),
global_rx.clone(),
services.clone(),
)
.unwrap();
);

let buffer2 = EnvelopeBufferService::new(
1,
config.clone(),
MemoryStat::default(),
global_rx,
services,
)
.unwrap();
);

// Start both services and create partitioned buffer
let observable1 = buffer1.start_in(&mut runner);
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub mod processor;
pub mod projects;
pub mod relays;
pub mod server;
pub mod spooler;
pub mod stats;
pub mod test_store;
pub mod upstream;
Expand Down
Loading