From cb507c17838c2f3a89da39ca2bf47382e5f89da0 Mon Sep 17 00:00:00 2001 From: xiepaup Date: Tue, 12 Nov 2024 10:50:02 +0800 Subject: [PATCH] =?UTF-8?q?feat(redis):=20RedisCache=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E9=87=8D=E5=BB=BAslvae=20#7104?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../atomjobs/atomredis/redis_local_redo_dr.go | 15 +++++++- .../dbactuator/pkg/jobmanager/jobmanager.go | 2 + dbm-ui/backend/flow/consts.py | 1 + .../atom_jobs/redis_cluster_master_rep.py | 38 +++++++++++++++++++ .../flow/utils/redis/redis_act_playload.py | 16 ++++++++ 5 files changed, 70 insertions(+), 2 deletions(-) diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_local_redo_dr.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_local_redo_dr.go index 0e243eef20..4230917f3f 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_local_redo_dr.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_local_redo_dr.go @@ -30,6 +30,17 @@ type RedisLocalDoDR struct { runtime *jobruntime.JobGenericRuntime } +/*{ + "bk_biz_id":0, + "cluster_id":111, + "cluster_type":"TwemproxyRedisInstance", + "immute_domain":"x.1.x.db", + "instances":[ + "master_ip":"","master_port":30000, + "slave_ip":"","slave_port":30000, + ] +}*/ + // Init implements jobruntime.JobRunner. func (r *RedisLocalDoDR) Init(runtime *jobruntime.JobGenericRuntime) error { // 获取安装参数 @@ -70,7 +81,7 @@ func (r *RedisLocalDoDR) Init(runtime *jobruntime.JobGenericRuntime) error { */ // Run implements jobruntime.JobRunner. func (r *RedisLocalDoDR) Run() error { - if r.params.ClusterType != consts.TendisTypeTwemproxyRedisInstance || + if r.params.ClusterType != consts.TendisTypeTwemproxyRedisInstance && r.params.ClusterType != consts.TendisTypeRedisInstance { r.runtime.Logger.Error("Unexpected clustertype <%s>", r.params.ClusterType) return fmt.Errorf("X-%s-X", r.params.ClusterType) @@ -309,7 +320,7 @@ func NewRedisLocalDoDR() jobruntime.JobRunner { // Name implements jobruntime.JobRunner. func (r *RedisLocalDoDR) Name() string { - return "redis_local_do_dr" + return "redis_local_redo_dr" } // Retry implements jobruntime.JobRunner. diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go index 755fa6a47f..3581ad5b71 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go @@ -206,6 +206,8 @@ func (m *JobGenericManager) atomjobsMapperLoading() { m.atomJobMapper[atomredis.NewRedisReshape().Name()] = atomredis.NewRedisReshape m.atomJobMapper[atomredis.NewClusterResetFlushMeet().Name()] = atomredis.NewClusterResetFlushMeet m.atomJobMapper[atomredis.NewReplicasForceResync().Name()] = atomredis.NewReplicasForceResync + m.atomJobMapper[atomredis.NewRedisLocalDoDR().Name()] = atomredis.NewRedisLocalDoDR + // 老备份系统 // m.atomJobMapper[atomredis.NewRedisDataRecover().Name()] = atomredis.NewRedisDataRecover m.atomJobMapper[atomredis.NewRedisDataStructure().Name()] = atomredis.NewRedisDataStructure diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index ca523673a7..10b48db6dd 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -451,6 +451,7 @@ class RedisActuatorActionEnum(str, StructuredEnum): Install = EnumField("install", _("install")) REPLICA_BATCH = EnumField("replica_batch", _("replica_batch")) Replicaof = EnumField("replicaof", _("replicaof")) + LocalReDoDR = EnumField("local_redo_dr", _("local_redo_dr")) CLUSTER_MEET = EnumField("clustermeet_slotsassign", _("clustermeet_slotsassign")) KEYS_PATTERN = EnumField("keyspattern", _("keyspattern")) KEYS_DELETE_REGEX = EnumField("keysdelete_regex", _("keysdelete_regex")) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_cluster_master_rep.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_cluster_master_rep.py index 0927c7a2d1..cedc877421 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_cluster_master_rep.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_cluster_master_rep.py @@ -221,6 +221,44 @@ def TwemproxyClusterMasterReplaceJob( kwargs=asdict(act_kwargs), ) + # 这里高版本的Redis Slave 还需要重新同步一下数据 + if act_kwargs.cluster["cluster_type"] in [ + ClusterType.TendisRedisInstance.value, + ClusterType.TendisTwemproxyRedisInstance.value, + ]: + sub_pipelines, resync_args = [], deepcopy(act_kwargs) + resync_args.cluster = { + "bk_biz_id": int(act_kwargs.cluster["bk_biz_id"]), + "cluster_id": int(act_kwargs.cluster["cluster_id"]), + "cluster_type": act_kwargs.cluster["cluster_type"], + "immute_domain": act_kwargs.cluster["immute_domain"], + "bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]), + "instances": [], + } + for sync_link in sync_relations: + one_resync_args = deepcopy(resync_args) + new_master, new_slave = sync_link["sync_dst1"], sync_link["sync_dst2"] + for instances in sync_link["ins_link"]: + master_port, slave_port = instances["sync_dst1"], instances["sync_dst2"] + one_resync_args.cluster["instances"].append( + { + "master_ip": new_master, + "master_port": int(master_port), + "slave_ip": new_slave, + "slave_port": int(slave_port), + } + ) + one_resync_args.exec_ip = new_slave + one_resync_args.get_redis_payload_func = RedisActPayload.redis_local_redo_dr.__name__ + sub_pipelines.append( + { + "act_name": _("{}-本地重建Slave").format(new_slave), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(one_resync_args), + } + ) + redis_pipeline.add_parallel_acts(acts_list=sub_pipelines) + # #### 下架旧实例 ############################################################################# sub_pipelines, shutdown_kwargs = [], deepcopy(act_kwargs) for replace_info in master_replace_detail: diff --git a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py index 0422b62436..6461b4f0d8 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py +++ b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py @@ -1310,6 +1310,22 @@ def redis_init_batch_replicate(self, **kwargs) -> dict: "payload": {"replica_pairs": params["replica_pairs"]}, } + # redis批量建立主从关系(本地强制重建slave) + def redis_local_redo_dr(self, **kwargs) -> dict: + """ + 适用场景: + 1. 整机替换 + 2. 扩缩容 + 3. 故障复用 + """ + params = kwargs["params"] + + return { + "db_type": DBActuatorTypeEnum.Redis.value, + "action": DBActuatorTypeEnum.Redis.value + "_" + RedisActuatorActionEnum.LocalReDoDR.value, + "payload": params, + } + # redis批量建立主从关系 (兼容单实例) def get_redis_batch_replicate(self, **kwargs) -> dict: """