Skip to content

Commit

Permalink
Let service handle it
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Nov 21, 2024
1 parent a7fe76f commit f66f835
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
2 changes: 1 addition & 1 deletion relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
tokio::select! {
_ = runner.join(|e| {
if e.is_panic() {
state.report_crash();
state.health_check().send(e);
}
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
Expand Down
51 changes: 37 additions & 14 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::Arc;

use relay_config::Config;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use relay_system::{
Addr, AsyncResponse, Controller, FromMessage, Interface, NoResponse, Sender, Service,
};
use std::future::Future;
use tokio::sync::watch;
use tokio::task::JoinError;
use tokio::time::{timeout, Instant};

use crate::services::buffer::ObservableEnvelopeBuffer;
Expand Down Expand Up @@ -51,16 +54,29 @@ impl FromIterator<Status> for Status {
}
}

/// Service interface for the [`IsHealthy`] message.
pub struct HealthCheck(IsHealthy, Sender<Status>);
/// Service interface for the health check service.
pub enum HealthCheck {
/// Query whether relay is healthy.
IsHealthy(IsHealthy, Sender<Status>),
/// Report a service crash.
ReportCrash(JoinError),
}

impl Interface for HealthCheck {}

impl FromMessage<IsHealthy> for HealthCheck {
type Response = AsyncResponse<Status>;

fn from_message(message: IsHealthy, sender: Sender<Status>) -> Self {
Self(message, sender)
Self::IsHealthy(message, sender)
}
}

impl FromMessage<JoinError> for HealthCheck {
type Response = NoResponse;

fn from_message(message: JoinError, _: ()) -> Self {
Self::ReportCrash(message)
}
}

Expand Down Expand Up @@ -197,6 +213,7 @@ impl Service for HealthCheckService {
// Add 10% buffer to the internal timeouts to avoid race conditions.
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

let update_tx2 = update_tx.clone();
relay_system::spawn!(async move {
let shutdown = Controller::shutdown_handle();

Expand All @@ -214,16 +231,22 @@ impl Service for HealthCheckService {
update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
});

while let Some(HealthCheck(message, sender)) = rx.recv().await {
let update = update_rx.borrow();

sender.send(if matches!(message, IsHealthy::Liveness) {
Status::Healthy
} else if update.instant.elapsed() >= status_timeout {
Status::Unhealthy
} else {
update.status
});
while let Some(message) = rx.recv().await {
match message {
HealthCheck::IsHealthy(message, sender) => {
let update = update_rx.borrow();
sender.send(if matches!(message, IsHealthy::Liveness) {
Status::Healthy
} else if update.instant.elapsed() >= status_timeout {
Status::Unhealthy
} else {
update.status
});
}
HealthCheck::ReportCrash(_) => {
update_tx2.send(StatusUpdate::new(Status::Unhealthy)).ok();
}
}
}
}
}
Expand Down

0 comments on commit f66f835

Please sign in to comment.