Skip to content

Commit

Permalink
feat(mongodb): 从旧系统迁移元数据到新系统 TencentBlueKing#5994
Browse files Browse the repository at this point in the history
  • Loading branch information
yyhenryyy committed Aug 19, 2024
1 parent 6d6f076 commit 080236e
Show file tree
Hide file tree
Showing 18 changed files with 960 additions and 18 deletions.
5 changes: 5 additions & 0 deletions dbm-ui/backend/components/dns/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def __init__(self):
url="/api/v1/dns/domain/all",
description=_("获取所有ip、域名关系"),
)
self.update_domain_belong_app = self.generate_data_api(
method="POST",
url="/api/v1/dns/domain/app",
description=_("更新域名所属业务关系"),
)


DnsApi = _DnsApi()
12 changes: 7 additions & 5 deletions dbm-ui/backend/db_meta/models/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ class Spec(AuditedModel):
max_length=64, choices=ClusterType.get_choices(), help_text=_("集群类型:MySQL、Proxy、Spider")
)
spec_machine_type = models.CharField(max_length=64, choices=MachineType.get_choices(), help_text=_("机器规格类型"))
cpu = models.JSONField(null=True, help_text=_("cpu规格描述:{'min':1,'max':10}"))
mem = models.JSONField(null=True, help_text=_("mem规格描述:{'min':100,'max':1000}"))
device_class = models.JSONField(null=True, help_text=_("实际机器机型: ['class1','class2'] "))
storage_spec = models.JSONField(null=True, help_text=_("存储磁盘需求配置:{'mount_point':'/data','size':500,'type':'ssd'}"))
cpu = models.JSONField(null=True, help_text=_('cpu规格描述:{"min":1,"max":10}'))
mem = models.JSONField(null=True, help_text=_('mem规格描述:{"min":100,"max":1000}'))
device_class = models.JSONField(null=True, help_text=_('实际机器机型: ["class1","class2"]'))
storage_spec = models.JSONField(
null=True, help_text=_('存储磁盘需求配置:[{"mount_point":"/data","size":500,"type":"ssd"}]')
)
desc = models.TextField(help_text=_("资源规格描述"), null=True, blank=True)
enable = models.BooleanField(help_text=_("是否启用"), default=True)
# es专属
instance_num = models.IntegerField(default=0, help_text=_("实例数(es专属)"))
# spider,redis集群专属
qps = models.JSONField(default=dict, help_text=_("qps规格描述:{'min': 1, 'max': 100}"))
qps = models.JSONField(default=dict, help_text=_('qps规格描述:{"min": 1, "max": 100}'))

class Meta:
verbose_name = verbose_name_plural = _("资源规格(Spec)")
Expand Down
4 changes: 2 additions & 2 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,10 +1137,10 @@ class RequestResultCode(int, StructuredEnum):

class MongoDBPasswordRule(str, StructuredEnum):
"""
mongodb密码规则
mongodb密码规则名
"""

RULE = EnumField("password", _("密码规则"))
RULE = EnumField("mongo_password", _("MongoDB密码规则"))


class MongoDBClusterRole(str, StructuredEnum):
Expand Down
70 changes: 70 additions & 0 deletions dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging.config
from typing import Dict, Optional

from backend.flow.engine.bamboo.scene.common.builder import Builder
from backend.flow.engine.bamboo.scene.mongodb.sub_task.migrate_meta import cluster_migrate
from backend.flow.engine.bamboo.scene.mongodb.sub_task.multi_replicaset_migrate_meta import multi_replicaset_migrate
from backend.flow.utils.mongodb.mongodb_migrate_dataclass import MigrateActKwargs

logger = logging.getLogger("flow")


class MongoDBMigrateMetaFlow(object):
"""元数据迁移flow"""

def __init__(self, root_id: str, data: Optional[Dict]):
"""
传入参数
@param root_id : 任务流程定义的root_id
@param data : 单据传递过来的参数列表,是dict格式
"""

self.root_id = root_id
self.data = data
self.get_kwargs = MigrateActKwargs()
self.get_kwargs.payload = data
self.get_kwargs.bk_biz_id = data.get("bk_biz_id")

def multi_cluster_migrate_flow(self):
"""
cluster migrate流程
"""

# 创建流程实例
pipeline = Builder(root_id=self.root_id, data=self.data)

# 副本集与分片集群并行
# 副本集子流程串行
sub_pipelines = []
if self.data["MongoReplicaSet"]:
self.get_kwargs.multi_replicaset_info = self.data["MongoReplicaSet"]
sub_pipline = multi_replicaset_migrate(
root_id=self.root_id,
ticket_data=self.data,
sub_kwargs=self.get_kwargs,
)
sub_pipelines.append(sub_pipline)
# 分片集群并行
for cluster_info in self.data["MongoShardedCluster"]:
self.get_kwargs.source_cluster_info = cluster_info
sub_pipline = cluster_migrate(
root_id=self.root_id,
ticket_data=self.data,
sub_kwargs=self.get_kwargs,
cluster=True,
)
sub_pipelines.append(sub_pipline)
pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

# 运行流程
pipeline.run_pipeline()
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from copy import deepcopy
from typing import Dict, Optional

from django.utils.translation import ugettext as _

from backend.db_meta.enums.cluster_type import ClusterType
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.plugins.components.collections.mongodb.add_relationship_to_meta import (
ExecAddRelationshipOperationComponent,
)
from backend.flow.plugins.components.collections.mongodb.migrate_meta import MongoDBMigrateMetaComponent
from backend.flow.utils.mongodb.mongodb_migrate_dataclass import MigrateActKwargs


def cluster_migrate(
root_id: str, ticket_data: Optional[Dict], sub_kwargs: MigrateActKwargs, cluster: bool
) -> SubBuilder:
"""
单个replicaset迁移元数
"""

# 获取变量
sub_get_kwargs = deepcopy(sub_kwargs)
# 创建子流程
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)

# 获取副本集机器是否复用
sub_get_kwargs.skip_machine()

# 检查 是否已经迁移 从目标环境检查迁移ip是否复用
kwargs = sub_get_kwargs.get_check_dest_cluster_info(
cluster_name=sub_get_kwargs.source_cluster_info.get("replsetname")
)
sub_pipeline.add_act(
act_name=_("检查cluster目标端是否存在"), act_component_code=MongoDBMigrateMetaComponent.code, kwargs=kwargs
)

# 检查机器规格是否在目标端存在
kwargs = sub_get_kwargs.get_check_spec_info()
sub_pipeline.add_act(act_name=_("检查目标端机器规格"), act_component_code=MongoDBMigrateMetaComponent.code, kwargs=kwargs)

# dbconfig保存oplogsize cachesize
if cluster:
namespace = ClusterType.MongoShardedCluster.value
else:
namespace = ClusterType.MongoReplicaSet.value
kwargs = sub_get_kwargs.get_save_conf_info(namespace=namespace)
sub_pipeline.add_act(act_name=_("保存配置"), act_component_code=MongoDBMigrateMetaComponent.code, kwargs=kwargs)

# 目标业务更新dba 检查目标业务的dba,不一致则更新
kwargs = sub_get_kwargs.get_dba_info()
sub_pipeline.add_act(act_name=_("更新dba"), act_component_code=MongoDBMigrateMetaComponent.code, kwargs=kwargs)

# 迁移数据
kwargs = sub_get_kwargs.get_migrate_info()
sub_pipeline.add_act(
act_name=_("迁移meta"),
act_component_code=ExecAddRelationshipOperationComponent.code,
kwargs=kwargs,
)

# node保存密码到密码服务
kwargs = sub_get_kwargs.get_save_password_info()
sub_pipeline.add_act(act_name=_("保存密码"), act_component_code=MongoDBMigrateMetaComponent.code, kwargs=kwargs)

# 修改dns的app字段
kwargs = sub_get_kwargs.get_change_dns_app_info()
sub_pipeline.add_act(act_name=_("更新dns的app"), act_component_code=MongoDBMigrateMetaComponent.code, kwargs=kwargs)

if cluster:
name = "cluster"
# cluster删除shard的域名 迁移完,运行无误后再删
# kwargs = sub_get_kwargs.get_shard_delete_doamin_info()
# sub_pipeline.add_act(
# act_name=_("删除shard的domain"),
# act_component_code=MongoDBMigrateMetaComponent.code,
# kwargs=kwargs
# )
cluster_name = sub_get_kwargs.source_cluster_info["cluster_id"]
# 添加clb
if sub_get_kwargs.source_cluster_info["clb"]:
kwargs = sub_get_kwargs.get_clb_info()
sub_pipeline.add_act(
act_name=_("添加clb到meta"), act_component_code=MongoDBMigrateMetaComponent.code, kwargs=kwargs
)
else:
name = "replicaset"
cluster_name = sub_get_kwargs.source_cluster_info["replsetname"]

return sub_pipeline.build_sub_process(
sub_name=_("MongoDB--{}-meta迁移-{}-{}".format(name, sub_get_kwargs.payload["app"], cluster_name))
)
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def mongod_replace(
# 获取参数
new_node = info["target"]
sub_sub_get_kwargs.payload["app"] = sub_sub_get_kwargs.payload["bk_app_abbr"]
if cluster_role != MongoDBClusterRole.ConfigSvr.value or not cluster_role:
sub_sub_get_kwargs.replicaset_info = {}
sub_sub_get_kwargs.replicaset_info = {}
sub_sub_get_kwargs.replicaset_info["port"] = sub_sub_get_kwargs.db_instance["port"]
force = True
if cluster_role:
Expand All @@ -68,6 +67,8 @@ def mongod_replace(
sub_sub_get_kwargs.payload["config_nodes"] = []
sub_sub_get_kwargs.payload["shards_nodes"] = []
sub_sub_get_kwargs.payload["mongos_nodes"] = []
# 获取配置
conf = sub_sub_get_kwargs.get_conf(cluster_name=sub_sub_get_kwargs.db_instance["cluster_name"])
if cluster_role == MongoDBClusterRole.ConfigSvr.value:
sub_sub_get_kwargs.payload["config_nodes"] = [
{
Expand All @@ -77,6 +78,8 @@ def mongod_replace(
"bk_cloud_id": info["bk_cloud_id"],
}
]
sub_sub_get_kwargs.replicaset_info["cacheSizeGB"] = conf["config_cacheSizeGB"]
sub_sub_get_kwargs.replicaset_info["oplogSizeMB"] = conf["config_oplogSizeMB"]
elif cluster_role == MongoDBClusterRole.ShardSvr.value:
shard_nodes = {
"nodes": [
Expand All @@ -90,7 +93,6 @@ def mongod_replace(
}
sub_sub_get_kwargs.payload["shards_nodes"].append(shard_nodes)
# shard直接获取配置
conf = sub_sub_get_kwargs.get_conf(cluster_name=sub_sub_get_kwargs.db_instance["cluster_name"])
sub_sub_get_kwargs.replicaset_info["cacheSizeGB"] = conf["cacheSizeGB"]
sub_sub_get_kwargs.replicaset_info["oplogSizeMB"] = conf["oplogSizeMB"]
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from copy import deepcopy
from typing import Dict, Optional

from django.utils.translation import ugettext as _

from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.engine.bamboo.scene.mongodb.sub_task.migrate_meta import cluster_migrate
from backend.flow.utils.mongodb.mongodb_migrate_dataclass import MigrateActKwargs


def multi_replicaset_migrate(
root_id: str,
ticket_data: Optional[Dict],
sub_kwargs: MigrateActKwargs,
) -> SubBuilder:
"""
多个replicaset迁移元数据
"""

# 获取变量
sub_get_kwargs = deepcopy(sub_kwargs)
# 创建子流程
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)

# 多个replicaset迁移元数据——串行
for replicaset_info in sub_get_kwargs.multi_replicaset_info:
sub_get_kwargs.source_cluster_info = replicaset_info
flow = cluster_migrate(
root_id=root_id,
ticket_data=ticket_data,
sub_kwargs=sub_get_kwargs,
cluster=False,
)
sub_pipeline.add_sub_pipeline(sub_flow=flow)

return sub_pipeline.build_sub_process(sub_name=_("MongoDB--复制集迁移元数据"))
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def replicaset_replace(
)

# config计算参数
if cluster_role == MongoDBClusterRole.ConfigSvr.value:
sub_get_kwargs.calc_param_replace(info=info, instance_num=0)
# if cluster_role == MongoDBClusterRole.ConfigSvr.value:
# sub_get_kwargs.calc_param_replace(info=info, instance_num=0)
# 进行替换——并行 以ip为维度
sub_sub_pipelines = []
for mongodb_instance in info["instances"]:
Expand Down
9 changes: 9 additions & 0 deletions dbm-ui/backend/flow/engine/controller/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from backend.flow.engine.bamboo.scene.mongodb.mongodb_install import MongoDBInstallFlow
from backend.flow.engine.bamboo.scene.mongodb.mongodb_install_dbmon import MongoInstallDBMon
from backend.flow.engine.bamboo.scene.mongodb.mongodb_instance_restart import MongoRestartInstanceFlow
from backend.flow.engine.bamboo.scene.mongodb.mongodb_migrate import MongoDBMigrateMetaFlow
from backend.flow.engine.bamboo.scene.mongodb.mongodb_remove_ns import MongoRemoveNsFlow
from backend.flow.engine.bamboo.scene.mongodb.mongodb_replace import MongoReplaceFlow
from backend.flow.engine.bamboo.scene.mongodb.mongodb_restore import MongoRestoreFlow
Expand Down Expand Up @@ -181,3 +182,11 @@ def disable_cluster(self):

flow = MongoEnableDisableFlow(root_id=self.root_id, data=self.ticket_data)
flow.multi_cluster_flow(enable=False)

def migrate_meta(self):
"""
迁移元数据
"""

flow = MongoDBMigrateMetaFlow(root_id=self.root_id, data=self.ticket_data)
flow.multi_cluster_migrate_flow()
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _execute(self, data, parent_data) -> bool:
result = MongoDBPassword().save_password_to_db(
instances=kwargs["nodes"], username=username, password=password, operator=kwargs["operator"]
)
if result is not None:
if result:
self.log_error("add password of user:{} to db fail, error:{}".format(username, result))
return False
self.log_info("add password of users:{} to db successfully".format(",".join(usernames)))
Expand Down
Loading

0 comments on commit 080236e

Please sign in to comment.