Skip to content

Commit

Permalink
Merge pull request #333 from HSF/dev
Browse files Browse the repository at this point in the history
fix transform orm/core functions to add items
  • Loading branch information
wguanicedew authored Aug 28, 2024
2 parents 53c0df5 + 5048f5c commit d34447f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 11 deletions.
15 changes: 9 additions & 6 deletions main/lib/idds/agents/carrier/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ def get_task_params(self, work):
task_param_map['maxAttempt'] = work.max_attempt
if task_param_map['maxFailure'] < work.max_attempt:
task_param_map['maxFailure'] = work.max_attempt
task_param_map['log'] = {"dataset": "PandaJob_iworkflow/", # "PandaJob_#{pandaid}/"
"destination": "local",
"param_type": "log",
"token": "local",
"type": "template",
"value": "log.tgz"}

if work.enable_separate_log:
task_param_map['log'] = {"dataset": "PandaJob_iworkflow/", # "PandaJob_#{pandaid}/"
"destination": "local",
"param_type": "log",
"token": "local",
"type": "template",
"value": "log.tgz"}

task_param_map['jobParameters'] = [
{'type': 'constant',
'value': executable, # noqa: E501
Expand Down
7 changes: 7 additions & 0 deletions main/lib/idds/core/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p
new_poll_period=1, update_poll_period=10, retries=0, expired_at=None, transform_metadata=None,
new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
internal_id=None, has_previous_conditions=None, loop_index=None,
cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
workprogress_id=None, session=None):
"""
Add a transform.
Expand Down Expand Up @@ -67,6 +69,11 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p
previous_transform_id=previous_transform_id,
current_processing_id=current_processing_id,
expired_at=expired_at, transform_metadata=transform_metadata,
internal_id=internal_id,
has_previous_conditions=has_previous_conditions,
loop_index=loop_index, cloned_from=cloned_from,
triggered_conditions=triggered_conditions,
untriggered_conditions=untriggered_conditions,
workprogress_id=workprogress_id, session=session)
return transform_id

Expand Down
14 changes: 14 additions & 0 deletions main/lib/idds/orm/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def create_transform(request_id, workload_id, transform_type, transform_tag=None
new_poll_period=1, update_poll_period=10,
new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
internal_id=None, has_previous_conditions=None, loop_index=None,
cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
retries=0, expired_at=None, transform_metadata=None):
"""
Create a transform.
Expand All @@ -58,6 +60,11 @@ def create_transform(request_id, workload_id, transform_type, transform_tag=None
parent_transform_id=parent_transform_id,
previous_transform_id=previous_transform_id,
current_processing_id=current_processing_id,
internal_id=internal_id,
has_previous_conditions=has_previous_conditions,
loop_index=loop_index, cloned_from=cloned_from,
triggered_conditions=triggered_conditions,
untriggered_conditions=untriggered_conditions,
transform_metadata=transform_metadata)
if new_poll_period:
new_poll_period = datetime.timedelta(seconds=new_poll_period)
Expand All @@ -74,6 +81,8 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p
new_poll_period=1, update_poll_period=10, retries=0, expired_at=None,
new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
internal_id=None, has_previous_conditions=None, loop_index=None,
cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
transform_metadata=None, workprogress_id=None, session=None):
"""
Add a transform.
Expand Down Expand Up @@ -106,6 +115,11 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p
parent_transform_id=parent_transform_id,
previous_transform_id=previous_transform_id,
current_processing_id=current_processing_id,
internal_id=internal_id,
has_previous_conditions=has_previous_conditions,
loop_index=loop_index, cloned_from=cloned_from,
triggered_conditions=triggered_conditions,
untriggered_conditions=untriggered_conditions,
transform_metadata=transform_metadata)
new_transform.save(session=session)
transform_id = new_transform.transform_id
Expand Down
9 changes: 7 additions & 2 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
# os.environ['PANDA_URL_SSL'] = 'https://panda-doma-k8s-panda.cern.ch/server/panda'
# os.environ['PANDA_URL'] = 'http://panda-doma-k8s-panda.cern.ch:25080/server/panda'

# os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
# os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'

# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'

from pandaclient import Client # noqa E402

Expand Down Expand Up @@ -75,6 +78,8 @@
task_ids = [i for i in range(10173, 10204)]
task_ids = [108, 109, 106, 107, 112]
task_ids = [i for i in range(2921, 2927)]
task_ids = [124, 68, 75, 78, 79]
task_ids = [19654]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down
16 changes: 16 additions & 0 deletions workflow/lib/idds/iworkflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ def processing_id(self):
def processing_id(self, value):
self._processing_id = value

@property
def enable_separate_log(self):
return self._workflow_context.enable_separate_log

@enable_separate_log.setter
def enable_separate_log(self, value):
self._workflow_context.enable_separate_log = value

@property
def brokers(self):
return self._workflow_context.brokers
Expand Down Expand Up @@ -574,6 +582,14 @@ def workload_id(self, value):
def get_workload_id(self):
return self.workload_id

@property
def enable_separate_log(self):
return self._context.enable_separate_log

@enable_separate_log.setter
def enable_separate_log(self, value):
self._context.enable_separate_log = value

@property
def token(self):
return self._context.token
Expand Down
26 changes: 23 additions & 3 deletions workflow/lib/idds/iworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def get_current_workflow(cls):


class WorkflowContext(Context):
def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None):
def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True,
max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False):
super(WorkflowContext, self).__init__()
self._service = service # panda, idds, sharefs
self._request_id = None
Expand Down Expand Up @@ -119,6 +120,8 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo
else:
self._exclude_source_files = [exclude_source_files]

self._enable_separate_log = enable_separate_log

@property
def logger(self):
return logging.getLogger(self.__class__.__name__)
Expand Down Expand Up @@ -287,6 +290,14 @@ def workload_id(self):
def workload_id(self, value):
self._workload_id = value

@property
def enable_separate_log(self):
return self._enable_separate_log

@enable_separate_log.setter
def enable_separate_log(self, value):
self._enable_separate_log = value

@property
def brokers(self):
return self._brokers
Expand Down Expand Up @@ -719,7 +730,7 @@ class Workflow(Base):
def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True,
pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None,
init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None,
exclude_source_files=[], clean_env=None):
enable_separate_log=False, exclude_source_files=[], clean_env=None):
"""
Init a workflow.
"""
Expand Down Expand Up @@ -754,7 +765,8 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo
else:
self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir,
distributed=distributed, init_env=init_env, max_walltime=max_walltime,
exclude_source_files=exclude_source_files, clean_env=clean_env)
exclude_source_files=exclude_source_files, clean_env=clean_env,
enable_separate_log=enable_separate_log)

@property
def service(self):
Expand Down Expand Up @@ -929,6 +941,14 @@ def token(self):
def token(self, value):
self._context.token = value

@property
def enable_separate_log(self):
return self._context.enable_separate_log

@enable_separate_log.setter
def enable_separate_log(self, value):
self._context.enable_separate_log = value

def get_work_tag(self):
return self._context.workflow_type.name

Expand Down

0 comments on commit d34447f

Please sign in to comment.