Skip to content

Commit

Permalink
fix(mysql): 优化tdbctl中控集群的同步账号处理逻辑 #7905
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu authored and iSecloud committed Nov 18, 2024
1 parent ef3d745 commit 532afef
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, InstanceStatus, MachineType, TenDBClusterSpiderRole
from backend.db_meta.models import Cluster
from backend.flow.consts import AUTH_ADDRESS_DIVIDER, DBA_ROOT_USER, TDBCTL_USER
from backend.flow.consts import AUTH_ADDRESS_DIVIDER, DBA_ROOT_USER, TDBCTL_USER, PrivRole
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
Expand All @@ -30,12 +30,14 @@
from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserComponent
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.sync_master import SyncMasterComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.plugins.components.collections.spider.add_spider_routing import AddSpiderRoutingComponent
from backend.flow.plugins.components.collections.spider.ctl_drop_routing import CtlDropRoutingComponent
from backend.flow.plugins.components.collections.spider.ctl_switch_to_slave import CtlSwitchToSlaveComponent
from backend.flow.plugins.components.collections.spider.remote_migrate_cut_over import RemoteMigrateCutOverComponent
from backend.flow.plugins.components.collections.spider.spider_db_meta import SpiderDBMetaComponent
from backend.flow.utils.base.base_dataclass import Instance
from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs
from backend.flow.utils.mysql.mysql_act_dataclass import (
CreateDnsKwargs,
Expand All @@ -44,6 +46,7 @@
DownloadMediaKwargs,
ExecActuatorKwargs,
InstanceUserCloneKwargs,
MysqlSyncMasterKwargs,
)
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.spider.get_spider_incr import get_spider_master_incr
Expand Down Expand Up @@ -91,6 +94,7 @@ def add_spider_slaves_sub_flow(

# 机器系统初始化
exec_ips = [ip_info["ip"] for ip_info in add_spider_slaves]
bk_host_ids = [ip_info["bk_host_id"] for ip_info in add_spider_slaves]
# 初始新机器
sub_pipeline.add_sub_pipeline(
sub_flow=init_machine_sub_flow(
Expand All @@ -100,6 +104,7 @@ def add_spider_slaves_sub_flow(
sys_init_ips=exec_ips,
init_check_ips=exec_ips,
yum_install_perl_ips=exec_ips,
bk_host_ids=bk_host_ids,
)
)

Expand Down Expand Up @@ -239,6 +244,7 @@ def add_spider_masters_sub_flow(
# 机器系统初始化

exec_ips = [ip_info["ip"] for ip_info in add_spider_masters]
bk_host_ids = [ip_info["bk_host_id"] for ip_info in add_spider_masters]
# 初始新机器
sub_pipeline.add_sub_pipeline(
sub_flow=init_machine_sub_flow(
Expand All @@ -248,6 +254,7 @@ def add_spider_masters_sub_flow(
sys_init_ips=exec_ips,
init_check_ips=exec_ips,
yum_install_perl_ips=exec_ips,
bk_host_ids=bk_host_ids,
)
)
# 阶段1 下发spider安装介质包
Expand Down Expand Up @@ -356,15 +363,26 @@ def add_spider_masters_sub_flow(

if not is_add_spider_mnt:
# 阶段8 待添加中控实例建立主从数据同步关系
sub_pipeline.add_sub_pipeline(
sub_flow=build_ctl_replication_with_gtid(
root_id=root_id,
parent_global_data=parent_global_data,
bk_cloud_id=cluster.bk_cloud_id,
ctl_primary=cluster.tendbcluster_ctl_primary_address(),
ctl_secondary_list=add_spider_masters,
)
ctl_master = cluster.tendbcluster_ctl_primary_address()
ctl_master_ip = ctl_master.split(IP_PORT_DIVIDER)[0]
ctl_master_port = ctl_master.split(IP_PORT_DIVIDER)[1]
sub_pipeline.add_act(
act_name=_("构建spider中控集群同步"),
act_component_code=SyncMasterComponent.code,
kwargs=asdict(
MysqlSyncMasterKwargs(
bk_biz_id=cluster.bk_biz_id,
bk_cloud_id=cluster.bk_cloud_id,
priv_role=PrivRole.TDBCTL.value,
master=Instance(host=ctl_master_ip, port=ctl_master_port),
slaves=[Instance(host=s["ip"], port=ctl_master_port) for s in add_spider_masters],
is_gtid=True,
is_add_any=True,
is_master_add_priv=False,
)
),
)

# 阶段8 添加域名映射关系
sub_pipeline.add_act(
act_name=_("添加集群域名"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,32 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, TenDBClusterSpiderRole
from backend.flow.consts import TDBCTL_USER
from backend.flow.consts import TDBCTL_USER, PrivRole
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
build_repl_by_manual_input_sub_flow,
build_surrounding_apps_sub_flow,
init_machine_sub_flow,
)
from backend.flow.engine.bamboo.scene.spider.common.common_sub_flow import (
build_apps_for_spider_sub_flow,
build_ctl_replication_with_gtid,
)
from backend.flow.engine.bamboo.scene.spider.common.common_sub_flow import build_apps_for_spider_sub_flow
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.sync_master import SyncMasterComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.plugins.components.collections.spider.add_system_user_in_cluster import (
AddSystemUserInClusterComponent,
)
from backend.flow.plugins.components.collections.spider.spider_db_meta import SpiderDBMetaComponent
from backend.flow.utils.base.base_dataclass import Instance
from backend.flow.utils.mysql.mysql_act_dataclass import (
AddSpiderSystemUserKwargs,
CreateDnsKwargs,
DBMetaOPKwargs,
DownloadMediaKwargs,
ExecActuatorKwargs,
MysqlSyncMasterKwargs,
)
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_context_dataclass import SpiderApplyManualContext
Expand Down Expand Up @@ -341,15 +340,20 @@ def deploy_cluster(self):
)
deploy_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_flow_list)

# 阶段5 构建spider中控集群
deploy_pipeline.add_sub_pipeline(
sub_flow=build_ctl_replication_with_gtid(
root_id=self.root_id,
parent_global_data=self.data,
bk_cloud_id=int(self.data["bk_cloud_id"]),
ctl_primary=f"{ctl_master['ip']}{IP_PORT_DIVIDER}{self.data['ctl_port']}",
ctl_secondary_list=ctl_slaves,
)
deploy_pipeline.add_act(
act_name=_("构建spider中控集群同步"),
act_component_code=SyncMasterComponent.code,
kwargs=asdict(
MysqlSyncMasterKwargs(
bk_biz_id=int(self.data["bk_biz_id"]),
bk_cloud_id=int(self.data["bk_cloud_id"]),
priv_role=PrivRole.TDBCTL.value,
master=Instance(host=ctl_master["ip"], port=self.data["ctl_port"]),
slaves=[Instance(host=s["ip"], port=self.data["ctl_port"]) for s in ctl_slaves],
is_gtid=True,
is_add_any=True,
)
),
)

# 阶段6 内部集群节点之间授权
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from django.utils.translation import ugettext as _
from pipeline.component_framework.component import Component

from backend.components import DBConfigApi, DBPrivManagerApi, DRSApi
from backend.components.dbconfig.constants import FormatType, LevelName
from backend.constants import IP_PORT_DIVIDER
from backend.flow.consts import ConfigTypeEnum, NameSpaceEnum, PrivRole
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException
from backend.flow.plugins.components.collections.common.base_service import BaseService


class SyncMasterService(BaseService):
def _get_repl_user(self):
data = DBConfigApi.query_conf_item(
{
"bk_biz_id": "0",
"level_name": LevelName.PLAT,
"level_value": "0",
"conf_file": "mysql#user",
"conf_type": ConfigTypeEnum.InitUser,
"namespace": NameSpaceEnum.TenDB.value,
"format": FormatType.MAP,
}
)["content"]
self.log_info("get repl_user successfully")
return data["repl_user"], data["repl_pwd"]

def _add_repl_user(
self,
address_list: list,
bk_cloud_id: int,
bk_biz_id: int,
priv_role: PrivRole,
priv_hosts: list,
repl_user: str,
repl_pwd: str,
):
"""
@param address_list: 授权实例列表
@param bk_cloud_id: 云区域ID
@param bk_biz_id: 业务ID
@param priv_role: 授权角色
@param priv_hosts: 授权host
@param repl_user: 同步账号
@param repl_pwd: 账号pwd
"""
# 远程授权
for address in address_list:
DBPrivManagerApi.add_priv_without_account_rule(
{
"bk_cloud_id": bk_cloud_id,
"bk_biz_id": bk_biz_id,
"operator": "",
"user": repl_user,
"psw": repl_pwd,
"hosts": priv_hosts,
"dbname": "%",
"dml_ddl_priv": "",
"global_priv": "REPLICATION SLAVE, REPLICATION CLIENT",
"address": address,
"role": priv_role,
}
)
self.log_info(_("在[{}]创建添加同步账号成功, priv_hosts:{}").format(address, priv_hosts))
return True

def get_bin_position(self, address: str, bk_cloud_id: int) -> (str, str):
"""
获取位点信息
"""
res = DRSApi.rpc(
{
"addresses": [address],
"cmds": ["show master status;"],
"force": False,
"bk_cloud_id": bk_cloud_id,
}
)
if res[0]["error_msg"]:
raise NormalTenDBFlowException(message=_(f"exec show master status failed: {res[0]['error_msg']}"))
self.log_info("get bin position successfully")
return res[0]["cmd_results"][1]["table_data"][0]["File"], res[0]["cmd_results"][1]["table_data"][0]["Position"]

def _execute(self, data, parent_data) -> bool:
"""
用rds来处理主从同步的建立过程,处理步骤如下:
1:先在master创建同步账号,保证待同步的slave有权限同步,并返回当前master位点信息
2:根据不同场景,拼接建立同步sql,通过drs执行
"""
kwargs = data.get_one_of_inputs("kwargs")
repl_user, repl_pwd = self._get_repl_user()
master_address = f"{kwargs['master']['host']}{IP_PORT_DIVIDER}{kwargs['master']['port']}"
if kwargs["is_add_any"]:
# 是否用%全匹配.全实例处理开权限,一步到位
priv_hosts = ["%"]
priv_instance_list = [f"{s['host']}{IP_PORT_DIVIDER}{s['port']}" for s in kwargs["slaves"]]
if kwargs["is_master_add_priv"]:
priv_instance_list += [master_address]

else:
# 不是全匹配,则每次只对master实例开权限
priv_instance_list = [master_address]
priv_hosts = [s["host"] for s in kwargs["slaves"]]

self._add_repl_user(
address_list=priv_instance_list,
bk_biz_id=kwargs["bk_biz_id"],
bk_cloud_id=kwargs["bk_cloud_id"],
priv_role=kwargs["priv_role"],
priv_hosts=priv_hosts,
repl_user=repl_user,
repl_pwd=repl_pwd,
)
if not kwargs["is_gtid"]:
# 普通位点模式
file, position = self.get_bin_position(address=master_address, bk_cloud_id=kwargs["bk_cloud_id"])
repl_sql = (
f"CHANGE MASTER TO "
f"MASTER_HOST ='{kwargs['master']['host']}',"
f"MASTER_PORT={kwargs['master']['port']},"
f"MASTER_USER ='{repl_user}',"
f"MASTER_PASSWORD='{repl_pwd}',"
f"MASTER_LOG_FILE = '{file}',"
f"MASTER_LOG_POS = {position};"
)
else:
# GTID模式
repl_sql = (
f"CHANGE MASTER TO "
f"MASTER_HOST ='{kwargs['master']['host']}',"
f"MASTER_PORT={kwargs['master']['port']},"
f"MASTER_USER ='{repl_user}',"
f"MASTER_PASSWORD='{repl_pwd}',"
"MASTER_AUTO_POSITION = 1;"
)

# 建立同步
for secondary in kwargs["slaves"]:
res = DRSApi.rpc(
{
"addresses": [f"{secondary['host']}{IP_PORT_DIVIDER}{secondary['port']}"],
"cmds": [repl_sql, "start slave;"],
"force": False,
"bk_cloud_id": kwargs["bk_cloud_id"],
}
)
if res[0]["error_msg"]:
raise NormalTenDBFlowException(message=_(f"exec change master failed: {res[0]['error_msg']}"))
return True


class SyncMasterComponent(Component):
name = __name__
code = "mysql_sync_master"
bound_service = SyncMasterService
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from django.utils.translation import ugettext_lazy as _
from pipeline.component_framework.component import Component

from backend.components import DBConfigApi, DBPrivManagerApi, DRSApi
from backend.components import DBConfigApi, DRSApi
from backend.components.dbconfig.constants import FormatType, LevelName
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import TenDBClusterSpiderRole
from backend.db_meta.models import Cluster
from backend.flow.consts import TDBCTL_USER, ConfigTypeEnum, NameSpaceEnum, PrivRole
from backend.flow.consts import TDBCTL_USER, ConfigTypeEnum, NameSpaceEnum
from backend.flow.engine.bamboo.scene.spider.common.exceptions import CtlSwitchToSlaveFailedException
from backend.flow.plugins.components.collections.common.base_service import BaseService

Expand Down Expand Up @@ -114,7 +114,7 @@ def _stop_slave(self, cluster: Cluster, ctl_set):
"bk_cloud_id": cluster.bk_cloud_id,
}
for ctl in ctl_set:
self.log_info(f"exec stop slave in instance[{ctl.machine.ip}{IP_PORT_DIVIDER}{ctl.admin_port}")
self.log_info(f"exec stop slave in instance[{ctl.machine.ip}{IP_PORT_DIVIDER}{ctl.admin_port}]")
rpc_params["addresses"] = [f"{ctl.machine.ip}{IP_PORT_DIVIDER}{ctl.admin_port}"]
res = DRSApi.rpc(rpc_params)

Expand Down Expand Up @@ -190,24 +190,6 @@ def _sync_to_new_master(self, cluster: Cluster, new_primary, other_secondary):
}
)["content"]

# 远程授权
DBPrivManagerApi.add_priv_without_account_rule(
{
"bk_cloud_id": cluster.bk_cloud_id,
"bk_biz_id": cluster.bk_biz_id,
"operator": "",
"user": data["repl_user"],
"psw": data["repl_pwd"],
"hosts": [slave.machine.ip for slave in other_secondary],
"dbname": "%",
"dml_ddl_priv": "",
"global_priv": "REPLICATION SLAVE, REPLICATION CLIENT",
"address": new_primary,
"role": PrivRole.TDBCTL.value,
}
)
self.log_info(_("在[{}]创建添加同步账号成功").format(new_primary))

# 基于GTID建立同步
for secondary in other_secondary:
repl_sql = (
Expand Down
Loading

0 comments on commit 532afef

Please sign in to comment.