From 438b1a3c1a9d44bca8b3826a1f12a08e05eec1d8 Mon Sep 17 00:00:00 2001 From: heqingpan Date: Mon, 18 Nov 2024 00:32:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8F=90=E4=BE=9B=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E6=8E=A2=E6=B4=BB=E6=8E=A5=E5=8F=A3=E4=BB=A5=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=8F=91=E7=8E=B0=E6=9C=89=E9=97=AE=E9=A2=98=E7=9A=84=E8=8A=82?= =?UTF-8?q?=E7=82=B9=EF=BC=8C=E5=8F=AF=E6=94=AF=E6=8C=81=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E7=A7=BB=E9=99=A4=E6=88=96=E9=87=8D=E5=90=AF?= =?UTF-8?q?=E9=97=AE=E9=A2=98=E8=8A=82=E7=82=B9;=20#171?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/appdata.rs | 2 + src/health/core.rs | 197 ++++++++++++++++++++++++++++++ src/health/health_handler_impl.rs | 98 +++++++++++++++ src/health/mod.rs | 3 + src/health/model.rs | 97 +++++++++++++++ src/lib.rs | 1 + src/openapi/health.rs | 28 +++++ src/openapi/mod.rs | 1 + src/starter.rs | 4 + src/web_config.rs | 3 + 10 files changed, 434 insertions(+) create mode 100644 src/health/core.rs create mode 100644 src/health/health_handler_impl.rs create mode 100644 src/health/mod.rs create mode 100644 src/health/model.rs create mode 100644 src/openapi/health.rs diff --git a/src/common/appdata.rs b/src/common/appdata.rs index 9c91e7fb..af6f2e1f 100644 --- a/src/common/appdata.rs +++ b/src/common/appdata.rs @@ -1,6 +1,7 @@ use crate::common::AppSysConfig; use crate::config::core::ConfigActor; use crate::grpc::bistream_manage::BiStreamManage; +use crate::health::core::HealthManager; use crate::metrics::core::MetricsManager; use crate::namespace::NamespaceActor; use crate::naming::cluster::node_manage::{InnerNodeManage, NodeManage}; @@ -46,4 +47,5 @@ pub struct AppShareData { pub raft_request_route: Arc, pub transfer_writer_manager: Addr, pub transfer_import_manager: Addr, + pub health_manager: Addr, } diff --git a/src/health/core.rs b/src/health/core.rs new file mode 100644 index 00000000..77547ff7 --- /dev/null +++ b/src/health/core.rs @@ -0,0 +1,197 @@ +use crate::config::core::ConfigActor; +use crate::health::model::{ + CheckHealthResult, HealthBackRequest, HealthCheckItem, HealthCheckRequest, HealthCheckType, + HealthManagerRequest, HealthManagerResponse, +}; +use crate::naming::core::NamingActor; +use crate::now_millis; +use crate::raft::cache::CacheManager; +use crate::raft::filestore::raftapply::StateApplyManager; +use crate::raft::filestore::raftindex::RaftIndexManager; +use crate::raft::filestore::raftlog::RaftLogManager; +use crate::raft::NacosRaft; +use crate::user::UserManager; +use actix::prelude::*; +use bean_factory::{bean, BeanFactory, FactoryData, Inject}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +#[bean(inject)] +#[derive(Clone, Default)] +pub struct HealthManager { + config_actor: Option>, + naming_actor: Option>, + user_actor: Option>, + cache_actor: Option>, + raft_index_manager: Option>, + raft_log_manager: Option>, + raft_apply_manager: Option>, + raft: Option>, + health_item_map: HashMap, +} + +impl HealthManager { + pub fn new() -> Self { + let mut health_item_map = HashMap::new(); + let now = now_millis(); + let common_timeout = 5500; + health_item_map.insert( + HealthCheckType::Config, + HealthCheckItem::new(HealthCheckType::Config, common_timeout, now), + ); + health_item_map.insert( + HealthCheckType::Naming, + HealthCheckItem::new(HealthCheckType::Naming, common_timeout, now), + ); + health_item_map.insert( + HealthCheckType::User, + HealthCheckItem::new(HealthCheckType::User, common_timeout, now), + ); + health_item_map.insert( + HealthCheckType::Cache, + HealthCheckItem::new(HealthCheckType::Cache, common_timeout, now), + ); + health_item_map.insert( + HealthCheckType::RaftStoreLog, + HealthCheckItem::new(HealthCheckType::RaftStoreLog, common_timeout, now), + ); + health_item_map.insert( + HealthCheckType::RaftStoreIndex, + HealthCheckItem::new(HealthCheckType::RaftStoreIndex, common_timeout, now), + ); + health_item_map.insert( + HealthCheckType::RaftStoreApply, + HealthCheckItem::new(HealthCheckType::RaftStoreApply, common_timeout, now), + ); + health_item_map.insert( + HealthCheckType::RaftCluster, + HealthCheckItem::new(HealthCheckType::RaftCluster, 12500, now), + ); + Self { + config_actor: None, + naming_actor: None, + user_actor: None, + cache_actor: None, + raft_index_manager: None, + raft_log_manager: None, + raft_apply_manager: None, + raft: None, + health_item_map, + } + } + + /// 测试健康状态成功后更新最新成功的时间 + pub fn update_success_status(&mut self, check_type: HealthCheckType) { + //for debug + //log::info!("Health check success: {:?}", check_type); + if let Some(v) = self.health_item_map.get_mut(&check_type) { + v.last_success_time = now_millis(); + } + } + + fn check_raft(&self) -> bool { + if let Some(raft) = &self.raft { + let metrics = raft.metrics().borrow().clone(); + metrics.state.is_leader() || metrics.state.is_follower() + } else { + false + } + } + + fn do_check(&mut self, ctx: &mut Context) -> anyhow::Result<()> { + let self_addr = ctx.address(); + if let Some(config) = self.config_actor.as_ref() { + config.do_send(HealthCheckRequest::Ping(self_addr.clone())); + } + if let Some(naming) = self.naming_actor.as_ref() { + naming.do_send(HealthCheckRequest::Ping(self_addr.clone())); + } + if let Some(cache) = self.cache_actor.as_ref() { + cache.do_send(HealthCheckRequest::Ping(self_addr.clone())); + } + if let Some(user) = self.user_actor.as_ref() { + user.do_send(HealthCheckRequest::Ping(self_addr.clone())); + } + // raft集群 + if let Some(raft_index) = self.raft_index_manager.as_ref() { + raft_index.do_send(HealthCheckRequest::Ping(self_addr.clone())); + } + if let Some(raft_log) = self.raft_log_manager.as_ref() { + raft_log.do_send(HealthCheckRequest::Ping(self_addr.clone())); + } + if let Some(raft_apply) = self.raft_apply_manager.as_ref() { + raft_apply.do_send(HealthCheckRequest::Ping(self_addr.clone())); + } + if self.check_raft() { + self.update_success_status(HealthCheckType::RaftCluster); + } + Ok(()) + } + + fn heartbeat(&mut self, ctx: &mut Context) { + ctx.run_later(Duration::from_millis(2000), |act, ctx| { + act.do_check(ctx).ok(); + act.heartbeat(ctx); + }); + } +} + +impl Actor for HealthManager { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + log::info!("HealthManager started"); + } +} + +impl Inject for HealthManager { + type Context = Context; + + fn inject( + &mut self, + factory_data: FactoryData, + _factory: BeanFactory, + ctx: &mut Self::Context, + ) { + self.config_actor = factory_data.get_actor(); + self.naming_actor = factory_data.get_actor(); + self.user_actor = factory_data.get_actor(); + self.cache_actor = factory_data.get_actor(); + self.raft_apply_manager = factory_data.get_actor(); + self.raft_log_manager = factory_data.get_actor(); + self.raft_index_manager = factory_data.get_actor(); + self.raft = factory_data.get_bean(); + self.heartbeat(ctx); + } +} + +impl Handler for HealthManager { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthBackRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthBackRequest::Pong(check_type) => { + self.update_success_status(check_type); + } + } + Ok(()) + } +} + +impl Handler for HealthManager { + type Result = anyhow::Result; + + fn handle(&mut self, _msg: HealthManagerRequest, _ctx: &mut Self::Context) -> Self::Result { + let now = now_millis(); + for item in self.health_item_map.values() { + let result = item.check(now); + if !result.is_success() { + return Ok(HealthManagerResponse::StatusResult(result)); + } + } + Ok(HealthManagerResponse::StatusResult( + CheckHealthResult::Success, + )) + } +} diff --git a/src/health/health_handler_impl.rs b/src/health/health_handler_impl.rs new file mode 100644 index 00000000..03305963 --- /dev/null +++ b/src/health/health_handler_impl.rs @@ -0,0 +1,98 @@ +use crate::config::core::ConfigActor; +use crate::health::model::{HealthBackRequest, HealthCheckRequest, HealthCheckType}; +use crate::naming::core::NamingActor; +use crate::raft::cache::CacheManager; +use crate::raft::filestore::raftapply::StateApplyManager; +use crate::raft::filestore::raftindex::RaftIndexManager; +use crate::raft::filestore::raftlog::RaftLogManager; +use crate::user::UserManager; +use actix::Handler; + +impl Handler for ConfigActor { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthCheckRequest::Ping(addr) => { + addr.do_send(HealthBackRequest::Pong(HealthCheckType::Config)) + } + } + Ok(()) + } +} + +impl Handler for NamingActor { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthCheckRequest::Ping(addr) => { + addr.do_send(HealthBackRequest::Pong(HealthCheckType::Naming)) + } + } + Ok(()) + } +} + +impl Handler for UserManager { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthCheckRequest::Ping(addr) => { + addr.do_send(HealthBackRequest::Pong(HealthCheckType::User)) + } + } + Ok(()) + } +} + +impl Handler for CacheManager { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthCheckRequest::Ping(addr) => { + addr.do_send(HealthBackRequest::Pong(HealthCheckType::Cache)) + } + } + Ok(()) + } +} + +impl Handler for RaftIndexManager { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthCheckRequest::Ping(addr) => { + addr.do_send(HealthBackRequest::Pong(HealthCheckType::RaftStoreIndex)) + } + } + Ok(()) + } +} +impl Handler for RaftLogManager { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthCheckRequest::Ping(addr) => { + addr.do_send(HealthBackRequest::Pong(HealthCheckType::RaftStoreLog)) + } + } + Ok(()) + } +} +impl Handler for StateApplyManager { + type Result = anyhow::Result<()>; + + fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result { + match msg { + HealthCheckRequest::Ping(addr) => { + addr.do_send(HealthBackRequest::Pong(HealthCheckType::RaftStoreApply)) + } + } + Ok(()) + } +} diff --git a/src/health/mod.rs b/src/health/mod.rs new file mode 100644 index 00000000..94517d17 --- /dev/null +++ b/src/health/mod.rs @@ -0,0 +1,3 @@ +pub mod core; +pub mod health_handler_impl; +pub mod model; diff --git a/src/health/model.rs b/src/health/model.rs new file mode 100644 index 00000000..becd6801 --- /dev/null +++ b/src/health/model.rs @@ -0,0 +1,97 @@ +use crate::health::core::HealthManager; +use actix::{Addr, Message}; + +#[derive(Debug, Clone)] +pub enum CheckHealthResult { + Success, + //Warning(String), + Error(String), +} + +impl CheckHealthResult { + pub fn is_success(&self) -> bool { + match self { + CheckHealthResult::Success => true, + CheckHealthResult::Error(_) => false, + } + } +} + +#[derive(Debug, Eq, PartialEq, Clone, Hash)] +pub enum HealthCheckType { + Config, + Naming, + User, + Cache, + RaftStoreIndex, + RaftStoreLog, + RaftStoreApply, + RaftCluster, +} + +impl HealthCheckType { + pub fn name(&self) -> &'static str { + match self { + HealthCheckType::Config => "Config", + HealthCheckType::Naming => "Naming", + HealthCheckType::User => "User", + HealthCheckType::Cache => "Cache", + HealthCheckType::RaftStoreIndex => "RaftStoreIndex", + HealthCheckType::RaftStoreLog => "RaftStoreLog", + HealthCheckType::RaftStoreApply => "RaftStoreApply", + HealthCheckType::RaftCluster => "RaftCluster", + } + } +} + +#[derive(Message, Debug, Clone)] +#[rtype(result = "anyhow::Result<()>")] +pub enum HealthCheckRequest { + Ping(Addr), +} + +#[derive(Message, Debug, Clone)] +#[rtype(result = "anyhow::Result<()>")] +pub enum HealthBackRequest { + Pong(HealthCheckType), +} + +#[derive(Message, Debug, Clone)] +#[rtype(result = "anyhow::Result")] +pub enum HealthManagerRequest { + Status, +} + +#[derive(Debug, Clone)] +pub enum HealthManagerResponse { + StatusResult(CheckHealthResult), +} + +#[derive(Debug, Clone)] +pub struct HealthCheckItem { + pub last_success_time: u64, + pub timeout: u64, + pub check_type: HealthCheckType, +} + +impl HealthCheckItem { + pub fn new( + check_type: HealthCheckType, + timeout: u64, + last_success_time: u64, + ) -> HealthCheckItem { + Self { + check_type, + timeout, + last_success_time, + } + } + + pub fn check(&self, now: u64) -> CheckHealthResult { + if now > self.last_success_time + self.timeout { + CheckHealthResult::Error(format!("{} module ill.", self.check_type.name())) + } else { + CheckHealthResult::Success + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 2f3ca1e6..9c42a428 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod user; pub mod utils; pub mod web_config; +pub mod health; pub mod transfer; pub use inner_mem_cache::TimeoutSet; diff --git a/src/openapi/health.rs b/src/openapi/health.rs new file mode 100644 index 00000000..00199b3f --- /dev/null +++ b/src/openapi/health.rs @@ -0,0 +1,28 @@ +use crate::common::appdata::AppShareData; +use crate::health::model::{CheckHealthResult, HealthManagerRequest, HealthManagerResponse}; +use actix_web::{web, HttpResponse, Responder}; +use std::sync::Arc; + +pub(crate) async fn health_info(appdata: web::Data>) -> impl Responder { + if let Ok(Ok(HealthManagerResponse::StatusResult(v))) = appdata + .health_manager + .send(HealthManagerRequest::Status) + .await + { + match v { + CheckHealthResult::Success => HttpResponse::Ok().body("success"), + CheckHealthResult::Error(msg) => { + HttpResponse::ServiceUnavailable().body(format!("error: {}", msg)) + } + } + } else { + HttpResponse::InternalServerError().body("request health_manager error") + } +} + +pub fn health_config(config: &mut web::ServiceConfig) { + config + .service(web::resource("/health").route(web::get().to(health_info))) + .service(web::resource("/nacos/health").route(web::get().to(health_info))) + .service(web::resource("/rnacos/health").route(web::get().to(health_info))); +} diff --git a/src/openapi/mod.rs b/src/openapi/mod.rs index edea7f69..c7d8a386 100644 --- a/src/openapi/mod.rs +++ b/src/openapi/mod.rs @@ -6,6 +6,7 @@ use crate::openapi::constant::NACOS_PREFIX; pub(crate) mod auth; pub(crate) mod config; mod constant; +pub(crate) mod health; pub(crate) mod metrics; pub mod middle; pub(crate) mod naming; diff --git a/src/starter.rs b/src/starter.rs index f3821516..9ca190c1 100644 --- a/src/starter.rs +++ b/src/starter.rs @@ -2,6 +2,7 @@ use std::{collections::HashSet, sync::Arc, time::Duration}; use crate::common::actor_utils::{create_actor_at_thread, create_actor_at_thread2}; use crate::grpc::handler::RAFT_ROUTE_REQUEST; +use crate::health::core::HealthManager; use crate::metrics::core::MetricsManager; use crate::namespace::NamespaceActor; use crate::raft::cluster::route::RaftRequestRoute; @@ -193,6 +194,8 @@ pub async fn config_factory(sys_config: Arc) -> anyhow::Result anyhow::Result impl FnOnce(&mut ServiceConfig) { ); login_config(config); metrics_config(config); + health_config(config); raft_config(config); nacos_console_api_config(config); config.configure(openapi_config(conf_data)); } else { login_config(config); metrics_config(config); + health_config(config); raft_config(config); nacos_console_api_config(config); config.configure(openapi_config(conf_data));