From 9e989745e7295479cd9c744c7ef1def9dbc0a185 Mon Sep 17 00:00:00 2001 From: yksitu <1297650644@qq.com> Date: Fri, 8 Nov 2024 23:06:00 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Dproxy=E6=9B=BF?= =?UTF-8?q?=E6=8D=A2=E6=B5=81=E7=A8=8B=EF=BC=9B=E4=BC=98=E5=8C=96=E9=9A=8F?= =?UTF-8?q?=E6=9C=BA=E8=B4=A6=E5=8F=B7=E7=AE=A1=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=9B=E4=BF=AE=E5=A4=8D=E5=B9=B6=E5=8F=91=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=A4=9A=E5=AE=9E=E4=BE=8B=E4=B8=8B=E6=9E=B6=E5=85=83=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=88=A0=E9=99=A4=E9=97=AE=E9=A2=98=20#7821?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/go-pubpkg/validate/validate.go | 4 +- .../sqlserver/cluster_role_switch.go | 28 ++-- .../api/cluster/tendbha/decommission.py | 6 + .../api/cluster/tendbha/switch_proxy.py | 4 + .../scene/mysql/mysql_proxy_cluster_switch.py | 55 +++++-- .../sqlserver_master_slave_switch.py | 6 +- .../common/create_random_job_user.py | 51 +----- .../common/drop_random_job_user.py | 150 ++++++------------ .../sqlserver/drop_random_job_user.py | 3 +- .../common/random_job_with_ticket_map.py | 106 +++++++++++++ 10 files changed, 234 insertions(+), 179 deletions(-) create mode 100644 dbm-ui/backend/flow/utils/mysql/common/random_job_with_ticket_map.py diff --git a/dbm-services/common/go-pubpkg/validate/validate.go b/dbm-services/common/go-pubpkg/validate/validate.go index 5f5a3298fc..38157e10b4 100644 --- a/dbm-services/common/go-pubpkg/validate/validate.go +++ b/dbm-services/common/go-pubpkg/validate/validate.go @@ -156,7 +156,9 @@ func translate(ut ut.Translator, fe validator.FieldError) string { func validCharSet(f validator.FieldLevel) bool { v := f.Field().String() - return cmutil.HasElem(v, []string{"default", "utf8mb4", "utf8", "latin1", "gb2312", "gbk", "binary", "gb18030"}) + return cmutil.HasElem( + v, []string{"default", "utf8mb4", "utf8", "latin1", "gb2312", "gbk", "binary", "gb18030", "utf8mb3"}, + ) } // validateCrontabExpr 验证Liunx crontab表达式 diff --git a/dbm-services/sqlserver/db-tools/dbactuator/pkg/components/sqlserver/cluster_role_switch.go b/dbm-services/sqlserver/db-tools/dbactuator/pkg/components/sqlserver/cluster_role_switch.go index 90263b7de9..946fd19eca 100644 --- a/dbm-services/sqlserver/db-tools/dbactuator/pkg/components/sqlserver/cluster_role_switch.go +++ b/dbm-services/sqlserver/db-tools/dbactuator/pkg/components/sqlserver/cluster_role_switch.go @@ -57,13 +57,12 @@ func (c *ClusterRoleSwitchComp) Init() error { c.Params.MasterPort, ); err != nil { // 如果主实例连接失败,且不属于强制切换,则退出异常 - if c.Params.Force { + if !c.Params.Force { return fmt.Errorf("connenct by [%s:%d] failed,err:%s", c.Params.MasterHost, c.Params.MasterPort, err.Error()) } logger.Warn("connenct by [%s:%d] failed,err:%s", c.Params.MasterHost, c.Params.MasterPort, err.Error()) - return err } if SdbWork, err = sqlserver.NewDbWorker( c.GeneralParam.RuntimeAccountParam.SAUser, @@ -104,19 +103,20 @@ func (c *ClusterRoleSwitchComp) PreCheck() error { if c.Params.SyncMode != cst.MIRRORING && c.Params.SyncMode != cst.ALWAYSON { return fmt.Errorf("the sync-mode [%d] is not supported", c.Params.SyncMode) } - // 检查是否空实例(没有业务数据库) - var checkDBS []string - if err := c.MasterDB.Queryx(&checkDBS, cst.GET_BUSINESS_DATABASE); err != nil { - return fmt.Errorf("get db list failed %v", err) - } - if len(checkDBS) == 0 { - // 空时候代表不需要进行下面逻辑,打tag - c.isEmpty = true - logger.Warn("this cluster is an empty cluster") - return nil - } // 检验逻辑,不同数据同步模式检测方式不一样,分开处理, 强制切换不做检测处理 - if c.Params.Force { + if !c.Params.Force { + // 检查是否空实例(没有业务数据库) + var checkDBS []string + if err := c.MasterDB.Queryx(&checkDBS, cst.GET_BUSINESS_DATABASE); err != nil { + return fmt.Errorf("get db list failed %v", err) + } + if len(checkDBS) == 0 { + // 空时候代表不需要进行下面逻辑,打tag + c.isEmpty = true + logger.Warn("this cluster is an empty cluster") + return nil + } + switch c.Params.SyncMode { case cst.MIRRORING: c.MirroringPreCheck() diff --git a/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py b/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py index 8c7b1ebc1a..f38eac4a2a 100644 --- a/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py +++ b/dbm-ui/backend/db_meta/api/cluster/tendbha/decommission.py @@ -28,6 +28,9 @@ def decommission(cluster: Cluster): cc_manage = CcManage(cluster.bk_biz_id, cluster.cluster_type) for proxy in cluster.proxyinstance_set.all(): + # 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题 + proxy.machine.proxyinstance_set.select_for_update().all() + proxy.delete(keep_parents=True) if not proxy.machine.proxyinstance_set.exists(): # 这个 api 不需要检查返回值, 转移主机到空闲模块,转移模块这里会把服务实例删除 @@ -38,6 +41,9 @@ def decommission(cluster: Cluster): cc_manage.delete_service_instance(bk_instance_ids=[proxy.bk_instance_id]) for storage in cluster.storageinstance_set.all(): + # 先做加锁处理,避免出现同机器同时回收实例出现判断异常的问题 + storage.machine.proxyinstance_set.select_for_update().all() + # 删除存储在密码服务的密码元信息 DBPrivManagerApi.delete_password( { diff --git a/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_proxy.py b/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_proxy.py index f2e969a82e..4e573c86a5 100644 --- a/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_proxy.py +++ b/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_proxy.py @@ -72,6 +72,10 @@ def reduce_proxy(cluster_ids: list, origin_proxy_ip: str): """ clusters = Cluster.objects.filter(id__in=cluster_ids) for cluster in clusters: + # 先对同机的proxy实例加锁, 避免多单据执行出现脏读情况 + ProxyInstance.objects.select_for_update().filter( + machine__bk_cloud_id=cluster.bk_cloud_id, machine__ip=origin_proxy_ip + ) proxy = cluster.proxyinstance_set.get(machine__ip=origin_proxy_ip) # 删除实例元数据 proxy.delete(keep_parents=True) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py index 127f109794..021bae366a 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_cluster_switch.py @@ -133,6 +133,16 @@ def switch_mysql_cluster_proxy_flow(self): sub_flow_context["proxy_ports"] = self.__get_proxy_install_ports(cluster_ids=info["cluster_ids"]) sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context)) + # 初始化同机替换的proxy集群信息 + clusters = [ + self.__get_switch_cluster_info( + cluster_id=cluster_id, + origin_proxy_ip=info["origin_proxy_ip"]["ip"], + target_proxy_ip=info["target_proxy_ip"]["ip"], + ) + for cluster_id in info["cluster_ids"] + ] + # 拼接执行原子任务活动节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题 exec_act_kwargs = ExecActuatorKwargs( cluster_type=ClusterType.TenDBHA, @@ -180,24 +190,15 @@ def switch_mysql_cluster_proxy_flow(self): act_component_code=ExecuteDBActuatorScriptComponent.code, kwargs=asdict(exec_act_kwargs), ) - # 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换 - sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={}) # 阶段2 根据需要替换的proxy的集群,依次添加 - switch_proxy_sub_list = [] - for cluster_id in info["cluster_ids"]: + add_proxy_sub_list = [] + for cluster in clusters: # 拼接子流程需要全局参数 sub_sub_flow_context = copy.deepcopy(self.data) sub_sub_flow_context.pop("infos") - # 获取集群的实例信息 - cluster = self.__get_switch_cluster_info( - cluster_id=cluster_id, - target_proxy_ip=info["target_proxy_ip"]["ip"], - origin_proxy_ip=info["origin_proxy_ip"]["ip"], - ) - # 针对集群维度声明替换子流程 switch_proxy_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context)) @@ -252,6 +253,28 @@ def switch_mysql_cluster_proxy_flow(self): ) switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list) + add_proxy_sub_list.append( + switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群添加proxy实例").format(cluster["name"])) + ) + + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=add_proxy_sub_list) + + # 后续流程需要在这里加一个暂停节点,让用户在合适的时间执行切换 + sub_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={}) + + # 阶段3 根据集群维度切换域名 + switch_dns_sub_list = [] + for cluster in clusters: + + # 拼接子流程需要全局参数 + sub_sub_flow_context = copy.deepcopy(self.data) + sub_sub_flow_context.pop("infos") + + # 针对集群维度声明替换子流程 + switch_cluster_dns_pipeline = SubBuilder( + root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context) + ) + acts_list = [] for dns_name in cluster["add_domain_list"]: # 这里的添加域名的方式根据目前集群对应proxy dns域名进行循环添加,这样保证某个域名添加异常时其他域名添加成功 @@ -269,9 +292,9 @@ def switch_mysql_cluster_proxy_flow(self): ), } ) - switch_proxy_sub_pipeline.add_parallel_acts(acts_list=acts_list) + switch_cluster_dns_pipeline.add_parallel_acts(acts_list=acts_list) - switch_proxy_sub_pipeline.add_act( + switch_cluster_dns_pipeline.add_act( act_name=_("回收旧proxy集群映射"), act_component_code=MySQLDnsManageComponent.code, kwargs=asdict( @@ -283,11 +306,11 @@ def switch_mysql_cluster_proxy_flow(self): ), ) - switch_proxy_sub_list.append( - switch_proxy_sub_pipeline.build_sub_process(sub_name=_("{}集群替换proxy实例").format(cluster["name"])) + switch_dns_sub_list.append( + switch_cluster_dns_pipeline.build_sub_process(sub_name=_("{}集群切换proxy域名").format(cluster["name"])) ) - sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_proxy_sub_list) + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_dns_sub_list) # 先把新的节点数据写入 sub_pipeline.add_act( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_master_slave_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_master_slave_switch.py index 1c382a3678..a3f24bda7c 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_master_slave_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_master_slave_switch.py @@ -70,6 +70,8 @@ def run_flow(self): # 定义主流程 main_pipeline = Builder(root_id=self.root_id, data=self.data) sub_pipelines = [] + # 定义切换名称 + sub_name = _("集群主故障切换") if self.data["force"] else _("集群互切") for info in self.data["infos"]: # 声明子流程 @@ -199,7 +201,7 @@ def run_flow(self): kwargs=asdict(DropRandomJobUserKwargs(cluster_ids=[cluster.id])), ) - act_list.append(cluster_pipeline.build_sub_process(sub_name=_("{}集群互切".format(cluster.name)))) + act_list.append(cluster_pipeline.build_sub_process(sub_name=_("{}{}".format(cluster.name, sub_name)))) # 拼接集群维度的子流程 sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=act_list) @@ -222,7 +224,7 @@ def run_flow(self): root_id=self.root_id, bk_biz_id=int(self.data["bk_biz_id"]), bk_cloud_id=0, - master_host=[Host(**info["master"])], + master_host=[] if self.data["force"] else [Host(**info["master"])], slave_host=[Host(**info["slave"])], cluster_domain_list=[i.immute_domain for i in Cluster.objects.filter(id__in=info["cluster_ids"])], is_install_backup_client=False, diff --git a/dbm-ui/backend/flow/plugins/components/collections/common/create_random_job_user.py b/dbm-ui/backend/flow/plugins/components/collections/common/create_random_job_user.py index b17147a864..3b90350021 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/common/create_random_job_user.py +++ b/dbm-ui/backend/flow/plugins/components/collections/common/create_random_job_user.py @@ -15,23 +15,15 @@ from pipeline.component_framework.component import Component from backend.components.mysql_priv_manager.client import DBPrivManagerApi -from backend.db_meta.enums import ClusterType, InstanceStatus +from backend.db_meta.enums import InstanceStatus from backend.db_meta.exceptions import ClusterNotExistException from backend.db_meta.models import Cluster -from backend.flow.consts import MachinePrivRoleMap, PrivRole from backend.flow.plugins.components.collections.common.base_service import BaseService +from backend.flow.utils.mysql.common.random_job_with_ticket_map import get_instance_with_random_job from backend.flow.utils.mysql.get_mysql_sys_user import generate_mysql_tmp_user -from backend.ticket.constants import TicketType logger = logging.getLogger("flow") -# 定义控制节点的异常控制,理论上所有单据执行这个活动节点应该所有实例权限成功才能显示成功,但是部分单据不一定完全成功地,比如故障替换场景。 -allow_list = [ - TicketType.MYSQL_MASTER_FAIL_OVER.value, - TicketType.MYSQL_RESTORE_SLAVE.value, - TicketType.TENDBCLUSTER_MASTER_FAIL_OVER.value, -] - class AddTempUserForClusterService(BaseService): """ @@ -53,41 +45,6 @@ def __add_priv(self, params): return True - @staticmethod - def _get_instance_for_cluster(cluster: Cluster) -> list: - """ - 根据cluster对象获取所有的cluster需要实例信息 - """ - inst_list = [] - for inst in cluster.storageinstance_set.all(): - inst_list.append( - { - "instance": inst.ip_port, - "priv_role": MachinePrivRoleMap.get(inst.machine_type), - "cmdb_status": inst.status, - } - ) - if cluster.cluster_type == ClusterType.TenDBCluster: - # 获取tendb cluster集群所有spider实例 - for inst in cluster.proxyinstance_set.all(): - inst_list.append( - { - "instance": inst.ip_port, - "priv_role": MachinePrivRoleMap.get(inst.machine_type), - "cmdb_status": inst.status, - } - ) - # 获取tendb cluster集群所有tdbctl实例,只给中控primary授权,权限同步到每个节点 - inst_list.append( - { - "instance": cluster.tendbcluster_ctl_primary_address(), - "priv_role": PrivRole.TDBCTL.value, - "cmdb_status": InstanceStatus.RUNNING.value, - } - ) - - return inst_list - def _execute(self, data, parent_data, callback=None) -> bool: kwargs = data.get_one_of_inputs("kwargs") global_data = data.get_one_of_inputs("global_data") @@ -121,7 +78,9 @@ def _execute(self, data, parent_data, callback=None) -> bool: common_param["bk_cloud_id"] = cluster.bk_cloud_id # 获取每套集群的所有需要添加临时的账号 - instance_list = self._get_instance_for_cluster(cluster=cluster) + instance_list = get_instance_with_random_job( + cluster=cluster, ticket_type=global_data.get("ticket_type", "test") + ) # 开始遍历集群每个实例,添加临时账号 for inst in instance_list: diff --git a/dbm-ui/backend/flow/plugins/components/collections/common/drop_random_job_user.py b/dbm-ui/backend/flow/plugins/components/collections/common/drop_random_job_user.py index 48736ba669..0d2cfd176b 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/common/drop_random_job_user.py +++ b/dbm-ui/backend/flow/plugins/components/collections/common/drop_random_job_user.py @@ -15,10 +15,11 @@ from pipeline.component_framework.component import Component from backend.components import DRSApi -from backend.db_meta.enums import ClusterType, InstanceStatus +from backend.db_meta.enums import InstanceStatus from backend.db_meta.exceptions import ClusterNotExistException from backend.db_meta.models import Cluster from backend.flow.plugins.components.collections.common.base_service import BaseService +from backend.flow.utils.mysql.common.random_job_with_ticket_map import get_instance_with_random_job from backend.flow.utils.mysql.get_mysql_sys_user import generate_mysql_tmp_user from backend.flow.utils.mysql.mysql_version_parse import mysql_version_parse from backend.ticket.constants import TicketType @@ -26,109 +27,64 @@ logger = logging.getLogger("flow") -# 定义控制节点的跳过行为,相关单据不需要执行具体回收单据内容,比如集群下架单据 -skip_list = [ - TicketType.MYSQL_HA_DESTROY.value, - TicketType.MYSQL_SINGLE_DESTROY.value, - TicketType.TENDBCLUSTER_DESTROY.value, - TicketType.TENDBCLUSTER_TEMPORARY_DESTROY.value, - TicketType.TENDBCLUSTER_SPIDER_MNT_DESTROY.value, - TicketType.TENDBCLUSTER_SPIDER_REDUCE_NODES.value, - TicketType.TENDBCLUSTER_SPIDER_MNT_DESTROY.value, -] - - class DropTempUserForClusterService(BaseService): """ 为单据删除job的临时本地账号,操作目标实例 单据是以集群维度来删除 """ - @staticmethod - def _get_instance_for_cluster(cluster: Cluster) -> list: - """ - 根据cluster对象获取所有的cluster需要实例信息 - """ - objs = [ - {"ip_port": i.ip_port, "is_tdbctl": False, "cmdb_status": i.status} - for i in list(cluster.storageinstance_set.all()) - ] - - if cluster.cluster_type == ClusterType.TenDBCluster: - # 如果是TenDB cluster集群,获取所有spider实例 - objs += [ - {"ip_port": i.ip_port, "is_tdbctl": False, "cmdb_status": i.status} - for i in list(cluster.proxyinstance_set.all()) - ] - - # 如果是tenDB cluster 集群类型,需要获取中控实例primary - objs.append( - { - "ip_port": cluster.tendbcluster_ctl_primary_address(), - "is_tdbctl": True, - "cmdb_status": InstanceStatus.RUNNING.value, - } - ) - return objs - - def drop_jor_user(self, cluster: Cluster, root_id: str): + def drop_jor_user(self, cluster: Cluster, root_id: str, ticket_type: TicketType): """ 集群维度删除job的临时用户 """ # 拼接临时用户的名称 user = generate_mysql_tmp_user(root_id) err_num = 0 - try: - # 删除localhost和 local_ip用户 - for instance in self._get_instance_for_cluster(cluster=cluster): - # 默认先关闭binlog记录, 最后统一打开 - cmd = ["set session sql_log_bin = 0 ;"] - - self.log_info(f"the cluster version is {cluster.major_version}") - if mysql_version_parse(cluster.major_version) >= mysql_version_parse("5.7"): - cmd += [ - f"drop user if exists `{user}`@`localhost`;", - f"drop user if exists `{user}`@`{instance['ip_port'].split(':')[0]}`;", - ] - else: - cmd += [ - f"drop user `{user}`@`localhost`;", - f"drop user `{user}`@`{instance['ip_port'].split(':')[0]}`;", - ] - # 最后统一打开binlog, 避免复用异常 - cmd.append("set session sql_log_bin = 1 ;") - resp = DRSApi.rpc( - { - "addresses": [instance["ip_port"]], - "cmds": cmd, - "force": True, # 中间出错也要执行下去,保证重新打开binlog - "bk_cloud_id": cluster.bk_cloud_id, - } - ) - for info in resp[0]["cmd_results"]: - # 其实只是一行 - if info["error_msg"]: - if instance["cmdb_status"] == InstanceStatus.RUNNING.value: - # 如果实例是running状态,应该记录错误,并且返回异常 - self.log_error( - f"The result [drop user `{user}`] in {instance['ip_port']}" - f" is [{info['error_msg']}]" - ) - err_num = err_num + 1 - else: - # 如果是非running状态,标记warning信息,但不作异常处理 - self.log_warning(info["error_msg"]) - self.log_warning( - f"[{instance['ip_port']} is not running in dbm [{instance['cmdb_status']}],ignore]" - ) - continue - - if err_num == 0: - self.log_info(f"The result [drop user if exists `{user}`] in {instance['ip_port']} is [success]") - - except Exception as e: # pylint: disable=broad-except - self.log_exception(f"drop user error in cluster [{cluster.name}]: {e}") - return False + # 删除localhost和 local_ip用户 + for instance in get_instance_with_random_job(cluster=cluster, ticket_type=ticket_type): + # 默认先关闭binlog记录, 最后统一打开 + cmd = ["set session sql_log_bin = 0 ;"] + + self.log_info(f"the cluster version is {cluster.major_version}") + if mysql_version_parse(cluster.major_version) >= mysql_version_parse("5.7"): + cmd += [ + f"drop user if exists `{user}`@`localhost`;", + f"drop user if exists `{user}`@`{instance['instance'].split(':')[0]}`;", + ] + else: + cmd += [ + f"drop user `{user}`@`localhost`;", + f"drop user `{user}`@`{instance['instance'].split(':')[0]}`;", + ] + # 最后统一打开binlog, 避免复用异常 + cmd.append("set session sql_log_bin = 1 ;") + resp = DRSApi.rpc( + { + "addresses": [instance["instance"]], + "cmds": cmd, + "force": True, # 中间出错也要执行下去,保证重新打开binlog + "bk_cloud_id": cluster.bk_cloud_id, + } + ) + for info in resp[0]["cmd_results"]: + # 其实只是一行 + if info["error_msg"]: + if instance["cmdb_status"] == InstanceStatus.RUNNING.value: + # 如果实例是running状态,应该记录错误,并且返回异常 + self.log_error( + f"The result [drop user `{user}`] in {instance['instance']}" f" is [{info['error_msg']}]" + ) + err_num = err_num + 1 + else: + # 如果是非running状态,标记warning信息,但不作异常处理 + self.log_warning(info["error_msg"]) + self.log_warning( + f"[{instance['instance']} is not running in dbm [{instance['cmdb_status']}],ignore]" + ) + continue + + if err_num == 0: + self.log_info(f"The result [drop user if exists `{user}`] in {instance['instance']} is [success]") if err_num > 0: self.log_error(f"drop user error in cluster [{cluster.name}]") @@ -141,7 +97,6 @@ def _execute(self, data, parent_data, callback=None) -> bool: kwargs = data.get_one_of_inputs("kwargs") global_data = data.get_one_of_inputs("global_data") - err_num = 0 for cluster_id in kwargs["cluster_ids"]: # 获取每个cluster_id对应的对象 try: @@ -150,12 +105,9 @@ def _execute(self, data, parent_data, callback=None) -> bool: raise ClusterNotExistException( cluster_id=cluster_id, bk_biz_id=global_data["bk_biz_id"], message=_("集群不存在") ) - if not self.drop_jor_user(cluster=cluster, root_id=global_data["job_root_id"]): - err_num = err_num + 1 - - if err_num > 0: - self.log_error("drop user error") - return False + self.drop_jor_user( + cluster=cluster, root_id=global_data["job_root_id"], ticket_type=global_data.get("ticket_type", "test") + ) return True diff --git a/dbm-ui/backend/flow/plugins/components/collections/sqlserver/drop_random_job_user.py b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/drop_random_job_user.py index 41bc4404a3..f2087977d8 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/sqlserver/drop_random_job_user.py +++ b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/drop_random_job_user.py @@ -45,7 +45,8 @@ def _drop_job_user(self, job_root_id: str, bk_cloud_id: int, storages: QuerySet, inst_status = storages.get( machine__ip=info["address"].split(":")[0], port=int(info["address"].split(":")[1]) - ) + ).status + if inst_status == InstanceStatus.UNAVAILABLE: # 如果实例的状态本身是unavailable,则失败可以忽略 self.log_warning(f"the instance [{info['address']}] is already unavailable, ignore") diff --git a/dbm-ui/backend/flow/utils/mysql/common/random_job_with_ticket_map.py b/dbm-ui/backend/flow/utils/mysql/common/random_job_with_ticket_map.py new file mode 100644 index 0000000000..edeb6615b7 --- /dev/null +++ b/dbm-ui/backend/flow/utils/mysql/common/random_job_with_ticket_map.py @@ -0,0 +1,106 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from dataclasses import dataclass, field + +from django.db.models import Q + +from backend.db_meta.enums import ClusterType, InstanceRole +from backend.db_meta.models import Cluster +from backend.flow.consts import InstanceStatus, MachinePrivRoleMap, PrivRole +from backend.ticket.constants import TicketType + + +@dataclass() +class RuleDict: + """ + 定义执行通用的规则结构体 + @attributes exec_storage_instance_role_list: 添加的storage角色 + @attributes ignore_storage_instance_role_list: 忽略的storage角色 + @attributes exec_proxy_instance_role_list: 需要添加的proxy角色 + @attributes ignore_proxy_instance_role_list: 忽略的proxy角色 + @attributes is_tdbctl_primary_add: 是否给tdbctl primary添加账号 + @attributes is_tdbctl_slave_add: 是否给tdbctl slave添加账号 + """ + + exec_storage_instance_role_list: list = field(default_factory=list) + ignore_storage_instance_role_list: list = field(default_factory=list) + exec_proxy_instance_role_list: list = field(default_factory=list) + ignore_proxy_instance_role_list: list = field(default_factory=list) + is_tdbctl_primary_add: bool = False + + +# 定义的单据类型对哪些实例角色来添加随机账号 +random_job_with_ticket_map = { + # mysql 变更SQL执行添加账号规则 + TicketType.MYSQL_IMPORT_SQLFILE: RuleDict(exec_storage_instance_role_list=[InstanceRole.BACKEND_MASTER]), + # tendb_cluster集群SQL执行添加账号规则 + TicketType.TENDBCLUSTER_IMPORT_SQLFILE: RuleDict(is_tdbctl_primary_add=True), +} + + +def get_instance_with_random_job(cluster: Cluster, ticket_type: TicketType): + """ + 根据单据类型以及集群信息获取到需要添加的实例 + @param cluster: 集群信息 + @param ticket_type: 单据类型 + """ + tdbctl_list = [] + proxy_instances = [] + storge_instances = [] + rule_dict = random_job_with_ticket_map.get(ticket_type, None) + if not rule_dict: + # 表示这类单据类型没有命中规则,默认返回所有 + storge_instances = cluster.storageinstance_set.all() + if cluster.cluster_type == ClusterType.TenDBCluster: + proxy_instances = cluster.proxyinstance_set.all() + tdbctl_list.append( + { + "instance": cluster.tendbcluster_ctl_primary_address(), + "priv_role": PrivRole.TDBCTL.value, + "cmdb_status": InstanceStatus.RUNNING.value, + } + ) + else: + # 进入匹配环节 + storage_filter_query = Q() + storage_exclude_query = Q() + proxy_filter_query = Q() + proxy_exclude_query = Q() + if rule_dict.exec_storage_instance_role_list: + storage_filter_query &= Q(instance_role__in=rule_dict.exec_storage_instance_role_list) + if rule_dict.ignore_storage_instance_role_list: + storage_exclude_query |= Q(instance_role__in=rule_dict.ignore_storage_instance_role_list) + if rule_dict.exec_proxy_instance_role_list: + proxy_filter_query &= Q(instance_role__in=rule_dict.exec_proxy_instance_role_list) + if rule_dict.ignore_proxy_instance_role_list: + proxy_exclude_query |= Q(instance_role__in=rule_dict.ignore_proxy_instance_role_list) + + if storage_filter_query.children or storage_exclude_query.children: + storge_instances = cluster.storageinstance_set.filter(storage_filter_query).exclude(storage_exclude_query) + if proxy_filter_query.children or proxy_exclude_query.children: + proxy_instances = cluster.proxyinstance_set.filter(proxy_filter_query).exclude(proxy_exclude_query) + if rule_dict.is_tdbctl_primary_add: + tdbctl_list.append( + { + "instance": cluster.tendbcluster_ctl_primary_address(), + "priv_role": PrivRole.TDBCTL.value, + "cmdb_status": InstanceStatus.RUNNING.value, + } + ) + + return [ + { + "instance": inst.ip_port, + "priv_role": MachinePrivRoleMap.get(inst.machine_type), + "cmdb_status": inst.status, + } + for inst in list(storge_instances) + list(proxy_instances) + ] + tdbctl_list