Skip to content

Commit

Permalink
feat: 提供服务探活接口以支持发现有问题的节点,可支持集群自动移除或重启问题节点; #171
Browse files Browse the repository at this point in the history
  • Loading branch information
heqingpan committed Nov 17, 2024
1 parent b89ba20 commit 438b1a3
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/common/appdata.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::common::AppSysConfig;
use crate::config::core::ConfigActor;
use crate::grpc::bistream_manage::BiStreamManage;
use crate::health::core::HealthManager;
use crate::metrics::core::MetricsManager;
use crate::namespace::NamespaceActor;
use crate::naming::cluster::node_manage::{InnerNodeManage, NodeManage};
Expand Down Expand Up @@ -46,4 +47,5 @@ pub struct AppShareData {
pub raft_request_route: Arc<RaftRequestRoute>,
pub transfer_writer_manager: Addr<TransferWriterManager>,
pub transfer_import_manager: Addr<TransferImportManager>,
pub health_manager: Addr<HealthManager>,
}
197 changes: 197 additions & 0 deletions src/health/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use crate::config::core::ConfigActor;
use crate::health::model::{
CheckHealthResult, HealthBackRequest, HealthCheckItem, HealthCheckRequest, HealthCheckType,
HealthManagerRequest, HealthManagerResponse,
};
use crate::naming::core::NamingActor;
use crate::now_millis;
use crate::raft::cache::CacheManager;
use crate::raft::filestore::raftapply::StateApplyManager;
use crate::raft::filestore::raftindex::RaftIndexManager;
use crate::raft::filestore::raftlog::RaftLogManager;
use crate::raft::NacosRaft;
use crate::user::UserManager;
use actix::prelude::*;
use bean_factory::{bean, BeanFactory, FactoryData, Inject};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

#[bean(inject)]
#[derive(Clone, Default)]
pub struct HealthManager {
config_actor: Option<Addr<ConfigActor>>,
naming_actor: Option<Addr<NamingActor>>,
user_actor: Option<Addr<UserManager>>,
cache_actor: Option<Addr<CacheManager>>,
raft_index_manager: Option<Addr<RaftIndexManager>>,
raft_log_manager: Option<Addr<RaftLogManager>>,
raft_apply_manager: Option<Addr<StateApplyManager>>,
raft: Option<Arc<NacosRaft>>,
health_item_map: HashMap<HealthCheckType, HealthCheckItem>,
}

impl HealthManager {
pub fn new() -> Self {
let mut health_item_map = HashMap::new();
let now = now_millis();
let common_timeout = 5500;
health_item_map.insert(
HealthCheckType::Config,
HealthCheckItem::new(HealthCheckType::Config, common_timeout, now),
);
health_item_map.insert(
HealthCheckType::Naming,
HealthCheckItem::new(HealthCheckType::Naming, common_timeout, now),
);
health_item_map.insert(
HealthCheckType::User,
HealthCheckItem::new(HealthCheckType::User, common_timeout, now),
);
health_item_map.insert(
HealthCheckType::Cache,
HealthCheckItem::new(HealthCheckType::Cache, common_timeout, now),
);
health_item_map.insert(
HealthCheckType::RaftStoreLog,
HealthCheckItem::new(HealthCheckType::RaftStoreLog, common_timeout, now),
);
health_item_map.insert(
HealthCheckType::RaftStoreIndex,
HealthCheckItem::new(HealthCheckType::RaftStoreIndex, common_timeout, now),
);
health_item_map.insert(
HealthCheckType::RaftStoreApply,
HealthCheckItem::new(HealthCheckType::RaftStoreApply, common_timeout, now),
);
health_item_map.insert(
HealthCheckType::RaftCluster,
HealthCheckItem::new(HealthCheckType::RaftCluster, 12500, now),
);
Self {
config_actor: None,
naming_actor: None,
user_actor: None,
cache_actor: None,
raft_index_manager: None,
raft_log_manager: None,
raft_apply_manager: None,
raft: None,
health_item_map,
}
}

/// 测试健康状态成功后更新最新成功的时间
pub fn update_success_status(&mut self, check_type: HealthCheckType) {
//for debug
//log::info!("Health check success: {:?}", check_type);
if let Some(v) = self.health_item_map.get_mut(&check_type) {
v.last_success_time = now_millis();
}
}

fn check_raft(&self) -> bool {
if let Some(raft) = &self.raft {
let metrics = raft.metrics().borrow().clone();
metrics.state.is_leader() || metrics.state.is_follower()
} else {
false
}
}

fn do_check(&mut self, ctx: &mut Context<Self>) -> anyhow::Result<()> {
let self_addr = ctx.address();
if let Some(config) = self.config_actor.as_ref() {
config.do_send(HealthCheckRequest::Ping(self_addr.clone()));
}
if let Some(naming) = self.naming_actor.as_ref() {
naming.do_send(HealthCheckRequest::Ping(self_addr.clone()));
}
if let Some(cache) = self.cache_actor.as_ref() {
cache.do_send(HealthCheckRequest::Ping(self_addr.clone()));
}
if let Some(user) = self.user_actor.as_ref() {
user.do_send(HealthCheckRequest::Ping(self_addr.clone()));
}
// raft集群
if let Some(raft_index) = self.raft_index_manager.as_ref() {
raft_index.do_send(HealthCheckRequest::Ping(self_addr.clone()));
}
if let Some(raft_log) = self.raft_log_manager.as_ref() {
raft_log.do_send(HealthCheckRequest::Ping(self_addr.clone()));
}
if let Some(raft_apply) = self.raft_apply_manager.as_ref() {
raft_apply.do_send(HealthCheckRequest::Ping(self_addr.clone()));
}
if self.check_raft() {
self.update_success_status(HealthCheckType::RaftCluster);
}
Ok(())
}

fn heartbeat(&mut self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::from_millis(2000), |act, ctx| {
act.do_check(ctx).ok();
act.heartbeat(ctx);
});
}
}

impl Actor for HealthManager {
type Context = Context<Self>;

fn started(&mut self, _ctx: &mut Self::Context) {
log::info!("HealthManager started");
}
}

impl Inject for HealthManager {
type Context = Context<Self>;

fn inject(
&mut self,
factory_data: FactoryData,
_factory: BeanFactory,
ctx: &mut Self::Context,
) {
self.config_actor = factory_data.get_actor();
self.naming_actor = factory_data.get_actor();
self.user_actor = factory_data.get_actor();
self.cache_actor = factory_data.get_actor();
self.raft_apply_manager = factory_data.get_actor();
self.raft_log_manager = factory_data.get_actor();
self.raft_index_manager = factory_data.get_actor();
self.raft = factory_data.get_bean();
self.heartbeat(ctx);
}
}

impl Handler<HealthBackRequest> for HealthManager {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthBackRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthBackRequest::Pong(check_type) => {
self.update_success_status(check_type);
}
}
Ok(())
}
}

impl Handler<HealthManagerRequest> for HealthManager {
type Result = anyhow::Result<HealthManagerResponse>;

fn handle(&mut self, _msg: HealthManagerRequest, _ctx: &mut Self::Context) -> Self::Result {
let now = now_millis();
for item in self.health_item_map.values() {
let result = item.check(now);
if !result.is_success() {
return Ok(HealthManagerResponse::StatusResult(result));
}
}
Ok(HealthManagerResponse::StatusResult(
CheckHealthResult::Success,
))
}
}
98 changes: 98 additions & 0 deletions src/health/health_handler_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::config::core::ConfigActor;
use crate::health::model::{HealthBackRequest, HealthCheckRequest, HealthCheckType};
use crate::naming::core::NamingActor;
use crate::raft::cache::CacheManager;
use crate::raft::filestore::raftapply::StateApplyManager;
use crate::raft::filestore::raftindex::RaftIndexManager;
use crate::raft::filestore::raftlog::RaftLogManager;
use crate::user::UserManager;
use actix::Handler;

impl Handler<HealthCheckRequest> for ConfigActor {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthCheckRequest::Ping(addr) => {
addr.do_send(HealthBackRequest::Pong(HealthCheckType::Config))
}
}
Ok(())
}
}

impl Handler<HealthCheckRequest> for NamingActor {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthCheckRequest::Ping(addr) => {
addr.do_send(HealthBackRequest::Pong(HealthCheckType::Naming))
}
}
Ok(())
}
}

impl Handler<HealthCheckRequest> for UserManager {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthCheckRequest::Ping(addr) => {
addr.do_send(HealthBackRequest::Pong(HealthCheckType::User))
}
}
Ok(())
}
}

impl Handler<HealthCheckRequest> for CacheManager {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthCheckRequest::Ping(addr) => {
addr.do_send(HealthBackRequest::Pong(HealthCheckType::Cache))
}
}
Ok(())
}
}

impl Handler<HealthCheckRequest> for RaftIndexManager {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthCheckRequest::Ping(addr) => {
addr.do_send(HealthBackRequest::Pong(HealthCheckType::RaftStoreIndex))
}
}
Ok(())
}
}
impl Handler<HealthCheckRequest> for RaftLogManager {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthCheckRequest::Ping(addr) => {
addr.do_send(HealthBackRequest::Pong(HealthCheckType::RaftStoreLog))
}
}
Ok(())
}
}
impl Handler<HealthCheckRequest> for StateApplyManager {
type Result = anyhow::Result<()>;

fn handle(&mut self, msg: HealthCheckRequest, _ctx: &mut Self::Context) -> Self::Result {
match msg {
HealthCheckRequest::Ping(addr) => {
addr.do_send(HealthBackRequest::Pong(HealthCheckType::RaftStoreApply))
}
}
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/health/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod core;
pub mod health_handler_impl;
pub mod model;
Loading

0 comments on commit 438b1a3

Please sign in to comment.