Skip to content

Commit

Permalink
feat: 超大订阅方案 (closed #2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Nov 18, 2024
1 parent 03825ce commit 2a19d88
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 21 deletions.
37 changes: 35 additions & 2 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
GseDataErrCode,
)
from apps.backend.api.job import process_parms
from apps.backend.components.collections.base import BaseService, CommonData
from apps.backend.components.collections.base import (
BaseService,
CommonData,
RedisCommonData,
)
from apps.backend.components.collections.common.script_content import INITIALIZE_SCRIPT
from apps.backend.components.collections.job import (
JobExecuteScriptService,
Expand Down Expand Up @@ -86,6 +90,34 @@ def __post_init__(self):
self.plugin_name = self.policy_step_adapter.plugin_name


class RedisPluginCommonData(RedisCommonData):

# 进程状态列表
@property
def process_statuses(self) -> List[models.ProcessStatus]:
return self._get_attr_from_redis("process_statuses")

# 目标主机列表,用于远程采集场景
@property
def target_host_objs(self) -> Optional[List[models.Host]]:
return self._get_attr_from_redis("target_host_objs")

# PluginStep 适配器,用于屏蔽不同类型的插件操作类订阅差异
@property
def policy_step_adapter(self) -> PolicyStepAdapter:
return self._get_attr_from_redis("policy_step_adapter")

# group_id - 订阅实例记录映射关系
@property
def group_id_instance_map(self) -> Dict[str, models.SubscriptionInstanceRecord]:
return self._get_attr_from_redis("group_id_instance_map")

# 插件名称
@property
def plugin_name(self) -> str:
return self._get_attr_from_redis("policy_step_adapter").plugin_name


class PluginBaseService(BaseService, metaclass=abc.ABCMeta):
"""
插件原子基类,提供一些常用的数据获取方法
Expand Down Expand Up @@ -134,7 +166,8 @@ def get_common_data(cls, data):
process_statuses = models.ProcessStatus.objects.filter(
name=policy_step_adapter.plugin_name, group_id__in=group_id_instance_map.keys()
)
return PluginCommonData(
plugin_common_data_cls = PluginCommonData if isinstance(common_data, CommonData) else RedisPluginCommonData
return plugin_common_data_cls(
bk_host_ids=common_data.bk_host_ids,
host_id_obj_map=common_data.host_id_obj_map,
ap_id_obj_map=common_data.ap_id_obj_map,
Expand Down
23 changes: 19 additions & 4 deletions pipeline/engine/core/handlers/service_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pipeline.core.flow.activity import ServiceActivity
from pipeline.core.flow.gateway import ConvergeGateway
from pipeline.django_signal_valve import valve
from pipeline.engine import signals
from pipeline.engine import signals, states
from pipeline.engine.models import Data, ScheduleService, Status

from .base import FlowElementHandler
Expand All @@ -35,6 +35,24 @@ def element_cls():
return ServiceActivity

def handle(self, process, element, status):

is_multi_paralle_gateway: bool = isinstance(element, ServiceActivity) and element.data.get_one_of_inputs(
"is_multi_paralle_gateway"
)

if is_multi_paralle_gateway and element.incoming:
incoming_service = element.incoming.flows[0].source.service
act_id = incoming_service.id
if all(
[incoming_service.need_schedule(), not Status.objects.filter(id=act_id, state=states.FINISHED).exists()]
):
next_node = element.next()
while is_multi_paralle_gateway:
# 跳过所有未执行的ServiceActivity寻找最近的ConvergeGateway
if isinstance(next_node, ConvergeGateway):
return self.HandleResult(next_node=next_node, should_return=False, should_sleep=False)
next_node = next_node.next()

success = False
exception_occurred = False
monitoring = False
Expand Down Expand Up @@ -114,9 +132,6 @@ def handle(self, process, element, status):
)

next_node = element.next()
is_multi_paralle_gateway: bool = isinstance(element, ServiceActivity) and element.data.get_one_of_inputs(
"is_multi_paralle_gateway"
)
while is_multi_paralle_gateway:
# 跳过所有未执行的ServiceActivity寻找最近的ConvergeGateway
if isinstance(next_node, ConvergeGateway):
Expand Down
61 changes: 46 additions & 15 deletions pipeline/engine/core/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@

from pipeline.django_signal_valve import valve
from pipeline.engine import exceptions, signals, states
from pipeline.engine.core.data import delete_parent_data, get_schedule_parent_data, set_schedule_data
from pipeline.engine.models import Data, MultiCallbackData, PipelineProcess, ScheduleService, Status
from pipeline.engine.core.data import (
delete_parent_data,
get_schedule_parent_data,
set_schedule_data,
)
from pipeline.engine.models import (
Data,
MultiCallbackData,
PipelineProcess,
ScheduleService,
Status,
)

logger = logging.getLogger("celery")

Expand All @@ -31,7 +41,7 @@ def schedule_exception_handler(process_id, schedule_id):
yield
except Exception as e:
activity_id = schedule_id[: ScheduleService.SCHEDULE_ID_SPLIT_DIVISION]
version = schedule_id[ScheduleService.SCHEDULE_ID_SPLIT_DIVISION:]
version = schedule_id[ScheduleService.SCHEDULE_ID_SPLIT_DIVISION :]
if Status.objects.filter(id=activity_id, version=version).exists():
logger.error(traceback.format_exc())
process = PipelineProcess.objects.get(id=process_id)
Expand Down Expand Up @@ -158,32 +168,53 @@ def schedule(process_id, schedule_id, data_id=None):

Data.objects.write_node_data(service_act, ex_data=ex_data)

is_multi_paralle_gateway = service_act.data.get_one_of_inputs("is_multi_paralle_gateway")

with transaction.atomic():
process = PipelineProcess.objects.select_for_update().get(id=sched_service.process_id)
if not process.is_alive:
logger.info("pipeline %s has been revoked, status adjust failed." % process.root_pipeline_id)
return

process.adjust_status()
if not is_multi_paralle_gateway:
process.adjust_status()

# send activity error signal
try:
service_act.schedule_fail()
except Exception:
logger.error("schedule_fail handler fail: %s" % traceback.format_exc())

signals.service_schedule_fail.send(
sender=ScheduleService, activity_shell=service_act, schedule_service=sched_service, ex_data=ex_data
)
if is_multi_paralle_gateway:

signals.service_schedule_success.send(
sender=ScheduleService, activity_shell=service_act, schedule_service=sched_service
)

valve.send(
signals,
"wake_from_schedule",
sender=ScheduleService,
process_id=sched_service.process_id,
activity_id=sched_service.activity_id,
)
else:
signals.service_schedule_fail.send(
sender=ScheduleService,
activity_shell=service_act,
schedule_service=sched_service,
ex_data=ex_data,
)

valve.send(
signals,
"activity_failed",
sender=process.root_pipeline,
pipeline_id=process.root_pipeline_id,
pipeline_activity_id=service_act.id,
subprocess_id_stack=process.subprocess_stack,
)

valve.send(
signals,
"activity_failed",
sender=process.root_pipeline,
pipeline_id=process.root_pipeline_id,
pipeline_activity_id=service_act.id,
subprocess_id_stack=process.subprocess_stack,
)
return

# schedule execute finished or one time callback finished
Expand Down

0 comments on commit 2a19d88

Please sign in to comment.