Skip to content

Commit

Permalink
feat(mongodb): 保存cachesize,oplosize和replsetname TencentBlueKing#4256
Browse files Browse the repository at this point in the history
  • Loading branch information
yyhenryyy committed May 10, 2024
1 parent a5100cf commit 1a709a2
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 134 deletions.
79 changes: 10 additions & 69 deletions dbm-services/go.work.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
type InitConfParams struct {
IP string `json:"ip" validate:"required"`
Port int `json:"port" validate:"required"`
App string `json:"app" validate:"required"`
SetId string `json:"setId" validate:"required"`
ConfigSvr bool `json:"configSvr"` // shardsvr configsvr
Ips []string `json:"ips" validate:"required"` // ip:port
Expand Down Expand Up @@ -107,7 +106,7 @@ func (i *InitiateReplicaset) Init(runtime *jobruntime.JobGenericRuntime) error {
"get parameters of initiateReplicaset fail by json.Unmarshal, error:%s", err))
return fmt.Errorf("get parameters of initiateReplicaset fail by json.Unmarshal, error:%s", err)
}
i.ClusterId = strings.Join([]string{i.ConfParams.App, i.ConfParams.SetId}, "-")
i.ClusterId = i.ConfParams.SetId
i.runtime.Logger.Info("init successfully")

// 进行校验
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
type DeInstallConfParams struct {
IP string `json:"ip" validate:"required"`
Port int `json:"port" validate:"required"`
App string `json:"app" validate:"required"`
SetId string `json:"setId" validate:"required"`
NodeInfo []string `json:"nodeInfo" validate:"required"` // []string ip,ip 如果为复制集节点,则为复制集所有节点的ip;如果为mongos,则为mongos的ip
InstanceType string `json:"instanceType" validate:"required"` // mongod mongos
Expand Down Expand Up @@ -112,12 +111,12 @@ func (d *DeInstall) Init(runtime *jobruntime.JobGenericRuntime) error {
strPort := strconv.Itoa(d.ConfParams.Port)
d.PortDir = filepath.Join(d.DataDir, "mongodata", strPort)
d.DbpathDir = filepath.Join(d.DataDir, "mongodata", strPort, "db")
d.DbPathRenameDir = filepath.Join(d.DataDir, "mongodata", fmt.Sprintf("%s_%s_%s_%d",
d.ConfParams.InstanceType, d.ConfParams.App, d.ConfParams.SetId, d.ConfParams.Port))
d.DbPathRenameDir = filepath.Join(d.DataDir, "mongodata", fmt.Sprintf("%s_%s_%d",
d.ConfParams.InstanceType, d.ConfParams.SetId, d.ConfParams.Port))
d.IPInfo = strings.Join(d.ConfParams.NodeInfo, "|")
d.LogPortDir = filepath.Join(d.BackupDir, "mongolog", strPort)
d.LogPathRenameDir = filepath.Join(d.BackupDir, "mongolog", fmt.Sprintf("%s_%s_%s_%d",
d.ConfParams.InstanceType, d.ConfParams.App, d.ConfParams.SetId, d.ConfParams.Port))
d.LogPathRenameDir = filepath.Join(d.BackupDir, "mongolog", fmt.Sprintf("%s_%s_%d",
d.ConfParams.InstanceType, d.ConfParams.SetId, d.ConfParams.Port))

// 进行校验
if err := d.checkParams(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ type MongoDBConfParams struct {
Port int `json:"port" validate:"required"`
DbVersion string `json:"dbVersion" validate:"required"`
InstanceType string `json:"instanceType" validate:"required"` // mongos mongod
App string `json:"app" validate:"required"`
SetId string `json:"setId" validate:"required"` // 复制集为集群id,cluster为集群id+序号
KeyFile string `json:"keyFile" validate:"required"` // keyFile的内容 app-setId
Auth bool `json:"auth"` // true:以验证方式启动mongod false:以非验证方式启动mongod
ClusterRole string `json:"clusterRole"` // 部署cluster时填写,shardsvr configsvr;部署复制集时为空
SetId string `json:"setId" validate:"required"` // 复制集为集群id,cluster为集群id+序号
KeyFile string `json:"keyFile" validate:"required"` // keyFile的内容 app-setId
Auth bool `json:"auth"` // true:以验证方式启动mongod false:以非验证方式启动mongod
ClusterRole string `json:"clusterRole"` // 部署cluster时填写,shardsvr configsvr;部署复制集时为空
DbConfig struct {
SlowOpThresholdMs int `json:"slowOpThresholdMs"`
CacheSizeGB int `json:"cacheSizeGB"`
Expand Down Expand Up @@ -199,8 +198,7 @@ func (m *MongoDBInstall) makeConfContent() error {
conf.Storage.Engine = "wiredTiger"
conf.Storage.WiredTiger.EngineConfig.CacheSizeGB = m.ConfParams.DbConfig.CacheSizeGB
conf.Replication.OplogSizeMB = m.ConfParams.DbConfig.OplogSizeMB
conf.Replication.ReplSetName = strings.Join([]string{m.ConfParams.App, m.ConfParams.SetId},
"-")
conf.Replication.ReplSetName = m.ConfParams.SetId
conf.SystemLog.LogAppend = true
conf.SystemLog.Path = m.LogPath
conf.SystemLog.Destination = m.ConfParams.DbConfig.Destination
Expand Down Expand Up @@ -238,8 +236,7 @@ func (m *MongoDBInstall) makeConfContent() error {
m.runtime.Logger.Info("start to make mongodb config file content")
NoAuthConf := common.IniNoAuthMongoDBConf
AuthConf := common.IniAuthMongoDBConf
replSet := strings.Join([]string{m.ConfParams.App, m.ConfParams.SetId},
"-")
replSet := m.ConfParams.SetId
NoAuthConf = strings.Replace(NoAuthConf, "{{replSet}}", replSet, -1)
AuthConf = strings.Replace(AuthConf, "{{replSet}}", replSet, -1)
NoAuthConf = strings.Replace(NoAuthConf, "{{dbpath}}", m.DbpathDir, -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type MongoSConfParams struct {
Port int `json:"port" validate:"required"`
DbVersion string `json:"dbVersion" validate:"required"`
InstanceType string `json:"instanceType" validate:"required"` // mongos mongod
App string `json:"app" validate:"required"`
SetId string `json:"setId" validate:"required"`
KeyFile string `json:"keyFile" validate:"required"` // keyFile的内容 app-setId
Auth bool `json:"auth"` // true:以验证方式启动mongos false:以非验证方式启动mongos
Expand Down Expand Up @@ -163,7 +162,7 @@ func (s *MongoSInstall) makeConfContent() error {
"get %s version fail, error:%s", s.ConfParams.InstanceType, err)
return fmt.Errorf("get %s version fail, error:%s", s.ConfParams.InstanceType, err)
}
clusterId := strings.Join([]string{s.ConfParams.App, s.ConfParams.SetId, "conf"}, "-")
clusterId := s.ConfParams.SetId
IpConfigDB := strings.Join(s.ConfParams.ConfigDB, ",")
configDB := strings.Join([]string{clusterId, IpConfigDB}, "/")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package common TODO
// Package common 公共函数
package common

import (
Expand All @@ -7,6 +7,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -394,7 +395,8 @@ func GetNodeInfo26(ip string, port int, username string, password string) (
var statusSlice bson.A
var confSlice bson.A
// 设置mongodb连接参数
clientOptions := options.Client().ApplyURI(fmt.Sprintf("mongodb://%s:%s@%s:%d", username, password, ip, port))
clientOptions := options.Client().ApplyURI(fmt.Sprintf("mongodb://%s:%s@%s:%d",
username, url.QueryEscape(password), ip, port))
// 连接mongodb
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
Expand Down Expand Up @@ -430,15 +432,18 @@ func GetCurrentNodeInfo(mainDbVersion float64, statusSlice bson.A, confSlice bso
flag := false
for _, key := range statusSlice {
var statusInfo map[string]interface{}
var nodeState int
if mainDbVersion < 2.6 {
statusInfo = key.(map[string]interface{})
nodeState, _ = strconv.Atoi(fmt.Sprintf("%1.0f", statusInfo["state"]))
} else {
infoMap := map[string]interface{}(key.(bson.M))
statusInfo = infoMap
nodeState = int(statusInfo["state"].(int32))
}
if statusInfo["name"].(string) == source {
id, _ = strconv.Atoi(fmt.Sprintf("%1.0f", statusInfo["_id"]))
state, _ = strconv.Atoi(fmt.Sprintf("%1.0f", statusInfo["state"]))
state = nodeState
flag = true
break
}
Expand Down Expand Up @@ -501,12 +506,14 @@ func GetNodeInfo(mongoBin string, ip string, port int, username string, password
var statusInfo map[string]interface{}
if mainDbVersion < 2.6 {
statusInfo = v.(map[string]interface{})
member["state"] = fmt.Sprintf("%1.0f", statusInfo["state"])
} else {
infoMap := map[string]interface{}(v.(bson.M))
statusInfo = infoMap
member["state"] = fmt.Sprintf("%d", statusInfo["state"])
}
member["name"] = statusInfo["name"].(string)
member["state"] = fmt.Sprintf("%1.0f", statusInfo["state"])

for _, k := range confSlice {
var confInfo map[string]interface{}
if mainDbVersion < 2.6 {
Expand Down
13 changes: 11 additions & 2 deletions dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from django.utils.translation import ugettext as _

from backend.db_meta.enums.cluster_type import ClusterType
from backend.flow.consts import MongoDBClusterRole
from backend.flow.engine.bamboo.scene.common.builder import Builder
from backend.flow.engine.bamboo.scene.mongodb.mongodb_install_dbmon import add_install_dbmon
Expand Down Expand Up @@ -135,8 +136,16 @@ def cluster_install_flow(self):
self.prepare_job(pipeline=pipeline)

# 保存keyfile到dbconfig
self.get_kwargs.cluster_save_key_file()

namespace = ClusterType.MongoShardedCluster.value
cluster_name = self.data["cluster_id"]
self.get_kwargs.save_key_file(namespace=namespace, cluster_name=cluster_name, key_file=self.data["key_file"])
# cachesize和oplogsize到dbconfig
self.get_kwargs.save_cache_size(
namespace=namespace, cluster_name=cluster_name, cache_size=str(self.data["shards"][0]["cacheSizeGB"])
)
self.get_kwargs.save_oplog_size(
namespace=namespace, cluster_name=cluster_name, oplog_size=str(self.data["shards"][0]["oplogSizeMB"])
)
# 密码服务获取管理用户密码 shard,config的密码保持一致
kwargs = self.get_kwargs.get_get_manager_password_kwargs()
pipeline.add_act(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ def cluster_increase_node(root_id: str, ticket_data: Optional[Dict], sub_kwargs:

# 获取key_file
sub_get_kwargs.cluster_type = sub_get_kwargs.payload["cluster_type"]
sub_get_kwargs.payload["key_file"] = sub_get_kwargs.get_key_file(
cluster_name=sub_get_kwargs.payload["cluster_name"]
)
sub_get_kwargs.payload["key_file"] = sub_get_kwargs.get_conf(cluster_name=sub_get_kwargs.payload["cluster_name"])[
"key_file"
]

# 以IP为维度增加node——子流程并行
sub_pipelines = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def increase_mongod(

# 获取key_file
sub_get_kwargs.cluster_type = sub_get_kwargs.payload["cluster_type"]
sub_get_kwargs.replicaset_info["key_file"] = sub_get_kwargs.get_key_file(
sub_get_kwargs.replicaset_info["key_file"] = sub_get_kwargs.get_conf(
cluster_name=sub_get_kwargs.payload["cluster_name"]
)
)["key_file"]

# 设置参数
sub_get_kwargs.replicaset_info["set_id"] = sub_get_kwargs.payload["set_id"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def increase_mongos(root_id: str, ticket_data: Optional[Dict], sub_kwargs: ActKw
cluster_name = info["cluster_name"]
sub_get_kwargs.mongos_info["set_id"] = cluster_name
sub_get_kwargs.db_main_version = str(info["db_version"].split(".")[0])
sub_get_kwargs.payload["key_file"] = sub_get_kwargs.get_key_file(cluster_name=cluster_name)
sub_get_kwargs.payload["key_file"] = sub_get_kwargs.get_conf(cluster_name=cluster_name)["key_file"]
sub_get_kwargs.payload["nodes"] = []
sub_get_kwargs.payload["nodes"].append(sub_get_kwargs.payload["mongos_nodes"][0])
sub_get_kwargs.payload["mongos"] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,18 @@ def mongod_replace(
# 获取参数
new_node = info["target"]
sub_sub_get_kwargs.payload["app"] = sub_sub_get_kwargs.payload["bk_app_abbr"]
if cluster_role != MongoDBClusterRole.ConfigSvr.value or not cluster_role:
sub_sub_get_kwargs.replicaset_info = {}
sub_sub_get_kwargs.replicaset_info["port"] = sub_sub_get_kwargs.db_instance["port"]
force = True
if cluster_role:
sub_sub_get_kwargs.cluster_type = ClusterType.MongoShardedCluster.value
cluster_name = sub_sub_get_kwargs.db_instance["seg_range"]
sub_sub_get_kwargs.payload["cluster_type"] = ClusterType.MongoShardedCluster.value
sub_sub_get_kwargs.payload["set_id"] = sub_sub_get_kwargs.db_instance["seg_range"]
sub_sub_get_kwargs.payload["key_file"] = sub_sub_get_kwargs.get_key_file(
sub_sub_get_kwargs.payload["key_file"] = sub_sub_get_kwargs.get_conf(
cluster_name=sub_sub_get_kwargs.db_instance["cluster_name"]
)
)["key_file"]
sub_sub_get_kwargs.payload["config_nodes"] = []
sub_sub_get_kwargs.payload["shards_nodes"] = []
sub_sub_get_kwargs.payload["mongos_nodes"] = []
Expand All @@ -86,14 +88,21 @@ def mongod_replace(
}
]
}

sub_sub_get_kwargs.payload["shards_nodes"].append(shard_nodes)
# shard直接获取配置
conf = sub_sub_get_kwargs.get_conf(cluster_name=sub_sub_get_kwargs.db_instance["cluster_name"])
sub_sub_get_kwargs.replicaset_info["cacheSizeGB"] = conf["cacheSizeGB"]
sub_sub_get_kwargs.replicaset_info["oplogSizeMB"] = conf["oplogSizeMB"]
else:
sub_sub_get_kwargs.cluster_type = ClusterType.MongoReplicaSet.value
cluster_name = sub_sub_get_kwargs.db_instance["cluster_name"]
sub_sub_get_kwargs.payload["cluster_type"] = ClusterType.MongoReplicaSet.value
sub_sub_get_kwargs.payload["set_id"] = cluster_name
sub_sub_get_kwargs.replicaset_info["key_file"] = sub_sub_get_kwargs.get_key_file(cluster_name=cluster_name)
# 副本集直接获取配置
conf = sub_sub_get_kwargs.get_conf(cluster_name=cluster_name)
sub_sub_get_kwargs.replicaset_info["key_file"] = conf["key_file"]
sub_sub_get_kwargs.replicaset_info["cacheSizeGB"] = conf["cacheSizeGB"]
sub_sub_get_kwargs.replicaset_info["oplogSizeMB"] = conf["oplogSizeMB"]
sub_sub_get_kwargs.replicaset_info["set_id"] = cluster_name
sub_sub_get_kwargs.replicaset_info["nodes"] = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ def mongos_replace(root_id: str, ticket_data: Optional[Dict], sub_sub_kwargs: Ac
sub_sub_get_kwargs.payload["shards_nodes"] = []
sub_sub_get_kwargs.payload["app"] = sub_sub_get_kwargs.payload["bk_app_abbr"]
sub_sub_get_kwargs.mongos_info["port"] = sub_sub_get_kwargs.db_instance["port"]
cluster_name = sub_sub_get_kwargs.db_instance["cluster_name"]
sub_sub_get_kwargs.mongos_info["set_id"] = cluster_name
cluster_id = sub_sub_get_kwargs.db_instance["cluster_id"]
sub_sub_get_kwargs.mongos_info["conf_set_id"] = sub_sub_get_kwargs.get_config_set_name_replace(
cluster_id=cluster_id
)
sub_sub_get_kwargs.cluster_type = ClusterType.MongoShardedCluster.value
sub_sub_get_kwargs.payload["key_file"] = sub_sub_get_kwargs.get_key_file(cluster_name=cluster_name)
sub_sub_get_kwargs.payload["key_file"] = sub_sub_get_kwargs.get_conf(
cluster_name=sub_sub_get_kwargs.db_instance["cluster_name"]
)["key_file"]
node = info["target"]
node["cluster_id"] = sub_sub_get_kwargs.db_instance["cluster_id"]
node["cluster_id"] = cluster_id
sub_sub_get_kwargs.payload["mongos"] = {}
sub_sub_get_kwargs.payload["mongos"]["nodes"] = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from django.utils.translation import ugettext as _

from backend.db_meta.enums.cluster_type import ClusterType
from backend.flow.consts import MongoDBManagerUser, MongoDBTask
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.plugins.components.collections.mongodb.add_domain_to_dns import ExecAddDomainToDnsOperationComponent
Expand Down Expand Up @@ -67,7 +68,22 @@ def replicaset_install(

if not cluster:
# 保存keyfile到dbconfig
sub_get_kwargs.set_save_key_file()
namespace = ClusterType.MongoReplicaSet.value
cluster_name = sub_get_kwargs.replicaset_info["set_id"]
sub_get_kwargs.save_key_file(
namespace=namespace, cluster_name=cluster_name, key_file=sub_get_kwargs.replicaset_info["key_file"]
)
# cachesize和oplogsize到dbconfig
sub_get_kwargs.save_cache_size(
namespace=namespace,
cluster_name=cluster_name,
cache_size=str(sub_get_kwargs.replicaset_info["cacheSizeGB"]),
)
sub_get_kwargs.save_oplog_size(
namespace=namespace,
cluster_name=cluster_name,
oplog_size=str(sub_get_kwargs.replicaset_info["oplogSizeMB"]),
)
# 密码服务获取管理用户密码
kwargs = sub_get_kwargs.get_get_manager_password_kwargs()
sub_pipeline.add_act(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ def replicaset_replace(
act_name=_("MongoDB-机器初始化"), act_component_code=ExecuteDBActuatorJobComponent.code, kwargs=kwargs
)

# 计算参数
sub_get_kwargs.calc_param_replace(info=info, instance_num=0)
# config计算参数
if cluster_role == MongoDBClusterRole.ConfigSvr.value:
sub_get_kwargs.calc_param_replace(info=info, instance_num=0)
# 进行替换——并行 以ip为维度
sub_sub_pipelines = []
for mongodb_instance in info["instances"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def decommission_proxies(self, cluster: Cluster, proxies: List[Dict]):
if _t:
raise Exception(_("{} 存在不是本集群的实例下架列表").format(_t))

machine_obj, cc_manage = defaultdict(dict), CcManage(cluster.bk_biz_id, DBType.MongoDB.value)
machine_obj, cc_manage = defaultdict(dict), CcManage(cluster.bk_biz_id, cluster.cluster_type)
cc_manage.delete_service_instance(bk_instance_ids=[obj.bk_instance_id for obj in proxy_objs])
for proxy_obj in proxy_objs:
logger.info("cluster proxy {} for cluster {}".format(proxy_obj, cluster.immute_domain))
Expand Down Expand Up @@ -91,7 +91,7 @@ def decommission_proxies(self, cluster: Cluster, proxies: List[Dict]):
@transaction.atomic
def decommission_backends(self, cluster: Cluster, backends: List[Dict]):
logger.info("user request decmmission backends {} {}".format(cluster.immute_domain, backends))
cc_manage = CcManage(cluster.bk_biz_id, DBType.MongoDB.value)
cc_manage = CcManage(cluster.bk_biz_id, cluster.cluster_type)
try:
storage_objs = common.filter_out_instance_obj(backends, cluster.storageinstance_set.all())
_t = common.not_exists(backends, storage_objs)
Expand Down Expand Up @@ -149,7 +149,7 @@ def _execute(self, data, parent_data) -> bool:
cluster_entry_obj.delete()
logger.info("cluster detail {}".format(cluster.__dict__))

CcManage(cluster.bk_biz_id, DBType.MongoDB.value).delete_cluster_modules(
CcManage(cluster.bk_biz_id, cluster.cluster_type).delete_cluster_modules(
db_type=DBType.MongoDB.value, cluster=cluster
)
cluster.delete()
Expand Down
4 changes: 2 additions & 2 deletions dbm-ui/backend/flow/utils/mongodb/calculate_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def cluster_calc(payload: dict, payload_clusters: dict, app: str) -> dict:
nodes = [{"ip": machine["ip"], "bk_cloud_id": machine["bk_cloud_id"]} for machine in machines]
replica_set["nodes"] = nodes
shards.append(replica_set)
add_shards["{}-{}".format(app, replica_set["set_id"])] = ",".join(
add_shards[replica_set["set_id"]] = ",".join(
["{}:{}".format(node["ip"], str(replica_set["port"])) for node in nodes[0:-1]]
)

Expand All @@ -225,7 +225,7 @@ def cluster_calc(payload: dict, payload_clusters: dict, app: str) -> dict:
# mongos
mongos = {}
mongos["port"] = payload["proxy_port"] # 默认27021
mongos["set_id"] = payload["cluster_name"]
mongos["conf_set_id"] = config["set_id"]
mongos["domain"] = "mongos.{}.{}.db".format(payload["cluster_name"], app)
nodes = [{"ip": machine["ip"], "bk_cloud_id": machine["bk_cloud_id"]} for machine in payload["nodes"]["mongos"]]
mongos["nodes"] = nodes
Expand Down
Loading

0 comments on commit 1a709a2

Please sign in to comment.