Skip to content

Commit

Permalink
Merge pull request #329 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Aug 6, 2024
2 parents 31e9644 + 947ab98 commit 66c0fac
Show file tree
Hide file tree
Showing 50 changed files with 1,564 additions and 221 deletions.
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <wen.guan@cern.ch>, 2019 - 2021


release_version = "2.0.9"
release_version = "2.1.30"
4 changes: 2 additions & 2 deletions atlas/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ dependencies:
- panda-client-light # panda client
- rucio-clients
- rucio-clients-atlas
- idds-common==2.0.9
- idds-workflow==2.0.9
- idds-common==2.1.30
- idds-workflow==2.1.30
46 changes: 26 additions & 20 deletions client/lib/idds/client/catalogclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <wen.guan@cern.ch>, 2019 - 2020
# - Wen Guan, <wen.guan@cern.ch>, 2019 - 2024


"""
Expand Down Expand Up @@ -35,60 +35,66 @@ def __init__(self, host=None, auth=None, timeout=None):
"""
super(CatalogClient, self).__init__(host=host, auth=auth, timeout=timeout)

def get_collections(self, scope=None, name=None, request_id=None, workload_id=None, relation_type=None):
def get_collections(self, request_id=None, transform_id=None, workload_id=None, scope=None, name=None, relation_type=None):
"""
Get collections from the Head service.
:param scope: the collection scope.
:param name: the collection name, can be wildcard.
:param request_id: the request id.
:param transform_id: the transform id.
:param workload_id: the workload id.
:param scope: the collection scope.
:param name: the collection name, can be wildcard.
:param relation_type: The relation_type of the request (input/output/log).
:raise exceptions if it's not got successfully.
"""
path = os.path.join(self.CATALOG_BASEURL, 'collections')
if scope is None:
scope = 'null'
if name is None:
name = 'null'
if request_id is None:
request_id = 'null'
if transform_id is None:
transform_id = 'null'
if workload_id is None:
workload_id = 'null'
if scope is None:
scope = 'null'
if name is None:
name = 'null'
if relation_type is None:
relation_type = 'null'
elif isinstance(relation_type, Enum):
relation_type = relation_type.value

url = self.build_url(self.host, path=os.path.join(path, scope, name, str(request_id), str(workload_id),
str(relation_type)))
url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(transform_id), str(workload_id),
scope, name, str(relation_type)))

collections = self.get_request_response(url, type='GET')
return collections

def get_contents(self, coll_scope=None, coll_name=None, request_id=None, workload_id=None,
relation_type=None, status=None):
def get_contents(self, request_id=None, transform_id=None, workload_id=None,
coll_scope=None, coll_name=None, relation_type=None, status=None):
"""
Get contents from the Head service.
:param coll_scope: the collection scope.
:param coll_name: the collection name, can be wildcard.
:param request_id: the request id.
:param transform_id: the transform id.
:param workload_id: the workload id.
:param coll_scope: the collection scope.
:param coll_name: the collection name, can be wildcard.
:param relation_type: the relation between the collection and the transform(input, output, log)
:param status: The content status.
:raise exceptions if it's not got successfully.
"""
path = os.path.join(self.CATALOG_BASEURL, 'contents')
if coll_scope is None:
coll_scope = 'null'
if coll_name is None:
coll_name = 'null'
if request_id is None:
request_id = 'null'
if transform_id is None:
transform_id = 'null'
if workload_id is None:
workload_id = 'null'
if coll_scope is None:
coll_scope = 'null'
if coll_name is None:
coll_name = 'null'
if relation_type is None:
relation_type = 'null'
elif isinstance(relation_type, Enum):
Expand All @@ -98,8 +104,8 @@ def get_contents(self, coll_scope=None, coll_name=None, request_id=None, workloa
elif isinstance(status, Enum):
status = status.value

url = self.build_url(self.host, path=os.path.join(path, coll_scope, coll_name, str(request_id),
str(workload_id), str(relation_type), str(status)))
url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(transform_id), str(workload_id), coll_scope,
coll_name, str(relation_type), str(status)))

contents = self.get_request_response(url, type='GET')
return contents
Expand Down
79 changes: 70 additions & 9 deletions client/lib/idds/client/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,10 @@ def download_logs(self, request_id=None, workload_id=None, dest_dir='./', filena
filename = self.client.download_logs(request_id=request_id, workload_id=workload_id, dest_dir=dest_dir, filename=filename)
if filename:
logging.info("Logs are downloaded to %s" % filename)
return (0, "Logs are downloaded to %s" % filename)
return (True, "Logs are downloaded to %s" % filename)
else:
logging.info("Failed to download logs for workload_id(%s) and request_id(%s)" % (workload_id, request_id))
return (-1, "Failed to download logs for workload_id(%s) and request_id(%s)" % (workload_id, request_id))
return (False, "Failed to download logs for workload_id(%s) and request_id(%s)" % (workload_id, request_id))

@exception_handler
def upload_to_cacher(self, filename):
Expand Down Expand Up @@ -817,7 +817,26 @@ def update_hyperparameter(self, workload_id, request_id, id, loss):
return self.client.update_hyperparameter(workload_id=workload_id, request_id=request_id, id=id, loss=loss)

@exception_handler
def get_messages(self, request_id=None, workload_id=None):
def send_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None, msgs=None):
"""
Send messages.
:param workload_id: the workload id.
:param request_id: the request.
"""
self.setup_client()

if request_id is None and workload_id is None:
logging.error("Both request_id and workload_id are None. One of them should not be None")
return (-1, "Both request_id and workload_id are None. One of them should not be None")

logging.info("Retrieving messages for request_id: %s, workload_id: %s" % (request_id, workload_id))
self.client.send_messages(request_id=request_id, workload_id=workload_id, transform_id=transform_id, internal_id=internal_id, msgs=msgs)
logging.info("Sent %s messages for request_id: %s, workload_id: %s" % (len(msgs), request_id, workload_id))
return True, None

@exception_handler
def get_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None):
"""
Get messages.
Expand All @@ -831,9 +850,49 @@ def get_messages(self, request_id=None, workload_id=None):
return (-1, "Both request_id and workload_id are None. One of them should not be None")

logging.info("Retrieving messages for request_id: %s, workload_id: %s" % (request_id, workload_id))
msgs = self.client.get_messages(request_id=request_id, workload_id=workload_id)
msgs = self.client.get_messages(request_id=request_id, workload_id=workload_id, transform_id=transform_id, internal_id=internal_id)
logging.info("Retrieved %s messages for request_id: %s, workload_id: %s" % (len(msgs), request_id, workload_id))
return (0, msgs)
return (True, msgs)

@exception_handler
def get_collections(self, request_id=None, transform_id=None, workload_id=None, scope=None, name=None, relation_type=None):
"""
Get collections from the Head service.
:param request_id: the request id.
:param transform_id: the transform id.
:param workload_id: the workload id.
:param scope: the scope.
:param name: the name.
:param relation_type: the relation type (input, output and log).
:raise exceptions if it's not got successfully.
"""
self.setup_client()

colls = self.client.get_collections(request_id=request_id, transform_id=transform_id, workload_id=workload_id,
scope=scope, name=name, relation_type=relation_type)
return True, colls

def get_contents(self, request_id=None, transform_id=None, workload_id=None, coll_scope=None, coll_name=None, relation_type=None, status=None):
"""
Get contents from the Head service.
:param request_id: the request id.
:param transform_id: the transform id.
:param workload_id: the workload id.
:param coll_scope: the scope of the related collection.
:param coll_name: the name of the related collection.
:param relation_type: the relation type (input, output and log).
:param status: the status of related contents.
:raise exceptions if it's not got successfully.
"""
self.setup_client()

contents = self.client.get_contents(request_id=request_id, transform_id=transform_id, workload_id=workload_id,
coll_scope=coll_scope, coll_name=coll_name, relation_type=relation_type, status=status)
return True, contents

@exception_handler
def get_contents_output_ext(self, request_id=None, workload_id=None, transform_id=None, group_by_jedi_task_id=False):
Expand All @@ -848,8 +907,9 @@ def get_contents_output_ext(self, request_id=None, workload_id=None, transform_i
"""
self.setup_client()

return self.client.get_contents_output_ext(workload_id=workload_id, request_id=request_id, transform_id=transform_id,
group_by_jedi_task_id=group_by_jedi_task_id)
contents = self.client.get_contents_output_ext(workload_id=workload_id, request_id=request_id, transform_id=transform_id,
group_by_jedi_task_id=group_by_jedi_task_id)
return True, contents

@exception_handler
def update_build_request(self, request_id, signature, workflow):
Expand All @@ -864,7 +924,8 @@ def update_build_request(self, request_id, signature, workflow):
"""
self.setup_client()

return self.client.update_build_request(request_id=request_id, signature=signature, workflow=workflow)
ret = self.client.update_build_request(request_id=request_id, signature=signature, workflow=workflow)
return True, ret

@exception_handler
def get_metainfo(self, name):
Expand All @@ -878,4 +939,4 @@ def get_metainfo(self, name):
logging.info("Retrieving meta info for %s" % (name))
ret = self.client.get_metainfo(name=name)
logging.info("Retrieved meta info for %s: %s" % (name, idds_mask(ret)))
return ret
return True, ret
25 changes: 19 additions & 6 deletions client/lib/idds/client/messageclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ def __init__(self, host=None, auth=None, timeout=None):
"""
super(MessageClient, self).__init__(host=host, auth=auth, timeout=timeout)

def send_message(self, request_id=None, workload_id=None, msg=None):
def send_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None, msgs=None):
"""
Send messages to the Head service.
:param request_id: the request id.
:param workload_id: the workload id.
:param transform_id: the transform id.
:param internal_id: the internal id.
:param msg: the list of message contents.
:raise exceptions if it's not got successfully.
"""
Expand All @@ -49,20 +52,29 @@ def send_message(self, request_id=None, workload_id=None, msg=None):
request_id = 'null'
if workload_id is None:
workload_id = 'null'
url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id)))
if transform_id is None:
transform_id = 'null'
if internal_id is None:
internal_id = 'null'
url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id), str(transform_id), str(internal_id)))

if msg is None:
if msgs is None:
raise Exception("Message is None")
self.get_request_response(url, type='POST', data=msg)

if type(msgs) not in (list, tuple):
msgs = [msgs]
self.get_request_response(url, type='POST', data=msgs)

return None

def get_messages(self, request_id=None, workload_id=None):
def get_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None):
"""
Get message from the Head service.
:param request_id: the request id.
:param workload_id: the workload id.
:param transform_id: the transform id.
:param internal_id: the internal id.
:raise exceptions if it's not got successfully.
"""
Expand All @@ -71,7 +83,8 @@ def get_messages(self, request_id=None, workload_id=None):
request_id = 'null'
if workload_id is None:
workload_id = 'null'
url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id)))

url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id), str(transform_id), str(internal_id)))

msgs = self.get_request_response(url, type='GET')

Expand Down
2 changes: 1 addition & 1 deletion client/lib/idds/client/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <wen.guan@cern.ch>, 2019 - 2021


release_version = "2.0.9"
release_version = "2.1.30"
4 changes: 2 additions & 2 deletions client/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ dependencies:
- urllib3 # url connections
- tabulate
- argcomplete
- idds-common==2.0.9
- idds-workflow==2.0.9
- idds-common==2.1.30
- idds-workflow==2.1.30
16 changes: 16 additions & 0 deletions common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class RequestType(IDDSEnum):
Derivation = 5
iWorkflow = 6
iWorkflowLocal = 7
GenericWorkflow = 8
Other = 99


Expand All @@ -214,6 +215,8 @@ class TransformType(IDDSEnum):
Data = 8
iWorkflow = 9
iWork = 10
GenericWorkflow = 11
GenericWork = 12
Other = 99


Expand Down Expand Up @@ -241,13 +244,22 @@ class TransformStatus(IDDSEnum):
Terminating = 20
Building = 21
Built = 22
Queue = 23
Throttling = 24
WaitForTrigger = 25


class TransformLocking(IDDSEnum):
Idle = 0
Locking = 1


class ConditionStatus(IDDSEnum):
New = 0
WaitForTrigger = 1
Triggered = 2


class CollectionType(IDDSEnum):
Container = 0
Dataset = 1
Expand Down Expand Up @@ -401,6 +413,7 @@ class MessageType(IDDSEnum):
HealthHeartbeat = 12
IDDSCommunication = 13
ContentExt = 14
AsyncResult = 15
UnknownFile = 97
UnknownCollection = 98
UnknownWork = 99
Expand All @@ -425,6 +438,7 @@ class MessageTypeStr(IDDSEnum):
UnknownCollection = 'collection_unknown'
UnknownWork = 'work_unknown'
ContentExt = 'content_ext'
AsyncResult = 'async_result'


TransformType2MessageTypeMap = {
Expand Down Expand Up @@ -462,6 +476,7 @@ class MessageStatus(IDDSEnum):
Delivered = 2
Failed = 3
ConfirmDelivered = 4
NoNeedDelivery = 5


class MessageLocking(IDDSEnum):
Expand All @@ -486,6 +501,7 @@ class MessageDestination(IDDSEnum):
Conductor = 4
Outside = 5
ContentExt = 6
AsyncResult = 7


class CommandType(IDDSEnum):
Expand Down
1 change: 1 addition & 0 deletions common/lib/idds/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class EventType(IDDSEnum):
UpdateTransform = 21
AbortTransform = 22
ResumeTransform = 23
QueueTransform = 24

NewProcessing = 30
UpdateProcessing = 31
Expand Down
Loading

0 comments on commit 66c0fac

Please sign in to comment.