From 9c8121c8b30a587ca6f019c0e1b61a17ba12ebc2 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 1 Jul 2024 10:39:20 +0200 Subject: [PATCH 01/21] update version --- atlas/lib/idds/atlas/version.py | 2 +- atlas/tools/env/environment.yml | 4 ++-- client/lib/idds/client/version.py | 2 +- client/tools/env/environment.yml | 4 ++-- common/lib/idds/common/version.py | 2 +- common/tools/env/environment.yml | 2 +- doma/lib/idds/doma/version.py | 2 +- doma/tools/env/environment.yml | 4 ++-- main/lib/idds/version.py | 2 +- main/tools/env/environment.yml | 2 +- monitor/lib/idds/monitor/version.py | 2 +- website/lib/idds/website/version.py | 2 +- workflow/lib/idds/workflow/version.py | 2 +- workflow/tools/env/environment.yml | 2 +- 14 files changed, 17 insertions(+), 17 deletions(-) diff --git a/atlas/lib/idds/atlas/version.py b/atlas/lib/idds/atlas/version.py index 777f1776..7cbd5099 100644 --- a/atlas/lib/idds/atlas/version.py +++ b/atlas/lib/idds/atlas/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/atlas/tools/env/environment.yml b/atlas/tools/env/environment.yml index 0fb1a1df..c5288409 100644 --- a/atlas/tools/env/environment.yml +++ b/atlas/tools/env/environment.yml @@ -13,5 +13,5 @@ dependencies: - panda-client-light # panda client - rucio-clients - rucio-clients-atlas - - idds-common==2.0.9 - - idds-workflow==2.0.9 \ No newline at end of file + - idds-common==2.1.30 + - idds-workflow==2.1.30 \ No newline at end of file diff --git a/client/lib/idds/client/version.py b/client/lib/idds/client/version.py index 777f1776..7cbd5099 100644 --- a/client/lib/idds/client/version.py +++ b/client/lib/idds/client/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/client/tools/env/environment.yml b/client/tools/env/environment.yml index c48c842c..5fcbebb4 100644 --- a/client/tools/env/environment.yml +++ b/client/tools/env/environment.yml @@ -7,5 +7,5 @@ dependencies: - urllib3 # url connections - tabulate - argcomplete - - idds-common==2.0.9 - - idds-workflow==2.0.9 \ No newline at end of file + - idds-common==2.1.30 + - idds-workflow==2.1.30 \ No newline at end of file diff --git a/common/lib/idds/common/version.py b/common/lib/idds/common/version.py index 777f1776..7cbd5099 100644 --- a/common/lib/idds/common/version.py +++ b/common/lib/idds/common/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/common/tools/env/environment.yml b/common/tools/env/environment.yml index fd136187..d5c95495 100644 --- a/common/tools/env/environment.yml +++ b/common/tools/env/environment.yml @@ -5,4 +5,4 @@ dependencies: - pip: - packaging - requests - - dogpile.cache + - dogpile.cache \ No newline at end of file diff --git a/doma/lib/idds/doma/version.py b/doma/lib/idds/doma/version.py index a123f19a..8ef3715c 100644 --- a/doma/lib/idds/doma/version.py +++ b/doma/lib/idds/doma/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2020 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/doma/tools/env/environment.yml b/doma/tools/env/environment.yml index 19e0b9f3..0a59b767 100644 --- a/doma/tools/env/environment.yml +++ b/doma/tools/env/environment.yml @@ -5,5 +5,5 @@ dependencies: - pip: - futures # multiple process/threads - panda-client-light # panda client - - idds-common==2.0.9 - - idds-workflow==2.0.9 \ No newline at end of file + - idds-common==2.1.30 + - idds-workflow==2.1.30 \ No newline at end of file diff --git a/main/lib/idds/version.py b/main/lib/idds/version.py index 777f1776..7cbd5099 100644 --- a/main/lib/idds/version.py +++ b/main/lib/idds/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/main/tools/env/environment.yml b/main/tools/env/environment.yml index c27e9ad0..3aa948ea 100644 --- a/main/tools/env/environment.yml +++ b/main/tools/env/environment.yml @@ -30,4 +30,4 @@ dependencies: - alembic - deepdiff - pyzmq - - oic + - oic \ No newline at end of file diff --git a/monitor/lib/idds/monitor/version.py b/monitor/lib/idds/monitor/version.py index 777f1776..7cbd5099 100644 --- a/monitor/lib/idds/monitor/version.py +++ b/monitor/lib/idds/monitor/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/website/lib/idds/website/version.py b/website/lib/idds/website/version.py index 777f1776..7cbd5099 100644 --- a/website/lib/idds/website/version.py +++ b/website/lib/idds/website/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/workflow/lib/idds/workflow/version.py b/workflow/lib/idds/workflow/version.py index 777f1776..7cbd5099 100644 --- a/workflow/lib/idds/workflow/version.py +++ b/workflow/lib/idds/workflow/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "2.0.9" +release_version = "2.1.30" diff --git a/workflow/tools/env/environment.yml b/workflow/tools/env/environment.yml index c2ba6e1a..d221ac1d 100644 --- a/workflow/tools/env/environment.yml +++ b/workflow/tools/env/environment.yml @@ -6,4 +6,4 @@ dependencies: - anytree - networkx - stomp.py<=8.0.1 - - idds-common==2.0.9 + - idds-common==2.1.30 \ No newline at end of file From 131dca2f512897403479f57850866db6e2070d62 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 1 Jul 2024 10:43:11 +0200 Subject: [PATCH 02/21] optimize client api to get contents information --- client/lib/idds/client/catalogclient.py | 46 ++++++++++++++----------- client/lib/idds/client/clientmanager.py | 38 ++++++++++++++++++++ main/lib/idds/core/catalog.py | 6 ++-- main/lib/idds/orm/collections.py | 11 +++--- main/lib/idds/orm/contents.py | 6 ++-- main/lib/idds/rest/v1/catalog.py | 38 ++++++++++++-------- 6 files changed, 100 insertions(+), 45 deletions(-) diff --git a/client/lib/idds/client/catalogclient.py b/client/lib/idds/client/catalogclient.py index 32741935..74d894c5 100644 --- a/client/lib/idds/client/catalogclient.py +++ b/client/lib/idds/client/catalogclient.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2020 +# - Wen Guan, , 2019 - 2024 """ @@ -35,60 +35,66 @@ def __init__(self, host=None, auth=None, timeout=None): """ super(CatalogClient, self).__init__(host=host, auth=auth, timeout=timeout) - def get_collections(self, scope=None, name=None, request_id=None, workload_id=None, relation_type=None): + def get_collections(self, request_id=None, transform_id=None, workload_id=None, scope=None, name=None, relation_type=None): """ Get collections from the Head service. - :param scope: the collection scope. - :param name: the collection name, can be wildcard. :param request_id: the request id. + :param transform_id: the transform id. :param workload_id: the workload id. + :param scope: the collection scope. + :param name: the collection name, can be wildcard. :param relation_type: The relation_type of the request (input/output/log). :raise exceptions if it's not got successfully. """ path = os.path.join(self.CATALOG_BASEURL, 'collections') - if scope is None: - scope = 'null' - if name is None: - name = 'null' if request_id is None: request_id = 'null' + if transform_id is None: + transform_id = 'null' if workload_id is None: workload_id = 'null' + if scope is None: + scope = 'null' + if name is None: + name = 'null' if relation_type is None: relation_type = 'null' elif isinstance(relation_type, Enum): relation_type = relation_type.value - url = self.build_url(self.host, path=os.path.join(path, scope, name, str(request_id), str(workload_id), - str(relation_type))) + url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(transform_id), str(workload_id), + scope, name, str(relation_type))) collections = self.get_request_response(url, type='GET') return collections - def get_contents(self, coll_scope=None, coll_name=None, request_id=None, workload_id=None, - relation_type=None, status=None): + def get_contents(self, request_id=None, transform_id=None, workload_id=None, + coll_scope=None, coll_name=None, relation_type=None, status=None): """ Get contents from the Head service. - :param coll_scope: the collection scope. - :param coll_name: the collection name, can be wildcard. :param request_id: the request id. + :param transform_id: the transform id. :param workload_id: the workload id. + :param coll_scope: the collection scope. + :param coll_name: the collection name, can be wildcard. :param relation_type: the relation between the collection and the transform(input, output, log) :param status: The content status. :raise exceptions if it's not got successfully. """ path = os.path.join(self.CATALOG_BASEURL, 'contents') - if coll_scope is None: - coll_scope = 'null' - if coll_name is None: - coll_name = 'null' if request_id is None: request_id = 'null' + if transform_id is None: + transform_id = 'null' if workload_id is None: workload_id = 'null' + if coll_scope is None: + coll_scope = 'null' + if coll_name is None: + coll_name = 'null' if relation_type is None: relation_type = 'null' elif isinstance(relation_type, Enum): @@ -98,8 +104,8 @@ def get_contents(self, coll_scope=None, coll_name=None, request_id=None, workloa elif isinstance(status, Enum): status = status.value - url = self.build_url(self.host, path=os.path.join(path, coll_scope, coll_name, str(request_id), - str(workload_id), str(relation_type), str(status))) + url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(transform_id), str(workload_id), coll_scope, + coll_name, str(relation_type), str(status))) contents = self.get_request_response(url, type='GET') return contents diff --git a/client/lib/idds/client/clientmanager.py b/client/lib/idds/client/clientmanager.py index da467ec0..0a0bf602 100644 --- a/client/lib/idds/client/clientmanager.py +++ b/client/lib/idds/client/clientmanager.py @@ -835,6 +835,44 @@ def get_messages(self, request_id=None, workload_id=None): logging.info("Retrieved %s messages for request_id: %s, workload_id: %s" % (len(msgs), request_id, workload_id)) return (0, msgs) + @exception_handler + def get_collections(self, request_id=None, transform_id=None, workload_id=None, scope=None, name=None, relation_type=None): + """ + Get collections from the Head service. + + :param request_id: the request id. + :param transform_id: the transform id. + :param workload_id: the workload id. + :param scope: the scope. + :param name: the name. + :param relation_type: the relation type (input, output and log). + + :raise exceptions if it's not got successfully. + """ + self.setup_client() + + return self.client.get_collections(request_id=request_id, transform_id=transform_id, workload_id=workload_id, + scope=scope, name=name, relation_type=relation_type) + + def get_contents(self, request_id=None, transform_id=None, workload_id=None, coll_scope=None, coll_name=None, relation_type=None, status=None): + """ + Get contents from the Head service. + + :param request_id: the request id. + :param transform_id: the transform id. + :param workload_id: the workload id. + :param coll_scope: the scope of the related collection. + :param coll_name: the name of the related collection. + :param relation_type: the relation type (input, output and log). + :param status: the status of related contents. + + :raise exceptions if it's not got successfully. + """ + self.setup_client() + + return self.client.get_contents(request_id=request_id, transform_id=transform_id, workload_id=workload_id, + coll_scope=coll_scope, coll_name=coll_name, relation_type=relation_type, status=status) + @exception_handler def get_contents_output_ext(self, request_id=None, workload_id=None, transform_id=None, group_by_jedi_task_id=False): """ diff --git a/main/lib/idds/core/catalog.py b/main/lib/idds/core/catalog.py index f78b67e3..4adf1561 100644 --- a/main/lib/idds/core/catalog.py +++ b/main/lib/idds/core/catalog.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 """ @@ -324,8 +324,8 @@ def get_contents(coll_scope=None, coll_name=None, request_id=None, workload_id=N content_relation_type = ContentRelationType.Input elif relation_type == CollectionRelationType.Log: content_relation_type = ContentRelationType.Log - rets = orm_contents.get_contents(coll_id=coll_ids, status=status, to_json=to_json, - relation_type=content_relation_type, session=session) + rets = orm_contents.get_contents(request_id=request_id, transform_id=transform_id, coll_id=coll_ids, status=status, + to_json=to_json, relation_type=content_relation_type, session=session) else: rets = [] return rets diff --git a/main/lib/idds/orm/collections.py b/main/lib/idds/orm/collections.py index 9d17dec5..ddf5f8f8 100644 --- a/main/lib/idds/orm/collections.py +++ b/main/lib/idds/orm/collections.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2020 +# - Wen Guan, , 2019 - 2024 """ @@ -285,10 +285,6 @@ def get_collections(scope=None, name=None, request_id=None, workload_id=None, tr transform_id = [transform_id] query = session.query(models.Collection) - if scope: - query = query.filter(models.Collection.scope == scope) - if name: - query = query.filter(models.Collection.name.like(name.replace('*', '%'))) if request_id: query = query.filter(models.Collection.request_id == request_id) if workload_id: @@ -298,6 +294,11 @@ def get_collections(scope=None, name=None, request_id=None, workload_id=None, tr if relation_type is not None: query = query.filter(models.Collection.relation_type == relation_type) + if scope: + query = query.filter(models.Collection.scope == scope) + if name: + query = query.filter(models.Collection.name.like(name.replace('*', '%'))) + query = query.order_by(asc(models.Collection.updated_at)) tmp = query.all() diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index eec653dd..d4524616 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 """ @@ -279,7 +279,7 @@ def get_match_contents(coll_id, scope, name, content_type=None, min_id=None, max @read_session -def get_contents(scope=None, name=None, request_id=None, transform_id=None, coll_id=None, status=None, +def get_contents(scope=None, name=None, request_id=None, transform_id=None, workload_id=None, coll_id=None, status=None, relation_type=None, to_json=False, session=None): """ Get content or raise a NoObject exception. @@ -314,6 +314,8 @@ def get_contents(scope=None, name=None, request_id=None, transform_id=None, coll query = query.filter(models.Content.request_id == request_id) if transform_id: query = query.filter(models.Content.transform_id == transform_id) + if workload_id: + query = query.filter(models.Content.workload_id == workload_id) if coll_id: query = query.filter(models.Content.coll_id.in_(coll_id)) if scope: diff --git a/main/lib/idds/rest/v1/catalog.py b/main/lib/idds/rest/v1/catalog.py index 26a006bd..4592606f 100644 --- a/main/lib/idds/rest/v1/catalog.py +++ b/main/lib/idds/rest/v1/catalog.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2024 import traceback @@ -22,8 +22,8 @@ class Collections(IDDSController): """ Catalog """ - def get(self, scope, name, request_id, workload_id, relation_type): - """ Get collections by scope, name, request_id and workload_id. + def get(self, request_id, transform_id, workload_id, scope, name, relation_type): + """ Get collections by request_id, transform_id, workload_id, scope and name HTTP Success: 200 OK HTTP Error: @@ -33,25 +33,29 @@ def get(self, scope, name, request_id, workload_id, relation_type): """ try: - if scope in ['null', 'None']: - scope = None - if name in ['null', 'None']: - name = None if request_id in ['null', 'None']: request_id = None else: request_id = int(request_id) + if transform_id in ['null', 'None']: + transform_id = None + else: + transform_id = int(transform_id) if workload_id in ['null', 'None']: workload_id = None else: workload_id = int(workload_id) + if scope in ['null', 'None']: + scope = None + if name in ['null', 'None']: + name = None if relation_type in ['null', 'None']: relation_type = None else: relation_type = int(relation_type) - rets = get_collections(scope=scope, name=name, request_id=request_id, workload_id=workload_id, - relation_type=relation_type, to_json=False) + rets = get_collections(request_id=request_id, transform_id=transform_id, workload_id=workload_id, + scope=scope, name=name, relation_type=relation_type, to_json=False) except exceptions.NoObject as error: return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error) except exceptions.IDDSException as error: @@ -67,8 +71,8 @@ def get(self, scope, name, request_id, workload_id, relation_type): class Contents(IDDSController): """ Catalog """ - def get(self, coll_scope, coll_name, request_id, workload_id, relation_type, status): - """ Get contents by coll_scope, coll_name, request_id, workload_id and relation_type. + def get(self, request_id, transform_id, workload_id, coll_scope, coll_name, relation_type, status): + """ Get contents by request_id, transform_id, workload_id coll_scope, coll_name, relation_type and status. HTTP Success: 200 OK HTTP Error: @@ -86,6 +90,10 @@ def get(self, coll_scope, coll_name, request_id, workload_id, relation_type, sta request_id = None else: request_id = int(request_id) + if transform_id in ['null', 'None']: + transform_id = None + else: + transform_id = int(transform_id) if workload_id in ['null', 'None']: workload_id = None else: @@ -99,8 +107,8 @@ def get(self, coll_scope, coll_name, request_id, workload_id, relation_type, sta else: status = int(status) - rets = get_contents(coll_scope=coll_scope, coll_name=coll_name, request_id=request_id, - workload_id=workload_id, relation_type=relation_type, status=status, to_json=False) + rets = get_contents(request_id=request_id, transform_id=transform_id, workload_id=workload_id, coll_scope=coll_scope, + coll_name=coll_name, relation_type=relation_type, status=status, to_json=False) except exceptions.NoObject as error: return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error) except exceptions.IDDSException as error: @@ -196,11 +204,11 @@ def get_blueprint(): # catalog_view = Catalog.as_view('catalog') collections_view = Collections.as_view('collections') - bp.add_url_rule('/catalog/collections/////', + bp.add_url_rule('/catalog/collections//////', view_func=collections_view, methods=['get', ]) # get collections contents_view = Contents.as_view('contents') - bp.add_url_rule('/catalog/contents//////', + bp.add_url_rule('/catalog/contents///////', view_func=contents_view, methods=['get', ]) # get contents contents_ext_view = ContentsOutputExt.as_view('contents_output_ext') From 4bd131e0b03f69ebcbcf399df8f0bd80b8ef2d13 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:39:59 +0200 Subject: [PATCH 03/21] add condition status --- client/lib/idds/client/clientmanager.py | 49 ++++++++++++++++++------- common/lib/idds/common/constants.py | 16 ++++++++ 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/client/lib/idds/client/clientmanager.py b/client/lib/idds/client/clientmanager.py index 0a0bf602..f5e344b8 100644 --- a/client/lib/idds/client/clientmanager.py +++ b/client/lib/idds/client/clientmanager.py @@ -761,10 +761,10 @@ def download_logs(self, request_id=None, workload_id=None, dest_dir='./', filena filename = self.client.download_logs(request_id=request_id, workload_id=workload_id, dest_dir=dest_dir, filename=filename) if filename: logging.info("Logs are downloaded to %s" % filename) - return (0, "Logs are downloaded to %s" % filename) + return (True, "Logs are downloaded to %s" % filename) else: logging.info("Failed to download logs for workload_id(%s) and request_id(%s)" % (workload_id, request_id)) - return (-1, "Failed to download logs for workload_id(%s) and request_id(%s)" % (workload_id, request_id)) + return (False, "Failed to download logs for workload_id(%s) and request_id(%s)" % (workload_id, request_id)) @exception_handler def upload_to_cacher(self, filename): @@ -817,7 +817,26 @@ def update_hyperparameter(self, workload_id, request_id, id, loss): return self.client.update_hyperparameter(workload_id=workload_id, request_id=request_id, id=id, loss=loss) @exception_handler - def get_messages(self, request_id=None, workload_id=None): + def send_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None, msgs=None): + """ + Send messages. + + :param workload_id: the workload id. + :param request_id: the request. + """ + self.setup_client() + + if request_id is None and workload_id is None: + logging.error("Both request_id and workload_id are None. One of them should not be None") + return (-1, "Both request_id and workload_id are None. One of them should not be None") + + logging.info("Retrieving messages for request_id: %s, workload_id: %s" % (request_id, workload_id)) + self.client.send_messages(request_id=request_id, workload_id=workload_id, transform_id=transform_id, internal_id=internal_id, msgs=msgs) + logging.info("Sent %s messages for request_id: %s, workload_id: %s" % (len(msgs), request_id, workload_id)) + return True, None + + @exception_handler + def get_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None): """ Get messages. @@ -831,9 +850,9 @@ def get_messages(self, request_id=None, workload_id=None): return (-1, "Both request_id and workload_id are None. One of them should not be None") logging.info("Retrieving messages for request_id: %s, workload_id: %s" % (request_id, workload_id)) - msgs = self.client.get_messages(request_id=request_id, workload_id=workload_id) + msgs = self.client.get_messages(request_id=request_id, workload_id=workload_id, transform_id=transform_id, internal_id=internal_id) logging.info("Retrieved %s messages for request_id: %s, workload_id: %s" % (len(msgs), request_id, workload_id)) - return (0, msgs) + return (True, msgs) @exception_handler def get_collections(self, request_id=None, transform_id=None, workload_id=None, scope=None, name=None, relation_type=None): @@ -851,8 +870,9 @@ def get_collections(self, request_id=None, transform_id=None, workload_id=None, """ self.setup_client() - return self.client.get_collections(request_id=request_id, transform_id=transform_id, workload_id=workload_id, - scope=scope, name=name, relation_type=relation_type) + colls = self.client.get_collections(request_id=request_id, transform_id=transform_id, workload_id=workload_id, + scope=scope, name=name, relation_type=relation_type) + return True, colls def get_contents(self, request_id=None, transform_id=None, workload_id=None, coll_scope=None, coll_name=None, relation_type=None, status=None): """ @@ -870,8 +890,9 @@ def get_contents(self, request_id=None, transform_id=None, workload_id=None, col """ self.setup_client() - return self.client.get_contents(request_id=request_id, transform_id=transform_id, workload_id=workload_id, - coll_scope=coll_scope, coll_name=coll_name, relation_type=relation_type, status=status) + contents = self.client.get_contents(request_id=request_id, transform_id=transform_id, workload_id=workload_id, + coll_scope=coll_scope, coll_name=coll_name, relation_type=relation_type, status=status) + return True, contents @exception_handler def get_contents_output_ext(self, request_id=None, workload_id=None, transform_id=None, group_by_jedi_task_id=False): @@ -886,8 +907,9 @@ def get_contents_output_ext(self, request_id=None, workload_id=None, transform_i """ self.setup_client() - return self.client.get_contents_output_ext(workload_id=workload_id, request_id=request_id, transform_id=transform_id, - group_by_jedi_task_id=group_by_jedi_task_id) + contents = self.client.get_contents_output_ext(workload_id=workload_id, request_id=request_id, transform_id=transform_id, + group_by_jedi_task_id=group_by_jedi_task_id) + return True, contents @exception_handler def update_build_request(self, request_id, signature, workflow): @@ -902,7 +924,8 @@ def update_build_request(self, request_id, signature, workflow): """ self.setup_client() - return self.client.update_build_request(request_id=request_id, signature=signature, workflow=workflow) + ret = self.client.update_build_request(request_id=request_id, signature=signature, workflow=workflow) + return True, ret @exception_handler def get_metainfo(self, name): @@ -916,4 +939,4 @@ def get_metainfo(self, name): logging.info("Retrieving meta info for %s" % (name)) ret = self.client.get_metainfo(name=name) logging.info("Retrieved meta info for %s: %s" % (name, idds_mask(ret))) - return ret + return True, ret diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index a57d24ec..62167c10 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -199,6 +199,7 @@ class RequestType(IDDSEnum): Derivation = 5 iWorkflow = 6 iWorkflowLocal = 7 + GenericWorkflow = 8 Other = 99 @@ -214,6 +215,8 @@ class TransformType(IDDSEnum): Data = 8 iWorkflow = 9 iWork = 10 + GenericWorkflow = 11 + GenericWork = 12 Other = 99 @@ -241,6 +244,9 @@ class TransformStatus(IDDSEnum): Terminating = 20 Building = 21 Built = 22 + Queue = 23 + Throttling = 24 + WaitForTrigger = 25 class TransformLocking(IDDSEnum): @@ -248,6 +254,12 @@ class TransformLocking(IDDSEnum): Locking = 1 +class ConditionStatus(IDDSEnum): + New = 0 + WaitForTrigger = 1 + Triggered = 2 + + class CollectionType(IDDSEnum): Container = 0 Dataset = 1 @@ -401,6 +413,7 @@ class MessageType(IDDSEnum): HealthHeartbeat = 12 IDDSCommunication = 13 ContentExt = 14 + AsyncResult = 15 UnknownFile = 97 UnknownCollection = 98 UnknownWork = 99 @@ -425,6 +438,7 @@ class MessageTypeStr(IDDSEnum): UnknownCollection = 'collection_unknown' UnknownWork = 'work_unknown' ContentExt = 'content_ext' + AsyncResult = 'async_result' TransformType2MessageTypeMap = { @@ -462,6 +476,7 @@ class MessageStatus(IDDSEnum): Delivered = 2 Failed = 3 ConfirmDelivered = 4 + NoNeedDelivery = 5 class MessageLocking(IDDSEnum): @@ -486,6 +501,7 @@ class MessageDestination(IDDSEnum): Conductor = 4 Outside = 5 ContentExt = 6 + AsyncResult = 7 class CommandType(IDDSEnum): From a295ac5933b66a1ebb505414c8b5aad2e59bf898 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:40:34 +0200 Subject: [PATCH 04/21] add queue transform status --- common/lib/idds/common/event.py | 1 + 1 file changed, 1 insertion(+) diff --git a/common/lib/idds/common/event.py b/common/lib/idds/common/event.py index b598334c..9c107aec 100644 --- a/common/lib/idds/common/event.py +++ b/common/lib/idds/common/event.py @@ -42,6 +42,7 @@ class EventType(IDDSEnum): UpdateTransform = 21 AbortTransform = 22 ResumeTransform = 23 + QueueTransform = 24 NewProcessing = 30 UpdateProcessing = 31 From 882e9957e9e394a0315edf2775ea6568359140e4 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:41:13 +0200 Subject: [PATCH 05/21] add log information during import functions --- common/lib/idds/common/imports.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/common/lib/idds/common/imports.py b/common/lib/idds/common/imports.py index 45422370..5382bb74 100644 --- a/common/lib/idds/common/imports.py +++ b/common/lib/idds/common/imports.py @@ -9,6 +9,7 @@ # - Wen Guan, , 2019 - 2024 +import logging import importlib import inspect import os @@ -79,6 +80,9 @@ def import_func(name: str) -> Callable[..., Any]: Any: An attribute (normally a Callable) """ with add_cwd_path(): + logging.info(f"import_func cwd: {os.getcwd()}") + logging.info(f"import_func sys.path: {sys.path}") + logging.info(f"import_func PYTHONPATH: {os.environ.get('PYTHONPATH', None)}") filename, module_name_bits, attribute_bits = name.split(':') # module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]] if module_name_bits == '__main__': @@ -92,7 +96,8 @@ def import_func(name: str) -> Callable[..., Any]: module_name = '.'.join(module_name_bits) module = importlib.import_module(module_name) break - except ImportError: + except ImportError as ex: + logging.warn(f"import_func import error: {ex}") attribute_bits.insert(0, module_name_bits.pop()) if module is None: From 0264f19bcd1dba781fd184c558ce3633927e948d Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:41:56 +0200 Subject: [PATCH 06/21] improve create archive file to exclude files --- common/lib/idds/common/utils.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index 2247f45c..c7755b18 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -607,11 +607,11 @@ def new_funct(*args, **kwargs): except IDDSException as ex: logging.error(ex) # print(traceback.format_exc()) - return str(ex) + return False, str(ex) except Exception as ex: logging.error(ex) # print(traceback.format_exc()) - return str(ex) + return False, str(ex) return new_funct @@ -925,14 +925,23 @@ def create_archive_file(work_dir, archive_filename, files, exclude_files=[]): with tarfile.open(archive_filename, "w:gz", dereference=True) as tar: for local_file in files: if os.path.isfile(local_file): + if is_execluded_file(local_file, exclude_files): + continue # base_name = os.path.basename(local_file) tar.add(local_file, arcname=os.path.basename(local_file)) elif os.path.isdir(local_file): - for root, dirs, fs in os.walk(local_file): - for f in fs: - if not is_execluded_file(f, exclude_files): - file_path = os.path.join(root, f) - tar.add(file_path, arcname=os.path.relpath(file_path, local_file)) + for filename in os.listdir(local_file): + if is_execluded_file(filename, exclude_files): + continue + if os.path.isfile(filename): + file_path = os.path.join(local_file, filename) + tar.add(file_path, arcname=os.path.relpath(file_path, local_file)) + elif os.path.isdir(filename): + for root, dirs, fs in os.walk(filename): + for f in fs: + if not is_execluded_file(f, exclude_files): + file_path = os.path.join(root, f) + tar.add(file_path, arcname=os.path.relpath(file_path, local_file)) return archive_filename From 0509a3c29aa847ff3af6fcd930f7fd46802f6e79 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:44:05 +0200 Subject: [PATCH 07/21] improve message client and rest to store async result messages --- client/lib/idds/client/messageclient.py | 25 +++++++++---- main/lib/idds/rest/v1/messages.py | 48 ++++++++++++++++++------- 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/client/lib/idds/client/messageclient.py b/client/lib/idds/client/messageclient.py index 6f4ed541..eb03be36 100644 --- a/client/lib/idds/client/messageclient.py +++ b/client/lib/idds/client/messageclient.py @@ -35,12 +35,15 @@ def __init__(self, host=None, auth=None, timeout=None): """ super(MessageClient, self).__init__(host=host, auth=auth, timeout=timeout) - def send_message(self, request_id=None, workload_id=None, msg=None): + def send_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None, msgs=None): """ Send messages to the Head service. :param request_id: the request id. :param workload_id: the workload id. + :param transform_id: the transform id. + :param internal_id: the internal id. + :param msg: the list of message contents. :raise exceptions if it's not got successfully. """ @@ -49,20 +52,29 @@ def send_message(self, request_id=None, workload_id=None, msg=None): request_id = 'null' if workload_id is None: workload_id = 'null' - url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id))) + if transform_id is None: + transform_id = 'null' + if internal_id is None: + internal_id = 'null' + url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id), str(transform_id), str(internal_id))) - if msg is None: + if msgs is None: raise Exception("Message is None") - self.get_request_response(url, type='POST', data=msg) + + if type(msgs) not in (list, tuple): + msgs = [msgs] + self.get_request_response(url, type='POST', data=msgs) return None - def get_messages(self, request_id=None, workload_id=None): + def get_messages(self, request_id=None, workload_id=None, transform_id=None, internal_id=None): """ Get message from the Head service. :param request_id: the request id. :param workload_id: the workload id. + :param transform_id: the transform id. + :param internal_id: the internal id. :raise exceptions if it's not got successfully. """ @@ -71,7 +83,8 @@ def get_messages(self, request_id=None, workload_id=None): request_id = 'null' if workload_id is None: workload_id = 'null' - url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id))) + + url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id), str(transform_id), str(internal_id))) msgs = self.get_request_response(url, type='GET') diff --git a/main/lib/idds/rest/v1/messages.py b/main/lib/idds/rest/v1/messages.py index 26639d58..e41248cb 100644 --- a/main/lib/idds/rest/v1/messages.py +++ b/main/lib/idds/rest/v1/messages.py @@ -14,6 +14,7 @@ from flask import Blueprint from idds.common import exceptions +from idds.common.authentication import authenticate_is_super_user from idds.common.constants import (HTTP_STATUS_CODE, MessageType, MessageStatus, MessageSource, MessageDestination, CommandType, RequestStatus) @@ -27,7 +28,7 @@ class Message(IDDSController): """ Get message """ - def get(self, request_id, workload_id): + def get(self, request_id, workload_id, transform_id, internal_id): """ Get messages with given id. HTTP Success: 200 OK @@ -42,6 +43,10 @@ def get(self, request_id, workload_id): request_id = None if workload_id == 'null': workload_id = None + if transform_id == 'null': + transform_id = None + if internal_id == 'null': + internal_id = None if request_id is None: raise Exception("request_id should not be None") @@ -54,8 +59,8 @@ def get(self, request_id, workload_id): username = self.get_username() reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True) for req in reqs: - if req['username'] and req['username'] != username: - raise exceptions.AuthenticationNoPermission("User %s has no permission to update request %s" % (username, req['request_id'])) + if req['username'] and req['username'] != username and not authenticate_is_super_user(username): + raise exceptions.AuthenticationNoPermission("User %s has no permission to get messages from request %s" % (username, req['request_id'])) except exceptions.AuthenticationNoPermission as error: return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) except Exception as error: @@ -64,10 +69,15 @@ def get(self, request_id, workload_id): return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) try: - msgs = retrieve_messages(request_id=request_id, workload_id=workload_id) + msgs = retrieve_messages(request_id=request_id, workload_id=workload_id, transform_id=transform_id, internal_id=internal_id) rets = [] for msg in msgs: - rets.append(msg['msg_content']) + msg_content = msg['msg_content'] + if type(msg_content) in (list, tuple): + for msg_content_item in msg_content: + rets.append(msg_content_item) + else: + rets.append(msg_content) except exceptions.NoObject as error: return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error) except exceptions.IDDSException as error: @@ -79,7 +89,7 @@ def get(self, request_id, workload_id): return self.generate_http_response(HTTP_STATUS_CODE.OK, data=rets) - def post(self, request_id, workload_id): + def post(self, request_id, workload_id, transform_id, internal_id): """ Create Request. HTTP Success: 200 OK @@ -92,6 +102,10 @@ def post(self, request_id, workload_id): request_id = None if workload_id == 'null': workload_id = None + if transform_id == 'null': + transform_id = None + if internal_id == 'null': + internal_id = None if request_id is None: raise Exception("request_id should not be None") except Exception as error: @@ -103,8 +117,8 @@ def post(self, request_id, workload_id): username = self.get_username() reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True) for req in reqs: - if req['username'] and req['username'] != username: - raise exceptions.AuthenticationNoPermission("User %s has no permission to update request %s" % (username, req['request_id'])) + if req['username'] and req['username'] != username and not authenticate_is_super_user(username): + raise exceptions.AuthenticationNoPermission("User %s has no permission to send messages to request %s" % (username, req['request_id'])) except exceptions.AuthenticationNoPermission as error: return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) except Exception as error: @@ -114,9 +128,19 @@ def post(self, request_id, workload_id): try: msg = self.get_request().data and json_loads(self.get_request().data) - # command = msg['command'] - # parameters = msg['parameters'] - if 'command' in msg and msg['command'] in ['update_request', 'update_processing']: + if 'msg_type' in msg and msg['msg_type'] in ['async_result']: + msg['from_idds'] = 'true' + add_message(msg_type=MessageType.AsyncResult, + status=MessageStatus.New, + destination=MessageDestination.AsyncResult, + source=MessageSource.Rest, + request_id=request_id, + workload_id=workload_id, + transform_id=transform_id, + internal_id=internal_id, + num_contents=1, + msg_content=msg) + elif 'command' in msg and msg['command'] in ['update_request', 'update_processing']: status = msg['parameters']['status'] if status in [RequestStatus.ToCancel, RequestStatus.ToSuspend]: add_command(request_id=request_id, cmd_type=CommandType.AbortRequest, @@ -162,5 +186,5 @@ def get_blueprint(): bp = Blueprint('message', __name__) view = Message.as_view('message') - bp.add_url_rule('/message//', view_func=view, methods=['get', 'post']) + bp.add_url_rule('/message////', view_func=view, methods=['get', 'post']) return bp From 95eb239388191e93794cc7089aae5e64dc73d80c Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:45:15 +0200 Subject: [PATCH 08/21] improve receiver to receive async result messages --- main/lib/idds/agents/carrier/receiver.py | 31 ++++++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index 86d2b158..612bda5e 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -30,6 +30,7 @@ TriggerProcessingEvent) from .utils import handle_messages_processing +from .iutils import handle_messages_asyncresult setup_logging(__name__) @@ -100,7 +101,7 @@ def get_num_queued_messages(self): def get_output_messages(self): with self._lock: - msgs = [] + msgs = {} try: msg_size = 0 while not self.message_queue.empty(): @@ -108,7 +109,11 @@ def get_output_messages(self): if msg: if msg_size < 10: self.logger.debug("Received message(only log first 10 messages): %s" % str(msg)) - msgs.append(msg) + name = msg['name'] + body = msg['body'] + if name not in msgs: + msgs[name] = [] + msgs[name].append(body) msg_size += 1 if msg_size >= self.bulk_message_size: break @@ -116,7 +121,10 @@ def get_output_messages(self): self.logger.error("Failed to get output messages: %s, %s" % (error, traceback.format_exc())) if msgs: total_msgs = self.get_num_queued_messages() - self.logger.info("process_messages: Get %s messages, left %s messages" % (len(msgs), total_msgs)) + got_msgs = 0 + for name in msgs: + got_msgs += len(msgs[name]) + self.logger.info("process_messages: Get %s messages, left %s messages" % (got_msgs, total_msgs)) return msgs def is_selected(self): @@ -186,12 +194,25 @@ def handle_messages(self, output_messages, log_prefix): event.set_terminating() self.event_bus.send(event) + def handle_messages_asyncresult(self, output_messages, log_prefix): + handle_messages_asyncresult(output_messages, + logger=self.logger, + log_prefix=log_prefix, + update_processing_interval=self.update_processing_interval) + + def handle_messages_channels(self, output_messages, log_prefix): + for channel in output_messages: + if channel in ['asyncresult']: + self.handle_messages_asyncresult(output_messages[channel], log_prefix) + else: + self.handle_messages(output_messages[channel], log_prefix) + def process_messages(self, log_prefix=None): output_messages = self.get_output_messages() has_messages = False if output_messages: self.logger.info("process_messages: Received %s messages" % (len(output_messages))) - self.handle_messages(output_messages, log_prefix=log_prefix) + self.handle_messages_channels(output_messages, log_prefix=log_prefix) self.logger.info("process_messages: Handled %s messages" % len(output_messages)) has_messages = True return has_messages @@ -219,7 +240,7 @@ def process_messages_event(self, event): output_messages = event.get_message() if output_messages: self.logger.info("process_messages: Received %s messages" % (len(output_messages))) - self.handle_messages(output_messages, log_prefix=self.log_prefix) + self.handle_messages_channels(output_messages, log_prefix=self.log_prefix) self.logger.info("process_messages: Handled %s messages" % len(output_messages)) except Exception as ex: self.logger.error(ex) From 099b010e2e3a9d38149f60148d2e768607fa1fdc Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:46:18 +0200 Subject: [PATCH 09/21] functions to convert async result messages to contents --- main/lib/idds/agents/carrier/iutils.py | 98 +++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/main/lib/idds/agents/carrier/iutils.py b/main/lib/idds/agents/carrier/iutils.py index 0eff63d5..9f1ac167 100644 --- a/main/lib/idds/agents/carrier/iutils.py +++ b/main/lib/idds/agents/carrier/iutils.py @@ -8,12 +8,16 @@ # Authors: # - Wen Guan, , 2024 + import datetime import logging -from idds.common.constants import ProcessingStatus, CollectionStatus +from idds.common.constants import (ProcessingStatus, CollectionStatus, ContentStatus, + MessageType, MessageStatus, MessageSource, MessageDestination, + ContentType, ContentRelationType) from idds.common.utils import setup_logging -from idds.core import catalog as core_catalog +from idds.core import (catalog as core_catalog, messages as core_messages) +from idds.agents.common.cache.redis import get_redis_cache setup_logging(__name__) @@ -127,3 +131,93 @@ def sync_iprocessing(processing, agent_attributes, terminate=False, abort=False, processing['status'] = processing['substatus'] return processing, u_colls, None + + +def get_request_id_transform_id_collection_id_map(request_id, transform_id): + cache = get_redis_cache() + coll_tf_id_map_key = "req_id_trf_id_coll_id_map" + coll_tf_id_map = cache.get(coll_tf_id_map_key, default={}) + + if request_id is not None and transform_id is not None: + if request_id not in coll_tf_id_map or transform_id not in coll_tf_id_map[request_id]: + colls = core_catalog.get_collections_by_request_ids([request_id]) + for coll in colls: + if coll['request_id'] not in coll_tf_id_map: + coll_tf_id_map[coll['request_id']] = {} + if coll['transform_id'] not in coll_tf_id_map[coll['request_id']]: + coll_tf_id_map[coll['request_id']][coll['transform_id']] = {} + if coll['relation_type'].value not in coll_tf_id_map[coll['request_id']][coll['transform_id']]: + coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value] = [] + if coll['coll_id'] not in coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value]: + coll_tf_id_map[coll['request_id']][coll['transform_id']][coll['relation_type'].value].append[coll['coll_id']] + + cache.set(coll_tf_id_map_key, coll_tf_id_map) + + return coll_tf_id_map[request_id][transform_id] + return None + + +def get_new_asyncresult_content(request_id, transform_id, name, path, workload_id=0, coll_id=0, map_id=0, scope='asyncresult', + status=ContentStatus.Available, content_relation_type=ContentRelationType.Output): + content = {'transform_id': transform_id, + 'coll_id': coll_id, + 'request_id': request_id, + 'workload_id': workload_id, + 'map_id': map_id, + 'scope': scope, + 'name': name, + 'min_id': 0, + 'max_id': 0, + 'status': status, + 'substatus': status, + 'path': path, + 'content_type': ContentType.PseudoContent, + 'content_relation_type': content_relation_type, + 'bytes': 0} + return content + + +def handle_messages_asyncresult(messages, logger=None, log_prefix='', update_processing_interval=300): + logger = get_logger(logger) + if not log_prefix: + log_prefix = "" + + req_msgs = {} + + for msg in messages: + if 'from_idds' in msg and msg['from_idds']: + continue + + # ret = msg['ret'] + # key = msg['key'] + # internal_id = msg['internal_id'] + # msg_type = msg['type'] + request_id = msg['request_id'] + transform_id = msg.get('transform_id', 0) + internal_id = msg.get('internal_id', None) + # if msg_type in ['iworkflow']: + + if request_id not in req_msgs: + req_msgs[request_id] = {} + if transform_id not in req_msgs[request_id]: + req_msgs[request_id][transform_id] = {} + if internal_id not in req_msgs[request_id][transform_id]: + req_msgs[request_id][transform_id][internal_id] = [] + req_msgs[request_id][transform_id][internal_id].append(msg) + + for request_id in req_msgs: + for transform_id in req_msgs[request_id]: + for internal_id in req_msgs[request_id][transform_id]: + msgs = req_msgs[request_id][transform_id][internal_id] + core_messages.add_message(msg_type=MessageType.AsyncResult, + status=MessageStatus.NoNeedDelivery, + destination=MessageDestination.AsyncResult, + source=MessageSource.Outside, + request_id=request_id, + workload_id=None, + transform_id=transform_id, + internal_id=internal_id, + num_contents=len(msgs), + msg_content=msgs) + + logger.debug(f"{log_prefix} handle_messages_asyncresult, add {len(msgs)} for request_id {request_id} transform_id {transform_id} internal_id {internal_id}") From 751f5d592ef3ac3151dec5f0cb9fadc55d12f604 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:48:39 +0200 Subject: [PATCH 10/21] clerk to handle conditions and transforms in table --- main/lib/idds/agents/clerk/clerk.py | 140 +++++++++++++++++- .../idds/agents/common/plugins/messaging.py | 22 +-- 2 files changed, 150 insertions(+), 12 deletions(-) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 2d67d7ed..0850f4f6 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -16,7 +16,7 @@ from idds.common import exceptions from idds.common.constants import (Sections, ReturnCode, RequestType, RequestStatus, RequestLocking, - TransformType, WorkflowType, + TransformType, WorkflowType, ConditionStatus, TransformStatus, ProcessingStatus, ContentStatus, ContentRelationType, CommandType, CommandStatus, CommandLocking) @@ -447,15 +447,50 @@ def generate_transform(self, req, work, build=False, iworkflow=False): transform_type = TransformType.iWorkflow elif work_type in [WorkflowType.iWork]: transform_type = TransformType.iWork + elif work_type in [WorkflowType.GenericWorkflow]: + transform_type = TransformType.GenericWorkflow + elif work_type in [WorkflowType.GenericWork]: + transform_type = TransformType.GenericWork except Exception: pass + has_previous_conditions = None + try: + if hasattr(work, 'get_previous_conditions'): + work_previous_conditions = work.get_previous_conditions() + if work_previous_conditions: + has_previous_conditions = len(work_previous_conditions) + except Exception: + pass + + triggered_conditions = [] + untriggered_conditions = [] + try: + if hasattr(work, 'get_following_conditions'): + following_conditions = work.get_following_conditions() + for cond in following_conditions: + untriggered_conditions.append(cond) + except Exception: + pass + + loop_index = None + try: + if hasattr(work, 'get_loop_index'): + loop_index = work.get_loop_index() + except Exception: + pass + + # transform_status = TransformStatus.New + transform_status = TransformStatus.Queue + if has_previous_conditions: + transform_status = TransformStatus.WaitForTrigger + new_transform = {'request_id': req['request_id'], 'workload_id': req['workload_id'], 'transform_type': transform_type, 'transform_tag': work.get_work_tag(), 'priority': req['priority'], - 'status': TransformStatus.New, + 'status': transform_status, 'retries': 0, 'parent_transform_id': None, 'previous_transform_id': None, @@ -466,6 +501,12 @@ def generate_transform(self, req, work, build=False, iworkflow=False): 'max_update_retries': max_update_retries, # 'expired_at': req['expired_at'], 'expired_at': None, + 'internal_id': work.internal_id, + 'has_previous_conditions': has_previous_conditions, + 'triggered_conditions': triggered_conditions, + 'untriggered_conditions': untriggered_conditions, + 'loop_index': loop_index, + 'site': req['site'], 'transform_metadata': {'internal_id': work.get_internal_id(), 'template_work_id': work.get_template_work_id(), 'sequence_id': work.get_sequence_id(), @@ -478,6 +519,26 @@ def generate_transform(self, req, work, build=False, iworkflow=False): return new_transform + def generate_condition(self, req, cond): + previous_works = cond.previous_works + following_works = cond.following_works + previous_transforms, following_transforms = [], [] + previous_transforms = previous_works + following_transforms = following_works + + new_condition = {'request_id': req['request_id'], + 'internal_id': cond.internal_id, + 'status': ConditionStatus.WaitForTrigger, + 'substatus': None, + 'is_loop': False, + 'loop_index': None, + 'cloned_from': None, + 'evaluate_result': None, + 'previous_transforms': previous_transforms, + 'following_transforms': following_transforms, + 'condition': {'condition': cond}} + return new_condition + def get_num_active_requests(self, site_name): cache = get_redis_cache() num_requests = cache.get("num_requests", default=None) @@ -788,6 +849,71 @@ def handle_new_irequest(self, req): self.logger.warn(log_pre + "Handle new irequest error result: %s" % str(ret_req)) return ret_req + def handle_new_generic_request(self, req): + try: + log_pre = self.get_log_prefix(req) + self.logger.info(log_pre + "Handle new generic request") + to_throttle = self.whether_to_throttle(req) + if to_throttle: + ret_req = {'request_id': req['request_id'], + 'parameters': {'status': RequestStatus.Throttling, + 'locking': RequestLocking.Idle}} + ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True) + self.logger.info(log_pre + "Throttle new generic request result: %s" % str(ret_req)) + else: + workflow = req['request_metadata']['workflow'] + + transforms = [] + works = workflow.get_works() + for w in works: + # todo + # set has_previous_conditions, has_conditions, all_conditions_triggered(False), cloned_from(None) + transform = self.generate_transform(req, w) + if transform: + transforms.append(transform) + self.logger.debug(log_pre + f"Processing request({req['request_id']}): new transforms: {transforms}") + + conds = workflow.get_conditions() + conditions = [] + for cond in conds: + condition = self.generate_condition(req, cond) + if condition: + conditions.append(condition) + self.logger.debug(log_pre + f"Processing request({req['request_id']}), new conditions: {conditions}") + + ret_req = {'request_id': req['request_id'], + 'parameters': {'status': RequestStatus.Transforming, + 'locking': RequestLocking.Idle}, + 'new_transforms': transforms, + 'new_conditions': conditions} + ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters']) + self.logger.info(log_pre + "Handle new generic request result: %s" % str(ret_req)) + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + retries = req['new_retries'] + 1 + if not req['max_new_retries'] or retries < req['max_new_retries']: + req_status = req['status'] + else: + req_status = RequestStatus.Failed + + # increase poll period + new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate) + if new_poll_period > self.max_new_poll_period: + new_poll_period = self.max_new_poll_period + + error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}} + + ret_req = {'request_id': req['request_id'], + 'parameters': {'status': req_status, + 'locking': RequestLocking.Idle, + 'new_retries': retries, + 'new_poll_period': new_poll_period, + 'errors': req['errors'] if req['errors'] else {}}} + ret_req['parameters']['errors'].update(error) + self.logger.warn(log_pre + "Handle new irequest error result: %s" % str(ret_req)) + return ret_req + def has_to_build_work(self, req): try: if req['status'] in [RequestStatus.New] and 'build_workflow' in req['request_metadata']: @@ -872,6 +998,11 @@ def update_request(self, req): else: update_transforms = {} + if 'new_conditions' in req: + new_conditions = req['new_conditions'] + else: + new_conditions = [] + retry = True retry_num = 0 while retry: @@ -880,7 +1011,8 @@ def update_request(self, req): try: _, new_tf_ids, update_tf_ids = core_requests.update_request_with_transforms(req['request_id'], req['parameters'], new_transforms=new_transforms, - update_transforms=update_transforms) + update_transforms=update_transforms, + new_conditions=new_conditions) except exceptions.DatabaseException as ex: if 'ORA-00060' in str(ex): self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource") @@ -933,6 +1065,8 @@ def process_new_request(self, event): ret = self.handle_build_request(req) elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: ret = self.handle_new_irequest(req) + elif req['request_type'] in [RequestType.GenericWorkflow]: + ret = self.handle_new_generic_request(req) else: ret = self.handle_new_request(req) new_tf_ids, update_tf_ids = self.update_request(ret) diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index ace76953..a594a479 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -29,11 +29,14 @@ class MessagingListener(stomp.ConnectionListener): ''' Messaging Listener ''' - def __init__(self, broker, output_queue, logger=None): + def __init__(self, broker, output_queue, name=None, logger=None): ''' __init__ ''' - self.name = "MessagingListener" + # self.name = "MessagingListener" + self.name = name + if not self.name: + self.name = 'default' self.__broker = broker self.__output_queue = output_queue # self.logger = logging.getLogger(self.__class__.__name__) @@ -49,8 +52,8 @@ def on_error(self, frame): self.logger.error('[broker] [%s]: %s', self.__broker, frame.body) def on_message(self, frame): - self.logger.debug('[broker] [%s]: %s', self.__broker, frame.body) - self.__output_queue.put(frame.body) + self.logger.debug('[broker] %s [%s]: %s', self.name, self.__broker, frame.body) + self.__output_queue.put({'name': self.name, 'msg': frame.body}) pass @@ -251,12 +254,13 @@ def __call__(self): class MessagingReceiver(MessagingSender): def __init__(self, name="MessagingReceiver", logger=None, **kwargs): super(MessagingReceiver, self).__init__(name=name, logger=logger, **kwargs) - self.listener = None + self.listener = {} self.receiver_conns = [] - def get_listener(self, broker): + def get_listener(self, broker, name): if self.listener is None: - self.listener = MessagingListener(broker, self.output_queue, logger=self.logger) + self.listener = {} + self.listener[name] = MessagingListener(broker, self.output_queue, name=name, logger=self.logger) return self.listener def subscribe(self): @@ -264,8 +268,8 @@ def subscribe(self): for name in self.receiver_conns: for conn in self.receiver_conns[name]: - self.logger.info('connecting to %s' % conn.transport._Transport__host_and_ports[0][0]) - conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0])) + self.logger.info(f'connecting to {name}: {conn.transport._Transport__host_and_ports[0][0]}') + conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0], name)) conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True) conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto') From cc52470d05cc4d54d9e646e13a77826b276d6f43 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:49:46 +0200 Subject: [PATCH 11/21] transformer to handle condition triggering --- .../idds/agents/transformer/transformer.py | 275 +++++++++++++++++- 1 file changed, 273 insertions(+), 2 deletions(-) diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index da6a014d..a9e0e7fa 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -20,11 +20,14 @@ CollectionType, CollectionStatus, CollectionRelationType, CommandType, ProcessingStatus, WorkflowType, + ConditionStatus, get_processing_type_from_transform_type, get_transform_status_from_processing_status) from idds.common.utils import setup_logging, truncate_string from idds.core import (transforms as core_transforms, - processings as core_processings) + processings as core_processings, + throttlers as core_throttlers, + conditions as core_conditions) from idds.agents.common.baseagent import BaseAgent from idds.agents.common.eventbus.event import (EventType, NewTransformEvent, @@ -35,6 +38,8 @@ NewProcessingEvent, UpdateProcessingEvent) +from idds.agents.common.cache.redis import get_redis_cache + setup_logging(__name__) @@ -106,6 +111,114 @@ def show_queue_size(self): q_str = "number of transforms: %s, max number of transforms: %s" % (self.number_workers, self.max_number_workers) self.logger.debug(q_str) + def get_throttlers(self): + cache = get_redis_cache() + throttlers = cache.get("throttlers", default=None) + if throttlers is None: + throttler_items = core_throttlers.get_throttlers() + throttlers = {} + for item in throttler_items: + throttlers[item['site']] = {'num_requests': item['num_requests'], + 'num_transforms': item['num_transforms'], + 'num_processings': item['num_processings'], + 'new_contents': item['new_contents'], + 'queue_contents': item['queue_contents'], + 'others': item['others'], + 'status': item['status']} + cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds) + return throttlers + + def whether_to_throttle(self, transform): + try: + site = transform['site'] + if site is None: + site = 'Default' + throttlers = self.get_throttlers() + num_transforms = self.get_num_active_transforms(site) + num_processings, active_transforms = self.get_num_active_processings(site) + num_input_contents, num_output_contents = self.get_num_active_contents(site, active_transforms) + self.logger.info("throttler(site: %s): transforms(%s), processings(%s)" % (site, num_transforms, num_processings)) + self.logger.info("throttler(site: %s): active input contents(%s), output contents(%s)" % (site, num_input_contents, num_output_contents)) + + throttle_transforms = throttlers.get(site, {}).get('num_transforms', None) + throttle_processings = throttlers.get(site, {}).get('num_processings', None) + throttle_new_jobs = throttlers.get(site, {}).get('new_contents', None) + throttle_queue_jobs = throttlers.get(site, {}).get('queue_contents', None) + self.logger.info("throttler(site: %s): throttle_transforms: %s, throttle_processings: %s" % (site, throttle_transforms, throttle_processings)) + if throttle_transforms: + if num_transforms['processing'] >= throttle_transforms: + self.logger.info("throttler(site: %s): num of processing transforms (%s) is bigger than throttle_transforms (%s), set throttling" % (site, num_transforms['processing'], throttle_transforms)) + return True + if throttle_processings: + if num_processings['processing'] >= throttle_processings: + self.logger.info("throttler(site: %s): num of processing processings (%s) is bigger than throttle_processings (%s), set throttling" % (site, num_processings['processing'], throttle_processings)) + return True + + new_jobs = num_input_contents['new'] + released_jobs = num_input_contents['processed'] + terminated_jobs = num_output_contents['processed'] + queue_jobs = released_jobs - terminated_jobs + + self.logger.info("throttler(site: %s): throttle_new_jobs: %s, throttle_queue_jobs: %s" % (site, throttle_new_jobs, throttle_queue_jobs)) + self.logger.info("throttler(site: %s): new_jobs: %s, queue_jobs: %s" % (site, new_jobs, queue_jobs)) + if throttle_new_jobs: + if new_jobs >= throttle_new_jobs: + self.logger.info("throttler(site: %s): num of new jobs(not released) (%s) is bigger than throttle_new_jobs (%s), set throttling" % (site, new_jobs, throttle_new_jobs)) + return True + if throttle_queue_jobs: + if queue_jobs >= throttle_queue_jobs: + self.logger.info("throttler(site: %s): num of queue jobs(released but not terminated) (%s) is bigger than throttle_queue_jobs (%s), set throttling" % (site, queue_jobs, throttle_queue_jobs)) + return True + + return False + except Exception as ex: + self.logger.error("whether_to_throttle: %s" % str(ex)) + self.logger.error(traceback.format_exc()) + return False + + def get_queue_transforms(self): + """ + Get queue transforms to set them to new if the throttler is ok. + """ + try: + if not self.is_ok_to_run_more_transforms(): + return [] + + self.show_queue_size() + + if BaseAgent.min_request_id is None: + return [] + + transform_status = [TransformStatus.Queue, TransformStatus.Throttling] + # next_poll_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_period) + transforms_q = core_transforms.get_transforms_by_status(status=transform_status, locking=True, + not_lock=True, order_by_fifo=True, + new_poll=True, only_return_id=True, + min_request_id=BaseAgent.min_request_id, + bulk_size=self.retrieve_bulk_size) + + # self.logger.debug("Main thread get %s New+Ready+Extend transforms to process" % len(transforms_new)) + if transforms_q: + self.logger.info("Main thread get queued transforms to process: %s" % str(transforms_q)) + for tf in transforms_q: + to_throttle = self.whether_to_throttle(tf) + transform_parameters = {'locking': TransformLocking.Idle} + parameters = self.load_poll_period(tf, transform_parameters) + if to_throttle: + parameters['status'] = TransformStatus.Throttling + else: + parameters['status'] = TransformStatus.New + core_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters) + + except exceptions.DatabaseException as ex: + if 'ORA-00060' in str(ex): + self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource") + else: + # raise ex + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + return [] + def get_new_transforms(self): """ Get new transforms to process @@ -122,7 +235,7 @@ def get_new_transforms(self): transform_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend] # next_poll_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_period) transforms_new = core_transforms.get_transforms_by_status(status=transform_status, locking=True, - not_lock=True, + not_lock=True, order_by_fifo=True, new_poll=True, only_return_id=True, min_request_id=BaseAgent.min_request_id, bulk_size=self.retrieve_bulk_size) @@ -212,6 +325,83 @@ def load_poll_period(self, transform, parameters): parameters['update_poll_period'] = self.update_poll_period return parameters + def trigger_condition(self, request_id, condition): + update_condition = {} + update_transforms = [] + cond = core_conditions.load_condition(condition) + is_triggered, is_updated, ret = cond.evaluate() + if is_triggered or is_updated: + update_condition['condition_id'] = cond.condition_id + update_condition['previous_works'] = cond.previous_works # previous_works = {'internal_id': <>, 'status': } + + if is_triggered: + update_condition['condition_id'] = cond.condition_id + update_condition['status'] = ConditionStatus.Triggered + update_condition['evaluate_result'] = cond.result + + triggered_works = cond.get_triggered_works(ret) + if triggered_works: + internal_ids = [w['internal_id'] for w in triggered_works] + triggered_transforms = core_transforms.get_transforms(request_id=request_id, + loop_index=condition['loop_index'], + internal_ids=internal_ids) + for tf in triggered_transforms: + if tf['status'] in [TransformStatus.WaitForTrigger]: + # change transform status from WaitForTrigger to New + u_transform = {'transform_id': tf['transform_id'], 'status': TransformStatus.New} + update_transforms.append(u_transform) + return is_triggered, is_updated, update_condition, update_transforms + + def evaluate_conditions(self, transform): + if not transform['has_conditons']: + return + + update_conditions = [] + all_update_triggered_transforms = [] + update_current_transform = None + + loop_index = transform['loop_index'] + triggered_conditions = transform['triggered_conditions'] + untriggered_conditions = transform['untriggered_conditions'] + + new_triggered_conditions = [] + u_cond_ids = [u_cond['internal_id'] for u_cond in untriggered_conditions] + conditions = core_conditions.get_condtions(request_id=transform['request_id'], internal_ids=u_cond_ids, loop_index=loop_index) + cond_dict = {} + for cond in conditions: + if (loop_index is None and cond['loop_index'] is None) or (loop_index == cond['loop_index']): + cond_dict[cond['internal_id']] = cond + for u_cond in untriggered_conditions: + cond = cond_dict[u_cond['internal_id']] + if cond['status'] not in [ConditionStatus.WaitForTrigger]: + ret = self.trigger_condition(request_id=transform['request_id'], condition=cond) + is_triggered, is_updated, update_condition, update_triggered_transforms = ret + if is_triggered or is_updated: + # is_triggered: the condition is triggered + # is_updated: the condition has multiple previous items. The item related to current transform is updated to ok, + # waiting for the other item to be ok. + new_triggered_conditions.append(u_cond) + if update_condition: + update_conditions.append(update_condition) + if update_triggered_transforms: + all_update_triggered_transforms = all_update_triggered_transforms + update_triggered_transforms + else: + new_triggered_conditions.append(u_cond) + if new_triggered_conditions: + new_triggered_conditions_dict = {new_cond['internal_id']: new_cond for new_cond in new_triggered_conditions} + untriggered_conditions_copy = copy.deepcopy(untriggered_conditions) + untriggered_conditions = [] + for u_cond in untriggered_conditions_copy: + if u_cond['internal_id'] in new_triggered_conditions_dict: + triggered_conditions.append(u_cond) + else: + untriggered_conditions.append(u_cond) + update_current_transform = {'transform_id': transform['transform_id'], + 'triggered_conditions': triggered_conditions, + 'untriggered_conditions': untriggered_conditions} + # return new_triggered_conditions, triggered_conditions, untriggered_conditions + return update_current_transform, update_conditions, all_update_triggered_transforms + def generate_processing_model(self, transform): new_processing_model = {} new_processing_model['transform_id'] = transform['transform_id'] @@ -416,6 +606,43 @@ def handle_new_itransform(self, transform): self.logger.info(log_pre + "handle_new_itransform exception result: %s" % str(ret)) return ret + def handle_new_generic_transform(self, transform): + """ + Process new transform + """ + try: + log_pre = self.get_log_prefix(transform) + if transform['transform_type'] in [TransformType.GenericWorkflow]: + ret = self.handle_new_generic_transform_real(transform) + elif transform['transform_type'] in [TransformType.GenericWork]: + ret = self.handle_new_generic_transform_real(transform) + self.logger.info(log_pre + "handle_new_generic_transform result: %s" % str(ret)) + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + retries = transform['new_retries'] + 1 + if not transform['max_new_retries'] or retries < transform['max_new_retries']: + tf_status = transform['status'] + else: + tf_status = TransformStatus.Failed + + # increase poll period + new_poll_period = int(transform['new_poll_period'].total_seconds() * self.poll_period_increase_rate) + if new_poll_period > self.max_new_poll_period: + new_poll_period = self.max_new_poll_period + + error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}} + + transform_parameters = {'status': tf_status, + 'new_retries': retries, + 'new_poll_period': new_poll_period, + 'errors': transform['errors'] if transform['errors'] else {}, + 'locking': TransformLocking.Idle} + transform_parameters['errors'].update(error) + ret = {'transform': transform, 'transform_parameters': transform_parameters} + self.logger.info(log_pre + "handle_new_generic_transform exception result: %s" % str(ret)) + return ret + def update_transform(self, ret): new_pr_ids, update_pr_ids = [], [] try: @@ -502,6 +729,8 @@ def process_new_transform(self, event): self.logger.info(log_pre + "process_new_transform") if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]: ret = self.handle_new_itransform(tf) + elif tf['transform_type'] in [TransformType.GenericWorkflow, TransformType.GenericWork]: + ret = self.handle_new_generic_transform(tf) else: ret = self.handle_new_transform(tf) self.logger.info(log_pre + "process_new_transform result: %s" % str(ret)) @@ -731,6 +960,44 @@ def handle_update_itransform(self, transform, event): self.logger.warn(log_pre + "handle_update_itransform exception result: %s" % str(ret)) return ret, False, None + def handle_update_generic_transform(self, transform, event): + """ + Process running transform + """ + try: + log_pre = self.get_log_prefix(transform) + + self.logger.info(log_pre + "handle_update_generic_transform: %s" % transform) + ret, is_terminated, ret_processing_id = self.handle_update_generic_transform_real(transform, event) + self.logger.info(log_pre + "handle_update_generic_transform result: %s" % str(ret)) + return ret, is_terminated, ret_processing_id + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + + retries = transform['update_retries'] + 1 + if not transform['max_update_retries'] or retries < transform['max_update_retries']: + tf_status = transform['status'] + else: + tf_status = TransformStatus.Failed + error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}} + + # increase poll period + update_poll_period = int(transform['update_poll_period'].total_seconds() * self.poll_period_increase_rate) + if update_poll_period > self.max_update_poll_period: + update_poll_period = self.max_update_poll_period + + transform_parameters = {'status': tf_status, + 'update_retries': retries, + 'update_poll_period': update_poll_period, + 'errors': transform['errors'] if transform['errors'] else {}, + 'locking': TransformLocking.Idle} + transform_parameters['errors'].update(error) + + ret = {'transform': transform, 'transform_parameters': transform_parameters} + self.logger.warn(log_pre + "handle_update_generic_transform exception result: %s" % str(ret)) + return ret, False, None + def process_update_transform(self, event): self.number_workers += 1 pro_ret = ReturnCode.Ok.value @@ -752,6 +1019,8 @@ def process_update_transform(self, event): if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]: ret, is_terminated, ret_processing_id = self.handle_update_itransform(tf, event) + elif tf['transform_type'] in [TransformType.GenericWorkflow, TransformType.GenericWork]: + ret, is_terminated, ret_processing_id = self.handle_update_generic_transform(tf, event) else: ret, is_terminated, ret_processing_id = self.handle_update_transform(tf, event) new_pr_ids, update_pr_ids = self.update_transform(ret) @@ -1073,6 +1342,8 @@ def run(self): self.init_event_function_map() + task = self.create_task(task_func=self.get_queue_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1) + self.add_task(task) task = self.create_task(task_func=self.get_new_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1) self.add_task(task) task = self.create_task(task_func=self.get_running_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1) From 696526a384d67249700651feddc9b7a4f6d415ac Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 5 Aug 2024 17:51:12 +0200 Subject: [PATCH 12/21] transformer to handle condition triggering and queue transforms --- main/lib/idds/agents/transformer/transformer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index a9e0e7fa..6addd172 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -1220,6 +1220,7 @@ def handle_resume_itransform(self, transform, event): if pr['processing_id'] == transform['current_processing_id']: pr_found = pr break + if pr_found: self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % pr['processing_id']) event = ResumeProcessingEvent(publisher_id=self.id, From bc7cfb18b75026a2e0b8afc7398120bbb1bc97eb Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:17:41 +0200 Subject: [PATCH 13/21] update models: add conditions and compaign --- main/lib/idds/orm/base/models.py | 39 ++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index 5699ddae..fb228d8a 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -160,6 +160,9 @@ class Request(BASE, ModelBase): new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) site = Column(String(50)) + campaign = Column(String(50)) + campaign_group = Column(String(250)) + campaign_tag = Column(String(20)) errors = Column(JSONString(1024)) _request_metadata = Column('request_metadata', JSON()) _processing_metadata = Column('processing_metadata', JSON()) @@ -290,6 +293,7 @@ class Transform(BASE, ModelBase): workload_id = Column(Integer()) transform_type = Column(EnumWithValue(TransformType), nullable=False) transform_tag = Column(String(20)) + internal_id = Column(String(20)) priority = Column(Integer()) safe2get_output_from_input = Column(Integer()) status = Column(EnumWithValue(TransformStatus), nullable=False) @@ -314,6 +318,11 @@ class Transform(BASE, ModelBase): update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) site = Column(String(50)) name = Column(String(NAME_LENGTH)) + has_previous_conditions = Column(Integer()) + loop_index = Column(Integer()) + cloned_from = Column(BigInteger()) + triggered_conditions = Column('triggered_conditions', JSON()) + untriggered_conditions = Column('untriggered_conditions', JSON()) errors = Column(JSONString(1024)) _transform_metadata = Column('transform_metadata', JSON()) _running_metadata = Column('running_metadata', JSON()) @@ -729,6 +738,7 @@ class Message(BASE, ModelBase): workload_id = Column(Integer()) transform_id = Column(Integer()) processing_id = Column(Integer()) + internal_id = Column(String(20)) num_contents = Column(Integer()) retries = Column(Integer(), default=0) fetching_id = Column(Integer()) @@ -943,6 +953,31 @@ class MetaInfo(BASE, ModelBase): UniqueConstraint('name', name='METAINFO_NAME_UQ')) +class Condition(BASE, ModelBase): + """Represents the conditions""" + __tablename__ = 'conditions' + condition_id = Column(BigInteger().with_variant(Integer, "sqlite"), + Sequence('CONDITION_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), + primary_key=True) + request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False) + internal_id = Column(String(20)) + name = Column(String(250)) + status = Column(EnumWithValue(CommandStatus), nullable=False) + substatus = Column(Integer()) + is_loop = Column(Integer()) + loop_index = Column(Integer()) + cloned_from = Column(BigInteger().with_variant(Integer, "sqlite")) + created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False) + updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) + evaluate_result = Column(String(1000)) + previous_transforms = Column(JSON()) + following_transforms = Column(JSON()) + condition = Column("condition", JSON()) + + __table_args__ = (PrimaryKeyConstraint('condition_id', name='CONDITION_PK'), + UniqueConstraint('request_id', 'internal_id', name='CONDITION_ID_UQ')) + + def create_trigger(): func = DDL(""" SET search_path TO %s; @@ -1086,7 +1121,7 @@ def register_models(engine): """ # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) - models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler) + models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler, MetaInfo, Condition) create_proc_to_update_contents() @@ -1101,7 +1136,7 @@ def unregister_models(engine): """ # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) - models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler) + models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler, MetaInfo, Condition) drop_proc_to_update_contents() From c4ef575d42937aae2261db8ba5203586bc9a4420 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:18:38 +0200 Subject: [PATCH 14/21] add internal_id in messages --- main/lib/idds/core/messages.py | 5 +++-- main/lib/idds/orm/messages.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/main/lib/idds/core/messages.py b/main/lib/idds/core/messages.py index f33a0176..457d6803 100644 --- a/main/lib/idds/core/messages.py +++ b/main/lib/idds/core/messages.py @@ -22,7 +22,7 @@ @transactional_session def add_message(msg_type, status, source, request_id, workload_id, transform_id, - num_contents, msg_content, bulk_size=None, processing_id=0, + num_contents, msg_content, internal_id=None, bulk_size=None, processing_id=0, destination=MessageDestination.Outside, session=None): """ Add a message to be submitted asynchronously to a message broker. @@ -37,7 +37,8 @@ def add_message(msg_type, status, source, request_id, workload_id, transform_id, request_id=request_id, workload_id=workload_id, transform_id=transform_id, num_contents=num_contents, destination=destination, processing_id=processing_id, - bulk_size=bulk_size, msg_content=msg_content, session=session) + internal_id=internal_id, bulk_size=bulk_size, + msg_content=msg_content, session=session) @transactional_session diff --git a/main/lib/idds/orm/messages.py b/main/lib/idds/orm/messages.py index 0412e0a7..da619921 100644 --- a/main/lib/idds/orm/messages.py +++ b/main/lib/idds/orm/messages.py @@ -29,7 +29,7 @@ @transactional_session def add_message(msg_type, status, source, request_id, workload_id, transform_id, - num_contents, msg_content, bulk_size=None, processing_id=None, + num_contents, msg_content, internal_id=None, bulk_size=None, processing_id=None, destination=MessageDestination.Outside, session=None): """ Add a message to be submitted asynchronously to a message broker. @@ -69,7 +69,7 @@ def add_message(msg_type, status, source, request_id, workload_id, transform_id, for msg_content, num_contents in zip(msg_content_list, num_contents_list): new_message = {'msg_type': msg_type, 'status': status, 'request_id': request_id, 'workload_id': workload_id, 'transform_id': transform_id, - 'source': source, 'num_contents': num_contents, + 'internal_id': internal_id, 'source': source, 'num_contents': num_contents, 'destination': destination, 'processing_id': processing_id, 'locking': 0, 'msg_content': msg_content} msgs.append(new_message) From 64ecb7f00b5fe173ce25b6f39a41543b0d81c954 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:19:48 +0200 Subject: [PATCH 15/21] add compaign and condition in core request functions --- main/lib/idds/core/requests.py | 16 +++++++++++++--- main/lib/idds/orm/requests.py | 4 ++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/main/lib/idds/core/requests.py b/main/lib/idds/core/requests.py index d13c3199..4b4c3189 100644 --- a/main/lib/idds/core/requests.py +++ b/main/lib/idds/core/requests.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 """ @@ -24,6 +24,7 @@ from idds.orm import transforms as orm_transforms from idds.orm import workprogress as orm_workprogresses from idds.orm import collections as orm_collections +from idds.orm import conditions as orm_conditions from idds.orm import messages as orm_messages from idds.orm import meta as orm_meta from idds.core import messages as core_messages @@ -35,6 +36,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None, lifetime=None, workload_id=None, request_metadata=None, new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, + campaign=None, campaign_group=None, campaign_tag=None, processing_metadata=None): """ Add a request. @@ -65,7 +67,8 @@ def create_request(scope=None, name=None, requester=None, request_type=None, 'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period, 'new_retries': new_retries, 'update_retries': update_retries, 'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries, - 'site': site, + 'site': site, 'campaign': campaign, 'campaign_group': campaign_group, + 'campaign_tag': campaign_tag, 'request_metadata': request_metadata, 'processing_metadata': processing_metadata} return orm_requests.create_request(**kwargs) @@ -77,6 +80,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None, lifetime=None, workload_id=None, request_metadata=None, new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, + campaign=None, campaign_group=None, campaign_tag=None, processing_metadata=None, session=None): """ Add a request. @@ -113,6 +117,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None, 'priority': priority, 'lifetime': lifetime, 'workload_id': workload_id, 'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period, 'new_retries': new_retries, 'update_retries': update_retries, + 'campaign': campaign, 'campaign_group': campaign_group, 'campaign_tag': campaign_tag, 'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries, 'request_metadata': request_metadata, 'processing_metadata': processing_metadata, 'session': session} @@ -290,7 +295,8 @@ def generate_collections(transform): @transactional_session def update_request_with_transforms(request_id, parameters, new_transforms=None, update_transforms=None, - new_messages=None, update_messages=None, session=None): + new_messages=None, update_messages=None, + new_conditions=None, session=None): """ update an request. @@ -349,6 +355,10 @@ def update_request_with_transforms(request_id, parameters, orm_messages.add_messages(new_messages, session=session) if update_messages: orm_messages.update_messages(update_messages, session=session) + + if new_conditions: + orm_conditions.add_conditions(new_conditions, session=session) + return orm_requests.update_request(request_id, parameters, session=session), new_tf_ids, update_tf_ids diff --git a/main/lib/idds/orm/requests.py b/main/lib/idds/orm/requests.py index c9a999e9..7cff7b89 100644 --- a/main/lib/idds/orm/requests.py +++ b/main/lib/idds/orm/requests.py @@ -33,6 +33,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None, lifetime=None, workload_id=None, request_metadata=None, new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, + campaign=None, campaign_group=None, campaign_tag=None, processing_metadata=None): """ Create a request. @@ -83,6 +84,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None, expired_at=expired_at, site=site, new_retries=new_retries, update_retries=update_retries, max_new_retries=max_new_retries, max_update_retries=max_update_retries, + campaign=campaign, campaign_group=campaign_group, campaign_tag=campaign_tag, request_metadata=request_metadata, processing_metadata=processing_metadata) if new_poll_period: new_poll_period = datetime.timedelta(seconds=new_poll_period) @@ -100,6 +102,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None, lifetime=None, workload_id=None, request_metadata=None, new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, + campaign=None, campaign_group=None, campaign_tag=None, processing_metadata=None, session=None): """ Add a request. @@ -132,6 +135,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None, update_poll_period=update_poll_period, new_retries=new_retries, update_retries=update_retries, max_new_retries=max_new_retries, max_update_retries=max_update_retries, + campaign=campaign, campaign_group=campaign_group, campaign_tag=campaign_tag, request_metadata=request_metadata, processing_metadata=processing_metadata) new_request.save(session=session) request_id = new_request.request_id From 3bdd8608e6bf5c1de714aa3be3fb5a04d876c81b Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:21:51 +0200 Subject: [PATCH 16/21] core transforms: order by priority, add loop_index and internal_ids --- main/lib/idds/core/transforms.py | 11 ++++++++--- main/lib/idds/orm/transforms.py | 17 ++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/main/lib/idds/core/transforms.py b/main/lib/idds/core/transforms.py index f6643b40..3deac47b 100644 --- a/main/lib/idds/core/transforms.py +++ b/main/lib/idds/core/transforms.py @@ -135,7 +135,7 @@ def get_transform_ids(workprogress_id, request_id=None, workload_id=None, transf @read_session -def get_transforms(request_id=None, workload_id=None, transform_id=None, to_json=False, session=None): +def get_transforms(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None, to_json=False, session=None): """ Get transforms or raise a NoObject exception. @@ -150,13 +150,15 @@ def get_transforms(request_id=None, workload_id=None, transform_id=None, to_json return orm_transforms.get_transforms(request_id=request_id, workload_id=workload_id, transform_id=transform_id, + loop_index=loop_index, + internal_ids=internal_ids, to_json=to_json, session=session) @transactional_session def get_transforms_by_status(status, period=None, locking=False, bulk_size=None, to_json=False, by_substatus=False, new_poll=False, update_poll=False, only_return_id=False, min_request_id=None, - not_lock=False, next_poll_at=None, session=None): + order_by_fifo=False, not_lock=False, next_poll_at=None, session=None): """ Get transforms or raise a NoObject exception. @@ -177,6 +179,7 @@ def get_transforms_by_status(status, period=None, locking=False, bulk_size=None, bulk_size=bulk_size * 2, locking_for_update=False, to_json=False, only_return_id=True, min_request_id=min_request_id, + order_by_fifo=order_by_fifo, new_poll=new_poll, update_poll=update_poll, by_substatus=by_substatus, session=session) if tf_ids: @@ -185,6 +188,7 @@ def get_transforms_by_status(status, period=None, locking=False, bulk_size=None, to_json=to_json, transform_ids=tf_ids, new_poll=new_poll, update_poll=update_poll, min_request_id=min_request_id, + order_by_fifo=order_by_fifo, by_substatus=by_substatus, session=session) if transform2s: # reqs = req2s[:bulk_size] @@ -204,7 +208,7 @@ def get_transforms_by_status(status, period=None, locking=False, bulk_size=None, transforms = [] else: transforms = orm_transforms.get_transforms_by_status(status=status, period=period, locking=locking, - locking_for_update=False, + locking_for_update=False, order_by_fifo=order_by_fifo, bulk_size=bulk_size, to_json=to_json, new_poll=new_poll, update_poll=update_poll, only_return_id=only_return_id, @@ -228,6 +232,7 @@ def get_transforms_by_status(status, period=None, locking=False, bulk_size=None, bulk_size=bulk_size, to_json=to_json, new_poll=new_poll, update_poll=update_poll, only_return_id=only_return_id, + order_by_fifo=order_by_fifo, min_request_id=min_request_id, by_substatus=by_substatus, session=session) return transforms diff --git a/main/lib/idds/orm/transforms.py b/main/lib/idds/orm/transforms.py index f2d6e367..e2b4fdcb 100644 --- a/main/lib/idds/orm/transforms.py +++ b/main/lib/idds/orm/transforms.py @@ -315,7 +315,7 @@ def get_transform_ids(workprogress_id=None, request_id=None, workload_id=None, t @read_session -def get_transforms(request_id=None, workload_id=None, transform_id=None, +def get_transforms(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None, to_json=False, session=None): """ Get transforms or raise a NoObject exception. @@ -337,6 +337,14 @@ def get_transforms(request_id=None, workload_id=None, transform_id=None, query = query.filter(models.Transform.workload_id == workload_id) if transform_id: query = query.filter(models.Transform.transform_id == transform_id) + if loop_index is not None: + query = query.filter(models.Transform.loop_index == loop_index) + if internal_ids: + if not isinstance(internal_ids, (list, tuple)): + internal_ids = [internal_ids] + if len(internal_ids) == 1: + internal_ids = [internal_ids[0], internal_ids[0]] + query = query.filter(models.Transform.internal_ids.in_(internal_ids)) tmp = query.all() rets = [] @@ -357,7 +365,7 @@ def get_transforms(request_id=None, workload_id=None, transform_id=None, @transactional_session def get_transforms_by_status(status, period=None, transform_ids=[], locking=False, locking_for_update=False, bulk_size=None, to_json=False, by_substatus=False, only_return_id=False, - min_request_id=None, new_poll=False, update_poll=False, session=None): + order_by_fifo=False, min_request_id=None, new_poll=False, update_poll=False, session=None): """ Get transforms or raise a NoObject exception. @@ -406,7 +414,10 @@ def get_transforms_by_status(status, period=None, transform_ids=[], locking=Fals if locking_for_update: query = query.with_for_update(skip_locked=True) else: - query = query.order_by(asc(models.Transform.updated_at)).order_by(desc(models.Transform.priority)) + if order_by_fifo: + query = query.order_by(desc(models.Transform.priority)).order_by(asc(models.Transform.transform_id)) + else: + query = query.order_by(asc(models.Transform.updated_at)).order_by(desc(models.Transform.priority)) if bulk_size: query = query.limit(bulk_size) From 565c99372c7e22414e69aa872524450185379964 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:22:59 +0200 Subject: [PATCH 17/21] add oracle sql updates --- main/etc/sql/oracle_update.sql | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index f738de11..a71bfa54 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -476,3 +476,29 @@ alter table PROCESSINGS add (processing_type NUMBER(2)); --- 20240327 alter table requests modify (transform_tag VARCHAR2(20)); + +--- 20240703 +alter table requests add (campaign VARCHAR2(100), campaign_group VARCHAR2(250), campaign_tag VARCHAR2(20)); +alter table transforms add (internal_id VARCHAR2(20), has_previous_conditions NUMBER(3), loop_index NUMBER(3), cloned_from NUMBER(12), triggered_conditions CLOB, untriggered_conditions CLOB); + +alter table messages add (internal_id VARCHAR2(20)); + +CREATE SEQUENCE CONDITION_ID_SEQ MINVALUE 1 INCREMENT BY 1 START WITH 1 NOCACHE ORDER NOCYCLE GLOBAL; +CREATE TABLE conditions +( + condition_id NUMBER(12) DEFAULT ON NULL CONDITION_ID_SEQ.NEXTVAL constraint CONDITION_ID_NN NOT NULL, + request_id NUMBER(12), + internal_id VARCHAR2(20), + status NUMBER(2), + is_loop NUMBER(2), + loop_index NUMBER(2), + cloned_from NUMBER(12), + created_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), + updated_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), + evaluate_result VARCHAR2(200), + previous_transforms CLOB, + following_transforms CLOB, + condition CLOB, + CONSTRAINT CONDITION_PK PRIMARY KEY (condition_id), -- USING INDEX LOCAL, + CONSTRAINT CONDITION_ID_UQ UNIQUE (request_id, internal_id) +); From df957404bef37e207ad1da20fc52d52949a6fa28 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:24:40 +0200 Subject: [PATCH 18/21] workflow prerun to check asyncresult is ok --- workflow/lib/idds/iworkflow/work.py | 37 ++++++++------ workflow/lib/idds/iworkflow/workflow.py | 65 +++++++++++++++---------- 2 files changed, 60 insertions(+), 42 deletions(-) diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index c3710819..49d2471e 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -341,6 +341,9 @@ def setup(self): ret = init_env return ret + def get_clean_env(self): + return self._workflow_context.get_clean_env() + class Work(Base): @@ -917,6 +920,12 @@ def setup(self): """ return self._context.setup() + def get_clean_env(self): + """ + :returns command: `str` to clean the workflow. + """ + return self._context.get_clean_env() + def load_func(self, func_name): """ Load the function from the source files. @@ -934,28 +943,19 @@ def pre_run(self): if workflow_context.distributed: logging.info("Test AsyncResult") a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30) - a_ret.subscribe() - - async_ret = AsyncResult(workflow_context, internal_id=a_ret.internal_id) - test_result = "AsyncResult test (request_id: %s, transform_id: %s)" % (workflow_context.request_id, workflow_context.transform_id) - logging.info("AsyncResult publish: %s" % test_result) - async_ret.publish(test_result) - - ret_q = a_ret.wait_result(force_return_results=True) - logging.info("AsyncResult results: %s" % str(ret_q)) - if ret_q and ret_q == test_result: - logging.info("AsyncResult test succeeded") - return True - else: - logging.info("AsyncResult test failed (published: %s, received: %s)" % (test_result, ret_q)) - return False + ret = a_ret.is_ok() + logging.info(f"pre_run asyncresult test is_ok: {ret}") + return ret return True def run(self): """ Run the work. """ - self.pre_run() + is_ok = self.pre_run() + if not is_ok: + logging.error(f"pre_run is_ok: {is_ok}, will exit.") + raise Exception("work pre_run failed") func_name, pre_kwargs, args, kwargs = self._func_name_and_args multi_jobs_kwargs_list = self.multi_jobs_kwargs_list @@ -1074,6 +1074,11 @@ def get_runner(self): cmd = cmd + " " + run_command else: cmd = run_command + + clean_env = self.get_clean_env() + if clean_env: + cmd = cmd + "; " + clean_env + return cmd diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 34df8c88..09210382 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -55,7 +55,7 @@ def get_current_workflow(cls): class WorkflowContext(Context): - def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[]): + def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None @@ -110,6 +110,7 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo self.init_idds() self._init_env = init_env + self._clean_env = clean_env self._exclude_source_files = [] if exclude_source_files: @@ -148,6 +149,16 @@ def init_env(self, value): if self._init_env: self._init_env = self._init_env + " " + @property + def clean_env(self): + return self._clean_env + + @clean_env.setter + def clean_env(self, value): + self._clean_env = value + if self._clean_env: + self._clean_env = self._clean_env + " " + @property def vo(self): return self._vo @@ -421,6 +432,7 @@ def init_idds(self): def init_panda(self): if not self._panda_initialized: self._panda_initialized = True + self._panda_env = self.get_panda_env() if not self.site: self.site = os.environ.get("PANDA_SITE", None) if not self.queue: @@ -478,6 +490,9 @@ def setup(self): ret = init_env return ret + def get_clean_env(self): + return self.clean_env + def setup_source_files(self): """ Setup source files. @@ -691,7 +706,7 @@ class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None, init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None, - exclude_source_files=[]): + exclude_source_files=[], clean_env=None): """ Init a workflow. """ @@ -726,7 +741,7 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo else: self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir, distributed=distributed, init_env=init_env, max_walltime=max_walltime, - exclude_source_files=exclude_source_files) + exclude_source_files=exclude_source_files, clean_env=clean_env) @property def service(self): @@ -1102,6 +1117,12 @@ def setup(self): """ return self._context.setup() + def get_clean_env(self): + """ + :returns command: `str` to clean the workflow. + """ + return self._context.get_clean_env() + def setup_source_files(self): """ Setup location of source files @@ -1125,25 +1146,9 @@ def pre_run(self): if workflow_context.distributed: logging.info("Test AsyncResult") a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30) - a_ret.subscribe() - - async_ret = AsyncResult(workflow_context, internal_id=a_ret.internal_id) - test_result = "AsyncResult test (request_id: %s)" % workflow_context.request_id - logging.info("AsyncResult publish: %s" % test_result) - async_ret.publish(test_result) - - ret_q = a_ret.wait_result(force_return_results=True) - logging.info("AsyncResult results: %s" % str(ret_q)) - if ret_q: - if ret_q == test_result: - logging.info("AsyncResult test succeeded") - return True - else: - logging.info("AsyncResult test failed (published: %s, received: %s)" % (test_result, ret_q)) - return False - else: - logging.info("Not received results") - return False + ret = a_ret.is_ok() + logging.info(f"pre_run asyncresult test is_ok: {ret}") + return ret return True def run(self): @@ -1152,7 +1157,10 @@ def run(self): """ # with self: if True: - self.pre_run() + is_ok = self.pre_run() + if not is_ok: + logging.error(f"pre_run is_ok: {is_ok}, will exit.") + raise Exception("workflow pre_run failed") func_name, pre_kwargs, args, kwargs = self._func_name_and_args multi_jobs_kwargs_list = self.multi_jobs_kwargs_list @@ -1205,6 +1213,11 @@ def get_runner(self): cmd = cmd + " " + run_command else: cmd = run_command + + clean_env = self.get_clean_env() + if clean_env: + cmd = cmd + "; " + clean_env + return cmd def get_func_name(self): @@ -1215,12 +1228,12 @@ def get_func_name(self): # foo = workflow(arg)(foo) def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False, - source_dir_parent_level=None, exclude_source_files=[]): + source_dir_parent_level=None, exclude_source_files=[], clean_env=None): if func is None: return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps, return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level, - exclude_source_files=exclude_source_files) + exclude_source_files=exclude_source_files, clean_env=clean_env) if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ: return func @@ -1230,7 +1243,7 @@ def wrapper(*args, **kwargs): try: f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level, - exclude_source_files=exclude_source_files) + exclude_source_files=exclude_source_files, clean_env=clean_env) f.queue = queue f.site = site From 6da59b203970371bc688e9976ba44c3f59f85b9b Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:26:08 +0200 Subject: [PATCH 19/21] split connections to listener and active, functions to fallback to https --- workflow/lib/idds/iworkflow/asyncresult.py | 312 +++++++++++++++++++-- 1 file changed, 284 insertions(+), 28 deletions(-) diff --git a/workflow/lib/idds/iworkflow/asyncresult.py b/workflow/lib/idds/iworkflow/asyncresult.py index f8a132cb..ae55569e 100644 --- a/workflow/lib/idds/iworkflow/asyncresult.py +++ b/workflow/lib/idds/iworkflow/asyncresult.py @@ -9,15 +9,23 @@ # - Wen Guan, , 2023 - 2024 import logging +import os import random import socket -import stomp import threading import time import traceback +import uuid from queue import Queue +try: + import stomp + with_stomp = True +except Exception as ex: + print(f"Failed to import stomp, with_stomp is False: {ex}") + with_stomp = False + from idds.common.constants import WorkflowType, GracefulEvent from idds.common.utils import json_dumps, json_loads, setup_logging, get_unique_id_for_dict from .base import Base @@ -149,12 +157,18 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs if internal_id: self.internal_id = internal_id self._work_context = work_context - self._work_context.init_brokers() + try: + self._work_context.init_brokers() + self._broker_initialized = True + except Exception as ex: + logging.warn(f"Failed to initialize messaging broker, will use Rest: {ex}") + self._broker_initialized = False self._name = name self._queue = Queue() self._connections = [] + self._subscribe_connections = [] self._graceful_stop = False self._subscribe_thread = None self._subscribed = False @@ -179,6 +193,28 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs self._nologs = False + self._num_stomp_failures = 0 + self._max_stomp_failures = 5 + try: + max_stomp_failures = os.environ.get("AYNC_RESULT_MAX_STOMP_FAILURES", None) + if max_stomp_failures: + max_stomp_failures = int(max_stomp_failures) + self._max_stomp_failures = max_stomp_failures + except Exception: + pass + + self._poll_period = 300 + try: + poll_period = os.environ.get("AYNC_RESULT_POLL_PERIOD", None) + if poll_period: + poll_period = int(poll_period) + self._poll_period = poll_period + except Exception: + pass + + self._is_messaging_ok = True + self._is_polling_ok = True + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -340,8 +376,42 @@ def disconnect(self): con.disconnect() except Exception: pass + self._connections = [] + + def disconnect_subscribe(self): + for con in self._subscribe_connections: + try: + if con.is_connected(): + con.disconnect() + except Exception: + pass + self._subscribe_connections = [] + + def has_connections(self, conns): + if conns: + for con in conns: + try: + if con.is_connected(): + return True + except Exception: + pass + return False + + def get_connections(self, conns): + if conns: + for con in conns: + try: + if con.is_connected(): + return con + except Exception: + pass + return None def connect_to_messaging_broker(self): + conn = self.get_connections(self._connections) + if conn: + return conn + workflow_context = self._work_context brokers = workflow_context.brokers @@ -362,7 +432,10 @@ def connect_to_messaging_broker(self): self._connections = [conn] return conn - def subscribe_to_messaging_brokers(self): + def subscribe_to_messaging_brokers(self, force=False): + if self._subscribed and not force and self._subscribe_connections: + return self._subscribe_connections + workflow_context = self._work_context brokers = workflow_context.brokers conns = [] @@ -384,7 +457,7 @@ def subscribe_to_messaging_brokers(self): timeout = workflow_context.broker_timeout - self.disconnect() + self.disconnect_subscribe() listener = MessagingListener(brokers, self._queue, logger=self.logger) conns = [] @@ -416,11 +489,11 @@ def subscribe_to_messaging_brokers(self): ack='auto', headers=subscribe_selector) self.logger.info("subscribe to %s:%s with selector: %s" % (broker, port, subscribe_selector)) conns.append(conn) - self._connections = conns + self._subscribe_connections = conns return conns - def publish(self, ret, key=None): - conn = self.connect_to_messaging_broker() + def get_message(self, ret, key=None): + message = {} workflow_context = self._work_context if key is None: if self._current_job_kwargs: @@ -433,42 +506,208 @@ def publish(self, ret, key=None): 'type': 'iworkflow', 'internal_id': str(self.internal_id), 'request_id': workflow_context.request_id} - body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id}) + body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iworkflow', + 'request_id': workflow_context.request_id}) + message = {"headers": headers, "body": body} + elif workflow_context.workflow_type == WorkflowType.iWork: + headers = {'persistent': 'true', + 'type': 'iwork', + 'internal_id': str(self.internal_id), + 'request_id': workflow_context.request_id, + 'transform_id': workflow_context.transform_id} + body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id, 'type': 'iwork', + 'request_id': workflow_context.request_id, + 'transform_id': workflow_context.transform_id}) + message = {"headers": headers, "body": body} + return message + + def publish_message(self, ret, key=None): + message = self.get_message(ret=ret, key=key) + headers = message['headers'] + body = message['body'] + conn = self.connect_to_messaging_broker() + workflow_context = self._work_context + if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: conn.send(body=body, destination=workflow_context.broker_destination, id='idds-iworkflow_%s' % self.internal_id, ack='auto', headers=headers ) - self.logger.info("publish header: %s, body: %s" % (str(headers), str(body))) + self.logger.info(f"published header: {headers}, body: {body}") elif workflow_context.workflow_type == WorkflowType.iWork: - headers = {'persistent': 'true', - 'type': 'iwork', - 'internal_id': str(self.internal_id), - 'request_id': workflow_context.request_id, - 'transform_id': workflow_context.transform_id} - body = json_dumps({'ret': ret, 'key': key, 'internal_id': self.internal_id}) conn.send(body=body, destination=workflow_context.broker_destination, id='idds-iwork_%s' % self.internal_id, ack='auto', headers=headers ) - self.logger.info("publish header: %s, body: %s" % (str(headers), str(body))) - self.disconnect() + self.logger.info(f"published header: {headers}, body: {body}") + # self.disconnect() - def run_subscriber(self): + def get_request_id_internal_id(self): + workflow_context = self._work_context + request_id, transform_id, internal_id = None, None, None + if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: + request_id = workflow_context.request_id + transform_id = 0 + internal_id = self.internal_id + elif workflow_context.workflow_type == WorkflowType.iWork: + request_id = workflow_context.request_id + transform_id = workflow_context.transform_id + internal_id = self.internal_id + else: + request_id = workflow_context.request_id + transform_id = 0 + internal_id = self.internal_id + return request_id, transform_id, internal_id + + def publish_through_panda_server(self, message): + request_id, transform_id, internal_id = self.get_request_id_internal_id() + + import idds.common.utils as idds_utils + import pandaclient.idds_api as idds_api + idds_server = self._work_context.get_idds_server() + # request_id = self._context.request_id + client = idds_api.get_api(idds_utils.json_dumps, + idds_host=idds_server, + compress=True, + manager=True) + status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message]) + if status: + self.logger.info(f"published message through panda server: {message}") + else: + self.logger.error(f"failed to publish message through panda server, status: {status}, ret: {ret}") + + def publish_through_idds_server(self, message): + request_id, transform_id, internal_id = self.get_request_id_internal_id() + + from idds.client.clientmanager import ClientManager + client = ClientManager(host=self._work_context.get_idds_server()) + status, ret = client.send_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id, msgs=[message]) + if status: + self.logger.info(f"published message through idds server: {message}") + else: + self.logger.error(f"failed to publish message through idds server, status: {status}, ret: {ret}") + + def publish_through_api(self, ret, key=None, force=False): + message = self.get_message(ret=ret, key=key) + # headers = message['headers'] + # body = message['body'] + message['msg_type'] = 'async_result' + + try: + request_id, transform_id, internal_id = self.get_request_id_internal_id() + if request_id is None: + if force: + request_id = 0 + else: + self.logger.warn("Not to publish message through API since the request id is None") + return + + if self._work_context.service == 'panda': + self.publish_through_panda_server(message) + else: + self.publish_through_idds_server(message) + except Exception as ex: + self.logger.error(f"Failed to publish message through API: {ex}") + + def publish(self, ret, key=None, force=False): + stomp_failed = False + if with_stomp and self._broker_initialized: + try: + self.logger.info("publishing results through messaging brokers") + self.publish_message(ret=ret, key=key) + except Exception as ex: + self.logger.warn(f"Failed to publish result through messaging brokers: {ex}") + stomp_failed = True + + if not with_stomp or not self._broker_initialized or stomp_failed: + self.logger.info("publishing results through http API") + self.publish_through_api(ret=ret, key=key, force=force) + + def poll_messages_through_panda_server(self, request_id, transform_id, internal_id): + if request_id is None: + self.logger.warn("Not to poll message through panda server, since the request_id is None") + return [] + + import idds.common.utils as idds_utils + import pandaclient.idds_api as idds_api + idds_server = self._work_context.get_idds_server() + # request_id = self._work_context.request_id + client = idds_api.get_api(idds_utils.json_dumps, + idds_host=idds_server, + compress=True, + manager=True) + status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + if status: + self.logger.info(f"poll message through panda server, number of messages: {len(messages)}") + return messages + else: + self.logger.error(f"failed to poll messages through panda server, error: {messages}") + return [] + + def poll_messages_through_idds_server(self, request_id, transform_id, internal_id): + if request_id is None: + self.logger.warn("Not to poll message through idds server, since the request_id is None") + return [] + + from idds.client.clientmanager import ClientManager + client = ClientManager(host=self._work_context.get_idds_server()) + status, messages = client.get_messages(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + if status: + self.logger.info(f"poll message through idds server, number of messages: {len(messages)}") + return messages + else: + self.logger.error(f"failed to poll messages through idds server, error: {messages}") + return [] + + def poll_messages(self, force=False): + try: + request_id, transform_id, internal_id = self.get_request_id_internal_id() + if request_id is None: + if force: + request_id = 0 + else: + self.logger.warn("Not to poll message, since the request_id is None") + return + + if self._work_context.service == 'panda': + messages = self.poll_messages_through_panda_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + else: + messages = self.poll_messages_through_idds_server(request_id=request_id, transform_id=transform_id, internal_id=internal_id) + + for message in messages: + self._queue.put(message) + except Exception as ex: + self.logger.error(f"Failed to poll message: {ex}") + + def run_subscriber(self, force=False): try: self.logger.info("run subscriber") - self.subscribe_to_messaging_brokers() + if with_stomp and self._broker_initialized and self._num_stomp_failures < self._max_stomp_failures: + try: + self.subscribe_to_messaging_brokers(force=True) + except Exception as ex: + self.logger.warn(f"run subscriber fails to subscribe to message broker: {ex}") + self._num_stomp_failures += 1 + self._is_messaging_ok = False + self._broker_initialized = False + while self._graceful_stop and not self._graceful_stop.is_set(): - has_failed_conns = False - for conn in self._connections: - if not conn.is_connected(): - has_failed_conns = True - if has_failed_conns: - self.subscribe_to_messaging_brokers() - time.sleep(1) + if with_stomp and self._broker_initialized and self._is_messaging_ok and self._num_stomp_failures < self._max_stomp_failures: + has_failed_conns = False + for conn in self._subscribe_connections: + if not conn.is_connected(): + has_failed_conns = True + if has_failed_conns: + self.subscribe_to_messaging_brokers(force=True) + time.sleep(1) + else: + self.poll_messages(force=force) + time.sleep(self._poll_period) + + self.poll_messages(force=force) self.stop() except Exception as ex: self.logger.error("run subscriber failed with error: %s" % str(ex)) @@ -492,10 +731,10 @@ def get_results(self, nologs=True): def get_results_percentage(self): return self._results_percentage - def subscribe(self): + def subscribe(self, force=False): if not self._subscribed: self._graceful_stop = GracefulEvent() - thread = threading.Thread(target=self.run_subscriber, name="RunSubscriber") + thread = threading.Thread(target=self.run_subscriber, kwargs={'force': force}, name="RunSubscriber") thread.start() time.sleep(1) self._subscribed = True @@ -559,3 +798,20 @@ def wait_result(self, timeout=None, force_return_results=False): self.wait_results(timeout=timeout, force_return_results=force_return_results) results = self.results return results + + def is_ok(self): + try: + self.subscribe(force=True) + test_id = str(uuid.uuid4()) + self.publish(test_id, force=True) + ret = self.wait_result(force_return_results=True) + self.logger.info(f"AsyncResult: publish: {test_id}, received: {ret}") + if test_id == ret: + self.logger.info("AsyncResult is ok") + return True + else: + self.logger.info("AsyncResult is not ok") + return False + except Exception as ex: + self.logger.error(f"AsyncResult is not ok: {ex}") + return False From 6f06782f15bbb4c5d9c99b2a3e631ddee003952f Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:26:43 +0200 Subject: [PATCH 20/21] optimzer run workflow tools --- main/lib/idds/tests/panda_test.py | 1 + main/tools/env/install_idds.sh | 4 +-- main/tools/env/setup_panda.sh | 45 ++++++++++++++++++++++++++----- monitor/data/conf.js | 12 ++++----- workflow/bin/run_workflow | 6 ++--- workflow/tools/make/zipheader | 3 +++ 6 files changed, 53 insertions(+), 18 deletions(-) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index c7311c3c..2e8c44e4 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -74,6 +74,7 @@ task_ids = [169786, 169787] task_ids = [i for i in range(10173, 10204)] task_ids = [108, 109, 106, 107, 112] +task_ids = [i for i in range(2921, 2927)] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/tools/env/install_idds.sh b/main/tools/env/install_idds.sh index 9ef9ff46..0b0b94f7 100644 --- a/main/tools/env/install_idds.sh +++ b/main/tools/env/install_idds.sh @@ -17,7 +17,7 @@ cp workflow/bin/run_workflow_wrapper ~/www/wiscgroup/ echo cp workflow/bin/run_workflow_wrapper /eos/user/w/wguan/www/ cp workflow/bin/run_workflow_wrapper /eos/user/w/wguan/www/ -echo scp workflow/bin/run_workflow_wrapper root@ai-idds-04:/data/iddssv1/srv/var/trf/user/ -scp workflow/bin/run_workflow_wrapper root@ai-idds-04:/data/iddssv1/srv/var/trf/user/ +# echo scp workflow/bin/run_workflow_wrapper root@ai-idds-04:/data/iddssv1/srv/var/trf/user/ +# scp workflow/bin/run_workflow_wrapper root@ai-idds-04:/data/iddssv1/srv/var/trf/user/ rm -fr workflow/bin/run_workflow_wrapper diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index 5f81d1cc..31e1b70c 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -27,6 +27,31 @@ if [ "$instance" == "k8s" ]; then export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ +elif [ "$instance" == "bnl" ]; then + export PANDA_AUTH=oidc + + export PANDA_URL_SSL=https://pandaserver01.sdcc.bnl.gov:25443/server/panda + export PANDA_URL=https://pandaserver01.sdcc.bnl.gov:25443/server/panda + export PANDACACHE_URL=https://pandaserver01.sdcc.bnl.gov:25443/server/panda + export PANDAMON_URL=https://pandamon01.sdcc.bnl.gov + export PANDA_AUTH=oidc + export PANDA_AUTH_VO=EIC + export PANDA_USE_NATIVE_HTTPLIB=1 + export PANDA_BEHIND_REAL_LB=1 + + # export PANDA_AUTH_VO=panda_dev + # export PANDA_AUTH_VO=Rubin:production + + export PANDACACHE_URL=$PANDA_URL_SSL + export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ + + export PANDA_BEHIND_REAL_LB=true + export PANDA_VERIFY_HOST=off + export PANDA_CONFIG_ROOT=~/.panda/ + + unset IDDS_HOST + # doma + # export IDDS_HOST=https://aipanda105.cern.ch:443/idds elif [ "$instance" == "usdf_dev" ]; then export PANDA_AUTH=oidc export PANDA_BEHIND_REAL_LB=true @@ -97,14 +122,19 @@ elif [ "$instance" == "new" ]; then PANDA_WORKING_GROUP=EIC PANDA_VO=wlcg elif [ "$instance" == "doma_k8s" ]; then - export PANDA_URL_SSL=https://panda-doma-k8s-panda.cern.ch/server/panda - export PANDA_URL=http://panda-doma-k8s-panda.cern.ch:25080/server/panda - export PANDACACHE_URL=https://panda-doma-k8s-panda.cern.ch/server/panda - export PANDAMON_URL=https://panda-doma-bigmon.cern.ch + # export PANDA_URL_SSL=https://panda-doma-k8s-panda.cern.ch/server/panda + # export PANDA_URL=http://panda-doma-k8s-panda.cern.ch:25080/server/panda + # export PANDACACHE_URL=https://panda-doma-k8s-panda.cern.ch/server/panda + # export PANDAMON_URL=https://panda-doma-bigmon.cern.ch + export PANDA_URL_SSL=https://pandaserver-doma.cern.ch/server/panda + export PANDA_URL=http://pandaserver-doma.cern.ch:25080/server/panda + export PANDACACHE_URL=https://pandaserver-doma.cern.ch/server/panda + export PANDAMON_URL=https://panda-doma.cern.ch + export PANDA_AUTH=oidc export PANDA_AUTH_VO=Rubin.production - export PANDA_AUTH_VO=EIC - export PANDA_AUTH_VO=Rubin + # export PANDA_AUTH_VO=EIC + # export PANDA_AUTH_VO=Rubin export PANDA_USE_NATIVE_HTTPLIB=1 export PANDA_BEHIND_REAL_LB=true @@ -119,7 +149,8 @@ elif [ "$instance" == "doma_k8s" ]; then export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_doma_k8s else export PANDA_AUTH=oidc - export PANDA_URL_SSL=https://pandaserver-doma.cern.ch:25443/server/panda + # export PANDA_URL_SSL=https://pandaserver-doma.cern.ch:25443/server/panda + export PANDA_URL_SSL=https://pandaserver-doma.cern.ch:443/server/panda export PANDA_URL=http://pandaserver-doma.cern.ch:25080/server/panda export PANDAMON_URL=https://panda-doma.cern.ch # export PANDA_AUTH_VO=panda_dev diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 2cd33a39..698f534c 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus952.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus952.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus952.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus952.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus952.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus952.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus994.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus994.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus994.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus994.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus994.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus994.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/bin/run_workflow b/workflow/bin/run_workflow index 4ae4ee9f..7663b907 100644 --- a/workflow/bin/run_workflow +++ b/workflow/bin/run_workflow @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python # # Licensed under the Apache License, Version 2.0 (the "License"); # You may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ Run workflow. from __future__ import print_function import argparse -import argcomplete +# import argcomplete import base64 # import json import logging @@ -210,7 +210,7 @@ if __name__ == '__main__': arguments = sys.argv[1:] oparser = get_parser() - argcomplete.autocomplete(oparser) + # argcomplete.autocomplete(oparser) args = oparser.parse_args(arguments) diff --git a/workflow/tools/make/zipheader b/workflow/tools/make/zipheader index 0561725c..3f4cc815 100644 --- a/workflow/tools/make/zipheader +++ b/workflow/tools/make/zipheader @@ -77,6 +77,9 @@ cat <<- EOF > ${current_dir}/$cmdfile echo "current dir: " \$PWD +which python +which python3 + # cd ${current_dir} current_dir=\$PWD From 19c9c527677655d6ff989ff334c893abf4901bde Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:27:43 +0200 Subject: [PATCH 21/21] core and orm conditions --- main/lib/idds/core/conditions.py | 104 +++++++++++++++++++ main/lib/idds/orm/conditions.py | 167 +++++++++++++++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 main/lib/idds/core/conditions.py create mode 100644 main/lib/idds/orm/conditions.py diff --git a/main/lib/idds/core/conditions.py b/main/lib/idds/core/conditions.py new file mode 100644 index 00000000..6df389b5 --- /dev/null +++ b/main/lib/idds/core/conditions.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +operations related to Conditions. +""" + + +from idds.common.constants import ConditionStatus +from idds.orm.base.session import read_session, transactional_session +from idds.orm import conditions as orm_conditions + + +@transactional_session +def add_condition(request_id, internal_id, status=ConditionStatus.WaitForTrigger, + substatus=None, is_loop=False, loop_index=None, cloned_from=None, + evaluate_result=None, previous_transforms=None, following_transforms=None, + condition=None, session=None): + """ + Add a condition. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param substatus: The substatus about the condition. + :param is_loop: Whether it's a loop condition. + :param loop_index: The loop index if it's a loop. + :param cloned_from: The original condition if it's a loop. + :param evaluate_result: The condition's evaluated result. + :param previous_transforms: The previous transforms which can trigger this condition. + :param following_transorms: The following transforms which will be triggered. + :param condition: The condition function. + :param session: The database session. + """ + + cond_id = orm_conditions.add_condition(request_id=request_id, internal_id=internal_id, + status=status, substatus=substatus, is_loop=is_loop, + loop_index=loop_index, cloned_from=cloned_from, + evaluate_result=evaluate_result, + previous_transforms=previous_transforms, + following_transforms=following_transforms, + condition=condition, + session=session) + return cond_id + + +@transactional_session +def update_condition(condition_id, parameters, session=None): + """ + Update condition. + + :param condition_id: The condition id. + :param parameters: Parameters as a dict. + :param session: The database session. + """ + orm_conditions.update_condition(condition_id=condition_id, parameters=parameters, session=session) + + +@transactional_session +def update_conditions(conditions, session=None): + """ + Update conditions. + + :param conditions: Condtions as a list of dict. + :param session: The database session. + """ + orm_conditions.update_conditions(conditions=conditions, session=session) + + +@read_session +def retrieve_conditions(request_id, internal_id=None, status=None, session=None): + """ + Retrieve conditions + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param session: The database session. + + :returns command: List of conditions + """ + conds = orm_conditions.retrieve_conditions(request_id=request_id, internal_id=internal_id, + status=status, session=session) + return conds + + +@transactional_session +def delete_conditions(request_id=None, internal_id=None, session=None): + """ + Delete all conditions with the given IDs. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param session: The database session. + """ + orm_conditions.delete_condtions(request_id=request_id, internal_id=internal_id, session=session) diff --git a/main/lib/idds/orm/conditions.py b/main/lib/idds/orm/conditions.py new file mode 100644 index 00000000..e471e9fb --- /dev/null +++ b/main/lib/idds/orm/conditions.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +operations related to Conditions. +""" + +import re +import datetime + +from sqlalchemy.exc import DatabaseError, IntegrityError + +from idds.common import exceptions +from idds.common.constants import ConditionStatus +from idds.orm.base import models +from idds.orm.base.session import read_session, transactional_session + + +@transactional_session +def add_condition(request_id, internal_id, status=ConditionStatus.WaitForTrigger, + substatus=None, is_loop=False, loop_index=None, cloned_from=None, + evaluate_result=None, previous_transforms=None, following_transforms=None, + condition=None, session=None): + """ + Add a condition. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param substatus: The substatus about the condition. + :param is_loop: Whether it's a loop condition. + :param loop_index: The loop index if it's a loop. + :param cloned_from: The original condition if it's a loop. + :param evaluate_result: The condition's evaluated result. + :param previous_transforms: The previous transforms which can trigger this condition. + :param following_transorms: The following transforms which will be triggered. + :param condition: The condition function. + :param session: The database session. + """ + + try: + cond = models.Condition(request_id=request_id, internal_id=internal_id, + status=status, substatus=substatus, is_loop=is_loop, + loop_index=loop_index, cloned_from=cloned_from, + evaluate_result=evaluate_result, + previous_transforms=previous_transforms, + following_transforms=following_transforms, + condition=condition) + + cond.save(session=session) + cond_id = cond.condition_id + return cond_id + except TypeError as e: + raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e)) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist condition: %s' % str(e)) + + +@transactional_session +def update_condition(condition_id, parameters, session=None): + """ + Update condition. + + :param condition_id: The condition id. + :param parameters: Parameters as a dict. + :param session: The database session. + """ + + try: + parameters['updated_at'] = datetime.datetime.utcnow() + session.query(models.Condition).filter_by(condition_id=condition_id)\ + .update(parameters, synchronize_session=False) + except TypeError as e: + raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e)) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist condition: %s' % str(e)) + + +@transactional_session +def update_conditions(conditions, session=None): + """ + Update conditions. + + :param conditions: Condtions as a list of dict. + :param session: The database session. + """ + + try: + session.bulk_update_mappings(models.Conidition, conditions) + except TypeError as e: + raise exceptions.DatabaseException('Invalid JSON for condition: %s' % str(e)) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist condition, condition too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist condition: %s' % str(e)) + + +@read_session +def retrieve_conditions(request_id, internal_id=None, status=None, session=None): + """ + Retrieve conditions + + :param request_id: The request id. + :param intenal_id: The internal id. + :param status: The status about the condition. + :param session: The database session. + + :returns command: List of conditions + """ + conditions = [] + try: + query = session.query(models.Condition) + + if request_id is not None: + query = query.filter_by(request_id=request_id) + if internal_id is not None: + query = query.filter_by(internal_id=internal_id) + if status is not None: + query = query.filter_by(status=status) + + tmp = query.all() + if tmp: + for t in tmp: + conditions.append(t.to_dict()) + return conditions + except IntegrityError as e: + raise exceptions.DatabaseException(e.args) + + +@transactional_session +def delete_conditions(request_id=None, internal_id=None, session=None): + """ + Delete all conditions with the given IDs. + + :param request_id: The request id. + :param intenal_id: The internal id. + :param session: The database session. + """ + try: + query = session.query(models.Condition) + + if request_id is not None: + query = query.filter_by(request_id=request_id) + if internal_id is not None: + query = query.filter_by(internal_id=internal_id) + + query.delete(synchronize_session=False) + except IntegrityError as e: + raise exceptions.DatabaseException(e.args)