Skip to content

Commit

Permalink
fix: 修复proxy替换流程;优化随机账号管理逻辑;修复并发处理多实例下架元数据删除问题 #7821
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu authored and iSecloud committed Nov 11, 2024
1 parent d9ed174 commit 9e98974
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 179 deletions.
4 changes: 3 additions & 1 deletion dbm-services/common/go-pubpkg/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ func translate(ut ut.Translator, fe validator.FieldError) string {

func validCharSet(f validator.FieldLevel) bool {
v := f.Field().String()
return cmutil.HasElem(v, []string{"default", "utf8mb4", "utf8", "latin1", "gb2312", "gbk", "binary", "gb18030"})
return cmutil.HasElem(
v, []string{"default", "utf8mb4", "utf8", "latin1", "gb2312", "gbk", "binary", "gb18030", "utf8mb3"},
)
}

// validateCrontabExpr 验证Liunx crontab表达式
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ func (c *ClusterRoleSwitchComp) Init() error {
c.Params.MasterPort,
); err != nil {
// 如果主实例连接失败,且不属于强制切换,则退出异常
if c.Params.Force {
if !c.Params.Force {
return fmt.Errorf("connenct by [%s:%d] failed,err:%s",
c.Params.MasterHost, c.Params.MasterPort, err.Error())
}
logger.Warn("connenct by [%s:%d] failed,err:%s",
c.Params.MasterHost, c.Params.MasterPort, err.Error())
return err
}
if SdbWork, err = sqlserver.NewDbWorker(
c.GeneralParam.RuntimeAccountParam.SAUser,
Expand Down Expand Up @@ -104,19 +103,20 @@ func (c *ClusterRoleSwitchComp) PreCheck() error {
if c.Params.SyncMode != cst.MIRRORING && c.Params.SyncMode != cst.ALWAYSON {
return fmt.Errorf("the sync-mode [%d] is not supported", c.Params.SyncMode)
}
// 检查是否空实例(没有业务数据库)
var checkDBS []string
if err := c.MasterDB.Queryx(&checkDBS, cst.GET_BUSINESS_DATABASE); err != nil {
return fmt.Errorf("get db list failed %v", err)
}
if len(checkDBS) == 0 {
// 空时候代表不需要进行下面逻辑,打tag
c.isEmpty = true
logger.Warn("this cluster is an empty cluster")
return nil
}
// 检验逻辑,不同数据同步模式检测方式不一样,分开处理, 强制切换不做检测处理
if c.Params.Force {
if !c.Params.Force {
// 检查是否空实例(没有业务数据库)
var checkDBS []string
if err := c.MasterDB.Queryx(&checkDBS, cst.GET_BUSINESS_DATABASE); err != nil {
return fmt.Errorf("get db list failed %v", err)
}
if len(checkDBS) == 0 {
// 空时候代表不需要进行下面逻辑,打tag
c.isEmpty = true
logger.Warn("this cluster is an empty cluster")
return nil
}

switch c.Params.SyncMode {
case cst.MIRRORING:
c.MirroringPreCheck()
Expand Down
6 changes: 6 additions & 0 deletions dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
def decommission(cluster: Cluster):
cc_manage = CcManage(cluster.bk_biz_id, cluster.cluster_type)
for proxy in cluster.proxyinstance_set.all():
# 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题
proxy.machine.proxyinstance_set.select_for_update().all()

proxy.delete(keep_parents=True)
if not proxy.machine.proxyinstance_set.exists():
# 这个 api 不需要检查返回值, 转移主机到空闲模块,转移模块这里会把服务实例删除
Expand All @@ -38,6 +41,9 @@ def decommission(cluster: Cluster):
cc_manage.delete_service_instance(bk_instance_ids=[proxy.bk_instance_id])

for storage in cluster.storageinstance_set.all():
# 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题
storage.machine.proxyinstance_set.select_for_update().all()

# 删除存储在密码服务的密码元信息
DBPrivManagerApi.delete_password(
{
Expand Down
4 changes: 4 additions & 0 deletions dbm-ui/backend/db_meta/api/cluster/tendbha/switch_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def reduce_proxy(cluster_ids: list, origin_proxy_ip: str):
"""
clusters = Cluster.objects.filter(id__in=cluster_ids)
for cluster in clusters:
# 先对同机的proxy实例加锁, 避免多单据执行出现脏读情况
ProxyInstance.objects.select_for_update().filter(
machine__bk_cloud_id=cluster.bk_cloud_id, machine__ip=origin_proxy_ip
)
proxy = cluster.proxyinstance_set.get(machine__ip=origin_proxy_ip)
# 删除实例元数据
proxy.delete(keep_parents=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ def switch_mysql_cluster_proxy_flow(self):
sub_flow_context["proxy_ports"] = self.__get_proxy_install_ports(cluster_ids=info["cluster_ids"])
sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context))

# 初始化同机替换的proxy集群信息
clusters = [
self.__get_switch_cluster_info(
cluster_id=cluster_id,
origin_proxy_ip=info["origin_proxy_ip"]["ip"],
target_proxy_ip=info["target_proxy_ip"]["ip"],
)
for cluster_id in info["cluster_ids"]
]

# 拼接执行原子任务活动节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题
exec_act_kwargs = ExecActuatorKwargs(
cluster_type=ClusterType.TenDBHA,
Expand Down Expand Up @@ -180,24 +190,15 @@ def switch_mysql_cluster_proxy_flow(self):
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
# 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换
sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})

# 阶段2 根据需要替换的proxy的集群,依次添加
switch_proxy_sub_list = []
for cluster_id in info["cluster_ids"]:
add_proxy_sub_list = []
for cluster in clusters:

# 拼接子流程需要全局参数
sub_sub_flow_context = copy.deepcopy(self.data)
sub_sub_flow_context.pop("infos")

# 获取集群的实例信息
cluster = self.__get_switch_cluster_info(
cluster_id=cluster_id,
target_proxy_ip=info["target_proxy_ip"]["ip"],
origin_proxy_ip=info["origin_proxy_ip"]["ip"],
)

# 针对集群维度声明替换子流程
switch_proxy_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context))

Expand Down Expand Up @@ -252,6 +253,28 @@ def switch_mysql_cluster_proxy_flow(self):
)
switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list)

add_proxy_sub_list.append(
switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群添加proxy实例").format(cluster["name"]))
)

sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=add_proxy_sub_list)

# 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换
sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})

# 阶段3 根据集群维度切换域名
switch_dns_sub_list = []
for cluster in clusters:

# 拼接子流程需要全局参数
sub_sub_flow_context = copy.deepcopy(self.data)
sub_sub_flow_context.pop("infos")

# 针对集群维度声明替换子流程
switch_cluster_dns_pipeline = SubBuilder(
root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context)
)

acts_list = []
for dns_name in cluster["add_domain_list"]:
# 这里的添加域名的方式根据目前集群对应proxy dns域名进行循环添加,这样保证某个域名添加异常时其他域名添加成功
Expand All @@ -269,9 +292,9 @@ def switch_mysql_cluster_proxy_flow(self):
),
}
)
switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list)
switch_cluster_dns_pipeline.add_parallel_acts(acts_list=acts_list)

switch_proxy_sub_pipeline.add_act(
switch_cluster_dns_pipeline.add_act(
act_name=_("回收旧proxy集群映射"),
act_component_code=MySQLDnsManageComponent.code,
kwargs=asdict(
Expand All @@ -283,11 +306,11 @@ def switch_mysql_cluster_proxy_flow(self):
),
)

switch_proxy_sub_list.append(
switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群替换proxy实例").format(cluster["name"]))
switch_dns_sub_list.append(
switch_cluster_dns_pipeline.build_sub_process(sub_name=_("{}集群切换proxy域名").format(cluster["name"]))
)

sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_proxy_sub_list)
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_dns_sub_list)

# 先把新的节点数据写入
sub_pipeline.add_act(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def run_flow(self):
# 定义主流程
main_pipeline = Builder(root_id=self.root_id, data=self.data)
sub_pipelines = []
# 定义切换名称
sub_name = _("集群主故障切换") if self.data["force"] else _("集群互切")

for info in self.data["infos"]:
# 声明子流程
Expand Down Expand Up @@ -199,7 +201,7 @@ def run_flow(self):
kwargs=asdict(DropRandomJobUserKwargs(cluster_ids=[cluster.id])),
)

act_list.append(cluster_pipeline.build_sub_process(sub_name=_("{}集群互切".format(cluster.name))))
act_list.append(cluster_pipeline.build_sub_process(sub_name=_("{}{}".format(cluster.name, sub_name))))

# 拼接集群维度的子流程
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=act_list)
Expand All @@ -222,7 +224,7 @@ def run_flow(self):
root_id=self.root_id,
bk_biz_id=int(self.data["bk_biz_id"]),
bk_cloud_id=0,
master_host=[Host(**info["master"])],
master_host=[] if self.data["force"] else [Host(**info["master"])],
slave_host=[Host(**info["slave"])],
cluster_domain_list=[i.immute_domain for i in Cluster.objects.filter(id__in=info["cluster_ids"])],
is_install_backup_client=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,15 @@
from pipeline.component_framework.component import Component

from backend.components.mysql_priv_manager.client import DBPrivManagerApi
from backend.db_meta.enums import ClusterType, InstanceStatus
from backend.db_meta.enums import InstanceStatus
from backend.db_meta.exceptions import ClusterNotExistException
from backend.db_meta.models import Cluster
from backend.flow.consts import MachinePrivRoleMap, PrivRole
from backend.flow.plugins.components.collections.common.base_service import BaseService
from backend.flow.utils.mysql.common.random_job_with_ticket_map import get_instance_with_random_job
from backend.flow.utils.mysql.get_mysql_sys_user import generate_mysql_tmp_user
from backend.ticket.constants import TicketType

logger = logging.getLogger("flow")

# 定义控制节点的异常控制,理论上所有单据执行这个活动节点应该所有实例权限成功才能显示成功,但是部分单据不一定完全成功地,比如故障替换场景。
allow_list = [
TicketType.MYSQL_MASTER_FAIL_OVER.value,
TicketType.MYSQL_RESTORE_SLAVE.value,
TicketType.TENDBCLUSTER_MASTER_FAIL_OVER.value,
]


class AddTempUserForClusterService(BaseService):
"""
Expand All @@ -53,41 +45,6 @@ def __add_priv(self, params):

return True

@staticmethod
def _get_instance_for_cluster(cluster: Cluster) -> list:
"""
根据cluster对象获取所有的cluster需要实例信息
"""
inst_list = []
for inst in cluster.storageinstance_set.all():
inst_list.append(
{
"instance": inst.ip_port,
"priv_role": MachinePrivRoleMap.get(inst.machine_type),
"cmdb_status": inst.status,
}
)
if cluster.cluster_type == ClusterType.TenDBCluster:
# 获取tendb cluster集群所有spider实例
for inst in cluster.proxyinstance_set.all():
inst_list.append(
{
"instance": inst.ip_port,
"priv_role": MachinePrivRoleMap.get(inst.machine_type),
"cmdb_status": inst.status,
}
)
# 获取tendb cluster集群所有tdbctl实例,只给中控primary授权,权限同步到每个节点
inst_list.append(
{
"instance": cluster.tendbcluster_ctl_primary_address(),
"priv_role": PrivRole.TDBCTL.value,
"cmdb_status": InstanceStatus.RUNNING.value,
}
)

return inst_list

def _execute(self, data, parent_data, callback=None) -> bool:
kwargs = data.get_one_of_inputs("kwargs")
global_data = data.get_one_of_inputs("global_data")
Expand Down Expand Up @@ -121,7 +78,9 @@ def _execute(self, data, parent_data, callback=None) -> bool:
common_param["bk_cloud_id"] = cluster.bk_cloud_id

# 获取每套集群的所有需要添加临时的账号
instance_list = self._get_instance_for_cluster(cluster=cluster)
instance_list = get_instance_with_random_job(
cluster=cluster, ticket_type=global_data.get("ticket_type", "test")
)

# 开始遍历集群每个实例,添加临时账号
for inst in instance_list:
Expand Down
Loading

0 comments on commit 9e98974

Please sign in to comment.