Skip to content

Commit

Permalink
feat: 新增集群slave重建和主从迁移flow TencentBlueKing#3840
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo authored and iSecloud committed Apr 16, 2024
1 parent 42ec814 commit 67d4b06
Show file tree
Hide file tree
Showing 8 changed files with 507 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

class TenDBRemoteSlaveLocalRecoverFlow(object):
"""
TenDB 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复
TenDB Cluster 后端从节点实例数据修复
"""

def __init__(self, root_id: str, ticket_data: Optional[Dict]):
Expand All @@ -47,6 +47,7 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]):
self.root_id = root_id
self.ticket_data = ticket_data
self.data = {}
self.backup_target_path = f"/data/dbbak/{self.root_id}"

def tendb_remote_slave_local_recover(self):
"""
Expand All @@ -68,7 +69,6 @@ def tendb_remote_slave_local_recover(self):
self.data["root_id"] = self.root_id
self.data["uid"] = self.ticket_data["uid"]
self.data["ticket_type"] = self.ticket_data["ticket_type"]
self.data["bk_biz_id"] = self.ticket_data["bk_biz_id"]
self.data["bk_biz_id"] = cluster_class.bk_biz_id
self.data["db_module_id"] = cluster_class.db_module_id
self.data["cluster_type"] = cluster_class.cluster_type
Expand All @@ -90,6 +90,7 @@ def tendb_remote_slave_local_recover(self):
)
),
)
# todo 怎么获取 shard_ids
sync_data_sub_pipeline_list = []
for shard_id in self.data["shard_ids"]:
shard = cluster_class.tendbclusterstorageset_set.get(shard_id=shard_id)
Expand Down Expand Up @@ -141,7 +142,7 @@ def tendb_remote_slave_local_recover(self):
"new_slave_ip": target_slave.machine.ip,
"new_slave_port": target_slave.port,
"bk_cloud_id": cluster_class.bk_cloud_id,
"file_target_path": f"/data/dbbak/{self.root_id}/{master.port}",
"file_target_path": f"{self.backup_target_path}/{master.port}",
"charset": self.data["charset"],
"change_master_force": True,
"cluster_type": cluster_class.cluster_type,
Expand Down Expand Up @@ -172,6 +173,7 @@ def tendb_remote_slave_local_recover(self):
)
)
tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list)

# 安装周边
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

class TenDBRemoteSlaveRecoverFlow(object):
"""
TenDB 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复
TenDB Cluster 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复
"""

def __init__(self, root_id: str, ticket_data: Optional[Dict]):
Expand All @@ -59,6 +59,7 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]):
self.root_id = root_id
self.ticket_data = ticket_data
self.data = {}
self.backup_target_path = f"/data/dbbak/{self.root_id}"

def tendb_remote_slave_recover(self):
"""
Expand All @@ -85,21 +86,22 @@ def tendb_remote_slave_recover(self):
self.data["db_module_id"] = cluster_class.db_module_id
self.data["cluster_type"] = cluster_class.cluster_type
self.data["force"] = True
self.data["target_ip"] = self.data["target_slave"]["ip"]
self.data["source_ip"] = self.data["source_slave"]["ip"]
self.data["target_ip"] = self.data["new_slave_ip"]
self.data["source_ip"] = self.data["old_slave_ip"]
self.data["charset"], self.data["db_version"] = get_version_and_charset(
bk_biz_id=cluster_class.bk_biz_id,
db_module_id=cluster_class.db_module_id,
cluster_type=cluster_class.cluster_type,
)

tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
cluster_info = get_slave_recover_info(self.data["cluster_id"], self.data["source_ip"])
cluster_info = get_slave_recover_info(cluster_class.id, self.data["source_ip"])
cluster_info["ports"] = []
for shard_id, shard in cluster_info["my_shards"].items():
slave = {
"ip": self.data["target_ip"],
"port": shard["slave"]["port"],
"bk_cloud_id": self.data["bk_cloud_id"],
"bk_cloud_id": cluster_class.bk_cloud_id,
"instance": "{}{}{}".format(self.data["target_ip"], IP_PORT_DIVIDER, shard["slave"]["port"]),
}
cluster_info["my_shards"][shard_id]["new_slave"] = slave
Expand All @@ -114,6 +116,7 @@ def tendb_remote_slave_recover(self):
cluster=cluster_class,
new_mysql_list=[self.data["target_ip"]],
install_ports=cluster_info["ports"],
bk_host_ids=[self.data["bk_new_slave"]["bk_host_id"]],
)
)
cluster = {
Expand Down Expand Up @@ -171,7 +174,7 @@ def tendb_remote_slave_recover(self):
"new_slave_ip": node["new_slave"]["ip"],
"new_slave_port": node["new_slave"]["port"],
"bk_cloud_id": cluster_class.bk_cloud_id,
"file_target_path": f'/data/dbbak/{self.root_id}/{node["master"]["port"]}',
"file_target_path": f'{self.backup_target_path}/{node["master"]["port"]}',
"change_master_force": True,
"charset": self.data["charset"],
"cluster_type": cluster_class.cluster_type,
Expand All @@ -185,7 +188,7 @@ def tendb_remote_slave_recover(self):
)
)
sync_data_sub_pipeline.add_act(
act_name=_("同步数据完毕,写入数据节点tuple相关元数据"),
act_name=_("同步数据完毕,写入数据节点的主从关系相关元数据"),
act_component_code=SpiderDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
Expand All @@ -196,11 +199,10 @@ def tendb_remote_slave_recover(self):
),
)
sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))
# 阶段4 切换 todo 等待从库切换接口
# 阶段4 切换
switch_sub_pipeline_list = []
# 切换后写入元数据
switch_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))

tdbctl_pass = get_random_string(length=10)
switch_slave_class = SwitchRemoteSlaveRoutingKwargs(
cluster_id=cluster_class.id, switch_remote_instance_pairs=[]
Expand Down Expand Up @@ -234,6 +236,39 @@ def tendb_remote_slave_recover(self):
)
switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换SLAVE节点")))

# 阶段5: 新机器安装周边组件
surrounding_sub_pipeline_list = []
re_surrounding_sub_pipeline_list = []
surrounding_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
surrounding_sub_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_class.bk_cloud_id,
slave_ip_list=[self.data["target_ip"]],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
collect_sysinfo=True,
cluster_type=ClusterType.TenDBCluster.value,
is_install_backup=False,
)
)
surrounding_sub_pipeline_list.append(surrounding_sub_pipeline.build_sub_process(sub_name=_("新机器安装周边组件")))

re_surrounding_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
re_surrounding_sub_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_class.bk_cloud_id,
slave_ip_list=[self.data["target_ip"]],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
cluster_type=ClusterType.TenDBCluster.value,
)
)
re_surrounding_sub_pipeline_list.append(
re_surrounding_sub_pipeline.build_sub_process(sub_name=_("切换后重新安装周边组件"))
)

# 阶段6 卸载
uninstall_svr_sub_pipeline_list = []
uninstall_svr_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
Expand Down Expand Up @@ -267,39 +302,19 @@ def tendb_remote_slave_recover(self):
uninstall_svr_sub_pipeline_list.append(
uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}".format(self.data["source_ip"])))
)

# 安装实例
tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=install_sub_pipeline_list)
# 数据同步
tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list)
# 数据同步完毕 安装周边
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_class.bk_cloud_id,
master_ip_list=None,
slave_ip_list=[self.data["target_ip"]],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
cluster_type=ClusterType.TenDBCluster.value,
is_install_backup=False,
)
)
tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=surrounding_sub_pipeline_list)
# 人工确认切换迁移实例
tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={})
# 切换迁移实例
tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_sub_pipeline_list)
# 安装周边组件
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_class.bk_cloud_id,
master_ip_list=None,
slave_ip_list=[self.data["target_ip"]],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
cluster_type=ClusterType.TenDBCluster.value,
)
)
# 实例切换完毕 安装周边
tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=re_surrounding_sub_pipeline_list)
# 卸载流程人工确认
tendb_migrate_pipeline.add_act(act_name=_("人工确认卸载实例"), act_component_code=PauseComponent.code, kwargs={})
# 卸载remote节点
Expand Down
8 changes: 8 additions & 0 deletions dbm-ui/backend/flow/engine/controller/spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from backend.flow.engine.bamboo.scene.spider.import_sqlfile_flow import ImportSQLFlow
from backend.flow.engine.bamboo.scene.spider.remote_local_slave_recover import TenDBRemoteSlaveLocalRecoverFlow
from backend.flow.engine.bamboo.scene.spider.remote_master_fail_over import RemoteMasterFailOverFlow
from backend.flow.engine.bamboo.scene.spider.remote_master_slave_migrate import TendbClusterMigrateRemoteFlow
from backend.flow.engine.bamboo.scene.spider.remote_master_slave_swtich import RemoteMasterSlaveSwitchFlow
from backend.flow.engine.bamboo.scene.spider.remote_slave_recover import TenDBRemoteSlaveRecoverFlow
from backend.flow.engine.bamboo.scene.spider.spider_add_mnt import TenDBClusterAddSpiderMNTFlow
Expand Down Expand Up @@ -177,6 +178,13 @@ def tendb_cluster_remote_rebalance(self):
flow = TenDBRemoteRebalanceFlow(root_id=self.root_id, ticket_data=self.ticket_data)
flow.tendb_migrate()

def tendb_cluster_remote_migrate(self):
"""
remote 节点主从成对迁移
"""
flow = TendbClusterMigrateRemoteFlow(root_id=self.root_id, ticket_data=self.ticket_data)
flow.migrate_master_slave_flow()

def tendb_cluster_remote_slave_recover(self):
"""
remote 远程slave节点恢复
Expand Down
2 changes: 2 additions & 0 deletions dbm-ui/backend/flow/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
from backend.flow.views.tbinlogdumper_switch import SwitchTBinlogDumperSceneApiView
from backend.flow.views.tendb_cluster_remote_fail_over import RemoteFailOverSceneApiView
from backend.flow.views.tendb_cluster_remote_local_recover import RemoteLocalRecoverSceneApiView
from backend.flow.views.tendb_cluster_remote_migrate import RemoteMigrateSceneApiView
from backend.flow.views.tendb_cluster_remote_rebalance import RemoteRebalanceSceneApiView
from backend.flow.views.tendb_cluster_remote_slave_recover import RemoteSlaveRecoverSceneApiView
from backend.flow.views.tendb_cluster_remote_switch import RemoteSwitchSceneApiView
Expand Down Expand Up @@ -448,6 +449,7 @@
url(r"^scene/tendb_cluster_remote_fail_over$", RemoteFailOverSceneApiView.as_view()),
# remote 节点扩缩容
url(r"^scene/tendb_cluster_remote_rebalance$", RemoteRebalanceSceneApiView.as_view()),
url(r"^scene/tendb_cluster_remote_migrate$", RemoteMigrateSceneApiView.as_view()),
url(r"^scene/tendb_cluster_rollback_data$", TendbClusterRollbackDataSceneApiView.as_view()),
url("^scene/destroy_tendb_slave_cluster$", DestroySpiderSlaveClusterSceneApiView.as_view()),
url("^scene/reduce_spider_mnt$", ReduceSpiderMNTSceneApiView.as_view()),
Expand Down
27 changes: 18 additions & 9 deletions dbm-ui/backend/flow/utils/spider/spider_db_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
from typing import Optional

from django.db import transaction
from django.db.transaction import atomic

from backend.db_meta.api.cluster.tendbcluster.handler import TenDBClusterClusterHandler
from backend.db_meta.api.cluster.tendbcluster.remotedb_node_migrate import TenDBClusterMigrateRemoteDb
from backend.db_meta.enums import ClusterEntryRole, MachineType, TenDBClusterSpiderRole
from backend.db_meta.enums import ClusterEntryRole, InstanceStatus, MachineType, TenDBClusterSpiderRole
from backend.db_meta.models import Cluster, StorageInstance
from backend.flow.utils.dict_to_dataclass import dict_to_dataclass
from backend.flow.utils.spider.spider_act_dataclass import ShardInfo
Expand Down Expand Up @@ -252,14 +253,22 @@ def tendb_slave_recover_add_nodes(self):
return True

def tendb_slave_recover_add_tuple(self):
new_slave_to_old_master = {
"master": {"ip": self.cluster["master_ip"], "port": self.cluster["master_port"]},
"slave": {"ip": self.cluster["new_slave_ip"], "port": self.cluster["new_slave_port"]},
}
TenDBClusterMigrateRemoteDb.add_storage_tuple(
cluster_id=self.cluster["cluster_id"], storage=new_slave_to_old_master
)
# todo 是否修改new_master角色为中继状态
with atomic():
new_slave_to_old_master = {
"master": {"ip": self.cluster["master_ip"], "port": self.cluster["master_port"]},
"slave": {"ip": self.cluster["new_slave_ip"], "port": self.cluster["new_slave_port"]},
}
TenDBClusterMigrateRemoteDb.add_storage_tuple(
cluster_id=self.cluster["cluster_id"], storage=new_slave_to_old_master
)
StorageInstance.objects.filter(
machine__ip=self.cluster["new_slave_port"], machine__bk_cloud_id=self.cluster["bk_cloud_id"]
).update(status=InstanceStatus.RUNNING.value)
# slave_storages = StorageInstance.objects.filter(machine__ip=self.cluster["new_slave_port"],
# machine__bk_cloud_id=self.cluster["bk_cloud_id"])
# for slave_storage in slave_storages:
# slave_storage.status=InstanceStatus.RUNNING.value
# slave_storage.save()

def tendb_modify_storage_status(self):
storage = StorageInstance.objects.get(self.cluster["storage_id"])
Expand Down
10 changes: 10 additions & 0 deletions dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ def get_slave_recover_info(cluster_id: int, ip: str):
return cluster_info


def get_master_slave_recover_info(cluster_id: int, master_ip: str, slave_ip: str):
cluster_info = get_cluster_info(cluster_id)
cluster_info["my_shards"] = {}
if master_ip in cluster_info["masters"] and slave_ip in cluster_info["slaves"]:
for key, val in cluster_info["shards"].items():
if val["master"]["ip"] == master_ip and val["slave"]["ip"] == slave_ip:
cluster_info["my_shards"][key] = val
return cluster_info


def get_slave_local_recover_info(cluster_id: int, storage_id: int):
cluster_info = get_cluster_info(cluster_id)
cluster_info["my_shards"] = {}
Expand Down
32 changes: 32 additions & 0 deletions dbm-ui/backend/flow/views/tendb_cluster_remote_migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import logging

from rest_framework.response import Response

from backend.flow.engine.controller.spider import SpiderController
from backend.flow.views.base import FlowTestView
from backend.utils.basic import generate_root_id

logger = logging.getLogger("root")


class RemoteMigrateSceneApiView(FlowTestView):
"""
api: /apis/v1/flow/scene/tendb_cluster_remote_migrate
params:
"""

def post(self, request):
root_id = generate_root_id()
test = SpiderController(root_id=root_id, ticket_data=request.data)
test.tendb_cluster_remote_migrate()
return Response({"root_id": root_id})

0 comments on commit 67d4b06

Please sign in to comment.