Skip to content

Commit

Permalink
feat(mongodb): 禁用/开启MongoDB TencentBlueKing#3864
Browse files Browse the repository at this point in the history
  • Loading branch information
yyhenryyy committed Apr 8, 2024
1 parent d9af4da commit 6c50348
Show file tree
Hide file tree
Showing 17 changed files with 533 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type DeInstallConfParams struct {
NodeInfo []string `json:"nodeInfo" validate:"required"` // []string ip,ip 如果为复制集节点,则为复制集所有节点的ip;如果为mongos,则为mongos的ip
InstanceType string `json:"instanceType" validate:"required"` // mongod mongos
Force bool `json:"force"` // 不检查连接,强制卸载
RenameDir bool `json:"renameDir"` // 关闭进程后是否重命名目录 true 重命名目录,false 不重命名目录
}

// DeInstall 添加分片到集群
Expand Down Expand Up @@ -199,6 +200,10 @@ func (d *DeInstall) shutdownProcess() error {
// DirRename 打包数据目录
func (d *DeInstall) DirRename() error {
// renameDb数据目录
// 关闭进程后不重命名目录
if d.ConfParams.RenameDir == false {
return nil
}
flag := util.FileExists(d.PortDir)
if flag == true {
d.runtime.Logger.Info("start to rename db directory")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package atommongodb

import (
"dbm-services/mongodb/db-tools/dbactuator/pkg/common"
"dbm-services/mongodb/db-tools/dbactuator/pkg/consts"
"dbm-services/mongodb/db-tools/dbactuator/pkg/jobruntime"
"encoding/json"
"fmt"
"path/filepath"
"strconv"

"github.com/go-playground/validator/v10"
)

// StartProcessConfParams 开启进程参数
type StartProcessConfParams struct {
IP string `json:"ip" validate:"required"`
Port int `json:"port" validate:"required"`
InstanceType string `json:"instanceType" validate:"required"` // mongos mongod
Auth bool `json:"auth"` // true->auth false->noauth
}

// MongoStartProcess 开启mongo进程
type MongoStartProcess struct {
BaseJob
runtime *jobruntime.JobGenericRuntime
BinDir string
DataDir string
DbpathDir string
Mongo string
OsUser string // MongoDB安装在哪个用户下
OsGroup string
ConfParams *StartProcessConfParams
AuthConfFilePath string
NoAuthConfFilePath string
}

// NewMongoStartProcess 实例化结构体
func NewMongoStartProcess() jobruntime.JobRunner {
return &MongoStartProcess{}
}

// Name 获取原子任务的名字
func (s *MongoStartProcess) Name() string {
return "mongo_start"
}

// Run 运行原子任务
func (s *MongoStartProcess) Run() error {
// 启动服务
if err := s.startup(); err != nil {
return err
}

return nil
}

// Retry 重试
func (s *MongoStartProcess) Retry() uint {
return 2
}

// Rollback 回滚
func (s *MongoStartProcess) Rollback() error {
return nil
}

// Init 初始化
func (s *MongoStartProcess) Init(runtime *jobruntime.JobGenericRuntime) error {
// 获取安装参数
s.runtime = runtime
s.runtime.Logger.Info("start to init")
s.BinDir = consts.UsrLocal
s.DataDir = consts.GetMongoDataDir()
s.OsUser = consts.GetProcessUser()
s.OsGroup = consts.GetProcessUserGroup()
s.Mongo = filepath.Join(s.BinDir, "mongodb", "bin", "mongo")

// 获取MongoDB配置文件参数
if err := json.Unmarshal([]byte(s.runtime.PayloadDecoded), &s.ConfParams); err != nil {
s.runtime.Logger.Error(fmt.Sprintf(
"get parameters of mongo restart fail by json.Unmarshal, error:%s", err))
return fmt.Errorf("get parameters of mongo restart fail by json.Unmarshal, error:%s", err)
}

// 设置各种路径
strPort := strconv.Itoa(s.ConfParams.Port)
s.DbpathDir = filepath.Join(s.DataDir, "mongodata", strPort, "db")
s.AuthConfFilePath = filepath.Join(s.DataDir, "mongodata", strPort, "mongo.conf")
s.NoAuthConfFilePath = filepath.Join(s.DataDir, "mongodata", strPort, "noauth.conf")
s.runtime.Logger.Info("init successfully")

// 安装前进行校验
if err := s.checkParams(); err != nil {
return err
}

return nil
}

// checkParams 校验参数
func (s *MongoStartProcess) checkParams() error {
// 校验重启配置参数
validate := validator.New()
s.runtime.Logger.Info("start to validate parameters of restart")
if err := validate.Struct(s.ConfParams); err != nil {
s.runtime.Logger.Error(fmt.Sprintf("validate parameters of restart fail, error:%s", err))
return fmt.Errorf("validate parameters of restart fail, error:%s", err)
}
s.runtime.Logger.Info("validate parameters of restart successfully")
return nil
}

// startup 开启服务
func (s *MongoStartProcess) startup() error {
// 检查服务是否存在
s.runtime.Logger.Info("start to check %s service", s.ConfParams.InstanceType)
result, _, err := common.CheckMongoService(s.ConfParams.Port)
if err != nil {
s.runtime.Logger.Error("check %s service fail, error:%s", s.ConfParams.InstanceType, err)
return fmt.Errorf("check %s service fail, error:%s", s.ConfParams.InstanceType, err)
}
if result == true {
s.runtime.Logger.Info("%s service has been open", s.ConfParams.InstanceType)
return nil
}
s.runtime.Logger.Info("check %s service successfully", s.ConfParams.InstanceType)

// 开启服务
s.runtime.Logger.Info("start to startup %s", s.ConfParams.InstanceType)
if err = common.StartMongoProcess(s.BinDir, s.ConfParams.Port, s.OsUser, s.ConfParams.Auth); err != nil {
s.runtime.Logger.Error("startup %s fail, error:%s", s.ConfParams.InstanceType, err)
return fmt.Errorf("startup %s fail, error:%s", s.ConfParams.InstanceType, err)
}
s.runtime.Logger.Info("startup %s successfully", s.ConfParams.InstanceType)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (m *JobGenericManager) RegisterAtomJob() {
atommongodb.NewPitrRecoverJob,
atommongodb.NewRemoveNsJob,
atommongodb.NewInstallDbmonJob,
atommongodb.NewMongoStartProcess,
} {
m.atomJobMapper[f().Name()] = f
}
Expand Down
1 change: 1 addition & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ class MongoDBActuatorActionEnum(str, StructuredEnum):
MongoDReplace = EnumField("mongod_replace", _("mongod_replace"))
MongoDeInstall = EnumField("mongo_deinstall", _("mongo_deinstall"))
InstallDBMon = EnumField("install_dbmon", _("install_dbmon"))
MongoStart = EnumField("mongo_start", _("mongo_start"))


class EsActuatorActionEnum(str, StructuredEnum):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging.config
from typing import Dict, Optional

from django.utils.translation import ugettext as _

from backend.flow.engine.bamboo.scene.common.builder import Builder
from backend.flow.engine.bamboo.scene.mongodb.sub_task import cluster_enable_disable
from backend.flow.plugins.components.collections.mongodb.exec_actuator_job import ExecuteDBActuatorJobComponent
from backend.flow.plugins.components.collections.mongodb.send_media import ExecSendMediaOperationComponent
from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs

logger = logging.getLogger("flow")


class MongoEnableDisableFlow(object):
"""MongoDB禁用启用flow"""

def __init__(self, root_id: str, data: Optional[Dict]):
"""
传入参数
@param root_id : 任务流程定义的root_id
@param data : 单据传递过来的参数列表,是dict格式
"""

self.root_id = root_id
self.data = data
self.get_kwargs = ActKwargs()
self.get_kwargs.payload = data
self.get_kwargs.get_file_path()

def prepare_job(self, pipeline: Builder):
"""
准备工作
"""

# 介质下发——job的api可以多个IP并行执行
kwargs = self.get_kwargs.get_send_media_kwargs(media_type="actuator")
pipeline.add_act(
act_name=_("MongoDB-介质下发"), act_component_code=ExecSendMediaOperationComponent.code, kwargs=kwargs
)

# 创建原子任务执行目录
kwargs = self.get_kwargs.get_create_dir_kwargs()
pipeline.add_act(
act_name=_("MongoDB-创建原子任务执行目录"), act_component_code=ExecuteDBActuatorJobComponent.code, kwargs=kwargs
)

def multi_cluster_flow(self, enable: bool):
"""
multi replicaset create/delete user流程
enable True:启用
enable False:禁用
"""

# 创建流程实例
pipeline = Builder(root_id=self.root_id, data=self.data)

# 获取所有的cluster主机信息
self.get_kwargs.get_hosts_deinstall()

# 下发介质
self.prepare_job(pipeline=pipeline)

# cluster启用/禁用——子流程并行
sub_pipelines = []
for cluster_id in self.data["cluster_ids"]:
sub_pipline = cluster_enable_disable(
root_id=self.root_id,
ticket_data=self.data,
sub_kwargs=self.get_kwargs,
cluster_id=cluster_id,
enable=enable,
)
sub_pipelines.append(sub_pipline)
pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

# 运行流程
pipeline.run_pipeline()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""


from .cluster_enable_disable import cluster_enable_disable
from .cluster_increase_node import cluster_increase_node
from .cluster_reduce_node import cluster_reduce_node
from .cluster_replace import cluster_replace
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from copy import deepcopy
from typing import Dict, Optional

from django.utils.translation import ugettext as _

from backend.db_meta.enums.cluster_type import ClusterType
from backend.flow.consts import MongoDBInstanceType
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.plugins.components.collections.mongodb.enable_disable_mongodb import (
EnableDisableMongoDBOperationComponent,
)
from backend.flow.plugins.components.collections.mongodb.exec_actuator_job import ExecuteDBActuatorJobComponent
from backend.flow.utils.mongodb.mongodb_dataclass import ActKwargs


def cluster_enable_disable(
root_id: str, ticket_data: Optional[Dict], sub_kwargs: ActKwargs, cluster_id: int, enable: bool
) -> SubBuilder:
"""
cluster禁用启用流程
"""

# 获取变量
sub_get_kwargs = deepcopy(sub_kwargs)

# 创建子流程
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)

# 设置参数
sub_get_kwargs.payload["app"] = sub_get_kwargs.payload["bk_app_abbr"]

# 获取集群信息
sub_get_kwargs.get_cluster_info_deinstall(cluster_id=cluster_id)

acts_list = []
name = ""
# 启用
if enable:
name = "enable"
if sub_get_kwargs.payload["cluster_type"] == ClusterType.MongoReplicaSet.value:
for node in sub_get_kwargs.payload["nodes"]:
kwargs = sub_get_kwargs.get_mongo_start_kwargs(
node_info=node,
instance_type=MongoDBInstanceType.MongoD.value,
)
acts_list.append(
{
"act_name": _("MongoDB-{}:{}-mongod开启进程".format(node["ip"], str(node["port"]))),
"act_component_code": ExecuteDBActuatorJobComponent.code,
"kwargs": kwargs,
}
)
elif sub_get_kwargs.payload["cluster_type"] == ClusterType.MongoShardedCluster.value:
for mongos in sub_get_kwargs.payload["mongos_nodes"]:
kwargs = sub_get_kwargs.get_mongo_start_kwargs(
node_info=mongos,
instance_type=MongoDBInstanceType.MongoS.value,
)
acts_list.append(
{
"act_name": _("MongoDB-{}:{}-mongos开启进程".format(mongos["ip"], str(mongos["port"]))),
"act_component_code": ExecuteDBActuatorJobComponent.code,
"kwargs": kwargs,
}
)
# 禁用
else:
name = "disable"
if sub_get_kwargs.payload["cluster_type"] == ClusterType.MongoReplicaSet.value:
for node in sub_get_kwargs.payload["nodes"]:
kwargs = sub_get_kwargs.get_mongo_deinstall_kwargs(
node_info=node,
instance_type=MongoDBInstanceType.MongoD.value,
nodes_info=sub_get_kwargs.payload["nodes"],
force=True,
rename_dir=False,
)
acts_list.append(
{
"act_name": _("MongoDB-{}:{}-mongod关闭进程".format(node["ip"], node["port"])),
"act_component_code": ExecuteDBActuatorJobComponent.code,
"kwargs": kwargs,
}
)
elif sub_get_kwargs.payload["cluster_type"] == ClusterType.MongoShardedCluster.value:
for mongos in sub_get_kwargs.payload["mongos_nodes"]:
print("=" * 100)
print(mongos)
kwargs = sub_get_kwargs.get_mongo_deinstall_kwargs(
node_info=mongos,
instance_type=MongoDBInstanceType.MongoS.value,
nodes_info=[mongos],
force=True,
rename_dir=False,
)
acts_list.append(
{
"act_name": _("MongoDB-{}:{}-mongos关闭进程".format(mongos["ip"], mongos["port"])),
"act_component_code": ExecuteDBActuatorJobComponent.code,
"kwargs": kwargs,
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 修改cluster状态
kwargs = {"cluster_id": cluster_id, "enable": enable}
sub_pipeline.add_act(
act_name=_("修改meta"),
act_component_code=EnableDisableMongoDBOperationComponent.code,
kwargs=kwargs,
)

return sub_pipeline.build_sub_process(sub_name=_("MongoDB--cluster-{}".format(name)))
Loading

0 comments on commit 6c50348

Please sign in to comment.