From 15b96293fe37b365bdd434c87ff6ac8c66dc9fe9 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 26 Nov 2024 15:59:40 +0100 Subject: [PATCH] feat(spooler): Add partition_id tag to more metrics --- relay-server/benches/benches.rs | 5 +- .../services/buffer/envelope_stack/sqlite.rs | 74 ++++++++++++------- .../services/buffer/envelope_store/sqlite.rs | 66 +++++++++++------ .../services/buffer/stack_provider/sqlite.rs | 16 +++- 4 files changed, 108 insertions(+), 53 deletions(-) diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index f5a738af82..cc47193b8f 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -75,7 +75,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { let temp_dir = TempDir::new().unwrap(); let db_path = temp_dir.path().join("test.db"); let db = setup_db(&db_path); - let envelope_store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(100)); let runtime = Runtime::new().unwrap(); @@ -99,6 +99,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { }); let stack = SqliteEnvelopeStack::new( + 0, envelope_store.clone(), disk_batch_size, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), @@ -135,6 +136,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { reset_db(db.clone()).await; let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store.clone(), disk_batch_size, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), @@ -175,6 +177,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { }); let stack = SqliteEnvelopeStack::new( + 0, envelope_store.clone(), disk_batch_size, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 617f0f3dc6..f3435242aa 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -40,11 +40,14 @@ pub struct SqliteEnvelopeStack { /// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough /// elements are available in the `batches_buffer`. check_disk: bool, + /// The tag value of this partition which is used for reporting purposes. + partition_tag: String, } impl SqliteEnvelopeStack { /// Creates a new empty [`SqliteEnvelopeStack`]. pub fn new( + partition_id: u8, envelope_store: SqliteEnvelopeStore, batch_size_bytes: usize, own_key: ProjectKey, @@ -59,6 +62,7 @@ impl SqliteEnvelopeStack { sampling_key, batch: vec![], check_disk, + partition_tag: partition_id.to_string(), } } @@ -78,18 +82,25 @@ impl SqliteEnvelopeStack { return Ok(()); }; - relay_statsd::metric!(counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64); + relay_statsd::metric!( + counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64, + partition_id = &self.partition_tag + ); // When early return here, we are acknowledging that the elements that we popped from // the buffer are lost in case of failure. We are doing this on purposes, since if we were // to have a database corruption during runtime, and we were to put the values back into // the buffer we will end up with an infinite cycle. - relay_statsd::metric!(timer(RelayTimers::BufferSpool), { - self.envelope_store - .insert_batch(batch) - .await - .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; - }); + relay_statsd::metric!( + timer(RelayTimers::BufferSpool), + partition_id = &self.partition_tag, + { + self.envelope_store + .insert_batch(batch) + .await + .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; + } + ); // If we successfully spooled to disk, we know that data should be there. self.check_disk = true; @@ -106,12 +117,16 @@ impl SqliteEnvelopeStack { /// envelope will not be unspooled and unspooling will continue with the remaining envelopes. async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> { debug_assert!(self.batch.is_empty()); - let batch = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), { - self.envelope_store - .delete_batch(self.own_key, self.sampling_key) - .await - .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)? - }); + let batch = relay_statsd::metric!( + timer(RelayTimers::BufferUnspool), + partition_id = &self.partition_tag, + { + self.envelope_store + .delete_batch(self.own_key, self.sampling_key) + .await + .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)? + } + ); match batch { Some(batch) => { @@ -121,7 +136,8 @@ impl SqliteEnvelopeStack { } relay_statsd::metric!( - counter(RelayCounters::BufferUnspooledEnvelopes) += self.batch.len() as u64 + counter(RelayCounters::BufferUnspooledEnvelopes) += self.batch.len() as u64, + partition_id = &self.partition_tag ); Ok(()) } @@ -146,10 +162,11 @@ impl EnvelopeStack for SqliteEnvelopeStack { self.spool_to_disk().await?; } - let encoded_envelope = - relay_statsd::metric!(timer(RelayTimers::BufferEnvelopesSerialization), { - DatabaseEnvelope::try_from(envelope.as_ref())? - }); + let encoded_envelope = relay_statsd::metric!( + timer(RelayTimers::BufferEnvelopesSerialization), + partition_id = &self.partition_tag, + { DatabaseEnvelope::try_from(envelope.as_ref())? } + ); self.batch.push(encoded_envelope); Ok(()) @@ -208,8 +225,9 @@ mod tests { #[should_panic] async fn test_push_with_mismatching_project_keys() { let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store, 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -226,13 +244,14 @@ mod tests { #[tokio::test] async fn test_push_when_db_is_not_valid() { let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); // Create envelopes first so we can calculate actual size let envelopes = mock_envelopes(4); let threshold_size = calculate_compressed_size(&envelopes) - 1; let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store, threshold_size, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -271,8 +290,9 @@ mod tests { #[tokio::test] async fn test_pop_when_db_is_not_valid() { let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -290,8 +310,9 @@ mod tests { #[tokio::test] async fn test_pop_when_stack_is_empty() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -307,8 +328,9 @@ mod tests { #[tokio::test] async fn test_push_below_threshold_and_pop() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store, 9999, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -346,7 +368,7 @@ mod tests { #[tokio::test] async fn test_push_above_threshold_and_pop() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); // Create envelopes first so we can calculate actual size let envelopes = mock_envelopes(7); @@ -354,6 +376,7 @@ mod tests { // Create stack with threshold just below the size of first 5 envelopes let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store, threshold_size, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -419,8 +442,9 @@ mod tests { #[tokio::test] async fn test_drain() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let mut stack = SqliteEnvelopeStack::new( + 0, envelope_store.clone(), 10 * COMPRESSED_ENVELOPE_SIZE, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index 7bffd8dba5..63739f0545 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -199,27 +199,29 @@ struct DiskUsage { db: Pool, last_known_usage: Arc, refresh_frequency: Duration, + partition_tag: String, } impl DiskUsage { /// Creates a new empty [`DiskUsage`]. - fn new(db: Pool, refresh_frequency: Duration) -> Self { + fn new(partition_id: u8, db: Pool, refresh_frequency: Duration) -> Self { Self { db, last_known_usage: Arc::new(AtomicU64::new(0)), refresh_frequency, + partition_tag: partition_id.to_string(), } } /// Prepares a [`DiskUsage`] instance with an initial reading of the database usage and fails /// if not reading can be made. pub async fn prepare( + partition_id: u8, db: Pool, refresh_frequency: Duration, ) -> Result { - let usage = Self::estimate_usage(&db).await?; - - let disk_usage = Self::new(db, refresh_frequency); + let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency); + let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?; disk_usage.last_known_usage.store(usage, Ordering::Relaxed); disk_usage.start_background_refresh(); @@ -240,6 +242,7 @@ impl DiskUsage { let last_known_usage_weak = Arc::downgrade(&self.last_known_usage); let refresh_frequency = self.refresh_frequency; + let partition_tag = self.partition_tag.clone(); relay_system::spawn!(async move { loop { // When our `Weak` reference can't be upgraded to an `Arc`, it means that the value @@ -248,7 +251,7 @@ impl DiskUsage { break; }; - let usage = Self::estimate_usage(&db).await; + let usage = Self::estimate_usage(&partition_tag, &db).await; let Ok(usage) = usage else { relay_log::error!("failed to update the disk usage asynchronously"); return; @@ -268,14 +271,20 @@ impl DiskUsage { } /// Estimates the disk usage of the SQLite database. - async fn estimate_usage(db: &Pool) -> Result { + async fn estimate_usage( + partition_tag: &str, + db: &Pool, + ) -> Result { let usage: i64 = build_estimate_size() .fetch_one(db) .await .and_then(|r| r.try_get(0)) .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?; - relay_statsd::metric!(gauge(RelayGauges::BufferDiskUsed) = usage as u64); + relay_statsd::metric!( + gauge(RelayGauges::BufferDiskUsed) = usage as u64, + partition_id = partition_tag + ); Ok(usage as u64) } @@ -289,14 +298,16 @@ impl DiskUsage { pub struct SqliteEnvelopeStore { db: Pool, disk_usage: DiskUsage, + partition_tag: String, } impl SqliteEnvelopeStore { /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`]. - pub fn new(db: Pool, refresh_frequency: Duration) -> Self { + pub fn new(partition_id: u8, db: Pool, refresh_frequency: Duration) -> Self { Self { db: db.clone(), - disk_usage: DiskUsage::new(db, refresh_frequency), + disk_usage: DiskUsage::new(partition_id, db, refresh_frequency), + partition_tag: partition_id.to_string(), } } @@ -350,8 +361,13 @@ impl SqliteEnvelopeStore { Ok(SqliteEnvelopeStore { db: db.clone(), - disk_usage: DiskUsage::prepare(db, config.spool_disk_usage_refresh_frequency_ms()) - .await?, + disk_usage: DiskUsage::prepare( + partition_id, + db, + config.spool_disk_usage_refresh_frequency_ms(), + ) + .await?, + partition_tag: partition_id.to_string(), }) } @@ -429,12 +445,16 @@ impl SqliteEnvelopeStore { .bind(count as u16) .bind(encoded); - relay_statsd::metric!(timer(RelayTimers::BufferSqlWrite), { - query - .execute(&self.db) - .await - .map_err(SqliteEnvelopeStoreError::WriteError)?; - }); + relay_statsd::metric!( + timer(RelayTimers::BufferSqlWrite), + partition_id = &self.partition_tag, + { + query + .execute(&self.db) + .await + .map_err(SqliteEnvelopeStoreError::WriteError)?; + } + ); Ok(()) } @@ -650,7 +670,7 @@ mod tests { #[tokio::test] async fn test_insert_and_delete_envelopes() { let db = setup_db(true).await; - let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); @@ -710,7 +730,7 @@ mod tests { #[tokio::test] async fn test_insert_and_delete_single() { let db = setup_db(true).await; - let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); @@ -754,7 +774,7 @@ mod tests { #[tokio::test] async fn test_insert_and_get_project_keys_pairs() { let db = setup_db(true).await; - let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100)); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); @@ -786,8 +806,8 @@ mod tests { #[tokio::test] async fn test_estimate_disk_usage() { let db = setup_db(true).await; - let mut store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(1)); - let disk_usage = DiskUsage::prepare(db, Duration::from_millis(1)) + let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1)); + let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1)) .await .unwrap(); @@ -820,7 +840,7 @@ mod tests { #[tokio::test] async fn test_total_count() { let db = setup_db(true).await; - let mut store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(1)); + let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1)); let envelopes = mock_envelopes(10); store diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 310eaf2024..51a548d6de 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -18,6 +18,7 @@ pub struct SqliteStackProvider { envelope_store: SqliteEnvelopeStore, batch_size_bytes: usize, max_disk_size: usize, + partition_id: u8, } #[warn(dead_code)] @@ -29,6 +30,7 @@ impl SqliteStackProvider { envelope_store, batch_size_bytes: config.spool_envelopes_batch_size_bytes(), max_disk_size: config.spool_envelopes_max_disk_size(), + partition_id, }) } @@ -60,6 +62,7 @@ impl StackProvider for SqliteStackProvider { project_key_pair: ProjectKeyPair, ) -> Self::Stack { let inner = SqliteEnvelopeStack::new( + self.partition_id, self.envelope_store.clone(), self.batch_size_bytes, project_key_pair.own_key, @@ -100,11 +103,16 @@ impl StackProvider for SqliteStackProvider { async fn flush(&mut self, envelope_stacks: impl IntoIterator) { relay_log::trace!("Flushing sqlite envelope buffer"); - relay_statsd::metric!(timer(RelayTimers::BufferDrain), { - for envelope_stack in envelope_stacks { - envelope_stack.flush().await; + let partition_tag = self.partition_id.to_string(); + relay_statsd::metric!( + timer(RelayTimers::BufferDrain), + partition_id = &partition_tag, + { + for envelope_stack in envelope_stacks { + envelope_stack.flush().await; + } } - }); + ); } }