Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Report unhealthy instead of terminating on panic #4255

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3fab72e
rm start_in
jjbayer Nov 13, 2024
ff88466
wip: easy cases
jjbayer Nov 14, 2024
45f580a
spawn
jjbayer Nov 14, 2024
4b2c4f9
clean
jjbayer Nov 14, 2024
20e403c
wip: service runner
jjbayer Nov 14, 2024
d246af0
update usage
jjbayer Nov 14, 2024
ffa83ca
fix remaining 2
jjbayer Nov 14, 2024
97cd873
lint
jjbayer Nov 14, 2024
ced514b
Merge remote-tracking branch 'origin/master' into joris/join
jjbayer Nov 14, 2024
a9a5b6d
doc
jjbayer Nov 14, 2024
0a28e44
lint
jjbayer Nov 14, 2024
d6bbdcc
health check
jjbayer Nov 14, 2024
d0b4af9
changelog
jjbayer Nov 14, 2024
e885bda
Update relay-server/src/services/projects/source/mod.rs
jjbayer Nov 14, 2024
2260462
start_with
jjbayer Nov 14, 2024
61a8c14
naming
jjbayer Nov 14, 2024
5803f9e
Merge remote-tracking branch 'origin/master' into joris/panic-unhealthy
jjbayer Nov 14, 2024
ad28282
merge
jjbayer Nov 14, 2024
82729a8
Revert "health check"
jjbayer Nov 14, 2024
f92a26d
Merge remote-tracking branch 'origin/joris/join' into joris/join
jjbayer Nov 14, 2024
740bc33
push
jjbayer Nov 14, 2024
f07042f
Revert "Revert "health check""
jjbayer Nov 15, 2024
d46f370
Revert "changelog"
jjbayer Nov 15, 2024
0092ece
Merge branch 'joris/join' into joris/panic-unhealthy
jjbayer Nov 15, 2024
63a88f9
Revert "Revert "changelog""
jjbayer Nov 15, 2024
a7fe76f
Merge remote-tracking branch 'origin/master' into joris/panic-unhealthy
jjbayer Nov 20, 2024
f66f835
Let service handle it
jjbayer Nov 21, 2024
e49dae0
Merge branch 'master' into joris/panic-unhealthy
jjbayer Nov 26, 2024
43eca3f
cleanup
jjbayer Nov 26, 2024
04ac05f
fix changelog
jjbayer Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- Report unhealthy when one of the services crashes. ([#4255](https://github.com/getsentry/relay/pull/4255))

## 24.11.1

**Bug Fixes**:
Expand Down
14 changes: 9 additions & 5 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,15 @@ pub fn run(config: Config) -> anyhow::Result<()> {
runner.start(HttpServer::new(config, state.clone())?);

tokio::select! {
_ = runner.join() => {},
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
// See also https://github.com/getsentry/relay/issues/4050.
_ = runner.join(|e| {
if e.is_panic() {
state.health_check().send(e);
}
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
// See also https://github.com/getsentry/relay/issues/4050.
}) => {},
_ = Controller::shutdown_handle().finished() => {}
}

Expand Down
51 changes: 37 additions & 14 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::Arc;

use relay_config::Config;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use relay_system::{
Addr, AsyncResponse, Controller, FromMessage, Interface, NoResponse, Sender, Service,
};
use std::future::Future;
use tokio::sync::watch;
use tokio::task::JoinError;
use tokio::time::{timeout, Instant};

use crate::services::buffer::ObservableEnvelopeBuffer;
Expand Down Expand Up @@ -51,16 +54,29 @@ impl FromIterator<Status> for Status {
}
}

/// Service interface for the [`IsHealthy`] message.
pub struct HealthCheck(IsHealthy, Sender<Status>);
/// Service interface for the health check service.
pub enum HealthCheck {
/// Query whether relay is healthy.
IsHealthy(IsHealthy, Sender<Status>),
/// Report a service crash.
ReportCrash(JoinError),
}

impl Interface for HealthCheck {}

impl FromMessage<IsHealthy> for HealthCheck {
type Response = AsyncResponse<Status>;

fn from_message(message: IsHealthy, sender: Sender<Status>) -> Self {
Self(message, sender)
Self::IsHealthy(message, sender)
}
}

impl FromMessage<JoinError> for HealthCheck {
type Response = NoResponse;

fn from_message(message: JoinError, _: ()) -> Self {
Self::ReportCrash(message)
}
}

Expand Down Expand Up @@ -197,6 +213,7 @@ impl Service for HealthCheckService {
// Add 10% buffer to the internal timeouts to avoid race conditions.
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

let update_tx2 = update_tx.clone();
relay_system::spawn!(async move {
let shutdown = Controller::shutdown_handle();

Expand All @@ -214,16 +231,22 @@ impl Service for HealthCheckService {
update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
});

while let Some(HealthCheck(message, sender)) = rx.recv().await {
let update = update_rx.borrow();

sender.send(if matches!(message, IsHealthy::Liveness) {
Status::Healthy
} else if update.instant.elapsed() >= status_timeout {
Status::Unhealthy
} else {
update.status
});
while let Some(message) = rx.recv().await {
match message {
HealthCheck::IsHealthy(message, sender) => {
let update = update_rx.borrow();
sender.send(if matches!(message, IsHealthy::Liveness) {
Status::Healthy
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We considered failing the Liveness check once a service has crashed, but IIUC this would mean that Kubernetes would kill the process immediately. We want to keep the process alive so other services still have a chance to finish their work.

If the liveness probe fails, the kubelet kills the container

https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#types-of-probe

} else if update.instant.elapsed() >= status_timeout {
Status::Unhealthy
} else {
update.status
});
}
HealthCheck::ReportCrash(_) => {
update_tx2.send(StatusUpdate::new(Status::Unhealthy)).ok();
}
}
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::future::Shared;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::task::{JoinError, JoinHandle};
use tokio::time::MissedTickBehavior;

use crate::statsd::SystemGauges;
Expand Down Expand Up @@ -1050,13 +1050,10 @@ impl ServiceRunner {
/// Awaits until all services have finished.
///
/// Panics if one of the spawned services has panicked.
pub async fn join(&mut self) {
pub async fn join<F: Fn(JoinError)>(&mut self, error_handler: F) {
while let Some(res) = self.0.next().await {
if let Err(e) = res {
if e.is_panic() {
// Re-trigger panic to terminate the process:
std::panic::resume_unwind(e.into_panic());
}
error_handler(e);
}
}
}
Expand Down