diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c345a1465..f85941c0be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- Report unhealthy when one of the services crashes. ([#4255](https://github.com/getsentry/relay/pull/4255)) + ## 24.11.1 **Bug Fixes**: diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index d675c220bf..ed1f87eb97 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -305,11 +305,15 @@ pub fn run(config: Config) -> anyhow::Result<()> { runner.start(HttpServer::new(config, state.clone())?); tokio::select! { - _ = runner.join() => {}, - // NOTE: when every service implements a shutdown listener, - // awaiting on `finished` becomes unnecessary: We can simply join() and guarantee - // that every service finished its main task. - // See also https://github.com/getsentry/relay/issues/4050. + _ = runner.join(|e| { + if e.is_panic() { + state.health_check().send(e); + } + // NOTE: when every service implements a shutdown listener, + // awaiting on `finished` becomes unnecessary: We can simply join() and guarantee + // that every service finished its main task. + // See also https://github.com/getsentry/relay/issues/4050. + }) => {}, _ = Controller::shutdown_handle().finished() => {} } diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index b586d22429..d9d03590ba 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -1,9 +1,12 @@ use std::sync::Arc; use relay_config::Config; -use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service}; +use relay_system::{ + Addr, AsyncResponse, Controller, FromMessage, Interface, NoResponse, Sender, Service, +}; use std::future::Future; use tokio::sync::watch; +use tokio::task::JoinError; use tokio::time::{timeout, Instant}; use crate::services::buffer::ObservableEnvelopeBuffer; @@ -51,8 +54,13 @@ impl FromIterator for Status { } } -/// Service interface for the [`IsHealthy`] message. -pub struct HealthCheck(IsHealthy, Sender); +/// Service interface for the health check service. +pub enum HealthCheck { + /// Query whether relay is healthy. + IsHealthy(IsHealthy, Sender), + /// Report a service crash. + ReportCrash(JoinError), +} impl Interface for HealthCheck {} @@ -60,7 +68,15 @@ impl FromMessage for HealthCheck { type Response = AsyncResponse; fn from_message(message: IsHealthy, sender: Sender) -> Self { - Self(message, sender) + Self::IsHealthy(message, sender) + } +} + +impl FromMessage for HealthCheck { + type Response = NoResponse; + + fn from_message(message: JoinError, _: ()) -> Self { + Self::ReportCrash(message) } } @@ -197,6 +213,7 @@ impl Service for HealthCheckService { // Add 10% buffer to the internal timeouts to avoid race conditions. let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1); + let update_tx2 = update_tx.clone(); relay_system::spawn!(async move { let shutdown = Controller::shutdown_handle(); @@ -214,16 +231,22 @@ impl Service for HealthCheckService { update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok(); }); - while let Some(HealthCheck(message, sender)) = rx.recv().await { - let update = update_rx.borrow(); - - sender.send(if matches!(message, IsHealthy::Liveness) { - Status::Healthy - } else if update.instant.elapsed() >= status_timeout { - Status::Unhealthy - } else { - update.status - }); + while let Some(message) = rx.recv().await { + match message { + HealthCheck::IsHealthy(message, sender) => { + let update = update_rx.borrow(); + sender.send(if matches!(message, IsHealthy::Liveness) { + Status::Healthy + } else if update.instant.elapsed() >= status_timeout { + Status::Unhealthy + } else { + update.status + }); + } + HealthCheck::ReportCrash(_) => { + update_tx2.send(StatusUpdate::new(Status::Unhealthy)).ok(); + } + } } } } diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index 063b23bc82..5f60453bdb 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -11,7 +11,7 @@ use futures::future::Shared; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use tokio::sync::{mpsc, oneshot}; -use tokio::task::JoinHandle; +use tokio::task::{JoinError, JoinHandle}; use tokio::time::MissedTickBehavior; use crate::statsd::SystemGauges; @@ -1050,13 +1050,10 @@ impl ServiceRunner { /// Awaits until all services have finished. /// /// Panics if one of the spawned services has panicked. - pub async fn join(&mut self) { + pub async fn join(&mut self, error_handler: F) { while let Some(res) = self.0.next().await { if let Err(e) = res { - if e.is_panic() { - // Re-trigger panic to terminate the process: - std::panic::resume_unwind(e.into_panic()); - } + error_handler(e); } } }