From 2a19d883c84a74c22fcf249e6027cb084b7c03b1 Mon Sep 17 00:00:00 2001 From: yunchao Date: Mon, 18 Nov 2024 16:53:05 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=B6=85=E5=A4=A7=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E6=96=B9=E6=A1=88=20(closed=20#2429)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/backend/components/collections/plugin.py | 37 ++++++++++- .../engine/core/handlers/service_activity.py | 23 +++++-- pipeline/engine/core/schedule.py | 61 ++++++++++++++----- 3 files changed, 100 insertions(+), 21 deletions(-) diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index d87312bd2..046ec2402 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -31,7 +31,11 @@ GseDataErrCode, ) from apps.backend.api.job import process_parms -from apps.backend.components.collections.base import BaseService, CommonData +from apps.backend.components.collections.base import ( + BaseService, + CommonData, + RedisCommonData, +) from apps.backend.components.collections.common.script_content import INITIALIZE_SCRIPT from apps.backend.components.collections.job import ( JobExecuteScriptService, @@ -86,6 +90,34 @@ def __post_init__(self): self.plugin_name = self.policy_step_adapter.plugin_name +class RedisPluginCommonData(RedisCommonData): + + # 进程状态列表 + @property + def process_statuses(self) -> List[models.ProcessStatus]: + return self._get_attr_from_redis("process_statuses") + + # 目标主机列表,用于远程采集场景 + @property + def target_host_objs(self) -> Optional[List[models.Host]]: + return self._get_attr_from_redis("target_host_objs") + + # PluginStep 适配器,用于屏蔽不同类型的插件操作类订阅差异 + @property + def policy_step_adapter(self) -> PolicyStepAdapter: + return self._get_attr_from_redis("policy_step_adapter") + + # group_id - 订阅实例记录映射关系 + @property + def group_id_instance_map(self) -> Dict[str, models.SubscriptionInstanceRecord]: + return self._get_attr_from_redis("group_id_instance_map") + + # 插件名称 + @property + def plugin_name(self) -> str: + return self._get_attr_from_redis("policy_step_adapter").plugin_name + + class PluginBaseService(BaseService, metaclass=abc.ABCMeta): """ 插件原子基类,提供一些常用的数据获取方法 @@ -134,7 +166,8 @@ def get_common_data(cls, data): process_statuses = models.ProcessStatus.objects.filter( name=policy_step_adapter.plugin_name, group_id__in=group_id_instance_map.keys() ) - return PluginCommonData( + plugin_common_data_cls = PluginCommonData if isinstance(common_data, CommonData) else RedisPluginCommonData + return plugin_common_data_cls( bk_host_ids=common_data.bk_host_ids, host_id_obj_map=common_data.host_id_obj_map, ap_id_obj_map=common_data.ap_id_obj_map, diff --git a/pipeline/engine/core/handlers/service_activity.py b/pipeline/engine/core/handlers/service_activity.py index f6916aa8a..f14e0a5ab 100644 --- a/pipeline/engine/core/handlers/service_activity.py +++ b/pipeline/engine/core/handlers/service_activity.py @@ -19,7 +19,7 @@ from pipeline.core.flow.activity import ServiceActivity from pipeline.core.flow.gateway import ConvergeGateway from pipeline.django_signal_valve import valve -from pipeline.engine import signals +from pipeline.engine import signals, states from pipeline.engine.models import Data, ScheduleService, Status from .base import FlowElementHandler @@ -35,6 +35,24 @@ def element_cls(): return ServiceActivity def handle(self, process, element, status): + + is_multi_paralle_gateway: bool = isinstance(element, ServiceActivity) and element.data.get_one_of_inputs( + "is_multi_paralle_gateway" + ) + + if is_multi_paralle_gateway and element.incoming: + incoming_service = element.incoming.flows[0].source.service + act_id = incoming_service.id + if all( + [incoming_service.need_schedule(), not Status.objects.filter(id=act_id, state=states.FINISHED).exists()] + ): + next_node = element.next() + while is_multi_paralle_gateway: + # 跳过所有未执行的ServiceActivity寻找最近的ConvergeGateway + if isinstance(next_node, ConvergeGateway): + return self.HandleResult(next_node=next_node, should_return=False, should_sleep=False) + next_node = next_node.next() + success = False exception_occurred = False monitoring = False @@ -114,9 +132,6 @@ def handle(self, process, element, status): ) next_node = element.next() - is_multi_paralle_gateway: bool = isinstance(element, ServiceActivity) and element.data.get_one_of_inputs( - "is_multi_paralle_gateway" - ) while is_multi_paralle_gateway: # 跳过所有未执行的ServiceActivity寻找最近的ConvergeGateway if isinstance(next_node, ConvergeGateway): diff --git a/pipeline/engine/core/schedule.py b/pipeline/engine/core/schedule.py index a4eb82975..8f7186606 100644 --- a/pipeline/engine/core/schedule.py +++ b/pipeline/engine/core/schedule.py @@ -19,8 +19,18 @@ from pipeline.django_signal_valve import valve from pipeline.engine import exceptions, signals, states -from pipeline.engine.core.data import delete_parent_data, get_schedule_parent_data, set_schedule_data -from pipeline.engine.models import Data, MultiCallbackData, PipelineProcess, ScheduleService, Status +from pipeline.engine.core.data import ( + delete_parent_data, + get_schedule_parent_data, + set_schedule_data, +) +from pipeline.engine.models import ( + Data, + MultiCallbackData, + PipelineProcess, + ScheduleService, + Status, +) logger = logging.getLogger("celery") @@ -31,7 +41,7 @@ def schedule_exception_handler(process_id, schedule_id): yield except Exception as e: activity_id = schedule_id[: ScheduleService.SCHEDULE_ID_SPLIT_DIVISION] - version = schedule_id[ScheduleService.SCHEDULE_ID_SPLIT_DIVISION:] + version = schedule_id[ScheduleService.SCHEDULE_ID_SPLIT_DIVISION :] if Status.objects.filter(id=activity_id, version=version).exists(): logger.error(traceback.format_exc()) process = PipelineProcess.objects.get(id=process_id) @@ -158,13 +168,16 @@ def schedule(process_id, schedule_id, data_id=None): Data.objects.write_node_data(service_act, ex_data=ex_data) + is_multi_paralle_gateway = service_act.data.get_one_of_inputs("is_multi_paralle_gateway") + with transaction.atomic(): process = PipelineProcess.objects.select_for_update().get(id=sched_service.process_id) if not process.is_alive: logger.info("pipeline %s has been revoked, status adjust failed." % process.root_pipeline_id) return - process.adjust_status() + if not is_multi_paralle_gateway: + process.adjust_status() # send activity error signal try: @@ -172,18 +185,36 @@ def schedule(process_id, schedule_id, data_id=None): except Exception: logger.error("schedule_fail handler fail: %s" % traceback.format_exc()) - signals.service_schedule_fail.send( - sender=ScheduleService, activity_shell=service_act, schedule_service=sched_service, ex_data=ex_data - ) + if is_multi_paralle_gateway: + + signals.service_schedule_success.send( + sender=ScheduleService, activity_shell=service_act, schedule_service=sched_service + ) + + valve.send( + signals, + "wake_from_schedule", + sender=ScheduleService, + process_id=sched_service.process_id, + activity_id=sched_service.activity_id, + ) + else: + signals.service_schedule_fail.send( + sender=ScheduleService, + activity_shell=service_act, + schedule_service=sched_service, + ex_data=ex_data, + ) + + valve.send( + signals, + "activity_failed", + sender=process.root_pipeline, + pipeline_id=process.root_pipeline_id, + pipeline_activity_id=service_act.id, + subprocess_id_stack=process.subprocess_stack, + ) - valve.send( - signals, - "activity_failed", - sender=process.root_pipeline, - pipeline_id=process.root_pipeline_id, - pipeline_activity_id=service_act.id, - subprocess_id_stack=process.subprocess_stack, - ) return # schedule execute finished or one time callback finished