diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py index 05f91c62a3..ed9fdfe14e 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py @@ -36,7 +36,7 @@ class TenDBRemoteSlaveLocalRecoverFlow(object): """ - TenDB 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复 + TenDB Cluster 后端从节点实例数据修复 """ def __init__(self, root_id: str, ticket_data: Optional[Dict]): @@ -47,6 +47,7 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]): self.root_id = root_id self.ticket_data = ticket_data self.data = {} + self.backup_target_path = f"/data/dbbak/{self.root_id}" def tendb_remote_slave_local_recover(self): """ @@ -68,7 +69,6 @@ def tendb_remote_slave_local_recover(self): self.data["root_id"] = self.root_id self.data["uid"] = self.ticket_data["uid"] self.data["ticket_type"] = self.ticket_data["ticket_type"] - self.data["bk_biz_id"] = self.ticket_data["bk_biz_id"] self.data["bk_biz_id"] = cluster_class.bk_biz_id self.data["db_module_id"] = cluster_class.db_module_id self.data["cluster_type"] = cluster_class.cluster_type @@ -90,6 +90,7 @@ def tendb_remote_slave_local_recover(self): ) ), ) + # todo 怎么获取 shard_ids sync_data_sub_pipeline_list = [] for shard_id in self.data["shard_ids"]: shard = cluster_class.tendbclusterstorageset_set.get(shard_id=shard_id) @@ -141,7 +142,7 @@ def tendb_remote_slave_local_recover(self): "new_slave_ip": target_slave.machine.ip, "new_slave_port": target_slave.port, "bk_cloud_id": cluster_class.bk_cloud_id, - "file_target_path": f"/data/dbbak/{self.root_id}/{master.port}", + "file_target_path": f"{self.backup_target_path}/{master.port}", "charset": self.data["charset"], "change_master_force": True, "cluster_type": cluster_class.cluster_type, @@ -172,6 +173,7 @@ def tendb_remote_slave_local_recover(self): ) ) tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list) + # 安装周边 tendb_migrate_pipeline.add_sub_pipeline( sub_flow=build_surrounding_apps_sub_flow( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_master_slave_migrate.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_master_slave_migrate.py new file mode 100644 index 0000000000..8221a7f848 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_master_slave_migrate.py @@ -0,0 +1,384 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import copy +import logging.config +from dataclasses import asdict +from datetime import datetime +from typing import Dict, Optional + +from django.utils import timezone +from django.utils.translation import ugettext as _ + +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import ClusterType +from backend.db_meta.models import Cluster +from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import ( + build_surrounding_apps_sub_flow, + install_mysql_in_cluster_sub_flow, +) +from backend.flow.engine.bamboo.scene.spider.common.common_sub_flow import remote_migrate_switch_sub_flow +from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException +from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import ( + remote_instance_migrate_sub_flow, + remote_node_uninstall_sub_flow, +) +from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent +from backend.flow.plugins.components.collections.common.pause import PauseComponent +from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent +from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.spider.spider_db_meta import SpiderDBMetaComponent +from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs +from backend.flow.utils.mysql.common.mysql_cluster_info import get_version_and_charset +from backend.flow.utils.mysql.mysql_act_dataclass import ClearMachineKwargs, DBMetaOPKwargs, ExecActuatorKwargs +from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload +from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext +from backend.flow.utils.spider.spider_db_meta import SpiderDBMeta +from backend.flow.utils.spider.tendb_cluster_info import get_master_slave_recover_info + +logger = logging.getLogger("flow") + + +class TendbClusterMigrateRemoteFlow(object): + """ + tendb cluster 后端remote节点主从成对迁移 + """ + + def __init__(self, root_id: str, ticket_data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param ticket_data : 单据传递参数 + """ + self.root_id = root_id + self.ticket_data = ticket_data + self.data = {} + + # 定义备份文件存放到目标机器目录位置 + self.backup_target_path = f"/data/dbbak/{self.root_id}" + + def migrate_master_slave_flow(self): + """ + 成对迁移集群主从节点。 + 元数据信息修改顺序: + 1 mysql_migrate_cluster_add_instance + 2 mysql_migrate_cluster_add_tuple + 3 mysql_migrate_cluster_switch_storage + """ + # 构建流程 + cluster_ids = [] + for i in self.ticket_data["infos"]: + cluster_ids.extend(i["cluster_ids"]) + + tendb_migrate_pipeline_all = Builder( + root_id=self.root_id, + data=copy.deepcopy(self.ticket_data), + need_random_pass_cluster_ids=list(set(cluster_ids)), + ) + + # 按照传入的infos信息,循环拼接子流程 + tendb_migrate_pipeline_all_list = [] + for info in self.ticket_data["infos"]: + # 阶段1 获取集群所有信息。计算端口,构建数据。 + self.data = copy.deepcopy(info) + cluster_class = Cluster.objects.get(id=self.data["cluster_id"]) + self.data["bk_cloud_id"] = cluster_class.bk_cloud_id + self.data["root_id"] = self.root_id + self.data["uid"] = self.ticket_data["uid"] + self.data["created_by"] = self.ticket_data["created_by"] + self.data["ticket_type"] = self.ticket_data["ticket_type"] + self.data["bk_biz_id"] = cluster_class.bk_biz_id + self.data["db_module_id"] = cluster_class.db_module_id + self.data["cluster_type"] = cluster_class.cluster_type + self.data["force"] = True + self.data["charset"], self.data["db_version"] = get_version_and_charset( + bk_biz_id=cluster_class.bk_biz_id, + db_module_id=cluster_class.db_module_id, + cluster_type=cluster_class.cluster_type, + ) + + # todo 验证2个ip为主从关系 + # 先查询备份,如果备份不存在则退出,不安装实例 + # restore_time = datetime.strptime("2023-07-31 17:40:00", "%Y-%m-%d %H:%M:%S") + backup_handler = FixPointRollbackHandler(cluster_class.id) + restore_time = datetime.now(timezone.utc) + backup_info = backup_handler.query_latest_backup_log(restore_time) + if backup_info is None: + logger.error("cluster {} backup info not exists".format(self.data["cluster_id"])) + raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id))) + logger.debug(backup_info) + tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + + cluster_info = get_master_slave_recover_info( + cluster_class.id, self.data["old_master_ip"], self.data["old_slave_ip"] + ) + cluster_info["charset"] = self.data["charset"] + cluster_info["db_version"] = self.data["db_version"] + cluster_info["ports"] = [] + for shard_id, shard in cluster_info["my_shards"].items(): + master = { + "ip": self.data["new_master_ip"], + "port": shard["master"]["port"], + "bk_cloud_id": self.data["bk_cloud_id"], + "instance": "{}{}{}".format(self.data["new_master_ip"], IP_PORT_DIVIDER, shard["master"]["port"]), + } + + slave = { + "ip": self.data["new_slave_ip"], + "port": shard["slave"]["port"], + "bk_cloud_id": self.data["bk_cloud_id"], + "instance": "{}{}{}".format(self.data["new_slave_ip"], IP_PORT_DIVIDER, shard["slave"]["port"]), + } + + cluster_info["my_shards"][shard_id]["new_slave"] = slave + cluster_info["my_shards"][shard_id]["new_master"] = master + cluster_info["ports"].append(shard["master"]["port"]) + + # 阶段2 安装实例 + install_sub_pipeline_list = [] + install_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + install_sub_pipeline.add_sub_pipeline( + sub_flow=install_mysql_in_cluster_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=cluster_class, + new_mysql_list=[self.data["new_master_ip"], self.data["new_slave_ip"]], + install_ports=cluster_info["ports"], + bk_host_ids=[self.data["bk_new_master"]["bk_host_id"], self.data["bk_new_slave"]["bk_host_id"]], + ) + ) + + cluster = { + "new_master_ip": self.data["new_master_ip"], + "new_slave_ip": self.data["new_slave_ip"], + "cluster_id": cluster_class.id, + "bk_cloud_id": cluster_class.bk_cloud_id, + "bk_biz_id": cluster_class.bk_biz_id, + "ports": cluster_info["ports"], + "version": cluster_class.major_version, + } + install_sub_pipeline.add_act( + act_name=_("写入初始化实例的db_meta元信息"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.remotedb_migrate_add_install_nodes.__name__, + cluster=copy.deepcopy(cluster), + is_update_trans_data=False, + ) + ), + ) + + install_sub_pipeline.add_act( + act_name=_("安装backup-client工具"), + act_component_code=DownloadBackupClientComponent.code, + kwargs=asdict( + DownloadBackupClientKwargs( + bk_cloud_id=cluster_class.bk_cloud_id, + bk_biz_id=int(cluster_class.bk_biz_id), + download_host_list=[cluster["new_master_ip"], cluster["new_slave_ip"]], + ) + ), + ) + + # 安装临时备份程序 + exec_act_kwargs = ExecActuatorKwargs( + cluster=cluster, + bk_cloud_id=cluster_class.bk_cloud_id, + cluster_type=cluster_class.cluster_type, + get_mysql_payload_func=MysqlActPayload.get_install_tmp_db_backup_payload.__name__, + ) + exec_act_kwargs.exec_ip = [cluster["new_master_ip"], cluster["new_slave_ip"]] + install_sub_pipeline.add_act( + act_name=_("安装临时备份程序"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + install_sub_pipeline_list.append(install_sub_pipeline.build_sub_process(sub_name=_("安装remote主从节点"))) + + # 阶段3 逐个实例同步数据到新主从库 + sync_data_sub_pipeline_list = [] + for shard_id, node in cluster_info["my_shards"].items(): + ins_cluster = copy.deepcopy(cluster_info["cluster"]) + ins_cluster["charset"] = cluster_info["charset"] + ins_cluster["new_master_ip"] = node["new_master"]["ip"] + ins_cluster["new_slave_ip"] = node["new_slave"]["ip"] + ins_cluster["new_master_port"] = node["new_master"]["port"] + ins_cluster["new_slave_port"] = node["new_slave"]["port"] + ins_cluster["master_ip"] = node["master"]["ip"] + ins_cluster["slave_ip"] = node["slave"]["ip"] + ins_cluster["master_port"] = node["master"]["port"] + ins_cluster["slave_port"] = node["slave"]["port"] + ins_cluster["file_target_path"] = f"{self.backup_target_path}/{ins_cluster['master_port']}" + ins_cluster["shard_id"] = shard_id + ins_cluster["change_master_force"] = False + ins_cluster["backupinfo"] = backup_info["remote_node"].get(shard_id, {}) + # 判断 remote_node 下每个分片的备份信息是否正常 + if ( + len(ins_cluster["backupinfo"]) == 0 + or len(ins_cluster["backupinfo"].get("file_list_details", {})) == 0 + ): + logger.error( + "cluster {} shard {} backup info not exists".format(self.data["cluster_id"], shard_id) + ) + raise TendbGetBackupInfoFailedException( + message=_("获取集群分片 {} shard {} 的备份信息失败".format(self.data["cluster_id"], shard_id)) + ) + sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=remote_instance_migrate_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=ins_cluster + ) + ) + sync_data_sub_pipeline.add_act( + act_name=_("同步数据完毕,写入数据节点的主从关系相关元数据"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.remotedb_migrate_add_storage_tuple.__name__, + cluster=ins_cluster, + is_update_trans_data=True, + ) + ), + ) + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) + + # 阶段4 切换remotedb + switch_sub_pipeline_list = [] + shard_list = [] + for shard_id, node in cluster_info["my_shards"].items(): + shard_cluster = { + "old_master": node["master"]["instance"], + "old_slave": node["slave"]["instance"], + "new_master": node["new_master"]["instance"], + "new_slave": node["new_slave"]["instance"], + } + shard_list.append(shard_cluster) + + switch_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + switch_sub_pipeline.add_sub_pipeline( + sub_flow=remote_migrate_switch_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=cluster_class, + migrate_tuples=shard_list, + created_by=self.data["created_by"], + ) + ) + switch_sub_pipeline.add_act( + act_name=_("remote机器切换完毕后修改元数据指向"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.tendb_remotedb_rebalance_switch.__name__, + cluster=cluster_info, + is_update_trans_data=True, + ) + ), + ) + switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换remote node 节点"))) + + # 阶段5: 新机器安装周边组件 + surrounding_sub_pipeline_list = [] + re_surrounding_sub_pipeline_list = [] + surrounding_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + surrounding_sub_pipeline.add_sub_pipeline( + sub_flow=build_surrounding_apps_sub_flow( + bk_cloud_id=cluster_class.bk_cloud_id, + master_ip_list=[self.data["new_master_ip"]], + slave_ip_list=[self.data["new_slave_ip"]], + root_id=self.root_id, + parent_global_data=copy.deepcopy(self.data), + is_init=True, + collect_sysinfo=True, + cluster_type=ClusterType.TenDBCluster.value, + is_install_backup=False, + ) + ) + surrounding_sub_pipeline_list.append(surrounding_sub_pipeline.build_sub_process(sub_name=_("新机器安装周边组件"))) + + re_surrounding_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + re_surrounding_sub_pipeline.add_sub_pipeline( + sub_flow=build_surrounding_apps_sub_flow( + bk_cloud_id=cluster_class.bk_cloud_id, + master_ip_list=[self.data["new_master_ip"]], + slave_ip_list=[self.data["new_slave_ip"]], + root_id=self.root_id, + parent_global_data=copy.deepcopy(self.data), + is_init=True, + cluster_type=ClusterType.TenDBCluster.value, + ) + ) + re_surrounding_sub_pipeline_list.append( + re_surrounding_sub_pipeline.build_sub_process(sub_name=_("切换后重新安装周边组件")) + ) + + # 阶段6: 主机级别卸载实例,卸载指定ip下的所有实例 + uninstall_svr_sub_pipeline_list = [] + for ip in [self.data["new_master_ip"], self.data["new_slave_ip"]]: + uninstall_svr_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + ins_cluster = {"uninstall_ip": ip, "cluster_id": cluster_class.id} + uninstall_svr_sub_pipeline.add_act( + act_name=_("整机卸载成功后删除元数据"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.remotedb_migrate_remove_storage.__name__, + cluster=ins_cluster, + is_update_trans_data=True, + ) + ), + ) + uninstall_svr_sub_pipeline.add_act( + act_name=_("清理机器配置"), + act_component_code=MySQLClearMachineComponent.code, + kwargs=asdict( + ClearMachineKwargs( + exec_ip=ip, + bk_cloud_id=cluster_class.bk_biz_id, + ) + ), + ) + uninstall_svr_sub_pipeline.add_sub_pipeline( + sub_flow=remote_node_uninstall_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), ip=ip + ) + ) + uninstall_svr_sub_pipeline_list.append( + uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}".format(ip))) + ) + + # 安装实例 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=install_sub_pipeline_list) + # 数据同步 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list) + # 数据同步完毕 安装周边 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=surrounding_sub_pipeline_list) + # 人工确认切换迁移实例 + tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) + # 切换迁移实例 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_sub_pipeline_list) + # 实例切换完毕 安装周边 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=re_surrounding_sub_pipeline_list) + # 卸载流程人工确认 + tendb_migrate_pipeline.add_act(act_name=_("人工确认卸载实例"), act_component_code=PauseComponent.code, kwargs={}) + # 卸载remote节点 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=uninstall_svr_sub_pipeline_list) + tendb_migrate_pipeline_all_list.append( + tendb_migrate_pipeline.build_sub_process(_("集群迁移{}").format(cluster_class.id)) + ) + + # 运行流程 + tendb_migrate_pipeline_all.add_parallel_sub_pipeline(tendb_migrate_pipeline_all_list) + tendb_migrate_pipeline_all.run_pipeline( + init_trans_data_class=ClusterInfoContext(), + is_drop_random_user=True, + ) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py index ea83b551c1..8ba3efc823 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py @@ -48,7 +48,7 @@ class TenDBRemoteSlaveRecoverFlow(object): """ - TenDB 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复 + TenDB Cluster 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复 """ def __init__(self, root_id: str, ticket_data: Optional[Dict]): @@ -59,6 +59,7 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]): self.root_id = root_id self.ticket_data = ticket_data self.data = {} + self.backup_target_path = f"/data/dbbak/{self.root_id}" def tendb_remote_slave_recover(self): """ @@ -85,21 +86,22 @@ def tendb_remote_slave_recover(self): self.data["db_module_id"] = cluster_class.db_module_id self.data["cluster_type"] = cluster_class.cluster_type self.data["force"] = True - self.data["target_ip"] = self.data["target_slave"]["ip"] - self.data["source_ip"] = self.data["source_slave"]["ip"] + self.data["target_ip"] = self.data["new_slave_ip"] + self.data["source_ip"] = self.data["old_slave_ip"] self.data["charset"], self.data["db_version"] = get_version_and_charset( bk_biz_id=cluster_class.bk_biz_id, db_module_id=cluster_class.db_module_id, cluster_type=cluster_class.cluster_type, ) + tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) - cluster_info = get_slave_recover_info(self.data["cluster_id"], self.data["source_ip"]) + cluster_info = get_slave_recover_info(cluster_class.id, self.data["source_ip"]) cluster_info["ports"] = [] for shard_id, shard in cluster_info["my_shards"].items(): slave = { "ip": self.data["target_ip"], "port": shard["slave"]["port"], - "bk_cloud_id": self.data["bk_cloud_id"], + "bk_cloud_id": cluster_class.bk_cloud_id, "instance": "{}{}{}".format(self.data["target_ip"], IP_PORT_DIVIDER, shard["slave"]["port"]), } cluster_info["my_shards"][shard_id]["new_slave"] = slave @@ -114,6 +116,7 @@ def tendb_remote_slave_recover(self): cluster=cluster_class, new_mysql_list=[self.data["target_ip"]], install_ports=cluster_info["ports"], + bk_host_ids=[self.data["bk_new_slave"]["bk_host_id"]], ) ) cluster = { @@ -171,7 +174,7 @@ def tendb_remote_slave_recover(self): "new_slave_ip": node["new_slave"]["ip"], "new_slave_port": node["new_slave"]["port"], "bk_cloud_id": cluster_class.bk_cloud_id, - "file_target_path": f'/data/dbbak/{self.root_id}/{node["master"]["port"]}', + "file_target_path": f'{self.backup_target_path}/{node["master"]["port"]}', "change_master_force": True, "charset": self.data["charset"], "cluster_type": cluster_class.cluster_type, @@ -185,7 +188,7 @@ def tendb_remote_slave_recover(self): ) ) sync_data_sub_pipeline.add_act( - act_name=_("同步数据完毕,写入数据节点tuple相关元数据"), + act_name=_("同步数据完毕,写入数据节点的主从关系相关元数据"), act_component_code=SpiderDBMetaComponent.code, kwargs=asdict( DBMetaOPKwargs( @@ -196,11 +199,10 @@ def tendb_remote_slave_recover(self): ), ) sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) - # 阶段4 切换 todo 等待从库切换接口 + # 阶段4 切换 switch_sub_pipeline_list = [] # 切换后写入元数据 switch_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) - tdbctl_pass = get_random_string(length=10) switch_slave_class = SwitchRemoteSlaveRoutingKwargs( cluster_id=cluster_class.id, switch_remote_instance_pairs=[] @@ -234,6 +236,39 @@ def tendb_remote_slave_recover(self): ) switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换SLAVE节点"))) + # 阶段5: 新机器安装周边组件 + surrounding_sub_pipeline_list = [] + re_surrounding_sub_pipeline_list = [] + surrounding_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + surrounding_sub_pipeline.add_sub_pipeline( + sub_flow=build_surrounding_apps_sub_flow( + bk_cloud_id=cluster_class.bk_cloud_id, + slave_ip_list=[self.data["target_ip"]], + root_id=self.root_id, + parent_global_data=copy.deepcopy(self.data), + is_init=True, + collect_sysinfo=True, + cluster_type=ClusterType.TenDBCluster.value, + is_install_backup=False, + ) + ) + surrounding_sub_pipeline_list.append(surrounding_sub_pipeline.build_sub_process(sub_name=_("新机器安装周边组件"))) + + re_surrounding_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + re_surrounding_sub_pipeline.add_sub_pipeline( + sub_flow=build_surrounding_apps_sub_flow( + bk_cloud_id=cluster_class.bk_cloud_id, + slave_ip_list=[self.data["target_ip"]], + root_id=self.root_id, + parent_global_data=copy.deepcopy(self.data), + is_init=True, + cluster_type=ClusterType.TenDBCluster.value, + ) + ) + re_surrounding_sub_pipeline_list.append( + re_surrounding_sub_pipeline.build_sub_process(sub_name=_("切换后重新安装周边组件")) + ) + # 阶段6 卸载 uninstall_svr_sub_pipeline_list = [] uninstall_svr_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) @@ -267,39 +302,19 @@ def tendb_remote_slave_recover(self): uninstall_svr_sub_pipeline_list.append( uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}".format(self.data["source_ip"]))) ) + # 安装实例 tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=install_sub_pipeline_list) # 数据同步 tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list) # 数据同步完毕 安装周边 - tendb_migrate_pipeline.add_sub_pipeline( - sub_flow=build_surrounding_apps_sub_flow( - bk_cloud_id=cluster_class.bk_cloud_id, - master_ip_list=None, - slave_ip_list=[self.data["target_ip"]], - root_id=self.root_id, - parent_global_data=copy.deepcopy(self.data), - is_init=True, - cluster_type=ClusterType.TenDBCluster.value, - is_install_backup=False, - ) - ) + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=surrounding_sub_pipeline_list) # 人工确认切换迁移实例 tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) # 切换迁移实例 tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_sub_pipeline_list) - # 安装周边组件 - tendb_migrate_pipeline.add_sub_pipeline( - sub_flow=build_surrounding_apps_sub_flow( - bk_cloud_id=cluster_class.bk_cloud_id, - master_ip_list=None, - slave_ip_list=[self.data["target_ip"]], - root_id=self.root_id, - parent_global_data=copy.deepcopy(self.data), - is_init=True, - cluster_type=ClusterType.TenDBCluster.value, - ) - ) + # 实例切换完毕 安装周边 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=re_surrounding_sub_pipeline_list) # 卸载流程人工确认 tendb_migrate_pipeline.add_act(act_name=_("人工确认卸载实例"), act_component_code=PauseComponent.code, kwargs={}) # 卸载remote节点 diff --git a/dbm-ui/backend/flow/engine/controller/spider.py b/dbm-ui/backend/flow/engine/controller/spider.py index a3a03dc365..bef5a969a9 100644 --- a/dbm-ui/backend/flow/engine/controller/spider.py +++ b/dbm-ui/backend/flow/engine/controller/spider.py @@ -13,6 +13,7 @@ from backend.flow.engine.bamboo.scene.spider.import_sqlfile_flow import ImportSQLFlow from backend.flow.engine.bamboo.scene.spider.remote_local_slave_recover import TenDBRemoteSlaveLocalRecoverFlow from backend.flow.engine.bamboo.scene.spider.remote_master_fail_over import RemoteMasterFailOverFlow +from backend.flow.engine.bamboo.scene.spider.remote_master_slave_migrate import TendbClusterMigrateRemoteFlow from backend.flow.engine.bamboo.scene.spider.remote_master_slave_swtich import RemoteMasterSlaveSwitchFlow from backend.flow.engine.bamboo.scene.spider.remote_slave_recover import TenDBRemoteSlaveRecoverFlow from backend.flow.engine.bamboo.scene.spider.spider_add_mnt import TenDBClusterAddSpiderMNTFlow @@ -177,6 +178,13 @@ def tendb_cluster_remote_rebalance(self): flow = TenDBRemoteRebalanceFlow(root_id=self.root_id, ticket_data=self.ticket_data) flow.tendb_migrate() + def tendb_cluster_remote_migrate(self): + """ + remote 节点主从成对迁移 + """ + flow = TendbClusterMigrateRemoteFlow(root_id=self.root_id, ticket_data=self.ticket_data) + flow.migrate_master_slave_flow() + def tendb_cluster_remote_slave_recover(self): """ remote 远程slave节点恢复 diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index 772c69b594..e98068f7b9 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -235,6 +235,7 @@ from backend.flow.views.tbinlogdumper_switch import SwitchTBinlogDumperSceneApiView from backend.flow.views.tendb_cluster_remote_fail_over import RemoteFailOverSceneApiView from backend.flow.views.tendb_cluster_remote_local_recover import RemoteLocalRecoverSceneApiView +from backend.flow.views.tendb_cluster_remote_migrate import RemoteMigrateSceneApiView from backend.flow.views.tendb_cluster_remote_rebalance import RemoteRebalanceSceneApiView from backend.flow.views.tendb_cluster_remote_slave_recover import RemoteSlaveRecoverSceneApiView from backend.flow.views.tendb_cluster_remote_switch import RemoteSwitchSceneApiView @@ -448,6 +449,7 @@ url(r"^scene/tendb_cluster_remote_fail_over$", RemoteFailOverSceneApiView.as_view()), # remote 节点扩缩容 url(r"^scene/tendb_cluster_remote_rebalance$", RemoteRebalanceSceneApiView.as_view()), + url(r"^scene/tendb_cluster_remote_migrate$", RemoteMigrateSceneApiView.as_view()), url(r"^scene/tendb_cluster_rollback_data$", TendbClusterRollbackDataSceneApiView.as_view()), url("^scene/destroy_tendb_slave_cluster$", DestroySpiderSlaveClusterSceneApiView.as_view()), url("^scene/reduce_spider_mnt$", ReduceSpiderMNTSceneApiView.as_view()), diff --git a/dbm-ui/backend/flow/utils/spider/spider_db_meta.py b/dbm-ui/backend/flow/utils/spider/spider_db_meta.py index 5cfdca1398..4cc17ef2ac 100644 --- a/dbm-ui/backend/flow/utils/spider/spider_db_meta.py +++ b/dbm-ui/backend/flow/utils/spider/spider_db_meta.py @@ -11,10 +11,11 @@ from typing import Optional from django.db import transaction +from django.db.transaction import atomic from backend.db_meta.api.cluster.tendbcluster.handler import TenDBClusterClusterHandler from backend.db_meta.api.cluster.tendbcluster.remotedb_node_migrate import TenDBClusterMigrateRemoteDb -from backend.db_meta.enums import ClusterEntryRole, MachineType, TenDBClusterSpiderRole +from backend.db_meta.enums import ClusterEntryRole, InstanceStatus, MachineType, TenDBClusterSpiderRole from backend.db_meta.models import Cluster, StorageInstance from backend.flow.utils.dict_to_dataclass import dict_to_dataclass from backend.flow.utils.spider.spider_act_dataclass import ShardInfo @@ -252,14 +253,22 @@ def tendb_slave_recover_add_nodes(self): return True def tendb_slave_recover_add_tuple(self): - new_slave_to_old_master = { - "master": {"ip": self.cluster["master_ip"], "port": self.cluster["master_port"]}, - "slave": {"ip": self.cluster["new_slave_ip"], "port": self.cluster["new_slave_port"]}, - } - TenDBClusterMigrateRemoteDb.add_storage_tuple( - cluster_id=self.cluster["cluster_id"], storage=new_slave_to_old_master - ) - # todo 是否修改new_master角色为中继状态 + with atomic(): + new_slave_to_old_master = { + "master": {"ip": self.cluster["master_ip"], "port": self.cluster["master_port"]}, + "slave": {"ip": self.cluster["new_slave_ip"], "port": self.cluster["new_slave_port"]}, + } + TenDBClusterMigrateRemoteDb.add_storage_tuple( + cluster_id=self.cluster["cluster_id"], storage=new_slave_to_old_master + ) + StorageInstance.objects.filter( + machine__ip=self.cluster["new_slave_port"], machine__bk_cloud_id=self.cluster["bk_cloud_id"] + ).update(status=InstanceStatus.RUNNING.value) + # slave_storages = StorageInstance.objects.filter(machine__ip=self.cluster["new_slave_port"], + # machine__bk_cloud_id=self.cluster["bk_cloud_id"]) + # for slave_storage in slave_storages: + # slave_storage.status=InstanceStatus.RUNNING.value + # slave_storage.save() def tendb_modify_storage_status(self): storage = StorageInstance.objects.get(self.cluster["storage_id"]) diff --git a/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py b/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py index 668237e6f8..ef3a613ad2 100644 --- a/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py +++ b/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py @@ -131,6 +131,16 @@ def get_slave_recover_info(cluster_id: int, ip: str): return cluster_info +def get_master_slave_recover_info(cluster_id: int, master_ip: str, slave_ip: str): + cluster_info = get_cluster_info(cluster_id) + cluster_info["my_shards"] = {} + if master_ip in cluster_info["masters"] and slave_ip in cluster_info["slaves"]: + for key, val in cluster_info["shards"].items(): + if val["master"]["ip"] == master_ip and val["slave"]["ip"] == slave_ip: + cluster_info["my_shards"][key] = val + return cluster_info + + def get_slave_local_recover_info(cluster_id: int, storage_id: int): cluster_info = get_cluster_info(cluster_id) cluster_info["my_shards"] = {} diff --git a/dbm-ui/backend/flow/views/tendb_cluster_remote_migrate.py b/dbm-ui/backend/flow/views/tendb_cluster_remote_migrate.py new file mode 100644 index 0000000000..5905158c7d --- /dev/null +++ b/dbm-ui/backend/flow/views/tendb_cluster_remote_migrate.py @@ -0,0 +1,32 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from rest_framework.response import Response + +from backend.flow.engine.controller.spider import SpiderController +from backend.flow.views.base import FlowTestView +from backend.utils.basic import generate_root_id + +logger = logging.getLogger("root") + + +class RemoteMigrateSceneApiView(FlowTestView): + """ + api: /apis/v1/flow/scene/tendb_cluster_remote_migrate + params: + """ + + def post(self, request): + root_id = generate_root_id() + test = SpiderController(root_id=root_id, ticket_data=request.data) + test.tendb_cluster_remote_migrate() + return Response({"root_id": root_id})