Skip to content

Commit

Permalink
feat(backend): 单据状态细化 #6755
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud committed Nov 1, 2024
1 parent 6def9e2 commit 3d1ab12
Show file tree
Hide file tree
Showing 24 changed files with 712 additions and 403 deletions.
4 changes: 2 additions & 2 deletions dbm-ui/backend/db_services/bigdata/resources/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from backend.db_proxy.models import ClusterExtension
from backend.db_services.dbbase.resources import query
from backend.db_services.ipchooser.query.resource import ResourceQueryHelper
from backend.ticket.constants import TicketFlowStatus
from backend.ticket.constants import TICKET_RUNNING_STATUS
from backend.ticket.models import InstanceOperateRecord
from backend.utils.time import datetime2str

Expand Down Expand Up @@ -65,7 +65,7 @@ def _filter_instance_hook(cls, bk_biz_id, query_params, instances, **kwargs):

# 获取实例的操作与实例记录
records = InstanceOperateRecord.objects.filter(
instance_id__in=instance_ids, ticket__status=TicketFlowStatus.RUNNING
instance_id__in=instance_ids, ticket__status__in=TICKET_RUNNING_STATUS
)
instance_operate_records_map: Dict[int, List] = defaultdict(list)
for record in records:
Expand Down
4 changes: 2 additions & 2 deletions dbm-ui/backend/db_services/mysql/dumper/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from backend.db_meta.enums import InstanceInnerRole
from backend.db_meta.models import Cluster
from backend.db_services.mysql.dumper.models import DumperSubscribeConfig
from backend.ticket.constants import FlowType, TicketFlowStatus, TicketStatus, TicketType
from backend.ticket.constants import TICKET_RUNNING_STATUS, FlowType, TicketFlowStatus, TicketStatus, TicketType
from backend.ticket.models import Flow, Ticket


Expand Down Expand Up @@ -66,7 +66,7 @@ def patch_dumper_list_info(cls, dumper_results: List[Dict], bk_biz_id: int = 0,
dumper_ticket_types.remove(TicketType.TBINLOGDUMPER_INSTALL)
dumper_ticket_types.extend([TicketType.MYSQL_MASTER_SLAVE_SWITCH, TicketType.MYSQL_MASTER_FAIL_OVER])
active_tickets = Ticket.objects.filter(
bk_biz_id=bk_biz_id, status=TicketStatus.RUNNING, ticket_type__in=dumper_ticket_types
bk_biz_id=bk_biz_id, status__in=TICKET_RUNNING_STATUS, ticket_type__in=dumper_ticket_types
)
# 获取每个dumper单据状态与id的映射
dumper_inst_id__ticket: Dict[int, str] = {}
Expand Down
6 changes: 3 additions & 3 deletions dbm-ui/backend/tests/ticket/test_doris_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
SCALEUP_POOL_TICKET_DATA,
)
from backend.tests.ticket.server_base import TestFlowBase
from backend.ticket.constants import TicketFlowStatus, TicketStatus
from backend.ticket.constants import TicketFlowStatus

logger = logging.getLogger("test")
pytestmark = pytest.mark.django_db
client = APIClient()

INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED]
CHANGED_MOCK_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED, TicketFlowStatus.RUNNING]
INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketFlowStatus.SUCCEEDED]
CHANGED_MOCK_STATUS = [TicketFlowStatus.SKIPPED, TicketFlowStatus.SUCCEEDED, TicketFlowStatus.RUNNING]


@pytest.fixture(autouse=True) # autouse=True 会自动应用这个fixture到所有的测试中
Expand Down
2 changes: 1 addition & 1 deletion dbm-ui/backend/tests/ticket/test_mongodb_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
pytestmark = pytest.mark.django_db
client = APIClient()

INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED]
INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketFlowStatus.SUCCEEDED]
CHANGED_MOCK_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED, TicketFlowStatus.RUNNING]


Expand Down
225 changes: 160 additions & 65 deletions dbm-ui/backend/ticket/constants.py

Large diffs are not rendered by default.

42 changes: 41 additions & 1 deletion dbm-ui/backend/ticket/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
from django_filters import rest_framework as filters

from backend.db_meta.models import Cluster
from backend.ticket.models import ClusterOperateRecord, Ticket
from backend.ticket.constants import TODO_RUNNING_STATUS
from backend.ticket.models import ClusterOperateRecord, InstanceOperateRecord, Ticket


class TicketListFilter(filters.FilterSet):
remark = filters.CharFilter(field_name="remark", lookup_expr="icontains", label=_("备注"))
cluster = filters.CharFilter(field_name="cluster", method="filter_cluster", label=_("集群域名"))
ids = filters.CharFilter(field_name="ids", method="filter_ids", label=_("单据ID列表"))
todo = filters.CharFilter(field_name="todo", method="filter_todo", label=_("代办状态"))

class Meta:
model = Ticket
Expand All @@ -40,3 +42,41 @@ def filter_cluster(self, queryset, name, value):
def filter_ids(self, queryset, name, value):
ids = list(map(int, value.split(",")))
return queryset.filter(id__in=ids)

def filter_todo(self, queryset, name, value):
user = self.request.user.username
if value == "running":
return queryset.filter(
todo_of_ticket__operators__contains=user, todo_of_ticket__status__in=TODO_RUNNING_STATUS
)
else:
return queryset.filter(todo_of_ticket__done_by=user)


class OpRecordListFilter(filters.FilterSet):
start_time = filters.DateTimeFilter(field_name="create_at", lookup_expr="gte", label=_("开始时间"))
end_time = filters.DateTimeFilter(field_name="create_at", lookup_expr="lte", label=_("开始时间"))
op_type = filters.CharFilter(field_name="op_type", method="filter_op_type", label=_("操作类型"))
op_status = filters.CharFilter(field_name="op_status", method="filter_op_status", label=_("操作状态"))

def filter_op_type(self, queryset, name, value):
return queryset.filter(ticket__ticket_type=value)

def filter_op_status(self, queryset, name, value):
return queryset.filter(ticket__status=value)


class ClusterOpRecordListFilter(OpRecordListFilter):
cluster_id = filters.NumberFilter(field_name="cluster_id", lookup_expr="exact", label=_("集群ID"))

class Meta:
model = ClusterOperateRecord
fields = ["start_time", "end_time", "op_type", "op_status"]


class InstanceOpRecordListFilter(OpRecordListFilter):
instance_id = filters.NumberFilter(field_name="instance_id", lookup_expr="exact", label=_("实例ID"))

class Meta:
model = InstanceOperateRecord
fields = ["start_time", "end_time", "op_type", "op_status"]
20 changes: 15 additions & 5 deletions dbm-ui/backend/ticket/flow_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
FLOW_FINISHED_STATUS,
FLOW_NOT_EXECUTE_STATUS,
FLOW_TYPE__EXPIRE_TYPE_CONFIG,
TICKET_EXPIRE_DEFAULT_CONFIG,
FlowContext,
FlowErrCode,
FlowTypeConfig,
TicketFlowStatus,
)
from backend.ticket.models import ClusterOperateRecord, Flow, InstanceOperateRecord, TicketFlowsConfig
from backend.ticket.models import ClusterOperateRecord, Flow, InstanceOperateRecord, TicketFlowsConfig, Todo

logger = logging.getLogger("root")

Expand Down Expand Up @@ -70,7 +71,7 @@ def status(self) -> str:

if not self.flow_obj.flow_obj_id:
# 任务流程未创建时未PENDING状态
return constants.TicketStatus.PENDING
return constants.TicketFlowStatus.PENDING

# 其他情况暂时认为在PENDING状态
return TicketFlowStatus.PENDING
Expand Down Expand Up @@ -144,10 +145,11 @@ def flush_error_status_handler(self):
def flush_revoke_status_handler(self, operator):
"""终止节点,更新相关状态和错误信息"""
self.flow_obj.status = TicketFlowStatus.TERMINATED
self.flow_obj.err_code = FlowErrCode.GENERAL_ERROR
if operator == DEFAULT_SYSTEM_USER:
self.flow_obj.err_code = FlowErrCode.SYSTEM_TERMINATED_ERROR
self.flow_obj.context = {FlowContext.EXPIRE_TIME: self.get_current_config_expire_time()}
else:
self.flow_obj.err_code = FlowErrCode.GENERAL_ERROR
self.flow_obj.save(update_fields=["status", "err_code", "context", "update_at"])
# 更新操作者
self.ticket.updater = operator
Expand All @@ -157,8 +159,9 @@ def get_current_config_expire_time(self):
"""获取当前配置的flow过期时间"""
if self.flow_obj.flow_type not in FLOW_TYPE__EXPIRE_TYPE_CONFIG:
return -1
config = TicketFlowsConfig.get_config(ticket_type=self.ticket.ticket_type)
expire_time = config[FlowTypeConfig.EXPIRE_CONFIG][FLOW_TYPE__EXPIRE_TYPE_CONFIG[self.flow_obj.flow_type]]
config = TicketFlowsConfig.get_config(ticket_type=self.ticket.ticket_type).configs
expire_config = config.get(FlowTypeConfig.EXPIRE_CONFIG, TICKET_EXPIRE_DEFAULT_CONFIG)
expire_time = expire_config[FLOW_TYPE__EXPIRE_TYPE_CONFIG[self.flow_obj.flow_type]]
return expire_time

def create_operate_records(self, object_key, record_model, object_ids):
Expand Down Expand Up @@ -259,4 +262,11 @@ def _retry(self) -> Any:
self.run()

def _revoke(self, operator) -> Any:
# 停止相关联的todo
from backend.ticket.todos import ActionType, TodoActorFactory

todos = Todo.objects.filter(ticket=self.ticket, flow=self.flow_obj)
for todo in todos:
TodoActorFactory.actor(todo).process(operator, ActionType.TERMINATE, params={})
# 刷新flow和单据状态 --> 终止
self.flush_revoke_status_handler(operator)
9 changes: 5 additions & 4 deletions dbm-ui/backend/ticket/flow_manager/delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ def _end_time(self) -> Optional[str]:
return datetime2str(self.pre_flow_tree.updated_at)

@property
def _summary(self) -> str:
def _summary(self) -> dict:
if self.pre_flow_tree.status in FAILED_STATES:
return _("失败后继续提交")
return {"status": TicketFlowStatus.FAILED, "message": _("失败后继续提交")}
elif self.pre_flow_tree.status in SUCCEED_STATES:
return _("执行成功")
return {"status": TicketFlowStatus.SUCCEEDED, "message": _("执行成功")}
else:
return _("执行{}".format(StateType.get_choice_label(self.pre_flow_tree.status)))
state = StateType.get_choice_label(self.pre_flow_tree.status)
return {"status": TicketFlowStatus.RUNNING, "message": _("执行{}".format(state))}

@property
def _url(self) -> str:
Expand Down
61 changes: 46 additions & 15 deletions dbm-ui/backend/ticket/flow_manager/inner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@
from backend.flow.models import FlowTree
from backend.ticket import constants
from backend.ticket.builders.common.base import fetch_cluster_ids
from backend.ticket.constants import BAMBOO_STATE__TICKET_STATE_MAP, FlowCallbackType, TicketType
from backend.ticket.constants import (
BAMBOO_STATE__TICKET_STATE_MAP,
FlowCallbackType,
TicketFlowStatus,
TicketType,
TodoStatus,
TodoType,
)
from backend.ticket.flow_manager.base import BaseTicketFlow
from backend.ticket.models import Flow
from backend.ticket.models import Flow, Todo
from backend.ticket.todos import BaseTodoContext
from backend.utils.basic import generate_root_id
from backend.utils.time import datetime2str

Expand Down Expand Up @@ -82,22 +90,39 @@ def _end_time(self) -> Union[str, datetime]:
@property
def _summary(self) -> str:
# TODO 可以给出具体失败的节点和原因
return _("任务{status_display}").format(status_display=constants.TicketStatus.get_choice_label(self.status))
return _("任务{status_display}").format(status_display=constants.TicketFlowStatus.get_choice_label(self.status))

@property
def _status(self) -> str:
# 如果未找到流程树,则直接取flow_obj的status
# 查询流程树状态,如果未找到则直接取flow_obj的status
if not self.flow_tree:
return self.flow_obj.status
status = BAMBOO_STATE__TICKET_STATE_MAP.get(self.flow_tree.status, constants.TicketFlowStatus.RUNNING)

status = BAMBOO_STATE__TICKET_STATE_MAP.get(self.flow_tree.status, constants.TicketStatus.RUNNING)
self.flow_obj.update_status(status)
return status
# 如果任务失败,则变更todo状态
todo_status = TodoStatus.TODO if status == TicketFlowStatus.FAILED else TodoStatus.DONE_SUCCESS
fail_todo = self.flow_obj.todo_of_flow.filter(type=TodoType.INNER_FAILED)
if fail_todo.exists() and fail_todo.first().status != todo_status:
fail_todo.update(status=todo_status)

return self.flow_obj.update_status(status)

@property
def _url(self) -> str:
return f"{env.BK_SAAS_HOST}/{self.ticket.bk_biz_id}/task-history/detail/{self.root_id}"

def create_inner_todo(self):
# 创建一条todo记录,在失败时变更为TODO状态
Todo.objects.create(
name=_("【{}】单据任务执行失败,待处理").format(self.ticket.get_ticket_type_display()),
flow=self.flow_obj,
ticket=self.ticket,
type=TodoType.INNER_FAILED,
operators=[self.ticket.creator],
context=BaseTodoContext(self.flow_obj.id, self.ticket.id).to_dict(),
status=TodoStatus.DONE_SUCCESS,
)

def check_exclusive_operations(self):
"""判断执行互斥"""
# TODO: 目前来说,执行互斥对于同时提单或者同时重试的操作是防不住的。
Expand All @@ -122,10 +147,6 @@ def check_exclusive_operations(self):
cluster_ids=cluster_ids, ticket_type=ticket_type, exclude_ticket_ids=[self.ticket.id]
)

def handle_exclusive_error(self):
"""处理执行互斥后重试的逻辑"""
pass

def callback(self, callback_type: FlowCallbackType) -> None:
"""
inner节点独有的钩子函数,执行前置/后继流程节点动作
Expand All @@ -141,6 +162,7 @@ def run(self) -> None:
# 获取or生成inner flow的root id
root_id = self.flow_obj.flow_obj_id or generate_root_id()
try:
self.create_inner_todo()
# 由于 _run 执行后可能会触发信号,导致 current_flow 的误判,因此需提前写入 flow_obj_id
self.run_status_handler(root_id)
# 判断执行互斥
Expand Down Expand Up @@ -184,6 +206,15 @@ def _retry(self) -> Any:
)
super()._retry()

def _revoke(self, operator) -> Any:
# 终止运行的pipeline
from backend.db_services.taskflow.handlers import TaskFlowHandler

if FlowTree.objects.filter(root_id=self.flow_obj.flow_obj_id).exists():
TaskFlowHandler(self.flow_obj.flow_obj_id).revoke_pipeline()
# 流转flow的终止状态
super()._revoke(operator)


class QuickInnerFlow(InnerFlow):
"""
Expand All @@ -193,7 +224,7 @@ class QuickInnerFlow(InnerFlow):

@property
def _status(self) -> str:
return constants.TicketStatus.SUCCEEDED
return constants.TicketFlowStatus.SUCCEEDED

@property
def _summary(self) -> str:
Expand All @@ -218,7 +249,7 @@ class IgnoreResultInnerFlow(InnerFlow):
@property
def _summary(self) -> str:
return _("(执行结果可忽略)任务状态: {status_display}").format(
status_display=constants.TicketStatus.get_choice_label(self._raw_status)
status_display=constants.TicketFlowStatus.get_choice_label(self._raw_status)
)

@property
Expand All @@ -228,7 +259,7 @@ def _raw_status(self) -> str:
@property
def _status(self) -> str:
status = self._raw_status
if status in [constants.TicketStatus.SUCCEEDED, constants.TicketStatus.REVOKED, constants.TicketStatus.FAILED]:
return constants.TicketStatus.SUCCEEDED
if status in [constants.TicketFlowStatus.SUCCEEDED, *constants.TICKET_FAILED_STATUS]:
return constants.TicketFlowStatus.SUCCEEDED

return status
Loading

0 comments on commit 3d1ab12

Please sign in to comment.