Skip to content

Commit

Permalink
fix(mysql): mysql备份单据调整 #7929
Browse files Browse the repository at this point in the history
  • Loading branch information
xfwduke committed Nov 18, 2024
1 parent 6b6e150 commit e883254
Show file tree
Hide file tree
Showing 22 changed files with 805 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,15 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import collections
import logging
import uuid
from dataclasses import asdict
from typing import Dict, Optional

from django.core.exceptions import ObjectDoesNotExist
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.db_meta.enums import ClusterType, InstanceInnerRole
from backend.db_meta.exceptions import ClusterNotExistException, DBMetaBaseException
from backend.db_meta.enums import ClusterType, InstanceInnerRole, InstanceStatus
from backend.db_meta.models import Cluster
from backend.flow.consts import DBA_SYSTEM_USER
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
Expand All @@ -42,7 +39,7 @@
logger = logging.getLogger("flow")


class MySQLHADBTableBackupFlow(object):
class MySQLDBTableBackupFlow(object):
"""
支持跨云操作
"""
Expand All @@ -58,7 +55,7 @@ def backup_flow(self):
"uid": "2022051612120001",
"created_by": "xxx",
"bk_biz_id": "152",
"ticket_type": "MYSQL_HA_DB_TABLE_BACKUP",
"ticket_type": "MYSQL_DB_TABLE_BACKUP",
"infos": [
{
"cluster_id": int,
Expand All @@ -74,30 +71,19 @@ def backup_flow(self):
增加单据临时ADMIN账号的添加和删除逻辑
"""
cluster_ids = [job["cluster_id"] for job in self.data["infos"]]
dup_cluster_ids = [item for item, count in collections.Counter(cluster_ids).items() if count > 1]
if dup_cluster_ids:
raise DBMetaBaseException(message="duplicate clusters found: {}".format(dup_cluster_ids))

backup_pipeline = Builder(
root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids))
)
sub_pipes = []
for job in self.data["infos"]:
try:
cluster_obj = Cluster.objects.get(
pk=job["cluster_id"], bk_biz_id=self.data["bk_biz_id"], cluster_type=ClusterType.TenDBHA.value
)
except ObjectDoesNotExist:
raise ClusterNotExistException(
cluster_type=ClusterType.TenDBHA.value, cluster_id=job["cluster_id"], immute_domain=""
)
cluster_obj = Cluster.objects.get(pk=job["cluster_id"], bk_biz_id=self.data["bk_biz_id"])

try:
if cluster_obj.cluster_type == ClusterType.TenDBHA:
instance_obj = cluster_obj.storageinstance_set.get(
instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True
instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True, status=InstanceStatus.RUNNING
)
except ObjectDoesNotExist:
raise DBMetaBaseException(message=_("{} standby slave 不存在".format(cluster_obj.immute_domain)))
else:
instance_obj = cluster_obj.storageinstance_set.filter(status=InstanceStatus.RUNNING).first()

sub_pipe = SubBuilder(
root_id=self.root_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
from dataclasses import asdict
from typing import Dict, Optional

from django.core.exceptions import ObjectDoesNotExist
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.db_meta.enums import ClusterType, InstanceInnerRole
from backend.db_meta.exceptions import ClusterNotExistException, DBMetaBaseException
from backend.db_meta.enums import ClusterType
from backend.db_meta.models import Cluster
from backend.flow.consts import DBA_SYSTEM_USER, LONG_JOB_TIMEOUT
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
Expand All @@ -36,7 +34,7 @@
logger = logging.getLogger("flow")


class MySQLHAFullBackupFlow(object):
class MySQLFullBackupFlow(object):
"""
mysql 库表备份流程
支持跨云管理
Expand All @@ -53,47 +51,37 @@ def full_backup_flow(self):
"uid": "398346234",
"created_type": "xxx",
"bk_biz_id": "152",
"ticket_type": "MYSQL_HA_FULL_BACKUP",
"infos": {
"backup_type": enum of backend.flow.consts.MySQLBackupTypeEnum
"file_tag": enum of backend.flow.consts.MySQLBackupFileTagEnum
"clusters": [{"cluster_id":"", "backup_local": enum []}]
}
"ticket_type": "MYSQL_HA_FULL_BACKUP|MYSQL_SINGLE_FULL_BACKUP",
"backup_type": enum of backend.flow.consts.MySQLBackupTypeEnum
"file_tag": enum of backend.flow.consts.MySQLBackupFileTagEnum
"infos": [
{
"cluster_id": int,
"backup_local": enum
}
]
}
增加单据临时ADMIN账号的添加和删除逻辑
"""
clusters = self.data["infos"]["clusters"]
cluster_ids = [cluster["cluster_id"] for cluster in clusters]
cluster_ids = [job["cluster_id"] for job in self.data["infos"]]

backup_pipeline = Builder(
root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids))
)

sub_pipes = []
for cluster in clusters:
cluster_id = cluster["cluster_id"]
backup_local = cluster["backup_local"]

try:
cluster_obj = Cluster.objects.get(
pk=cluster_id, bk_biz_id=self.data["bk_biz_id"], cluster_type=ClusterType.TenDBHA.value
)
except ObjectDoesNotExist:
raise ClusterNotExistException(
cluster_type=ClusterType.TenDBHA.value, cluster_id=cluster_id, immute_domain=""
)

if backup_local == InstanceInnerRole.MASTER.value:
backend_obj = cluster_obj.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
elif backup_local == InstanceInnerRole.SLAVE.value:
try:
backend_obj = cluster_obj.storageinstance_set.get(
instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True
)
except ObjectDoesNotExist:
raise DBMetaBaseException(message=_("{} standby slave 不存在".format(cluster_obj.immute_domain)))
for job in self.data["infos"]:
cluster_id = job["cluster_id"]
backup_local = job["backup_local"]

cluster_obj = Cluster.objects.get(pk=cluster_id, bk_biz_id=self.data["bk_biz_id"])

if cluster_obj.cluster_type == ClusterType.TenDBSingle:
backend_obj = cluster_obj.storageinstance_set.first()
elif cluster_obj.cluster_type == ClusterType.TenDBHA:
backend_obj = cluster_obj.storageinstance_set.get(instance_inner_role=backup_local, is_stand_by=True)
else:
raise MySQLBackupLocalException(msg=_("不支持的备份位置 {}".format(backup_local)))
raise MySQLBackupLocalException(_("不支持的集群类型 {}".format(cluster_obj.cluster_type)))

sub_pipe = SubBuilder(
root_id=self.root_id,
Expand All @@ -104,8 +92,8 @@ def full_backup_flow(self):
"ticket_type": self.data["ticket_type"],
"ip": backend_obj.machine.ip,
"port": backend_obj.port,
"file_tag": self.data["infos"]["file_tag"],
"backup_type": self.data["infos"]["backup_type"],
"file_tag": self.data["file_tag"],
"backup_type": self.data["backup_type"],
"backup_id": uuid.uuid1(),
"backup_gsd": ["all"],
"role": backend_obj.instance_role,
Expand Down Expand Up @@ -136,7 +124,6 @@ def full_backup_flow(self):
get_mysql_payload_func=MysqlActPayload.mysql_backup_demand_payload.__name__,
)
),
# write_payload_var="backup_report_response",
)

sub_pipe.add_act(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@
import copy
import logging
import uuid
from collections import Counter, defaultdict
from collections import defaultdict
from dataclasses import asdict
from typing import Dict, List, Optional

from django.core.exceptions import ObjectDoesNotExist
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, InstanceInnerRole, TenDBClusterSpiderRole
from backend.db_meta.exceptions import ClusterNotExistException, DBMetaBaseException
from backend.db_meta.models import Cluster, StorageInstanceTuple
from backend.flow.consts import DBA_SYSTEM_USER
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder, SubProcess
Expand Down Expand Up @@ -68,31 +66,21 @@ def backup_flow(self):
"table_patterns": ["tb_role%", "tb_mail%", "*"],
"ignore_tables": ["tb_role1", "tb_mail10"],
},
...
...
]
}
增加单据临时ADMIN账号的添加和删除逻辑
"""
cluster_ids = [job["cluster_id"] for job in self.data["infos"]]
dup_cluster_ids = [item for item, count in Counter(cluster_ids).items() if count > 1]
if dup_cluster_ids:
raise DBMetaBaseException(message="duplicate clusters found: {}".format(dup_cluster_ids))

backup_pipeline = Builder(
root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids))
)

cluster_pipes = []
for job in self.data["infos"]:
try:
cluster_obj = Cluster.objects.get(
pk=job["cluster_id"], bk_biz_id=self.data["bk_biz_id"], cluster_type=ClusterType.TenDBCluster.value
)
except ObjectDoesNotExist:
raise ClusterNotExistException(
cluster_type=ClusterType.TenDBCluster.value, cluster_id=job["cluster_id"], immute_domain=""
)
cluster_obj = Cluster.objects.get(
pk=job["cluster_id"], bk_biz_id=self.data["bk_biz_id"], cluster_type=ClusterType.TenDBCluster.value
)

backup_id = uuid.uuid1()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
from dataclasses import asdict
from typing import Dict, List, Optional

from django.core.exceptions import ObjectDoesNotExist
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, InstanceInnerRole, TenDBClusterSpiderRole
from backend.db_meta.exceptions import ClusterNotExistException
from backend.db_meta.enums import InstanceInnerRole, TenDBClusterSpiderRole
from backend.db_meta.models import Cluster, StorageInstanceTuple
from backend.flow.consts import DBA_SYSTEM_USER, LONG_JOB_TIMEOUT
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder, SubProcess
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.exceptions import IncompatibleBackupTypeAndLocal, MySQLBackupLocalException
from backend.flow.engine.exceptions import MySQLBackupLocalException
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_link_backup_id_bill_id import (
MySQLLinkBackupIdBillIdComponent,
Expand All @@ -51,49 +49,51 @@ def full_backup_flow(self):
"created_type": "xxx",
"bk_biz_id": "152",
"ticket_type": "TENDBCLUSTER_FULL_BACKUP",
"infos": {
"backup_type": enum of backend.flow.consts.MySQLBackupTypeEnum,
"file_tag": enum of backend.flow.consts.MySQLBackupFileTagEnum,
“clusters": [
"backup_type": enum of backend.flow.consts.MySQLBackupTypeEnum,
"file_tag": enum of backend.flow.consts.MySQLBackupFileTagEnum,
"infos": [
{
"cluster_id": int,
"backup_local": enum [backend.db_meta.enum.InstanceInnerRole, SPIDER_MNT],
"spider_mnt_address": "x.x.x.x:y" # 如果 backup_local 是 spider_mnt
},
...
],
}
]
}
增加单据临时ADMIN账号的添加和删除逻辑
"""

clusters = self.data["infos"]["clusters"]
cluster_ids = [i["cluster_id"] for i in self.data["infos"]["clusters"]]
cluster_ids = [job["cluster_id"] for job in self.data["infos"]]

backup_pipeline = Builder(
root_id=self.root_id, data=self.data, need_random_pass_cluster_ids=list(set(cluster_ids))
)

cluster_pipes = []
for cluster in clusters:
if (
self.data["infos"]["backup_type"] == "physical"
and cluster["backup_local"] == TenDBClusterSpiderRole.SPIDER_MNT.value
):
IncompatibleBackupTypeAndLocal(
backup_type=self.data["infos"]["backup_type"], backup_local=cluster["backup_local"]
)

try:
cluster_obj = Cluster.objects.get(
pk=cluster["cluster_id"],
bk_biz_id=self.data["bk_biz_id"],
cluster_type=ClusterType.TenDBCluster.value,
)
except ObjectDoesNotExist:
raise ClusterNotExistException(
cluster_type=ClusterType.TenDBCluster.value, cluster_id=cluster["cluster_id"], immute_domain=""
)
# for cluster in clusters:
# if (
# self.data["infos"]["backup_type"] == "physical"
# and cluster["backup_local"] == TenDBClusterSpiderRole.SPIDER_MNT.value
# ):
# IncompatibleBackupTypeAndLocal(
# backup_type=self.data["infos"]["backup_type"], backup_local=cluster["backup_local"]
# )
for job in self.data["infos"]:
cluster_id = job["cluster_id"]
backup_local = job["backup_local"]

cluster_obj = Cluster.objects.get(pk=cluster_id, bk_biz_id=self.data["bk_biz_id"])

# try:
# cluster_obj = Cluster.objects.get(
# pk=cluster["cluster_id"],
# bk_biz_id=self.data["bk_biz_id"],
# cluster_type=ClusterType.TenDBCluster.value,
# )
# except ObjectDoesNotExist:
# raise ClusterNotExistException(
# cluster_type=ClusterType.TenDBCluster.value, cluster_id=cluster["cluster_id"], immute_domain=""
# )

backup_id = uuid.uuid1()
cluster_pipe = SubBuilder(
Expand All @@ -113,22 +113,22 @@ def full_backup_flow(self):
sub_flow=self.backup_on_spider_ctl(backup_id=backup_id, cluster_obj=cluster_obj)
)

if cluster["backup_local"] == InstanceInnerRole.SLAVE.value: # "remote":
if backup_local == InstanceInnerRole.SLAVE.value: # "remote":
cluster_pipe.add_parallel_sub_pipeline(
sub_flow_list=self.backup_on_remote_slave(backup_id=backup_id, cluster_obj=cluster_obj)
)
elif cluster["backup_local"] == InstanceInnerRole.MASTER.value:
elif backup_local == InstanceInnerRole.MASTER.value:
cluster_pipe.add_parallel_sub_pipeline(
sub_flow_list=self.backup_on_remote_master(backup_id=backup_id, cluster_obj=cluster_obj)
)
elif cluster["backup_local"] == TenDBClusterSpiderRole.SPIDER_MNT.value:
elif backup_local == TenDBClusterSpiderRole.SPIDER_MNT.value:
cluster_pipe.add_sub_pipeline(
sub_flow=self.backup_on_spider_mnt(
backup_id=backup_id, cluster_obj=cluster_obj, spider_mnt_address=cluster["spider_mnt_address"]
backup_id=backup_id, cluster_obj=cluster_obj, spider_mnt_address=job["spider_mnt_address"]
)
)
else:
raise MySQLBackupLocalException(msg=_("不支持的备份位置 {}".format(cluster["backup_local"])))
raise MySQLBackupLocalException(msg=_("不支持的备份位置 {}".format(backup_local)))

cluster_pipe.add_act(
act_name=_("关联备份id"),
Expand Down
Loading

0 comments on commit e883254

Please sign in to comment.