From 89496a062bfb4f4ccd54c34ebede00e6613fac4a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 28 Aug 2024 11:33:17 +0200 Subject: [PATCH 1/2] fix transform orm/core functions to add items --- main/lib/idds/core/transforms.py | 7 +++++++ main/lib/idds/orm/transforms.py | 14 ++++++++++++++ main/lib/idds/tests/panda_test.py | 9 +++++++-- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/main/lib/idds/core/transforms.py b/main/lib/idds/core/transforms.py index 3deac47b..e814d719 100644 --- a/main/lib/idds/core/transforms.py +++ b/main/lib/idds/core/transforms.py @@ -34,6 +34,8 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p new_poll_period=1, update_poll_period=10, retries=0, expired_at=None, transform_metadata=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, parent_transform_id=None, previous_transform_id=None, current_processing_id=None, + internal_id=None, has_previous_conditions=None, loop_index=None, + cloned_from=None, triggered_conditions=None, untriggered_conditions=None, workprogress_id=None, session=None): """ Add a transform. @@ -67,6 +69,11 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p previous_transform_id=previous_transform_id, current_processing_id=current_processing_id, expired_at=expired_at, transform_metadata=transform_metadata, + internal_id=internal_id, + has_previous_conditions=has_previous_conditions, + loop_index=loop_index, cloned_from=cloned_from, + triggered_conditions=triggered_conditions, + untriggered_conditions=untriggered_conditions, workprogress_id=workprogress_id, session=session) return transform_id diff --git a/main/lib/idds/orm/transforms.py b/main/lib/idds/orm/transforms.py index e2b4fdcb..995dc8e9 100644 --- a/main/lib/idds/orm/transforms.py +++ b/main/lib/idds/orm/transforms.py @@ -32,6 +32,8 @@ def create_transform(request_id, workload_id, transform_type, transform_tag=None new_poll_period=1, update_poll_period=10, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, parent_transform_id=None, previous_transform_id=None, current_processing_id=None, + internal_id=None, has_previous_conditions=None, loop_index=None, + cloned_from=None, triggered_conditions=None, untriggered_conditions=None, retries=0, expired_at=None, transform_metadata=None): """ Create a transform. @@ -58,6 +60,11 @@ def create_transform(request_id, workload_id, transform_type, transform_tag=None parent_transform_id=parent_transform_id, previous_transform_id=previous_transform_id, current_processing_id=current_processing_id, + internal_id=internal_id, + has_previous_conditions=has_previous_conditions, + loop_index=loop_index, cloned_from=cloned_from, + triggered_conditions=triggered_conditions, + untriggered_conditions=untriggered_conditions, transform_metadata=transform_metadata) if new_poll_period: new_poll_period = datetime.timedelta(seconds=new_poll_period) @@ -74,6 +81,8 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p new_poll_period=1, update_poll_period=10, retries=0, expired_at=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, parent_transform_id=None, previous_transform_id=None, current_processing_id=None, + internal_id=None, has_previous_conditions=None, loop_index=None, + cloned_from=None, triggered_conditions=None, untriggered_conditions=None, transform_metadata=None, workprogress_id=None, session=None): """ Add a transform. @@ -106,6 +115,11 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p parent_transform_id=parent_transform_id, previous_transform_id=previous_transform_id, current_processing_id=current_processing_id, + internal_id=internal_id, + has_previous_conditions=has_previous_conditions, + loop_index=loop_index, cloned_from=cloned_from, + triggered_conditions=triggered_conditions, + untriggered_conditions=untriggered_conditions, transform_metadata=transform_metadata) new_transform.save(session=session) transform_id = new_transform.transform_id diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 2e8c44e4..82e9188c 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -14,8 +14,11 @@ # os.environ['PANDA_URL_SSL'] = 'https://panda-doma-k8s-panda.cern.ch/server/panda' # os.environ['PANDA_URL'] = 'http://panda-doma-k8s-panda.cern.ch:25080/server/panda' -# os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' -# os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' + +# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' +# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' from pandaclient import Client # noqa E402 @@ -75,6 +78,8 @@ task_ids = [i for i in range(10173, 10204)] task_ids = [108, 109, 106, 107, 112] task_ids = [i for i in range(2921, 2927)] +task_ids = [124, 68, 75, 78, 79] +task_ids = [19654] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) From e8de06e926c36c980bd830791d67515f1480bb6e Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 28 Aug 2024 11:52:04 +0200 Subject: [PATCH 2/2] add workflow operations to disable separate logs --- main/lib/idds/agents/carrier/plugins/base.py | 15 ++++++----- workflow/lib/idds/iworkflow/work.py | 16 ++++++++++++ workflow/lib/idds/iworkflow/workflow.py | 26 +++++++++++++++++--- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/main/lib/idds/agents/carrier/plugins/base.py b/main/lib/idds/agents/carrier/plugins/base.py index 4994d165..84f10124 100644 --- a/main/lib/idds/agents/carrier/plugins/base.py +++ b/main/lib/idds/agents/carrier/plugins/base.py @@ -98,12 +98,15 @@ def get_task_params(self, work): task_param_map['maxAttempt'] = work.max_attempt if task_param_map['maxFailure'] < work.max_attempt: task_param_map['maxFailure'] = work.max_attempt - task_param_map['log'] = {"dataset": "PandaJob_iworkflow/", # "PandaJob_#{pandaid}/" - "destination": "local", - "param_type": "log", - "token": "local", - "type": "template", - "value": "log.tgz"} + + if work.enable_separate_log: + task_param_map['log'] = {"dataset": "PandaJob_iworkflow/", # "PandaJob_#{pandaid}/" + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz"} + task_param_map['jobParameters'] = [ {'type': 'constant', 'value': executable, # noqa: E501 diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 4c14a46d..7d8e4d99 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -236,6 +236,14 @@ def processing_id(self): def processing_id(self, value): self._processing_id = value + @property + def enable_separate_log(self): + return self._workflow_context.enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._workflow_context.enable_separate_log = value + @property def brokers(self): return self._workflow_context.brokers @@ -574,6 +582,14 @@ def workload_id(self, value): def get_workload_id(self): return self.workload_id + @property + def enable_separate_log(self): + return self._context.enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._context.enable_separate_log = value + @property def token(self): return self._context.token diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 4930870e..376b77f2 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -55,7 +55,8 @@ def get_current_workflow(cls): class WorkflowContext(Context): - def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None): + def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, + max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None @@ -119,6 +120,8 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo else: self._exclude_source_files = [exclude_source_files] + self._enable_separate_log = enable_separate_log + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -287,6 +290,14 @@ def workload_id(self): def workload_id(self, value): self._workload_id = value + @property + def enable_separate_log(self): + return self._enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._enable_separate_log = value + @property def brokers(self): return self._brokers @@ -719,7 +730,7 @@ class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None, init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None, - exclude_source_files=[], clean_env=None): + enable_separate_log=False, exclude_source_files=[], clean_env=None): """ Init a workflow. """ @@ -754,7 +765,8 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo else: self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir, distributed=distributed, init_env=init_env, max_walltime=max_walltime, - exclude_source_files=exclude_source_files, clean_env=clean_env) + exclude_source_files=exclude_source_files, clean_env=clean_env, + enable_separate_log=enable_separate_log) @property def service(self): @@ -929,6 +941,14 @@ def token(self): def token(self, value): self._context.token = value + @property + def enable_separate_log(self): + return self._context.enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._context.enable_separate_log = value + def get_work_tag(self): return self._context.workflow_type.name