Skip to content

Commit

Permalink
feat(redis): RedisCache本地重建slvae #7104
Browse files Browse the repository at this point in the history
  • Loading branch information
xiepaup authored and iSecloud committed Nov 15, 2024
1 parent 6608664 commit cb507c1
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ type RedisLocalDoDR struct {
runtime *jobruntime.JobGenericRuntime
}

/*{
"bk_biz_id":0,
"cluster_id":111,
"cluster_type":"TwemproxyRedisInstance",
"immute_domain":"x.1.x.db",
"instances":[
"master_ip":"","master_port":30000,
"slave_ip":"","slave_port":30000,
]
}*/

// Init implements jobruntime.JobRunner.
func (r *RedisLocalDoDR) Init(runtime *jobruntime.JobGenericRuntime) error {
// 获取安装参数
Expand Down Expand Up @@ -70,7 +81,7 @@ func (r *RedisLocalDoDR) Init(runtime *jobruntime.JobGenericRuntime) error {
*/
// Run implements jobruntime.JobRunner.
func (r *RedisLocalDoDR) Run() error {
if r.params.ClusterType != consts.TendisTypeTwemproxyRedisInstance ||
if r.params.ClusterType != consts.TendisTypeTwemproxyRedisInstance &&
r.params.ClusterType != consts.TendisTypeRedisInstance {
r.runtime.Logger.Error("Unexpected clustertype <%s>", r.params.ClusterType)
return fmt.Errorf("X-%s-X", r.params.ClusterType)
Expand Down Expand Up @@ -309,7 +320,7 @@ func NewRedisLocalDoDR() jobruntime.JobRunner {

// Name implements jobruntime.JobRunner.
func (r *RedisLocalDoDR) Name() string {
return "redis_local_do_dr"
return "redis_local_redo_dr"
}

// Retry implements jobruntime.JobRunner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func (m *JobGenericManager) atomjobsMapperLoading() {
m.atomJobMapper[atomredis.NewRedisReshape().Name()] = atomredis.NewRedisReshape
m.atomJobMapper[atomredis.NewClusterResetFlushMeet().Name()] = atomredis.NewClusterResetFlushMeet
m.atomJobMapper[atomredis.NewReplicasForceResync().Name()] = atomredis.NewReplicasForceResync
m.atomJobMapper[atomredis.NewRedisLocalDoDR().Name()] = atomredis.NewRedisLocalDoDR

// 老备份系统
// m.atomJobMapper[atomredis.NewRedisDataRecover().Name()] = atomredis.NewRedisDataRecover
m.atomJobMapper[atomredis.NewRedisDataStructure().Name()] = atomredis.NewRedisDataStructure
Expand Down
1 change: 1 addition & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ class RedisActuatorActionEnum(str, StructuredEnum):
Install = EnumField("install", _("install"))
REPLICA_BATCH = EnumField("replica_batch", _("replica_batch"))
Replicaof = EnumField("replicaof", _("replicaof"))
LocalReDoDR = EnumField("local_redo_dr", _("local_redo_dr"))
CLUSTER_MEET = EnumField("clustermeet_slotsassign", _("clustermeet_slotsassign"))
KEYS_PATTERN = EnumField("keyspattern", _("keyspattern"))
KEYS_DELETE_REGEX = EnumField("keysdelete_regex", _("keysdelete_regex"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,44 @@ def TwemproxyClusterMasterReplaceJob(
kwargs=asdict(act_kwargs),
)

# 这里高版本的Redis Slave 还需要重新同步一下数据
if act_kwargs.cluster["cluster_type"] in [
ClusterType.TendisRedisInstance.value,
ClusterType.TendisTwemproxyRedisInstance.value,
]:
sub_pipelines, resync_args = [], deepcopy(act_kwargs)
resync_args.cluster = {
"bk_biz_id": int(act_kwargs.cluster["bk_biz_id"]),
"cluster_id": int(act_kwargs.cluster["cluster_id"]),
"cluster_type": act_kwargs.cluster["cluster_type"],
"immute_domain": act_kwargs.cluster["immute_domain"],
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"instances": [],
}
for sync_link in sync_relations:
one_resync_args = deepcopy(resync_args)
new_master, new_slave = sync_link["sync_dst1"], sync_link["sync_dst2"]
for instances in sync_link["ins_link"]:
master_port, slave_port = instances["sync_dst1"], instances["sync_dst2"]
one_resync_args.cluster["instances"].append(
{
"master_ip": new_master,
"master_port": int(master_port),
"slave_ip": new_slave,
"slave_port": int(slave_port),
}
)
one_resync_args.exec_ip = new_slave
one_resync_args.get_redis_payload_func = RedisActPayload.redis_local_redo_dr.__name__
sub_pipelines.append(
{
"act_name": _("{}-本地重建Slave").format(new_slave),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(one_resync_args),
}
)
redis_pipeline.add_parallel_acts(acts_list=sub_pipelines)

# #### 下架旧实例 #############################################################################
sub_pipelines, shutdown_kwargs = [], deepcopy(act_kwargs)
for replace_info in master_replace_detail:
Expand Down
16 changes: 16 additions & 0 deletions dbm-ui/backend/flow/utils/redis/redis_act_playload.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,22 @@ def redis_init_batch_replicate(self, **kwargs) -> dict:
"payload": {"replica_pairs": params["replica_pairs"]},
}

# redis批量建立主从关系(本地强制重建slave)
def redis_local_redo_dr(self, **kwargs) -> dict:
"""
适用场景:
1. 整机替换
2. 扩缩容
3. 故障复用
"""
params = kwargs["params"]

return {
"db_type": DBActuatorTypeEnum.Redis.value,
"action": DBActuatorTypeEnum.Redis.value + "_" + RedisActuatorActionEnum.LocalReDoDR.value,
"payload": params,
}

# redis批量建立主从关系 (兼容单实例)
def get_redis_batch_replicate(self, **kwargs) -> dict:
"""
Expand Down

0 comments on commit cb507c1

Please sign in to comment.