Skip to content

Commit

Permalink
feat(backend): 聚合数据修复单据 #7846
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud committed Nov 12, 2024
1 parent 4604fcb commit e94bbe1
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 50 deletions.
5 changes: 2 additions & 3 deletions dbm-ui/backend/core/storages/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import zipfile
from typing import Any, Dict, List

import chardet
from bkstorages.exceptions import RequestError as BKStorageError
from django.http import StreamingHttpResponse
from rest_framework.status import HTTP_200_OK
Expand Down Expand Up @@ -53,8 +52,8 @@ def batch_fetch_file_content(self, file_path_list: List[str]) -> List[Dict[str,
# 如果文件只有一个,则返回的是文件本身
if len(file_path_list) == 1:
file_path = file_path_list[0]
encoding = chardet.detect(zip_content[:100])["encoding"]
zip_content = zip_content.decode(encoding, errors="replace")
# encoding = chardet.detect(zip_content[:100])["encoding"]
zip_content = zip_content.decode("utf-8", errors="replace")
return [{"path": file_path, "content": zip_content, "url": self.storage.url(file_path)}]

# 如果是有多个文件,则制品库会打包文件,返回一个zip流
Expand Down
3 changes: 2 additions & 1 deletion dbm-ui/backend/db_periodic_task/local_tasks/ticket.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def auto_retry_exclusive_inner_flow():
TicketTask.retry_exclusive_inner_flow()


@register_periodic_task(run_every=crontab(minute=0, hour=6))
# 数据修复跳过周一,因为周一取得checksum是上周五记录,会造成修复失效
@register_periodic_task(run_every=crontab(minute=0, hour=6, day_of_week="0,2,3,4,5,6"))
def auto_create_data_repair_ticket():
TicketTask.auto_create_data_repair_ticket()

Expand Down
5 changes: 2 additions & 3 deletions dbm-ui/backend/db_services/mysql/sql_import/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import time
from typing import Any, Dict, List, Optional, Union

import chardet
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.utils.translation import ugettext as _

Expand Down Expand Up @@ -96,8 +95,8 @@ def upload_sql_file(
else:
content_bytes = file.read()
# chardet.detect预测性非100%,这里非强制UnicodeDecodeError,选择replace模式忽略
encoding = chardet.detect(content_bytes[:100])["encoding"]
sql_content = content_bytes.decode(encoding=encoding, errors="replace")
# encoding = chardet.detect(content_bytes[:100])["encoding"]
sql_content = content_bytes.decode("utf-8", errors="replace")

sql_file_info.update(sql_path=sql_path, sql_content=sql_content, raw_file_name=file.name)

Expand Down
10 changes: 5 additions & 5 deletions dbm-ui/backend/ticket/builders/common/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,16 @@ def validate_instance_related_clusters(
return inst["bk_host_id"] in intersected_host_ids

@classmethod
def validate_duplicate_cluster_name(cls, bk_biz_id, ticket_type, cluster_name):
def validate_duplicate_cluster_name(cls, bk_biz_id, ticket_type, cluster_name, db_module_id=0):
"""校验是否存在重复集群名"""

from backend.ticket.builders import BuilderFactory

cluster_type = BuilderFactory.ticket_type__cluster_type.get(ticket_type, ticket_type)
if Cluster.objects.filter(bk_biz_id=bk_biz_id, cluster_type=cluster_type, name=cluster_name).exists():
raise serializers.ValidationError(
_("业务{}下已经存在同类型: {}, 同名: {} 集群,请重新命名").format(bk_biz_id, cluster_type, cluster_name)
)
if Cluster.objects.filter(
bk_biz_id=bk_biz_id, cluster_type=cluster_type, name=cluster_name, db_module_id=db_module_id
).exists():
raise serializers.ValidationError(_("该业务下已经存在同类型同名集群,请重新命名").format(bk_biz_id))

@classmethod
def _validate_domain_valid(cls, domain):
Expand Down
2 changes: 1 addition & 1 deletion dbm-ui/backend/ticket/builders/mysql/mysql_single_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def validate(self, attrs):
# 校验集群名是否重复
for domain in attrs["domains"]:
CommonValidate.validate_duplicate_cluster_name(
self.context["bk_biz_id"], self.context["ticket_type"], domain["key"]
self.context["bk_biz_id"], self.context["ticket_type"], domain["key"], attrs["db_module_id"]
)
# 校验域名是否重复
# TODO 校验存量的域名是否存在重复
Expand Down
2 changes: 1 addition & 1 deletion dbm-ui/backend/ticket/builders/riak/riak_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_city_name(self, obj):
def validate(self, attrs):
# 校验集群名是否重复
CommonValidate.validate_duplicate_cluster_name(
self.context["bk_biz_id"], self.context["ticket_type"], attrs["cluster_name"]
self.context["bk_biz_id"], self.context["ticket_type"], attrs["cluster_name"], attrs["db_module_id"]
)
return attrs

Expand Down
2 changes: 1 addition & 1 deletion dbm-ui/backend/ticket/builders/riak/riak_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_city_name(self, obj):
def validate(self, attrs):
# 校验集群名是否重复
CommonValidate.validate_duplicate_cluster_name(
self.context["bk_biz_id"], self.context["ticket_type"], attrs["cluster_name"]
self.context["bk_biz_id"], self.context["ticket_type"], attrs["cluster_name"], attrs["db_module_id"]
)
return attrs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def validate(self, attrs):
# 校验集群名是否重复
for domain in attrs["domains"]:
CommonValidate.validate_duplicate_cluster_name(
self.context["bk_biz_id"], self.context["ticket_type"], domain["key"]
self.context["bk_biz_id"], self.context["ticket_type"], domain["key"], attrs["db_module_id"]
)

# 校验域名是否重复 TODO: 校验存量的域名是否存在重复
Expand Down
2 changes: 1 addition & 1 deletion dbm-ui/backend/ticket/builders/tendbcluster/tendb_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def validate(self, attrs):
# TODO: spider集群部署校验
# 校验集群名是否重复
CommonValidate.validate_duplicate_cluster_name(
self.context["bk_biz_id"], self.context["ticket_type"], attrs["cluster_name"]
self.context["bk_biz_id"], self.context["ticket_type"], attrs["cluster_name"], attrs["db_module_id"]
)
# 校验分片数合法
TendbBaseOperateDetailSerializer.validate_cluster_shard_num(attrs)
Expand Down
78 changes: 45 additions & 33 deletions dbm-ui/backend/ticket/tasks/ticket_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from backend import env
from backend.components import BKLogApi, ItsmApi
from backend.components.cmsi.handler import CmsiHandler
from backend.configuration.constants import PLAT_BIZ_ID
from backend.configuration.constants import PLAT_BIZ_ID, DBType
from backend.constants import DEFAULT_SYSTEM_USER
from backend.db_meta.enums import ClusterType, InstanceInnerRole
from backend.db_meta.models import AppCache, Cluster, StorageInstance
Expand Down Expand Up @@ -114,15 +114,19 @@ def auto_create_data_repair_ticket(cls):
checksum_log = json.loads(hit["_source"]["log"])
cluster__checksum_logs_map[checksum_log["cluster_id"]].append(checksum_log)

cluster_map = {c.id: c for c in Cluster.objects.filter(id__in=list(cluster__checksum_logs_map.keys()))}
biz__db_type__repair_infos: Dict[int, Dict[DBType, List]] = defaultdict(lambda: defaultdict(list))

# 为每个待修复的集群生成修复单据
for cluster_id, checksum_logs in cluster__checksum_logs_map.items():
try:
cluster = Cluster.objects.get(id=cluster_id)
except Cluster.DoesNotExist:
# 忽略不在dbm meta信息中的集群
# 忽略不在dbm meta信息中的集群
if cluster_id not in cluster_map:
logger.error(_("无法在dbm meta中查询到集群{}的相关信息,请排查该集群的状态".format(cluster_id)))
continue

cluster = cluster_map[cluster_id]

# 根据logs获取ip:port和实例的映射
inst_filter_list = [
(
Q(
Expand Down Expand Up @@ -197,34 +201,42 @@ def auto_create_data_repair_ticket(cls):
logger.info(_("集群{}数据校验正确,不需要进行数据修复".format(cluster_id)))
continue

# 构造修复单据
ticket_details = {
# "非innodb表是否修复"这个参数与校验保持一致,默认为false
"is_sync_non_innodb": False,
"is_ticket_consistent": False,
"checksum_table": MYSQL_CHECKSUM_TABLE,
"trigger_type": MySQLDataRepairTriggerMode.ROUTINE.value,
"start_time": date2str(start_time),
"end_time": date2str(end_time),
"infos": [
{
"cluster_id": cluster_id,
"master": data_info["master"],
"slaves": data_info["slaves"],
}
for data_info in data_repair_infos
],
}
ticket_type = TicketType.MYSQL_DATA_REPAIR
if cluster.cluster_type == ClusterType.TenDBCluster:
ticket_type = TicketType.TENDBCLUSTER_DATA_REPAIR
cls._create_ticket(
ticket_type=ticket_type,
creator=DEFAULT_SYSTEM_USER,
bk_biz_id=cluster.bk_biz_id,
remark=_("集群{}存在数据不一致,自动创建的数据修复单据").format(cluster.name),
details=ticket_details,
)
# 获取修复单据详情信息
ticket_infos = [
{"cluster_id": cluster_id, "master": data_info["master"], "slaves": data_info["slaves"]}
for data_info in data_repair_infos
]
db_type = ClusterType.cluster_type_to_db_type(cluster.cluster_type)
biz__db_type__repair_infos[cluster.bk_biz_id][db_type].extend(ticket_infos)

# 构造修复单据
for biz, db_type__repair_infos in biz__db_type__repair_infos.items():
for db_type, repair_infos in db_type__repair_infos.items():
ticket_details = {
# "非innodb表是否修复"这个参数与校验保持一致,默认为false
"is_sync_non_innodb": False,
"is_ticket_consistent": False,
"checksum_table": MYSQL_CHECKSUM_TABLE,
"trigger_type": MySQLDataRepairTriggerMode.ROUTINE.value,
"start_time": date2str(start_time),
"end_time": date2str(end_time),
"infos": [
{
"cluster_id": data_info["cluster_id"],
"master": data_info["master"],
"slaves": data_info["slaves"],
}
for data_info in repair_infos
],
}
ticket_type = getattr(TicketType, f"{db_type.upper()}_DATA_REPAIR")
cls._create_ticket(
ticket_type=ticket_type,
creator=DEFAULT_SYSTEM_USER,
bk_biz_id=biz,
remark=_("集群存在数据不一致,自动创建的数据修复单据"),
details=ticket_details,
)

@classmethod
def auto_clear_expire_flow(cls):
Expand Down

0 comments on commit e94bbe1

Please sign in to comment.