Skip to content

Commit

Permalink
feat: doris缩容替换flow TencentBlueKing#3991
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangrq5 authored and zhangzhw8 committed May 6, 2024
1 parent edf6c5f commit a5100cf
Show file tree
Hide file tree
Showing 19 changed files with 1,038 additions and 82 deletions.
3 changes: 1 addition & 2 deletions dbm-ui/backend/db_meta/api/cluster/doris/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@
from .disable import disable
from .enable import enable
from .scale_up import scale_up

# from .shrink import shrink
from .shrink import shrink
2 changes: 1 addition & 1 deletion dbm-ui/backend/db_meta/api/cluster/doris/destroy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def destroy(cluster_id: int):
"""

cluster = Cluster.objects.get(id=cluster_id)
cc_manage = CcManage(bk_biz_id=cluster.bk_biz_id, db_type=DBType.Doris.value)
cc_manage = CcManage(bk_biz_id=cluster.bk_biz_id, cluster_type=cluster.cluster_type)

# 删除storage instance
for storage in cluster.storageinstance_set.all():
Expand Down
61 changes: 61 additions & 0 deletions dbm-ui/backend/db_meta/api/cluster/doris/shrink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
from typing import List, Optional

from django.db import transaction

from backend.db_meta import request_validator
from backend.db_meta.api import common
from backend.db_meta.enums import InstanceRole
from backend.db_meta.models import Cluster, ClusterEntry, StorageInstance
from backend.flow.utils.cc_manage import CcManage

logger = logging.getLogger("root")


@transaction.atomic
def shrink(
cluster_id: int,
storages: Optional[List] = None,
):
"""
缩容清理DBMeta
"""

cluster = Cluster.objects.get(id=cluster_id)
cluster_entry = ClusterEntry.objects.get(cluster=cluster)

storages = request_validator.validated_storage_list(storages, allow_empty=False, allow_null=False)
storage_objs = common.filter_out_instance_obj(storages, StorageInstance.objects.all())
for storage in storage_objs:
storage.delete(keep_parents=True)
if not storage.machine.storageinstance_set.exists():
# 将机器挪到 待回收 模块
CcManage(storage.bk_biz_id, cluster.cluster_type).recycle_host([storage.machine.bk_host_id])
storage.machine.delete(keep_parents=True)

cluster.storageinstance_set.remove(*storage_objs)
cluster.save()
cluster_entry.storageinstance_set.remove(*storage_objs)
cluster_entry.save()

# 当集群入口(域名)找不到实例,需要更新
if not cluster_entry.storageinstance_set.exists():
observers = cluster.storageinstance_set.filter(instance_role=InstanceRole.DORIS_OBSERVER)
followers = cluster.storageinstance_set.filter(instance_role=InstanceRole.DORIS_FOLLOWER)

if observers.exists():
for observer in observers:
cluster_entry.storageinstance_set.add(observer)
elif followers.exists():
for follower in followers:
cluster_entry.storageinstance_set.add(follower)
1 change: 1 addition & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ class DorisActuatorActionEnum(str, StructuredEnum):
RestartProcess = EnumField("restart_process", _("restart_process"))
CleanData = EnumField("clean_data", _("clean_data"))
UpdateMetadata = EnumField("update_metadata", _("update_metadata"))
CheckDecommission = EnumField("check_decommission", "check_decommission")


class RiakModuleId(int, StructuredEnum):
Expand Down
49 changes: 45 additions & 4 deletions dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_apply_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@
from backend.flow.consts import DnsOpType, DorisRoleEnum, ManagerOpType, ManagerServiceType
from backend.flow.engine.bamboo.scene.common.builder import Builder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.doris.doris_base_flow import DorisBaseFlow, get_all_node_ips_in_ticket
from backend.flow.engine.bamboo.scene.doris.doris_base_flow import (
DorisBaseFlow,
get_all_node_ips_in_ticket,
get_node_ips_in_ticket_by_role,
make_meta_host_map,
)
from backend.flow.engine.bamboo.scene.doris.exceptions import (
BeMachineCountException,
RoleMachineCountException,
RoleMachineCountMustException,
)
from backend.flow.plugins.components.collections.common.bigdata_manager_service import BigdataManagerComponent
from backend.flow.plugins.components.collections.doris.doris_db_meta import DorisMetaComponent
from backend.flow.plugins.components.collections.doris.doris_dns_manage import DorisDnsManageComponent
Expand All @@ -29,6 +39,11 @@
from backend.flow.plugins.components.collections.doris.get_doris_resource import GetDorisResourceComponent
from backend.flow.plugins.components.collections.doris.rewrite_doris_config import WriteBackDorisConfigComponent
from backend.flow.plugins.components.collections.doris.trans_files import TransFileComponent
from backend.flow.utils.doris.consts import (
DORIS_BACKEND_NOT_COUNT,
DORIS_FOLLOWER_MUST_COUNT,
DORIS_OBSERVER_NOT_COUNT,
)
from backend.flow.utils.doris.doris_act_payload import DorisActPayload
from backend.flow.utils.doris.doris_context_dataclass import DnsKwargs, DorisActKwargs, DorisApplyContext
from backend.flow.utils.extension_manage import BigdataManagerKwargs
Expand Down Expand Up @@ -56,7 +71,7 @@ def __get_flow_data(self) -> dict:

master_ip = self.nodes[DorisRoleEnum.FOLLOWER][0]["ip"]
flow_data["master_fe_ip"] = master_ip
host_map = self.make_meta_host_map(flow_data)
host_map = make_meta_host_map(flow_data)
flow_data["host_meta_map"] = host_map

return flow_data
Expand All @@ -68,6 +83,9 @@ def deploy_doris_flow(self):
"""

doris_deploy_data = self.__get_flow_data()
# 检查单据传参
self.check_apply_role_ip_count(doris_deploy_data)

doris_pipeline = Builder(root_id=self.root_id, data=doris_deploy_data)

trans_files = GetFileList(db_type=DBType.Doris)
Expand Down Expand Up @@ -115,7 +133,7 @@ def deploy_doris_flow(self):
kwargs=asdict(act_kwargs),
)

act_kwargs.get_doris_payload_func = DorisActPayload.get_add_nodes_metadata_payload.__name__
act_kwargs.get_doris_payload_func = DorisActPayload.get_add_metadata_payload.__name__
doris_pipeline.add_act(
act_name=_("集群元数据更新"),
act_component_code=ExecuteDorisActuatorScriptComponent.code,
Expand All @@ -125,7 +143,7 @@ def deploy_doris_flow(self):
sub_new_fe_pipelines = self.new_fe_sub_flows(act_kwargs=act_kwargs, data=doris_deploy_data)
doris_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_new_fe_pipelines)
# 扩容BE节点子流程
sub_new_be_acts = self.new_bew_sub_acts(act_kwargs=act_kwargs, data=doris_deploy_data)
sub_new_be_acts = self.new_be_sub_acts(act_kwargs=act_kwargs, data=doris_deploy_data)
doris_pipeline.add_parallel_acts(acts_list=sub_new_be_acts)

# 插入Doris WebUI实例信息
Expand Down Expand Up @@ -167,3 +185,26 @@ def deploy_doris_flow(self):
)

doris_pipeline.run_pipeline()

@staticmethod
def check_apply_role_ip_count(data: dict):
# 检查 follower 数量
follower_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.FOLLOWER))
if follower_count != DORIS_FOLLOWER_MUST_COUNT:
logger.error(_("DorisFollower主机数不为{},当前选择数量为{}".format(DORIS_FOLLOWER_MUST_COUNT, follower_count)))
raise RoleMachineCountMustException(
doris_role=DorisRoleEnum.FOLLOWER, must_count=DORIS_FOLLOWER_MUST_COUNT
)

# 检查 observer 数量
observer_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.OBSERVER))
if observer_count == DORIS_OBSERVER_NOT_COUNT:
logger.error(_("DorisObserver主机数不能为{}".format(DORIS_OBSERVER_NOT_COUNT)))
raise RoleMachineCountException(doris_role=DorisRoleEnum.OBSERVER, machine_count=DORIS_OBSERVER_NOT_COUNT)

# 检查数据节点的数量(hot + cold > 1)
hot_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.HOT))
cold_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.COLD))
if hot_count + cold_count == DORIS_BACKEND_NOT_COUNT:
logger.error(_("Doris数据节点(hot+cold)数量不能为{}".format(DORIS_BACKEND_NOT_COUNT)))
raise BeMachineCountException(must_count=DORIS_BACKEND_NOT_COUNT)
144 changes: 128 additions & 16 deletions dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_base_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import copy
import logging.config
from dataclasses import asdict
from typing import Dict, Optional
Expand All @@ -23,6 +24,7 @@
from backend.flow.plugins.components.collections.doris.exec_doris_actuator_script import (
ExecuteDorisActuatorScriptComponent,
)
from backend.flow.plugins.components.collections.doris.get_doris_payload import GetDorisActPayloadComponent
from backend.flow.utils.base.payload_handler import PayloadHandler
from backend.flow.utils.doris.consts import DorisConfigEnum
from backend.flow.utils.doris.doris_act_payload import DorisActPayload
Expand All @@ -32,6 +34,33 @@
logger = logging.getLogger("flow")


def make_fe_map_from_ticket(data: dict) -> dict:
host_map = {}
for role in data["nodes"]:
if role in [DorisRoleEnum.FOLLOWER.value, DorisRoleEnum.OBSERVER.value]:
ips = [node["ip"] for node in data["nodes"][role]]
host_map[role] = ips
return host_map


def make_be_map_from_ticket(data: dict) -> dict:
host_map = {}
for role in data["nodes"]:
if role in [DorisRoleEnum.HOT.value, DorisRoleEnum.COLD.value]:
ips = [node["ip"] for node in data["nodes"][role]]
host_map[role] = ips
return host_map


def make_meta_host_map(data: dict) -> dict:
host_map = {}
for role in data["nodes"]:
ips = [node["ip"] for node in data["nodes"][role]]
host_map[role] = ips

return host_map


class DorisBaseFlow(object):
"""
Doris Flow基类
Expand Down Expand Up @@ -109,14 +138,13 @@ def __init__(self, root_id: str, data: Optional[Dict]):
self.doris_config = dbconfig["content"]
self.be_conf = self.doris_config[DorisConfigEnum.Backend]
self.fe_conf = self.doris_config[DorisConfigEnum.Frontend]
self.http_port = self.doris_config[DorisConfigEnum.Frontend]["http_port"]
self.query_port = self.doris_config[DorisConfigEnum.Frontend]["query_port"]
# dbconfig 默认返回字符串类型,需要转int
self.http_port = int(self.doris_config[DorisConfigEnum.Frontend]["http_port"])
self.query_port = int(self.doris_config[DorisConfigEnum.Frontend]["query_port"])

auth_info = PayloadHandler.get_bigdata_auth_by_cluster(cluster, 0)
self.username = auth_info["username"]
self.password = auth_info["password"]
self.username = "username"
self.password = "password"
self.master_ips = [master.machine.ip for master in masters]

def get_flow_base_data(self) -> dict:
Expand Down Expand Up @@ -144,19 +172,16 @@ def get_flow_base_data(self) -> dict:
def __get_flow_data(self) -> dict:
pass

def make_meta_host_map(self, data: dict) -> dict:
host_map = {}
for role in self.nodes:
ips = [node["ip"] for node in self.nodes[role]]
host_map[role] = ips

return host_map

def get_all_node_ips_in_dbmeta(self) -> list:
cluster = Cluster.objects.get(id=self.cluster_id)
storage_ips = list(set(StorageInstance.objects.filter(cluster=cluster).values_list("machine__ip", flat=True)))
return storage_ips

def get_role_ips_in_dbmeta(self, role: InstanceRole) -> list:
cluster = Cluster.objects.get(id=self.cluster_id)
role_ips = list(cluster.storageinstance_set.filter(instance_role=role).values_list("machine__ip", flat=True))
return role_ips

def new_common_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list:
# """
# 新增节点common操作sub_flow 数组
Expand All @@ -168,7 +193,7 @@ def new_common_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list:
# 操作
# """
sub_pipelines = []
for role, role_nodes in self.nodes.items():
for role, role_nodes in data["nodes"].items():
for node in role_nodes:
sub_pipeline = SubBuilder(root_id=self.root_id, data=data)
ip = node["ip"]
Expand Down Expand Up @@ -210,7 +235,7 @@ def new_common_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list:
# 新加入frontend(follower/observer)节点子流程(不包括元数据更新)
def new_fe_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list:
sub_pipelines = []
for role, role_nodes in self.nodes.items():
for role, role_nodes in data["nodes"].items():
if role in [DorisRoleEnum.FOLLOWER.value, DorisRoleEnum.OBSERVER.value]:
for fe_node in role_nodes:
fe_ip = fe_node["ip"]
Expand Down Expand Up @@ -241,9 +266,10 @@ def new_fe_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list:
return sub_pipelines

# 新加入backend(hot/cold)节点子流程(不包括元数据更新)
def new_bew_sub_acts(self, act_kwargs: DorisActKwargs, data: dict) -> list:
@staticmethod
def new_be_sub_acts(act_kwargs: DorisActKwargs, data: dict) -> list:
be_acts = []
for role, role_nodes in self.nodes.items():
for role, role_nodes in data["nodes"].items():
if role in [DorisRoleEnum.COLD.value, DorisRoleEnum.HOT.value]:
for be_node in role_nodes:
act_kwargs.exec_ip = be_node["ip"]
Expand All @@ -257,6 +283,92 @@ def new_bew_sub_acts(self, act_kwargs: DorisActKwargs, data: dict) -> list:
be_acts.append(be_act)
return be_acts

def build_del_fe_sub_flow(self, data: dict) -> SubBuilder:
# sub_flow 缩容FE 只涉及Doris集群操作,不包括清理数据目录/dbmeta等
del_fe_data = copy.deepcopy(data)
del_fe_data["host_meta_map"] = make_fe_map_from_ticket(del_fe_data)
del_fe_sub_pipeline = SubBuilder(root_id=self.root_id, data=del_fe_data)
act_kwargs = DorisActKwargs(bk_cloud_id=self.bk_cloud_id)

del_fe_sub_pipeline.add_act(
act_name=_("获取集群Payload"), act_component_code=GetDorisActPayloadComponent.code, kwargs=asdict(act_kwargs)
)

stop_fe_acts = []
for role, role_nodes in data["nodes"].items():
if role in [DorisRoleEnum.FOLLOWER.value, DorisRoleEnum.OBSERVER.value]:
for fe_node in role_nodes:
act_kwargs.exec_ip = fe_node["ip"]
act_kwargs.doris_role = role
act_kwargs.get_doris_payload_func = DorisActPayload.get_stop_process_payload.__name__
fe_act = {
"act_name": _("停止DorisFE-{}-{}").format(role, fe_node["ip"]),
"act_component_code": ExecuteDorisActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
stop_fe_acts.append(fe_act)

del_fe_sub_pipeline.add_parallel_acts(stop_fe_acts)
# 更新元数据 删除 FE
act_kwargs.exec_ip = del_fe_data["master_fe_ip"]
act_kwargs.get_doris_payload_func = DorisActPayload.get_drop_metadata_payload.__name__
del_fe_sub_pipeline.add_act(
act_name=_("集群元数据更新-drop-fe"),
act_component_code=ExecuteDorisActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)
return del_fe_sub_pipeline

def build_del_be_sub_flow(self, data: dict) -> SubBuilder:
del_be_data = copy.deepcopy(data)
del_be_data["host_meta_map"] = make_be_map_from_ticket(del_be_data)
# sub_flow 缩容BE 只涉及Doris集群操作,不包括清理数据目录/dbmeta等
del_be_sub_pipeline = SubBuilder(root_id=self.root_id, data=del_be_data)
act_kwargs = DorisActKwargs(bk_cloud_id=self.bk_cloud_id)

del_be_sub_pipeline.add_act(
act_name=_("获取集群Payload"), act_component_code=GetDorisActPayloadComponent.code, kwargs=asdict(act_kwargs)
)
# 更新元数据 退役 BE
act_kwargs.exec_ip = del_be_data["master_fe_ip"]
act_kwargs.get_doris_payload_func = DorisActPayload.get_decommission_metadata_payload.__name__
del_be_sub_pipeline.add_act(
act_name=_("集群元数据更新-退役-BE"),
act_component_code=ExecuteDorisActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)
# 等待数据搬迁
act_kwargs.get_doris_payload_func = DorisActPayload.get_check_decommission_payload.__name__
del_be_sub_pipeline.add_act(
act_name=_("检查数据节点是否退役"),
act_component_code=ExecuteDorisActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)
# 更新元数据 删除 BE
act_kwargs.get_doris_payload_func = DorisActPayload.get_force_drop_metadata_payload.__name__
del_be_sub_pipeline.add_act(
act_name=_("集群元数据更新-删除-BE"),
act_component_code=ExecuteDorisActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)

stop_be_acts = []
for role, role_nodes in data["nodes"].items():
if role in [DorisRoleEnum.HOT.value, DorisRoleEnum.COLD.value]:
for be_node in role_nodes:
act_kwargs.exec_ip = be_node["ip"]
act_kwargs.doris_role = role
act_kwargs.get_doris_payload_func = DorisActPayload.get_stop_process_payload.__name__
be_act = {
"act_name": _("停止DorisBE-{}-{}").format(role, be_node["ip"]),
"act_component_code": ExecuteDorisActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
stop_be_acts.append(be_act)

del_be_sub_pipeline.add_parallel_acts(stop_be_acts)
return del_be_sub_pipeline


def get_node_ips_in_ticket_by_role(data: dict, role: str) -> list:
if role not in data.get("nodes"):
Expand Down
Loading

0 comments on commit a5100cf

Please sign in to comment.