Skip to content

Commit

Permalink
feat: tendbHa数据恢复流程加入权限恢复节点 #7860
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo authored and iSecloud committed Nov 29, 2024
1 parent cfe0b20 commit d6b265d
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime,
else:
return self.aggregate_tendb_dbbackup_logs(backup_logs)

def query_instance_backup_logs(self, end_time: datetime, **kwargs) -> Dict:
def query_instance_backup_priv_logs(self, end_time: datetime, **kwargs) -> Dict:
"""
通过日志平台查询集群实例一周内最近的权限备份日志
:param end_time: 结束时间
Expand Down
1 change: 1 addition & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ class DBActuatorActionEnum(str, StructuredEnum):
CheckTdbctlWithSpiderSchema = EnumField("check-tdbctl-with-spider-schema", _("icheck-tdbctl-with-spider-schema"))
GetBackupFile = EnumField("find-local-backup", _("find-local-backup"))
RestoreSlave = EnumField("restore-dr", _("restore-dr"))
FastExecuteSqlFile = EnumField("fast-execute-sql-file", _("fast-execute-sql-file"))
RecoverBinlog = EnumField("recover-binlog", _("recover-binlog"))
GrantRepl = EnumField("grant-repl", _("grant-repl"))
ChangeMaster = EnumField("change-master", _("change-master"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import copy
import logging
import os.path
from dataclasses import asdict
from datetime import datetime

Expand All @@ -18,6 +19,7 @@

from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME, MYSQL_USUAL_JOB_TIME, DBType
from backend.db_meta.enums import ClusterType
from backend.db_meta.models import Cluster
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
from backend.flow.consts import MysqlChangeMasterType
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
Expand Down Expand Up @@ -169,3 +171,82 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
return sub_pipeline.build_sub_process(
sub_name=_("{}:{}从节点重建".format(cluster["new_slave_ip"], cluster["new_slave_port"]))
)


def priv_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict, ips: list):
"""
tendb privilege recover 指定实例权限恢复。
@param root_id: flow流程的root_id
@param ticket_data: 关联单据 ticket对象
@param cluster_info: 关联的cluster对象
@param ips: 实例ip
"""
cluster_model = Cluster.objects.get(id=cluster_info["cluster_id"])
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)
# 查询备份
rollback_time = datetime.now(timezone.utc)
rollback_handler = FixPointRollbackHandler(cluster_id=cluster_model.id)
backup_info = rollback_handler.query_instance_backup_priv_logs(rollback_time)
if backup_info is None:
return None
# 查询不到权限备份,则不添加权限备份节点。
# logger.error("cluster {} backup privilege not exists".format(cluster_model.id))
# raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的权限备份信息失败".format(cluster_model.id)))

priv_file_task_ids = [file["task_id"] for file in backup_info["file_list"].values()]
priv_files = [os.path.basename(file["file_name"]) for file in backup_info["file_list"].values()]
storages = cluster_model.storageinstance_set.filter(machine__ip__in=ips)
priv_sub_pipeline_list = []
for storage in storages:
priv_sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)
cluster = {
"cluster_id": cluster_model.id,
"file_target_path": f"/data/dbbak/{root_id}/{storage.port}",
"sql_files": priv_files,
"port": storage.port,
"force": False,
}
exec_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
cluster_type=cluster_model.cluster_type,
cluster=copy.deepcopy(cluster),
)
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.mysql_mkdir_dir.__name__
exec_act_kwargs.exec_ip = storage.machine.ip
sub_pipeline.add_act(
act_name=_("创建目录 {}".format(cluster["file_target_path"])),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)

download_kwargs = DownloadBackupFileKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
task_ids=priv_file_task_ids,
dest_ip=storage.machine.ip,
dest_dir=cluster["file_target_path"],
reason="privilege recover",
)

priv_sub_pipeline.add_act(
act_name=_("下载权限备份介质到 {}".format(storage.machine.ip)),
act_component_code=MySQLDownloadBackupfileComponent.code,
kwargs=asdict(download_kwargs),
)

exec_act_kwargs.job_timeout = MYSQL_USUAL_JOB_TIME
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_restore_priv_payload.__name__
priv_sub_pipeline.add_act(
act_name=_("权限恢复 {}".format(storage.ip_port)),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
priv_sub_pipeline_list.append(
priv_sub_pipeline.build_sub_process(sub_name=_(_("{}权限恢复").format(storage.ip_port)))
)

if len(priv_sub_pipeline_list) > 0:
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=priv_sub_pipeline_list)
return sub_pipeline.build_sub_process(sub_name=_("{}恢复权限".format(cluster_model.id)))
else:
return None
# 如果流程未空,如何?
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import (
mysql_restore_master_slave_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import priv_recover_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow
from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException
from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import remote_instance_migrate_sub_flow
Expand Down Expand Up @@ -278,6 +279,14 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)
priv_sub_flow = priv_recover_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster_info=cluster,
ips=[self.data["new_master_ip"], self.data["new_slave_ip"]],
)
if priv_sub_flow:
sync_data_sub_pipeline.add_sub_pipeline(sub_flow=priv_sub_flow)

sync_data_sub_pipeline.add_act(
act_name=_("数据恢复完毕,写入新主节点和旧主节点的关系链元数据"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
)
from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import (
priv_recover_sub_flow,
slave_recover_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.slave_recover_switch import slave_migrate_switch_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow
from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent
Expand Down Expand Up @@ -237,6 +240,15 @@ def tendb_ha_restore_slave_flow(self):
)
)

priv_sub_flow = priv_recover_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster_info=cluster,
ips=[self.data["new_slave_ip"]],
)
if priv_sub_flow:
sync_data_sub_pipeline.add_sub_pipeline(sub_flow=priv_sub_flow)

sync_data_sub_pipeline.add_act(
act_name=_("同步完毕,写入主从关系,设置节点为running状态"),
act_component_code=MySQLDBMetaComponent.code,
Expand Down Expand Up @@ -544,6 +556,14 @@ def restore_local_slave_flow(self):
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)
priv_sub_flow = priv_recover_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster_info=cluster,
ips=[target_slave.machine.ip],
)
if priv_sub_flow:
tendb_migrate_pipeline.add_sub_pipeline(sub_flow=priv_sub_flow)

# 卸载流程人工确认
tendb_migrate_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})
Expand Down
22 changes: 22 additions & 0 deletions dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,28 @@ def tendb_restore_remotedb_payload(self, **kwargs):
}
return payload

def tendb_restore_priv_payload(self, **kwargs):
"""
tendb 恢复权限
"""
payload = {
"db_type": DBActuatorTypeEnum.MySQL.value,
"action": DBActuatorActionEnum.FastExecuteSqlFile.value,
"payload": {
"general": {"runtime_account": self.account},
"extend": {
"socket": "",
"host": kwargs["ip"],
"port": self.cluster["port"],
"database": "information_schema",
"force": self.cluster["force"],
"file_dir": self.cluster["file_target_path"],
"sql_files": self.cluster["sql_files"],
},
},
}
return payload

def tendb_grant_remotedb_repl_user(self, **kwargs) -> dict:
"""
拼接创建repl账号的payload参数(在master节点执行)
Expand Down

0 comments on commit d6b265d

Please sign in to comment.