From 3e94fc50710f3f0054aa7c19bb8bff4bddb386a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gschwind?= Date: Thu, 21 Sep 2023 20:03:09 +0200 Subject: [PATCH 01/30] Implement safe DB session management --- pywps/dblog.py | 194 ++++++++++++++++++++++++++++--------------------- 1 file changed, 110 insertions(+), 84 deletions(-) diff --git a/pywps/dblog.py b/pywps/dblog.py index 2882ea9f2..1f9fe823f 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -40,6 +40,42 @@ lock = Lock() +class SessionManager: + """Implement ref counted session, and close session automaticaly + + This SessionManager allow to call all dblog function recursively + + Usage: + + >>> with current_session as session: + ... [...] + ... pass + + Nested with statement will reuse the session and only one session.close() + will be issued and it will be issued in any case, even on return or + exception within 'with' statement + """ + def __init__(self): + self._session = None + self._handler_count = 0 + + def __enter__(self): + self._handler_count += 1 + if self._session is None: + self._session = get_session() + return self._session + + def __exit__(self, exc_type, exc_val, exc_tb): + self._handler_count -= 1 + if self._handler_count <= 0: + self._session.close() + self._session = None + return False + + +current_session = SessionManager() + + class ProcessInstance(Base): __tablename__ = '{}requests'.format(_tableprefix) @@ -73,14 +109,12 @@ def log_request(uuid, request): time_start = datetime.datetime.now() identifier = _get_identifier(request) - session = get_session() - request = ProcessInstance( - uuid=str(uuid), pid=pid, operation=operation, version=version, - time_start=time_start, identifier=identifier) - - session.add(request) - session.commit() - session.close() + with current_session as session: + request = ProcessInstance( + uuid=str(uuid), pid=pid, operation=operation, version=version, + time_start=time_start, identifier=identifier) + session.add(request) + session.commit() # NoApplicableCode("Could commit to database: {}".format(e.message)) @@ -88,63 +122,58 @@ def get_process_counts(): """Returns running and stored process counts and """ - session = get_session() - stored_query = session.query(RequestInstance.uuid) - running_count = ( - session.query(ProcessInstance) - .filter(ProcessInstance.percent_done < 100) - .filter(ProcessInstance.percent_done > -1) - .filter(~ProcessInstance.uuid.in_(stored_query)) - .count() - ) - stored_count = stored_query.count() - session.close() - return running_count, stored_count + with current_session as session: + stored_query = session.query(RequestInstance.uuid) + running_count = ( + session.query(ProcessInstance) + .filter(ProcessInstance.percent_done < 100) + .filter(ProcessInstance.percent_done > -1) + .filter(~ProcessInstance.uuid.in_(stored_query)) + .count() + ) + stored_count = stored_query.count() + return running_count, stored_count def pop_first_stored(): """Gets the first stored process and delete it from the stored_requests table """ - session = get_session() - request = session.query(RequestInstance).first() + with current_session as session: + request = session.query(RequestInstance).first() - if request: - delete_count = session.query(RequestInstance).filter_by(uuid=request.uuid).delete() - if delete_count == 0: - LOGGER.debug("Another thread or process took the same stored request") - request = None + if request: + delete_count = session.query(RequestInstance).filter_by(uuid=request.uuid).delete() + if delete_count == 0: + LOGGER.debug("Another thread or process took the same stored request") + request = None - session.commit() - return request + session.commit() + return request def store_status(uuid, wps_status, message=None, status_percentage=None): """Writes response to database """ - session = get_session() - - requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) - if requests.count(): - request = requests.one() - request.time_end = datetime.datetime.now() - request.message = str(message) - request.percent_done = status_percentage - request.status = wps_status - session.commit() - session.close() + with current_session as session: + requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) + if requests.count(): + request = requests.one() + request.time_end = datetime.datetime.now() + request.message = str(message) + request.percent_done = status_percentage + request.status = wps_status + session.commit() def update_pid(uuid, pid): """Update actual pid for the uuid processing """ - session = get_session() - - requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) - if requests.count(): - request = requests.one() - request.pid = pid - session.commit() - session.close() + with current_session as session: + requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) + if requests.count(): + request = requests.one() + request.pid = pid + session.commit() def cleanup_crashed_process(): @@ -152,34 +181,32 @@ def cleanup_crashed_process(): if sys.platform != "linux": return - session = get_session() - stored_query = session.query(RequestInstance.uuid) - running_cur = ( - session.query(ProcessInstance) - .filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED])) - .filter(~ProcessInstance.uuid.in_(stored_query)) - ) - - failed = [] - running = [(p.uuid, p.pid) for p in running_cur] - for uuid, pid in running: - # No process with this pid, the process has crashed - if not os.path.exists(os.path.join("/proc", str(pid))): - failed.append(uuid) - continue - - # If we can't read the environ, that mean the process belong another user - # which mean that this is not our process, thus our process has crashed - # this not work because root is the user for the apache - # if not os.access(os.path.join("/proc", str(pid), "environ"), os.R_OK): - # failed.append(uuid) - # continue - pass - - for uuid in failed: - store_status(uuid, WPS_STATUS.FAILED, "Process crashed", 100) - - session.close() + with current_session as session: + stored_query = session.query(RequestInstance.uuid) + running_cur = ( + session.query(ProcessInstance) + .filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED])) + .filter(~ProcessInstance.uuid.in_(stored_query)) + ) + + failed = [] + running = [(p.uuid, p.pid) for p in running_cur] + for uuid, pid in running: + # No process with this pid, the process has crashed + if not os.path.exists(os.path.join("/proc", str(pid))): + failed.append(uuid) + continue + + # If we can't read the environ, that mean the process belong another user + # which mean that this is not our process, thus our process has crashed + # this not work because root is the user for the apache + # if not os.access(os.path.join("/proc", str(pid), "environ"), os.R_OK): + # failed.append(uuid) + # continue + pass + + for uuid in failed: + store_status(uuid, WPS_STATUS.FAILED, "Process crashed", 100) def _get_identifier(request): @@ -249,11 +276,10 @@ def store_process(uuid, request): """Save given request under given UUID for later usage """ - session = get_session() - request_json = request.json - # the BLOB type requires bytes on Python 3 - request_json = request_json.encode('utf-8') - request = RequestInstance(uuid=str(uuid), request=request_json) - session.add(request) - session.commit() - session.close() + with current_session as session: + request_json = request.json + # the BLOB type requires bytes on Python 3 + request_json = request_json.encode('utf-8') + request = RequestInstance(uuid=str(uuid), request=request_json) + session.add(request) + session.commit() From 6907477fe8cdc992a885cd86edf829ee9f23fc23 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 10:51:44 +0100 Subject: [PATCH 02/30] Reduce the use of WPSRequest.http_request toward removing it --- pywps/app/WPSRequest.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pywps/app/WPSRequest.py b/pywps/app/WPSRequest.py index 7ffaad873..12d734648 100644 --- a/pywps/app/WPSRequest.py +++ b/pywps/app/WPSRequest.py @@ -64,7 +64,7 @@ def __init__(self, http_request=None, preprocessors=None): self.api = d.get('api') self.default_mimetype = d.get('default_mimetype') request_parser = self._get_request_parser_method(http_request.method) - request_parser() + request_parser(http_request) def _get_request_parser_method(self, method): @@ -75,11 +75,11 @@ def _get_request_parser_method(self, method): else: raise MethodNotAllowed() - def _get_request(self): + def _get_request(self, http_request): """HTTP GET request parser.""" # service shall be WPS - service = _get_get_param(self.http_request, 'service', None if wps_strict else 'wps') + service = _get_get_param(http_request, 'service', None if wps_strict else 'wps') if service: if str(service).lower() != 'wps': raise InvalidParameterValue( @@ -87,28 +87,28 @@ def _get_request(self): else: raise MissingParameterValue('service', 'service') - self.operation = _get_get_param(self.http_request, 'request', self.operation) + self.operation = _get_get_param(http_request, 'request', self.operation) - language = _get_get_param(self.http_request, 'language') + language = _get_get_param(http_request, 'language') self.check_and_set_language(language) request_parser = self._get_request_parser(self.operation) - request_parser(self.http_request) + request_parser(http_request) - def _post_request(self): + def _post_request(self, http_request): """HTTP POST request parser.""" # check if input file size was not exceeded maxsize = configuration.get_config_value('server', 'maxrequestsize') maxsize = configuration.get_size_mb(maxsize) * 1024 * 1024 - if self.http_request.content_length > maxsize: + if http_request.content_length > maxsize: raise FileSizeExceeded('File size for input exceeded.' ' Maximum request size allowed: {} megabytes'.format(maxsize / 1024 / 1024)) - content_type = self.http_request.content_type or [] # or self.http_request.mimetype + content_type = http_request.content_type or [] # or self.http_request.mimetype json_input = 'json' in content_type if not json_input: try: - doc = etree.fromstring(self.http_request.get_data()) + doc = etree.fromstring(http_request.get_data()) except Exception as e: raise NoApplicableCode(str(e)) operation = doc.tag @@ -122,7 +122,7 @@ def _post_request(self): request_parser(doc) else: try: - jdoc = json.loads(self.http_request.get_data()) + jdoc = json.loads(http_request.get_data()) except Exception as e: raise NoApplicableCode(str(e)) if self.identifier is not None: @@ -144,7 +144,7 @@ def _post_request(self): jdoc['default_mimetype'] = self.default_mimetype if self.preprocess_request is not None: - jdoc = self.preprocess_request(jdoc, http_request=self.http_request) + jdoc = self.preprocess_request(jdoc, http_request=http_request) self.json = jdoc version = jdoc.get('version') From 074fbcbb41ac937ca07dc8af1b1b290edc04e268 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 11:19:22 +0100 Subject: [PATCH 03/30] Move WPSResponse.template_env to pywps.response.TEMPLATE_ENV module constant --- pywps/inout/outputs.py | 11 ++--------- pywps/response/__init__.py | 11 ++++++++++- pywps/response/basic.py | 16 +++------------- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/pywps/inout/outputs.py b/pywps/inout/outputs.py index cafacc1d5..dbeb412a8 100644 --- a/pywps/inout/outputs.py +++ b/pywps/inout/outputs.py @@ -524,15 +524,8 @@ def url(self): def _load_template(self): from jinja2 import PackageLoader - - from pywps.response import RelEnvironment - - template_env = RelEnvironment( - loader=PackageLoader('pywps', 'templates'), - trim_blocks=True, lstrip_blocks=True, - autoescape=True, ) - - self._template = template_env.get_template(self._xml_template) + from pywps.response import TEMPLATE_ENV + self._template = TEMPLATE_ENV.get_template(self._xml_template) class MetaLink4(MetaLink): diff --git a/pywps/response/__init__.py b/pywps/response/__init__.py index e7211084b..3599e89ff 100644 --- a/pywps/response/__init__.py +++ b/pywps/response/__init__.py @@ -1,7 +1,8 @@ import os -from jinja2 import Environment +from jinja2 import Environment, PackageLoader +from pywps.translations import get_translation class RelEnvironment(Environment): @@ -10,6 +11,14 @@ def join_path(self, template, parent): return os.path.dirname(parent) + '/' + template +TEMPLATE_ENV = RelEnvironment( + loader=PackageLoader('pywps', 'templates'), + trim_blocks=True, lstrip_blocks=True, + autoescape=True, +) +TEMPLATE_ENV.globals.update(get_translation=get_translation) + + def get_response(operation): from .capabilities import CapabilitiesResponse diff --git a/pywps/response/basic.py b/pywps/response/basic.py index 05b6d4b2d..fcae08358 100644 --- a/pywps/response/basic.py +++ b/pywps/response/basic.py @@ -4,15 +4,10 @@ if TYPE_CHECKING: from pywps import WPSRequest -import os - -from jinja2 import Environment, PackageLoader - from pywps.dblog import store_status -from pywps.translations import get_translation - -from . import RelEnvironment +from pywps.response import TEMPLATE_ENV from .status import WPS_STATUS +import os class WPSResponse(object): @@ -27,12 +22,7 @@ def __init__(self, wps_request: 'WPSRequest', uuid=None, version="1.0.0"): self.doc = None self.content_type = None self.version = version - self.template_env = RelEnvironment( - loader=PackageLoader('pywps', 'templates'), - trim_blocks=True, lstrip_blocks=True, - autoescape=True, - ) - self.template_env.globals.update(get_translation=get_translation) + self.template_env = TEMPLATE_ENV def _update_status(self, status, message, status_percentage): """ From 74984e0a5a83edca881b071f871cd48b9e142336 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 10:46:33 +0100 Subject: [PATCH 04/30] Move RelEnvironment from pywps.response to pywps.response.basic --- pywps/inout/outputs.py | 2 +- pywps/response/__init__.py | 19 ------------------- pywps/response/basic.py | 18 +++++++++++++++++- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/pywps/inout/outputs.py b/pywps/inout/outputs.py index dbeb412a8..ed1316304 100644 --- a/pywps/inout/outputs.py +++ b/pywps/inout/outputs.py @@ -524,7 +524,7 @@ def url(self): def _load_template(self): from jinja2 import PackageLoader - from pywps.response import TEMPLATE_ENV + from pywps.response.basic import TEMPLATE_ENV self._template = TEMPLATE_ENV.get_template(self._xml_template) diff --git a/pywps/response/__init__.py b/pywps/response/__init__.py index 3599e89ff..ccdcb831f 100644 --- a/pywps/response/__init__.py +++ b/pywps/response/__init__.py @@ -1,23 +1,4 @@ -import os - -from jinja2 import Environment, PackageLoader -from pywps.translations import get_translation - - -class RelEnvironment(Environment): - """Override join_path() to enable relative template paths.""" - def join_path(self, template, parent): - return os.path.dirname(parent) + '/' + template - - -TEMPLATE_ENV = RelEnvironment( - loader=PackageLoader('pywps', 'templates'), - trim_blocks=True, lstrip_blocks=True, - autoescape=True, -) -TEMPLATE_ENV.globals.update(get_translation=get_translation) - def get_response(operation): diff --git a/pywps/response/basic.py b/pywps/response/basic.py index fcae08358..4cfc9dc0e 100644 --- a/pywps/response/basic.py +++ b/pywps/response/basic.py @@ -5,10 +5,26 @@ from pywps import WPSRequest from pywps.dblog import store_status -from pywps.response import TEMPLATE_ENV from .status import WPS_STATUS import os +from jinja2 import Environment, PackageLoader +from pywps.translations import get_translation + + +class RelEnvironment(Environment): + """Override join_path() to enable relative template paths.""" + def join_path(self, template, parent): + return os.path.dirname(parent) + '/' + template + + +TEMPLATE_ENV = RelEnvironment( + loader=PackageLoader('pywps', 'templates'), + trim_blocks=True, lstrip_blocks=True, + autoescape=True, +) +TEMPLATE_ENV.globals.update(get_translation=get_translation) + class WPSResponse(object): From 9636cf0cea1a5279bc90f9753eaa963214c8d5ed Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 15:37:57 +0100 Subject: [PATCH 05/30] Split WPSRequest into WPSExecuteRequest The WPS request contain the pre-parsed WPS data in json, i.e. an intermediate form of the request. We can store this form for later processing. In other hand the WPSExecuteRequest is the final WPS request form to run the process, it is used into the handler function as wps_request parameter. This simplify the data stored for later use. This also avoid that serialization of pywps.inout.inputs.*Input is used in very diferent context. --- pywps/app/Process.py | 6 +- pywps/app/Service.py | 133 +--------------------------- pywps/app/WPSExecuteRequest.py | 157 +++++++++++++++++++++++++++++++++ pywps/app/WPSRequest.py | 22 +---- pywps/app/__init__.py | 1 + pywps/processing/job.py | 3 +- tests/test_execute.py | 7 +- tests/test_wpsrequest.py | 29 +++--- 8 files changed, 187 insertions(+), 171 deletions(-) create mode 100644 pywps/app/WPSExecuteRequest.py diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 41e4b7910..6287613f9 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -11,6 +11,9 @@ import sys import traceback +from pywps.app.WPSExecuteRequest import WPSExecuteRequest +from pywps.inout.inputs import input_from_json +from pywps.inout.outputs import output_from_json import pywps.configuration as config from pywps import dblog from pywps.app.exceptions import ProcessError @@ -220,7 +223,7 @@ def _execute_process(self, async_, wps_request, wps_response): if stored >= maxprocesses and maxprocesses != -1: raise ServerBusy('Maximum number of processes in queue reached. Please try later.') LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid)) - dblog.store_process(self.uuid, wps_request) + dblog.store_process(self.uuid, wps_request.wps_request) wps_response._update_status(WPS_STATUS.ACCEPTED, 'PyWPS Process stored in job queue', 0) # not async @@ -320,6 +323,7 @@ def launch_next_process(self): new_wps_request.json = json.loads(request_json) process_identifier = new_wps_request.identifier process = self.service.prepare_process_for_execution(process_identifier) + new_wps_request = WPSExecuteRequest(process, new_wps_request) process._set_uuid(uuid) process._setup_status_storage() process.async_ = True diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 7d3b2b239..2a076af7a 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -14,7 +14,8 @@ from urllib.parse import urlparse from werkzeug.exceptions import HTTPException -from werkzeug.wrappers import Request +from werkzeug.wrappers import Request, Response +from pywps.app.WPSExecuteRequest import WPSExecuteRequest import pywps.configuration as config from pywps import response @@ -104,117 +105,13 @@ def _parse_and_execute(self, process, wps_request, uuid): """Parse and execute request """ - LOGGER.debug('Checking if all mandatory inputs have been passed') - data_inputs = {} - for inpt in process.inputs: - # Replace the dicts with the dict of Literal/Complex inputs - # set the input to the type defined in the process. - - request_inputs = None - if inpt.identifier in wps_request.inputs: - request_inputs = wps_request.inputs[inpt.identifier] - - if not request_inputs: - if inpt._default is not None: - if not inpt.data_set and isinstance(inpt, ComplexInput): - inpt._set_default_value() - - data_inputs[inpt.identifier] = [inpt.clone()] - else: - - if isinstance(inpt, ComplexInput): - data_inputs[inpt.identifier] = self.create_complex_inputs( - inpt, request_inputs) - elif isinstance(inpt, LiteralInput): - data_inputs[inpt.identifier] = self.create_literal_inputs( - inpt, request_inputs) - elif isinstance(inpt, BoundingBoxInput): - data_inputs[inpt.identifier] = self.create_bbox_inputs( - inpt, request_inputs) - - for inpt in process.inputs: - - if inpt.identifier not in data_inputs: - if inpt.min_occurs > 0: - LOGGER.error('Missing parameter value: {}'.format(inpt.identifier)) - raise MissingParameterValue( - inpt.identifier, inpt.identifier) - - wps_request.inputs = data_inputs + wps_request = WPSExecuteRequest(process, wps_request) process.setup_outputs_from_wps_request(wps_request) wps_response = process.execute(wps_request, uuid) return wps_response - def create_complex_inputs(self, source, inputs): - """Create new ComplexInput as clone of original ComplexInput - because of inputs can be more than one, take it just as Prototype. - - :param source: The process's input definition. - :param inputs: The request input data. - :return collections.deque: - """ - - outinputs = deque(maxlen=source.max_occurs) - - for inpt in inputs: - data_input = source.clone() - frmt = data_input.supported_formats[0] - if 'mimeType' in inpt: - if inpt['mimeType']: - frmt = data_input.get_format(inpt['mimeType']) - else: - frmt = data_input.data_format - - if frmt: - data_input.data_format = frmt - else: - raise InvalidParameterValue( - 'Invalid mimeType value {} for input {}'.format(inpt.get('mimeType'), source.identifier), - 'mimeType') - - data_input.method = inpt.get('method', 'GET') - data_input.process(inpt) - outinputs.append(data_input) - - if len(outinputs) < source.min_occurs: - description = "At least {} inputs are required. You provided {}.".format( - source.min_occurs, - len(outinputs), - ) - raise MissingParameterValue(description=description, locator=source.identifier) - return outinputs - - def create_literal_inputs(self, source, inputs): - """ Takes the http_request and parses the input to objects - :return collections.deque: - """ - - outinputs = deque(maxlen=source.max_occurs) - - for inpt in inputs: - newinpt = source.clone() - # set the input to the type defined in the process - newinpt.uom = inpt.get('uom') - data_type = inpt.get('datatype') - if data_type: - newinpt.data_type = data_type - - # get the value of the field - newinpt.data = inpt.get('data') - - outinputs.append(newinpt) - - if len(outinputs) < source.min_occurs: - description = "At least {} inputs are required. You provided {}.".format( - source.min_occurs, - len(outinputs), - ) - raise MissingParameterValue(description, locator=source.identifier) - - return outinputs - def _set_grass(self): """Set environment variables needed for GRASS GIS support """ @@ -241,30 +138,6 @@ def _set_grass(self): os.putenv('PYTHONPATH', os.environ.get('PYTHONPATH')) sys.path.insert(0, python_path) - def create_bbox_inputs(self, source, inputs): - """ Takes the http_request and parses the input to objects - :return collections.deque: - """ - - outinputs = deque(maxlen=source.max_occurs) - - for inpt in inputs: - newinpt = source.clone() - newinpt.data = inpt.get('data') - LOGGER.debug(f'newinpt bbox data={newinpt.data}') - newinpt.crs = inpt.get('crs') - newinpt.dimensions = inpt.get('dimensions') - outinputs.append(newinpt) - - if len(outinputs) < source.min_occurs: - description = "At least {} inputs are required. You provided {}.".format( - source.min_occurs, - len(outinputs), - ) - raise MissingParameterValue(description=description, locator=source.identifier) - - return outinputs - # May not raise exceptions, this function must return a valid werkzeug.wrappers.Response. def call(self, http_request): diff --git a/pywps/app/WPSExecuteRequest.py b/pywps/app/WPSExecuteRequest.py new file mode 100644 index 000000000..1bd94aa6a --- /dev/null +++ b/pywps/app/WPSExecuteRequest.py @@ -0,0 +1,157 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## + +import logging + +from pywps.exceptions import MissingParameterValue, InvalidParameterValue +from pywps.inout.inputs import ComplexInput, LiteralInput, BoundingBoxInput +from collections import deque + +LOGGER = logging.getLogger("PYWPS") + + +class WPSExecuteRequest(object): + + def __init__(self, process, wps_request): + self.wps_request = wps_request + + LOGGER.debug('Checking if all mandatory inputs have been passed') + self.inputs = dict() + for inpt in process.inputs: + # Replace the dicts with the dict of Literal/Complex inputs + # set the input to the type defined in the process. + + request_inputs = None + if inpt.identifier in wps_request.inputs: + request_inputs = wps_request.inputs[inpt.identifier] + + if not request_inputs: + if inpt._default is not None: + if not inpt.data_set and isinstance(inpt, ComplexInput): + inpt._set_default_value() + + self.inputs[inpt.identifier] = [inpt.clone()] + else: + + if isinstance(inpt, ComplexInput): + self.inputs[inpt.identifier] = self.create_complex_inputs( + inpt, request_inputs) + elif isinstance(inpt, LiteralInput): + self.inputs[inpt.identifier] = self.create_literal_inputs( + inpt, request_inputs) + elif isinstance(inpt, BoundingBoxInput): + self.inputs[inpt.identifier] = self.create_bbox_inputs( + inpt, request_inputs) + + for inpt in process.inputs: + + if inpt.identifier not in self.inputs: + if inpt.min_occurs > 0: + LOGGER.error('Missing parameter value: {}'.format(inpt.identifier)) + raise MissingParameterValue( + inpt.identifier, inpt.identifier) + + @property + def json(self): + raise NotImplementedError("WPSExectuteRequest is not serialisable to json") + + # Fall back to attibute of WPSRequest + def __getattr__(self, item): + return getattr(self.wps_request, item) + + @staticmethod + def create_complex_inputs(source, inputs): + """Create new ComplexInput as clone of original ComplexInput + because of inputs can be more than one, take it just as Prototype. + + :param source: The process's input definition. + :param inputs: The request input data. + :return collections.deque: + """ + + outinputs = deque(maxlen=source.max_occurs) + + for inpt in inputs: + data_input = source.clone() + frmt = data_input.supported_formats[0] + if 'mimeType' in inpt: + if inpt['mimeType']: + frmt = data_input.get_format(inpt['mimeType']) + else: + frmt = data_input.data_format + + if frmt: + data_input.data_format = frmt + else: + raise InvalidParameterValue( + 'Invalid mimeType value {} for input {}'.format(inpt.get('mimeType'), source.identifier), + 'mimeType') + + data_input.method = inpt.get('method', 'GET') + data_input.process(inpt) + outinputs.append(data_input) + + if len(outinputs) < source.min_occurs: + description = "At least {} inputs are required. You provided {}.".format( + source.min_occurs, + len(outinputs), + ) + raise MissingParameterValue(description=description, locator=source.identifier) + return outinputs + + @staticmethod + def create_literal_inputs(source, inputs): + """ Takes the http_request and parses the input to objects + :return collections.deque: + """ + + outinputs = deque(maxlen=source.max_occurs) + + for inpt in inputs: + newinpt = source.clone() + # set the input to the type defined in the process + newinpt.uom = inpt.get('uom') + data_type = inpt.get('datatype') + if data_type: + newinpt.data_type = data_type + + # get the value of the field + newinpt.data = inpt.get('data') + + outinputs.append(newinpt) + + if len(outinputs) < source.min_occurs: + description = "At least {} inputs are required. You provided {}.".format( + source.min_occurs, + len(outinputs), + ) + raise MissingParameterValue(description, locator=source.identifier) + + return outinputs + + @staticmethod + def create_bbox_inputs(source, inputs): + """ Takes the http_request and parses the input to objects + :return collections.deque: + """ + + outinputs = deque(maxlen=source.max_occurs) + + for inpt in inputs: + newinpt = source.clone() + newinpt.data = inpt.get('data') + LOGGER.debug(f'newinpt bbox data={newinpt.data}') + newinpt.crs = inpt.get('crs') + newinpt.dimensions = inpt.get('dimensions') + outinputs.append(newinpt) + + if len(outinputs) < source.min_occurs: + description = "At least {} inputs are required. You provided {}.".format( + source.min_occurs, + len(outinputs), + ) + raise MissingParameterValue(description=description, locator=source.identifier) + + return outinputs diff --git a/pywps/app/WPSRequest.py b/pywps/app/WPSRequest.py index 12d734648..facce379c 100644 --- a/pywps/app/WPSRequest.py +++ b/pywps/app/WPSRequest.py @@ -443,7 +443,7 @@ def default(self, obj): 'store_execute': self.store_execute, 'status': self.status, 'lineage': self.lineage, - 'inputs': dict((i, [inpt.json for inpt in self.inputs[i]]) for i in self.inputs), + 'inputs': self.inputs, 'outputs': self.outputs, 'raw': self.raw } @@ -469,25 +469,7 @@ def json(self, value): self.lineage = value.get('lineage', False) self.outputs = value.get('outputs') self.raw = value.get('raw', False) - self.inputs = {} - - for identifier in value.get('inputs', []): - inpt_defs = value['inputs'][identifier] - if not isinstance(inpt_defs, (list, tuple)): - inpt_defs = [inpt_defs] - self.inputs[identifier] = [] - for inpt_def in inpt_defs: - if not isinstance(inpt_def, dict): - inpt_def = {"data": inpt_def} - if 'identifier' not in inpt_def: - inpt_def['identifier'] = identifier - try: - inpt = input_from_json(inpt_def) - self.inputs[identifier].append(inpt) - except Exception as e: - LOGGER.warning(e) - LOGGER.warning(f'skipping input: {identifier}') - pass + self.inputs = value.get('inputs', []) def get_inputs_from_xml(doc): diff --git a/pywps/app/__init__.py b/pywps/app/__init__.py index 3cefbd934..a33cccfd3 100644 --- a/pywps/app/__init__.py +++ b/pywps/app/__init__.py @@ -8,3 +8,4 @@ from pywps.app.WPSRequest import WPSRequest # noqa: F401 from pywps.app.WPSRequest import get_inputs_from_xml # noqa: F401 from pywps.app.WPSRequest import get_output_from_xml # noqa: F401 +from pywps.app.WPSExecuteRequest import WPSExecuteRequest diff --git a/pywps/processing/job.py b/pywps/processing/job.py index d6ec87c7a..93a9f7468 100644 --- a/pywps/processing/job.py +++ b/pywps/processing/job.py @@ -11,6 +11,7 @@ import pywps.configuration as config from pywps import Process, WPSRequest from pywps.response.execute import ExecuteResponse +from pywps.app.WPSExecuteRequest import WPSExecuteRequest LOGGER = logging.getLogger("PYWPS") @@ -84,7 +85,7 @@ def load(cls, filename): return job def run(self): - getattr(self.process, self.method)(self.wps_request, self.wps_response) + getattr(self.process, self.method)(WPSExecuteRequest(self.process, self.wps_request), self.wps_response) class JobLauncher(object): diff --git a/tests/test_execute.py b/tests/test_execute.py index bcc2f0a30..40a92acf0 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -19,6 +19,7 @@ from pywps.app.basic import get_xpath_ns from pywps.tests import client_for, assert_response_success, assert_response_success_json from pywps import configuration +from pywps.app.WPSExecuteRequest import WPSExecuteRequest from io import StringIO @@ -319,7 +320,7 @@ class FakeRequest(): self.assertEqual(e.locator, 'mimeType') request.inputs['complex'][0]['mimeType'] = 'application/gml' - parsed_inputs = service.create_complex_inputs(my_process.inputs[0], + parsed_inputs = WPSExecuteRequest.create_complex_inputs(my_process.inputs[0], request.inputs['complex']) # TODO parse outputs and their validators too @@ -328,7 +329,7 @@ class FakeRequest(): request.inputs['complex'][0]['mimeType'] = 'application/xml+gml' try: - parsed_inputs = service.create_complex_inputs(my_process.inputs[0], + parsed_inputs = WPSExecuteRequest.create_complex_inputs(my_process.inputs[0], request.inputs['complex']) except InvalidParameterValue as e: self.assertEqual(e.locator, 'mimeType') @@ -343,7 +344,7 @@ class FakeRequest(): my_process.inputs[0].supported_formats = [frmt] my_process.inputs[0].data_format = Format(mime_type='application/xml+gml') - parsed_inputs = service.create_complex_inputs(my_process.inputs[0], + parsed_inputs = WPSExecuteRequest.create_complex_inputs(my_process.inputs[0], request.inputs['complex']) self.assertEqual(parsed_inputs[0].data_format.validate, validategml) diff --git a/tests/test_wpsrequest.py b/tests/test_wpsrequest.py index cc5868295..011a54075 100644 --- a/tests/test_wpsrequest.py +++ b/tests/test_wpsrequest.py @@ -60,10 +60,7 @@ def test_json_in(self): self.request = WPSRequest() self.request.json = obj - self.assertEqual(self.request.inputs['myliteral'][0].data, 1, 'Data are in the file') - self.assertEqual(self.request.inputs['myin'][0].data, 'ahoj', 'Data are in the file') - self.assertListEqual(self.request.inputs['myliteral'][0].allowed_values, [AnyValue()], 'Any value not set') - self.assertTrue(self.request.inputs['myliteral'][0].any_value, 'Any value set') + self.assertEqual(self.request.inputs['myliteral'][0]['data'], 1, 'Data are in the file') def test_json_inout_datetime(self): obj = { @@ -105,17 +102,17 @@ def test_json_inout_datetime(self): self.request = WPSRequest() self.request.json = obj - self.assertEqual(self.request.inputs['datetime'][0].data, datetime.datetime(2017, 4, 20, 12), 'Datatime set') - self.assertEqual(self.request.inputs['date'][0].data, datetime.date(2017, 4, 20), 'Data set') - self.assertEqual(self.request.inputs['time'][0].data, datetime.time(9, 0, 0), 'Time set') + self.assertEqual(self.request.inputs['datetime'][0]['data'], '2017-04-20T12:00:00', 'Datatime set') + self.assertEqual(self.request.inputs['date'][0]['data'], '2017-04-20', 'Data set') + self.assertEqual(self.request.inputs['time'][0]['data'], '09:00:00', 'Time set') # dump to json and reload dump = self.request.json self.request.json = json.loads(dump) - self.assertEqual(self.request.inputs['datetime'][0].data, datetime.datetime(2017, 4, 20, 12), 'Datatime set') - self.assertEqual(self.request.inputs['date'][0].data, datetime.date(2017, 4, 20), 'Data set') - self.assertEqual(self.request.inputs['time'][0].data, datetime.time(9, 0, 0), 'Time set') + self.assertEqual(self.request.inputs['datetime'][0]['data'], '2017-04-20T12:00:00', 'Datatime set') + self.assertEqual(self.request.inputs['date'][0]['data'], '2017-04-20', 'Data set') + self.assertEqual(self.request.inputs['time'][0]['data'], '09:00:00', 'Time set') def test_json_inout_bbox(self): obj = { @@ -145,17 +142,17 @@ def test_json_inout_bbox(self): self.request = WPSRequest() self.request.json = obj - self.assertEqual(self.request.inputs['bbox'][0].data, [6.117602, 46.176194, 6.22283, 46.275832], 'BBox data set') - self.assertTrue(isinstance(self.request.inputs['bbox'][0].crs, str), 'CRS is a string') - self.assertEqual(self.request.inputs['bbox'][0].crs, 'epsg:4326', 'CRS data correctly parsed') + self.assertEqual(self.request.inputs['bbox'][0]['bbox'], '6.117602,46.176194,6.22283,46.275832', 'BBox data set') + self.assertTrue(isinstance(self.request.inputs['bbox'][0]['crs'], str), 'CRS is a string') + self.assertEqual(self.request.inputs['bbox'][0]['crs'], 'urn:ogc:def:crs:EPSG::4326', 'CRS data correctly parsed') # dump to json and reload dump = self.request.json self.request.json = json.loads(dump) - self.assertEqual(self.request.inputs['bbox'][0].data, [6.117602, 46.176194, 6.22283, 46.275832], 'BBox data set') - self.assertTrue(isinstance(self.request.inputs['bbox'][0].crs, str), 'CRS is a string') - self.assertEqual(self.request.inputs['bbox'][0].crs, 'epsg:4326', 'CRS data correctly parsed') + self.assertEqual(self.request.inputs['bbox'][0]['bbox'], '6.117602,46.176194,6.22283,46.275832', 'BBox data set') + self.assertTrue(isinstance(self.request.inputs['bbox'][0]['crs'], str), 'CRS is a string') + self.assertEqual(self.request.inputs['bbox'][0]['crs'], 'urn:ogc:def:crs:EPSG::4326', 'CRS data correctly parsed') def load_tests(loader=None, tests=None, pattern=None): From 352aa83f2a81210e5705ad12106c24bcb6bee72a Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 15:55:31 +0100 Subject: [PATCH 06/30] Ensure WPSExecuteRequest.inputs['id'] to be python list WPSExecuteRequest.inputs['id'] is collections deque, this patch convert them to regular python list. --- pywps/app/WPSExecuteRequest.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pywps/app/WPSExecuteRequest.py b/pywps/app/WPSExecuteRequest.py index 1bd94aa6a..4686a9223 100644 --- a/pywps/app/WPSExecuteRequest.py +++ b/pywps/app/WPSExecuteRequest.py @@ -99,7 +99,7 @@ def create_complex_inputs(source, inputs): len(outinputs), ) raise MissingParameterValue(description=description, locator=source.identifier) - return outinputs + return list(outinputs) @staticmethod def create_literal_inputs(source, inputs): @@ -129,7 +129,7 @@ def create_literal_inputs(source, inputs): ) raise MissingParameterValue(description, locator=source.identifier) - return outinputs + return list(outinputs) @staticmethod def create_bbox_inputs(source, inputs): @@ -154,4 +154,4 @@ def create_bbox_inputs(source, inputs): ) raise MissingParameterValue(description=description, locator=source.identifier) - return outinputs + return list(outinputs) From ae1b89673e079cd3942b7351d20f521354d5e017 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 17:03:23 +0100 Subject: [PATCH 07/30] Split pywps.response.execute.ExecuteResponse into WPSExecuteResponse The ExecuteResponse is responsible to render the output XML and this is not mandatory for running process, in particular background process. This patch split the part that are required to run a given execute request from the part that is required to render the execute output. --- pywps/app/Process.py | 7 +- pywps/app/Service.py | 6 +- pywps/app/WPSExecuteResponse.py | 249 ++++++++++++++++++++++++++++++++ pywps/processing/job.py | 9 +- pywps/response/execute.py | 166 ++------------------- tests/test_processing.py | 13 +- 6 files changed, 276 insertions(+), 174 deletions(-) create mode 100644 pywps/app/WPSExecuteResponse.py diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 6287613f9..16fad90ee 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -12,6 +12,7 @@ import traceback from pywps.app.WPSExecuteRequest import WPSExecuteRequest +from pywps.app.WPSExecuteResponse import WPSExecuteResponse from pywps.inout.inputs import input_from_json from pywps.inout.outputs import output_from_json import pywps.configuration as config @@ -131,8 +132,7 @@ def execute(self, wps_request, uuid): self._set_uuid(uuid) self._setup_status_storage() self.async_ = False - response_cls = get_response("execute") - wps_response = response_cls(wps_request, process=self, uuid=self.uuid) + wps_response = WPSExecuteResponse(self, wps_request, self.uuid) LOGGER.debug('Check if status storage and updating are supported by this process') if wps_request.store_execute == 'true': @@ -327,8 +327,7 @@ def launch_next_process(self): process._set_uuid(uuid) process._setup_status_storage() process.async_ = True - process.setup_outputs_from_wps_request(new_wps_request) - new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid) + new_wps_response = WPSExecuteResponse(process, new_wps_request, uuid) new_wps_response.store_status_file = True process._run_async(new_wps_request, new_wps_response) except Exception as e: diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 2a076af7a..a12227e42 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -16,7 +16,7 @@ from werkzeug.exceptions import HTTPException from werkzeug.wrappers import Request, Response from pywps.app.WPSExecuteRequest import WPSExecuteRequest - +from pywps.response.execute import ExecuteResponse import pywps.configuration as config from pywps import response from pywps.app.WPSRequest import WPSRequest @@ -107,10 +107,8 @@ def _parse_and_execute(self, process, wps_request, uuid): wps_request = WPSExecuteRequest(process, wps_request) - process.setup_outputs_from_wps_request(wps_request) - wps_response = process.execute(wps_request, uuid) - return wps_response + return ExecuteResponse(wps_response) def _set_grass(self): """Set environment variables needed for GRASS GIS support diff --git a/pywps/app/WPSExecuteResponse.py b/pywps/app/WPSExecuteResponse.py new file mode 100644 index 000000000..e16cdc85d --- /dev/null +++ b/pywps/app/WPSExecuteResponse.py @@ -0,0 +1,249 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## +import logging + +LOGGER = logging.getLogger("PYWPS") + +import logging +import time +from werkzeug.wrappers import Request +from pywps import get_ElementMakerForVersion +from pywps.app.basic import get_response_type, get_json_indent, get_default_response_mimetype +import pywps.configuration as config +from werkzeug.wrappers import Response +from pywps.dblog import store_status +from pywps.inout.array_encode import ArrayEncoder +from pywps.response.status import WPS_STATUS +from pywps.inout.formats import FORMATS +from pywps.inout.outputs import ComplexOutput +from pywps.response.execute import ExecuteResponse +from pywps.exceptions import (StorageNotSupported, OperationNotSupported, + ServerBusy, NoApplicableCode, + InvalidParameterValue) + +import urllib.parse as urlparse +from urllib.parse import urlencode + +LOGGER = logging.getLogger("PYWPS") + +WPS, OWS = get_ElementMakerForVersion("1.0.0") + + +class WPSExecuteResponse(object): + + def __init__(self, process, wps_request, uuid, **kwargs): + """constructor + + :param pywps.app.WPSRequest.WPSRequest wps_request: + :param pywps.app.Process.Process process: + :param uuid: string this request uuid + """ + self.uuid = uuid + self.wps_request = wps_request + self.process = process + self.outputs = {o.identifier: o for o in self.process.outputs} + self.store_status_file = False + + self.setup_outputs_from_wps_request(process, wps_request) + + def setup_outputs_from_wps_request(self, process, wps_request): + # set as_reference to True for all the outputs specified as reference + # if the output is not required to be raw + if not wps_request.raw: + for wps_outpt in wps_request.outputs: + + is_reference = wps_request.outputs[wps_outpt].get('asReference', 'false') + mimetype = wps_request.outputs[wps_outpt].get('mimetype', '') + if not isinstance(mimetype, str): + mimetype = '' + + if is_reference.lower() == 'true': + if process.store_supported == 'false': + raise StorageNotSupported( + 'The storage of data is not supported for this process.') + + is_reference = True + else: + is_reference = False + + for outpt in self.outputs.values(): + if outpt.identifier == wps_outpt: + outpt.as_reference = is_reference + if isinstance(outpt, ComplexOutput) and mimetype: + data_format = [f for f in outpt.supported_formats if f.mime_type == mimetype] + if len(data_format) == 0: + raise InvalidParameterValue( + f"MimeType {mimetype} not valid") + outpt.data_format = data_format[0] + + def _update_stored_status(self, status, message, status_percentage): + """ + Update status report of currently running process instance + + :param str message: Message you need to share with the client + :param int status_percentage: Percent done (number betwen <0-100>) + :param pywps.response.status.WPS_STATUS status: process status - user should usually + ommit this parameter + """ + self.message = message + self.status = status + self.status_percentage = status_percentage + store_status(self.uuid, self.status, self.message, self.status_percentage) + + # override WPSResponse._update_status + def _update_status(self, status, message, status_percentage, clean=True): + """ + Updates status report of currently running process instance: + + * Updates the status document. + * Updates the status file (if requested). + * Cleans the working directory when process has finished. + + This method is *only* called by pywps internally. + """ + self._update_stored_status(status, message, status_percentage) + + LOGGER.debug("_update_status: status={}, clean={}".format(status, clean)) + self._update_status_doc() + if self.store_status_file: + self._update_status_file() + if clean: + if self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED: + LOGGER.debug("clean workdir: status={}".format(status)) + self.process.clean() + + def update_status(self, message, status_percentage=None): + """ + Update status report of currently running process instance. + + This method is *only* called by the user provided process. + The status is handled internally in pywps. + + :param str message: Message you need to share with the client + :param int status_percentage: Percent done (number betwen <0-100>) + """ + if status_percentage is None: + status_percentage = self.status_percentage + self._update_status(self.status, message, status_percentage, False) + + def _update_status_doc(self): + try: + # rebuild the doc + response = ExecuteResponse(self) + self.doc, self.content_type = response._construct_doc() + except Exception as e: + raise NoApplicableCode('Building Response Document failed with : {}'.format(e)) + + def _update_status_file(self): + # TODO: check if file/directory is still present, maybe deleted in mean time + try: + # update the status xml file + self.process.status_store.write( + self.doc, + self.process.status_filename, + data_format=FORMATS.XML) + except Exception as e: + raise NoApplicableCode('Writing Response Document failed with : {}'.format(e)) + + def _process_accepted(self): + percent = int(self.status_percentage) + if percent > 99: + percent = 99 + return { + "status": "accepted", + "time": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), + "percent_done": str(percent), + "message": self.message + } + + def _process_started(self): + data = self._process_accepted() + data.update({ + "status": "started", + }) + return data + + def _process_paused(self): + data = self._process_accepted() + data.update({ + "status": "paused", + }) + return data + + def _process_succeeded(self): + data = self._process_accepted() + data.update({ + "status": "succeeded", + "percent_done": "100" + }) + return data + + def _process_failed(self): + data = self._process_accepted() + data.update({ + "status": "failed", + "code": "NoApplicableCode", + "locator": "None", + }) + return data + + def _get_serviceinstance(self): + + url = config.get_config_value("server", "url") + params = {'request': 'GetCapabilities', 'service': 'WPS'} + + url_parts = list(urlparse.urlparse(url)) + query = dict(urlparse.parse_qsl(url_parts[4])) + query.update(params) + + url_parts[4] = urlencode(query) + return urlparse.urlunparse(url_parts).replace("&", "&") + + # Kept as guard to avoid misleading implementation + @property + def json(self): + raise NotImplementedError("Use WPSExecuteResponse.as_json_for_execute_template instead") + + # Used only to render the status outputs. + def as_json_for_execute_template(self): + data = { + "language": self.wps_request.language, + "service_instance": self._get_serviceinstance(), + "process": self.process.json + } + + if self.store_status_file: + if self.process.status_location: + data["status_location"] = self.process.status_url + + if self.status == WPS_STATUS.ACCEPTED: + self.message = 'PyWPS Process {} accepted'.format(self.process.identifier) + data["status"] = self._process_accepted() + elif self.status == WPS_STATUS.STARTED: + data["status"] = self._process_started() + elif self.status == WPS_STATUS.FAILED: + # check if process failed and display fail message + data["status"] = self._process_failed() + elif self.status == WPS_STATUS.PAUSED: + # TODO: handle paused status + data["status"] = self._process_paused() + elif self.status == WPS_STATUS.SUCCEEDED: + data["status"] = self._process_succeeded() + # Process outputs XML + data["outputs"] = [self.outputs[o].json for o in self.outputs] + # lineage: add optional lineage when process has finished + if self.status in [WPS_STATUS.SUCCEEDED, WPS_STATUS.FAILED]: + # DataInputs and DataOutputs definition XML if lineage=true + if self.wps_request.lineage == 'true': + data["lineage"] = True + try: + # TODO: stored process has ``pywps.inout.basic.LiteralInput`` + # instead of a ``pywps.inout.inputs.LiteralInput``. + data["input_definitions"] = [self.wps_request.inputs[i][0].json for i in self.wps_request.inputs] + except Exception as e: + LOGGER.error("Failed to update lineage for input parameter. {}".format(e)) + + data["output_definitions"] = [self.outputs[o].json for o in self.outputs] + return data diff --git a/pywps/processing/job.py b/pywps/processing/job.py index 93a9f7468..dd8e9673c 100644 --- a/pywps/processing/job.py +++ b/pywps/processing/job.py @@ -12,6 +12,7 @@ from pywps import Process, WPSRequest from pywps.response.execute import ExecuteResponse from pywps.app.WPSExecuteRequest import WPSExecuteRequest +from pywps.app.WPSExecuteResponse import WPSExecuteResponse LOGGER = logging.getLogger("PYWPS") @@ -58,10 +59,10 @@ def from_json(cls, value): process = Process.from_json(value['process']) wps_request = WPSRequest() wps_request.json = json.loads(value['wps_request']) - wps_response = ExecuteResponse( - wps_request=wps_request, - uuid=process.uuid, - process=process) + wps_response = WPSExecuteResponse( + process, + wps_request, + process.uuid) wps_response.store_status_file = True new_job = Job( process=Process.from_json(value['process']), diff --git a/pywps/response/execute.py b/pywps/response/execute.py index 48c0afe9b..de8ec16dc 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -32,7 +32,7 @@ class ExecuteResponse(WPSResponse): - def __init__(self, wps_request, uuid, **kwargs): + def __init__(self, wps_execute_response): """constructor :param pywps.app.WPSRequest.WPSRequest wps_request: @@ -40,160 +40,13 @@ def __init__(self, wps_request, uuid, **kwargs): :param uuid: string this request uuid """ - super(ExecuteResponse, self).__init__(wps_request, uuid) + super(ExecuteResponse, self).__init__(wps_execute_response.wps_request, wps_execute_response.uuid) - self.process = kwargs["process"] - self.outputs = {o.identifier: o for o in self.process.outputs} - self.store_status_file = False + self.wps_execute_response = wps_execute_response - # override WPSResponse._update_status - def _update_status(self, status, message, status_percentage, clean=True): - """ - Updates status report of currently running process instance: - - * Updates the status document. - * Updates the status file (if requested). - * Cleans the working directory when process has finished. - - This method is *only* called by pywps internally. - """ - super(ExecuteResponse, self)._update_status(status, message, status_percentage) - LOGGER.debug("_update_status: status={}, clean={}".format(status, clean)) - self._update_status_doc() - if self.store_status_file: - self._update_status_file() - if clean: - if self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED: - LOGGER.debug("clean workdir: status={}".format(status)) - self.process.clean() - - def update_status(self, message, status_percentage=None): - """ - Update status report of currently running process instance. - - This method is *only* called by the user provided process. - The status is handled internally in pywps. - - :param str message: Message you need to share with the client - :param int status_percentage: Percent done (number betwen <0-100>) - """ - if status_percentage is None: - status_percentage = self.status_percentage - self._update_status(self.status, message, status_percentage, False) - - def _update_status_doc(self): - try: - # rebuild the doc - self.doc, self.content_type = self._construct_doc() - except Exception as e: - raise NoApplicableCode('Building Response Document failed with : {}'.format(e)) - - def _update_status_file(self): - # TODO: check if file/directory is still present, maybe deleted in mean time - try: - # update the status xml file - self.process.status_store.write( - self.doc, - self.process.status_filename, - data_format=FORMATS.XML) - except Exception as e: - raise NoApplicableCode('Writing Response Document failed with : {}'.format(e)) - - def _process_accepted(self): - percent = int(self.status_percentage) - if percent > 99: - percent = 99 - return { - "status": "accepted", - "time": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), - "percent_done": str(percent), - "message": self.message - } - - def _process_started(self): - data = self._process_accepted() - data.update({ - "status": "started", - }) - return data - - def _process_paused(self): - data = self._process_accepted() - data.update({ - "status": "paused", - }) - return data - - def _process_succeeded(self): - data = self._process_accepted() - data.update({ - "status": "succeeded", - "percent_done": "100" - }) - return data - - def _process_failed(self): - data = self._process_accepted() - data.update({ - "status": "failed", - "code": "NoApplicableCode", - "locator": "None", - }) - return data - - def _get_serviceinstance(self): - - url = config.get_config_value("server", "url") - params = {'request': 'GetCapabilities', 'service': 'WPS'} - - url_parts = list(urlparse.urlparse(url)) - query = dict(urlparse.parse_qsl(url_parts[4])) - query.update(params) - - url_parts[4] = urlencode(query) - return urlparse.urlunparse(url_parts).replace("&", "&") - - @property - def json(self): - data = { - "language": self.wps_request.language, - "service_instance": self._get_serviceinstance(), - "process": self.process.json - } - - if self.store_status_file: - if self.process.status_location: - data["status_location"] = self.process.status_url - - if self.status == WPS_STATUS.ACCEPTED: - self.message = 'PyWPS Process {} accepted'.format(self.process.identifier) - data["status"] = self._process_accepted() - elif self.status == WPS_STATUS.STARTED: - data["status"] = self._process_started() - elif self.status == WPS_STATUS.FAILED: - # check if process failed and display fail message - data["status"] = self._process_failed() - elif self.status == WPS_STATUS.PAUSED: - # TODO: handle paused status - data["status"] = self._process_paused() - elif self.status == WPS_STATUS.SUCCEEDED: - data["status"] = self._process_succeeded() - # Process outputs XML - data["outputs"] = [self.outputs[o].json for o in self.outputs] - # lineage: add optional lineage when process has finished - if self.status in [WPS_STATUS.SUCCEEDED, WPS_STATUS.FAILED]: - # DataInputs and DataOutputs definition XML if lineage=true - if self.wps_request.lineage == 'true': - data["lineage"] = True - try: - # TODO: stored process has ``pywps.inout.basic.LiteralInput`` - # instead of a ``pywps.inout.inputs.LiteralInput``. - data["input_definitions"] = [self.wps_request.inputs[i][0].json for i in self.wps_request.inputs] - except Exception as e: - LOGGER.error("Failed to update lineage for input parameter. {}".format(e)) - - data["output_definitions"] = [self.outputs[o].json for o in self.outputs] - return data + # Fallback to self.wps_execute_response attribute + def __getattr__(self, item): + return getattr(self.wps_execute_response, item) @staticmethod def _render_json_response(jdoc): @@ -219,7 +72,7 @@ def _construct_doc(self): self.outputs = self.wps_request.preprocess_response(self.outputs, request=self.wps_request, http_request=self.wps_request.http_request) - doc = self.json + doc = self.wps_execute_response.as_json_for_execute_template() try: json_response, mimetype = get_response_type( self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) @@ -278,6 +131,7 @@ def __call__(self, request): headers={'Content-Disposition': 'attachment; filename="{}"' .format(wps_output_identifier + suffix)}) else: - if not self.doc: + doc, mimetypes = self._construct_doc() + if not doc: return NoApplicableCode("Output was not generated") - return Response(self.doc, mimetype=accepted_mimetype) + return Response(doc, mimetype=accepted_mimetype) diff --git a/tests/test_processing.py b/tests/test_processing.py index c7fa2f046..b482c61ed 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -17,6 +17,7 @@ from pywps.processing.basic import MultiProcessing from pywps.app import WPSRequest from pywps.response.execute import ExecuteResponse +from pywps.app.WPSExecuteResponse import WPSExecuteResponse from processes import Greeter, InOut, BBox @@ -34,8 +35,8 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(self.wps_request, self.uuid, - process=self.dummy_process) + self.wps_response = ExecuteResponse(WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid)) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, @@ -81,8 +82,8 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(self.wps_request, self.uuid, - process=self.dummy_process) + self.wps_response = ExecuteResponse(WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid)) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, @@ -118,8 +119,8 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(self.wps_request, self.uuid, - process=self.dummy_process) + self.wps_response = ExecuteResponse(WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid)) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, From 98edcd4fd65a57ed799c1c372e7b7a01ef945e11 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 18:52:14 +0100 Subject: [PATCH 08/30] Improve tests/test_processing.py with realistic WPSRequest --- tests/test_processing.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/test_processing.py b/tests/test_processing.py index b482c61ed..da7c4ccf8 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -119,6 +119,23 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() + self.wps_request.json = { + 'operation': 'execute', + 'version': '1.0.0', + 'language': 'eng', + 'identifier': 'bbox_test', + 'store_execute': True, + 'status': True, + 'lineage': False, + 'inputs': { + 'area': [{ + 'identifier': 'area', + 'data': [0, 1, 2, 3] + }] + }, + 'outputs': { }, + 'raw': False + } self.wps_response = ExecuteResponse(WPSExecuteResponse(self.dummy_process, self.wps_request, self.uuid)) self.job = Job( From 9baf3aa5488a0f8f7aaff88ee8890cc950f5d26d Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 19:20:56 +0100 Subject: [PATCH 09/30] Split ExecuteResponse into ExecuteRawResponse --- pywps/app/Service.py | 7 ++- pywps/response/execute.py | 112 +++++++++++++++++++++++--------------- 2 files changed, 72 insertions(+), 47 deletions(-) diff --git a/pywps/app/Service.py b/pywps/app/Service.py index a12227e42..484a34916 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -16,7 +16,7 @@ from werkzeug.exceptions import HTTPException from werkzeug.wrappers import Request, Response from pywps.app.WPSExecuteRequest import WPSExecuteRequest -from pywps.response.execute import ExecuteResponse +from pywps.response.execute import ExecuteResponse, ExecuteRawResponse import pywps.configuration as config from pywps import response from pywps.app.WPSRequest import WPSRequest @@ -108,7 +108,10 @@ def _parse_and_execute(self, process, wps_request, uuid): wps_request = WPSExecuteRequest(process, wps_request) wps_response = process.execute(wps_request, uuid) - return ExecuteResponse(wps_response) + if wps_request.wps_request.raw: + return ExecuteRawResponse(wps_response) + else: + return ExecuteResponse(wps_response) def _set_grass(self): """Set environment variables needed for GRASS GIS support diff --git a/pywps/response/execute.py b/pywps/response/execute.py index de8ec16dc..70204b569 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -30,6 +30,69 @@ WPS, OWS = get_ElementMakerForVersion("1.0.0") +class ExecuteRawResponse(WPSResponse): + + def __init__(self, wps_execute_response): + """constructor + + :param pywps.app.WPSRequest.WPSRequest wps_request: + :param pywps.app.Process.Process process: + :param uuid: string this request uuid + """ + + super(ExecuteRawResponse, self).__init__(wps_execute_response.wps_request, wps_execute_response.uuid) + + self.wps_execute_response = wps_execute_response + + # Fallback to self.wps_execute_response attribute + def __getattr__(self, item): + return getattr(self.wps_execute_response, item) + + @Request.application + def __call__(self, request): + accept_json_response, accepted_mimetype = get_response_type( + self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) + if self.status == WPS_STATUS.FAILED: + return NoApplicableCode(self.message) + else: + wps_output_identifier = next(iter(self.wps_request.outputs)) # get the first key only + wps_output_value = self.outputs[wps_output_identifier] + response = wps_output_value.data + if response is None: + return NoApplicableCode("Expected output was not generated") + suffix = '' + # if isinstance(wps_output_value, ComplexOutput): + data_format = None + if hasattr(wps_output_value, 'output_format'): + # this is set in the response, thus should be more precise + data_format = wps_output_value.output_format + elif hasattr(wps_output_value, 'data_format'): + # this is set in the process' response _handler function, thus could have a few supported formats + data_format = wps_output_value.data_format + if data_format is not None: + mimetype = data_format.mime_type + if data_format.extension is not None: + suffix = data_format.extension + else: + # like LitearlOutput + mimetype = self.wps_request.outputs[wps_output_identifier].get('mimetype', None) + if not isinstance(response, (str, bytes, bytearray)): + if not mimetype: + mimetype = accepted_mimetype + json_response = mimetype and 'json' in mimetype + if json_response: + mimetype = 'application/json' + suffix = '.json' + response = json.dumps(response, cls=ArrayEncoder, indent=get_json_indent()) + else: + response = str(response) + if not mimetype: + mimetype = None + return Response(response, mimetype=mimetype, + headers={'Content-Disposition': 'attachment; filename="{}"' + .format(wps_output_identifier + suffix)}) + + class ExecuteResponse(WPSResponse): def __init__(self, wps_execute_response): @@ -90,48 +153,7 @@ def _construct_doc(self): def __call__(self, request): accept_json_response, accepted_mimetype = get_response_type( self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - if self.wps_request.raw: - if self.status == WPS_STATUS.FAILED: - return NoApplicableCode(self.message) - else: - wps_output_identifier = next(iter(self.wps_request.outputs)) # get the first key only - wps_output_value = self.outputs[wps_output_identifier] - response = wps_output_value.data - if response is None: - return NoApplicableCode("Expected output was not generated") - suffix = '' - # if isinstance(wps_output_value, ComplexOutput): - data_format = None - if hasattr(wps_output_value, 'output_format'): - # this is set in the response, thus should be more precise - data_format = wps_output_value.output_format - elif hasattr(wps_output_value, 'data_format'): - # this is set in the process' response _handler function, thus could have a few supported formats - data_format = wps_output_value.data_format - if data_format is not None: - mimetype = data_format.mime_type - if data_format.extension is not None: - suffix = data_format.extension - else: - # like LitearlOutput - mimetype = self.wps_request.outputs[wps_output_identifier].get('mimetype', None) - if not isinstance(response, (str, bytes, bytearray)): - if not mimetype: - mimetype = accepted_mimetype - json_response = mimetype and 'json' in mimetype - if json_response: - mimetype = 'application/json' - suffix = '.json' - response = json.dumps(response, cls=ArrayEncoder, indent=get_json_indent()) - else: - response = str(response) - if not mimetype: - mimetype = None - return Response(response, mimetype=mimetype, - headers={'Content-Disposition': 'attachment; filename="{}"' - .format(wps_output_identifier + suffix)}) - else: - doc, mimetypes = self._construct_doc() - if not doc: - return NoApplicableCode("Output was not generated") - return Response(doc, mimetype=accepted_mimetype) + doc, mimetypes = self._construct_doc() + if not doc: + return NoApplicableCode("Output was not generated") + return Response(doc, mimetype=accepted_mimetype) From bfb7ec642ca2cad1af919c85574bd8c690c3689b Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 19:50:48 +0100 Subject: [PATCH 10/30] Update comments in WPSExecuteRequest to clarify the intend of the code --- pywps/app/WPSExecuteRequest.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pywps/app/WPSExecuteRequest.py b/pywps/app/WPSExecuteRequest.py index 4686a9223..e6df764f4 100644 --- a/pywps/app/WPSExecuteRequest.py +++ b/pywps/app/WPSExecuteRequest.py @@ -12,6 +12,13 @@ LOGGER = logging.getLogger("PYWPS") +# The WPSExecuteRequest is used a user interface within the Process._handler to +# allow user to get WPS request inputs using +# WPSExecuteRequest.inputs['identifier']. It is also used to get some meta data +# from the requested inputs. +# +# It is not intended to be serializable to json. and json proporties is disable +# on purpose. class WPSExecuteRequest(object): def __init__(self, process, wps_request): @@ -55,6 +62,7 @@ def __init__(self, process, wps_request): @property def json(self): + # Raise exception on purpose. Do not implement this function ! raise NotImplementedError("WPSExectuteRequest is not serialisable to json") # Fall back to attibute of WPSRequest From c9691b9cd3ba6d85e44d07f9c5595cf84c5a7b44 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Mon, 7 Nov 2022 19:51:03 +0100 Subject: [PATCH 11/30] Update comments in WPSExecuteResponse to clarify the intend of the code --- pywps/app/WPSExecuteResponse.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pywps/app/WPSExecuteResponse.py b/pywps/app/WPSExecuteResponse.py index e16cdc85d..733410e5c 100644 --- a/pywps/app/WPSExecuteResponse.py +++ b/pywps/app/WPSExecuteResponse.py @@ -31,6 +31,13 @@ WPS, OWS = get_ElementMakerForVersion("1.0.0") +# WPSExecuteResponse is crafted as an interface for the user. The user can +# provide outputs within Process._handler using +# WPSExecuteResponse.outputs['identifier']. The structure also provide output +# metadata such as the requested mimetype for ComplexOutput +# +# This structure is not expect to be serialized. The json property is used as +# input for the execute template and should stay like this. class WPSExecuteResponse(object): def __init__(self, process, wps_request, uuid, **kwargs): From 8c4268eab6da6a06f6fe376d6a6f4647d0c9c96a Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Tue, 8 Nov 2022 15:37:05 +0100 Subject: [PATCH 12/30] Parse XML request more accuratly --- pywps/app/WPSRequest.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/pywps/app/WPSRequest.py b/pywps/app/WPSRequest.py index facce379c..352418ea4 100644 --- a/pywps/app/WPSRequest.py +++ b/pywps/app/WPSRequest.py @@ -489,8 +489,8 @@ def get_inputs_from_xml(doc): inpt = { 'identifier': identifier_el.text, 'data': str(value_el.text), - 'uom': value_el.attrib.get('uom', ''), - 'datatype': value_el.attrib.get('datatype', '') + 'uom': value_el.attrib.get('uom', None), + 'datatype': value_el.attrib.get('datatype', None) } the_inputs[identifier].append(inpt) continue @@ -501,10 +501,12 @@ def get_inputs_from_xml(doc): inpt = { 'identifier': identifier_el.text, 'mimeType': complex_data_el.attrib.get('mimeType', None), - 'encoding': complex_data_el.attrib.get('encoding', '').lower(), - 'schema': complex_data_el.attrib.get('schema', ''), - 'method': complex_data_el.attrib.get('method', 'GET') + 'encoding': complex_data_el.attrib.get('encoding', None), + 'schema': complex_data_el.attrib.get('schema', None), + 'method': complex_data_el.attrib.get('method', None) } + if inpt['encoding'] is not None: + inpt['encoding'] = inpt['encoding'].lower() if len(complex_data_el.getchildren()) > 0: value_el = complex_data_el[0] @@ -523,10 +525,10 @@ def get_inputs_from_xml(doc): 'identifier': identifier_el.text, identifier_el.text: reference_data_el.text, 'href': reference_data_el.attrib.get( - '{http://www.w3.org/1999/xlink}href', '' + '{http://www.w3.org/1999/xlink}href', None ), 'mimeType': reference_data_el.attrib.get('mimeType', None), - 'method': reference_data_el.attrib.get('method', 'GET') + 'method': reference_data_el.attrib.get('method', None) } header_element = xpath_ns(reference_data_el, './wps:Header') if header_element: @@ -572,9 +574,9 @@ def get_output_from_xml(doc): outpt = { identifier_el.text: '', 'mimetype': output_el.attrib.get('mimeType', None), - 'encoding': output_el.attrib.get('encoding', ''), - 'schema': output_el.attrib.get('schema', ''), - 'uom': output_el.attrib.get('uom', ''), + 'encoding': output_el.attrib.get('encoding', None), + 'schema': output_el.attrib.get('schema', None), + 'uom': output_el.attrib.get('uom', None), 'asReference': output_el.attrib.get('asReference', 'false') } the_output[identifier_el.text] = outpt @@ -585,9 +587,9 @@ def get_output_from_xml(doc): outpt = { identifier_el.text: '', 'mimetype': output_el.attrib.get('mimeType', None), - 'encoding': output_el.attrib.get('encoding', ''), - 'schema': output_el.attrib.get('schema', ''), - 'uom': output_el.attrib.get('uom', '') + 'encoding': output_el.attrib.get('encoding', None), + 'schema': output_el.attrib.get('schema', None), + 'uom': output_el.attrib.get('uom', None) } the_output[identifier_el.text] = outpt From 2977c90c5579ef99e233f517529239a8146e6a69 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Tue, 8 Nov 2022 15:42:44 +0100 Subject: [PATCH 13/30] Update execute template to safely export xml attribute if they are set --- pywps/templates/1.0.0/execute/main.xml | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pywps/templates/1.0.0/execute/main.xml b/pywps/templates/1.0.0/execute/main.xml index 047b39aab..8d83fb040 100644 --- a/pywps/templates/1.0.0/execute/main.xml +++ b/pywps/templates/1.0.0/execute/main.xml @@ -1,4 +1,7 @@ +{% macro safe_attr(name, value) -%} +{% if value %}{{ name }}="{{value}}"{% endif %} +{%- endmacro %} {{ process.identifier }} @@ -40,7 +43,7 @@ {{ get_translation(input, "abstract", language) }} {% if input.type == "complex" %} - {{ input.data | safe }} + {{ input.data | safe }} {% elif input.type == "literal" %} @@ -48,13 +51,13 @@ {% elif input.type == "bbox" %} - + {% for c in input.ll %} {{ c }} {% endfor %} {% for c in input.ur %} {{ c }} {% endfor %} {% elif input.type == "reference" %} - + {% endif %} {% endfor %} @@ -64,7 +67,7 @@ {% for output in output_definitions %} {% if output.type in ["complex", "reference"] %} - + {% else %} {% endif %} @@ -84,10 +87,10 @@ {{ get_translation(output, "title", language) }} {{ get_translation(output, "abstract", language) }} {% if output.type == "reference" %} - + {% elif output.type == "complex" %} - {{ output.data | safe }} + {{ output.data | safe }} {% elif output.type == "literal" %} @@ -95,7 +98,7 @@ {% elif output.type == "bbox" %} - + {% for c in output.ll %} {{ c }} {% endfor %} {% for c in output.ur %} {{ c }} {% endfor %} From 350d780a665fd713499257116ba7741168ec4a26 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Tue, 8 Nov 2022 16:08:54 +0100 Subject: [PATCH 14/30] Apply preprocess_response within WPSExecuteResponse --- pywps/app/WPSExecuteResponse.py | 8 ++++++++ pywps/response/execute.py | 6 ------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pywps/app/WPSExecuteResponse.py b/pywps/app/WPSExecuteResponse.py index 733410e5c..319e9d39a 100644 --- a/pywps/app/WPSExecuteResponse.py +++ b/pywps/app/WPSExecuteResponse.py @@ -112,6 +112,14 @@ def _update_status(self, status, message, status_percentage, clean=True): """ self._update_stored_status(status, message, status_percentage) + if self.status == WPS_STATUS.SUCCEEDED and \ + getattr(self.wps_request, 'preprocess_response', None): + self.outputs = self.wps_request.preprocess_response(self.outputs, + request=self.wps_request, + http_request=self.wps_request.http_request) + # Avoid multiple apply of preprocess_response + self.wps_request.preprocess_response = None + LOGGER.debug("_update_status: status={}, clean={}".format(status, clean)) self._update_status_doc() if self.store_status_file: diff --git a/pywps/response/execute.py b/pywps/response/execute.py index 70204b569..d54a51b89 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -129,12 +129,6 @@ def _render_json_response(jdoc): return response def _construct_doc(self): - if self.status == WPS_STATUS.SUCCEEDED and \ - hasattr(self.wps_request, 'preprocess_response') and \ - self.wps_request.preprocess_response: - self.outputs = self.wps_request.preprocess_response(self.outputs, - request=self.wps_request, - http_request=self.wps_request.http_request) doc = self.wps_execute_response.as_json_for_execute_template() try: json_response, mimetype = get_response_type( From f5ce9c83c2af63b52d03e58a635a54000883b88b Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Tue, 5 Jul 2022 11:00:25 +0200 Subject: [PATCH 15/30] Implement pywps.dblog.StatusRecord --- pywps/dblog.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/pywps/dblog.py b/pywps/dblog.py index 1f9fe823f..b8366e5a7 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -11,6 +11,9 @@ import logging import os import sys +import json + +from types import SimpleNamespace from multiprocessing import Lock import sqlalchemy @@ -98,6 +101,17 @@ class RequestInstance(Base): request = Column(LargeBinary, nullable=False) +class StatusRecord(Base): + __tablename__ = '{}status_records'.format(_tableprefix) + + # Process uuid + uuid = Column(VARCHAR(255), primary_key=True, nullable=False) + # Time stamp for creation time + timestamp = Column(DateTime(), nullable=False) + # json data used in template + data = Column(LargeBinary, nullable=False) + + def log_request(uuid, request): """Write OGC WPS request (only the necessary parts) to database logging system @@ -165,6 +179,39 @@ def store_status(uuid, wps_status, message=None, status_percentage=None): session.commit() +# Update or create a store instance +def update_status_record(uuid, data): + with current_session as session: + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count(): + status_record = r.one() + status_record.timestamp = datetime.datetime.now() + status_record.data = json.dumps(data).encode("utf-8") + else: + status_record = StatusRecord( + uuid=str(uuid), + timestamp=datetime.datetime.now(), + data=json.dumps(data).encode("utf-8") + ) + session.add(status_record) + session.commit() + + +# Get store instance data from uuid +def get_status_record(uuid): + with current_session as session: + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count(): + status_record = r.one() + # Ensure new item to avoid change in database + # FIXME: There is a better solution ? + attrs = ["uuid", "timestamp", "data"] + status_record = SimpleNamespace(**{k: getattr(status_record, k) for k in attrs}) + status_record.data = json.loads(status_record.data.decode("utf-8")) + return status_record + return None + + def update_pid(uuid, pid): """Update actual pid for the uuid processing """ @@ -265,6 +312,7 @@ def get_session(): Session = sessionmaker(bind=engine) ProcessInstance.metadata.create_all(engine) RequestInstance.metadata.create_all(engine) + StatusRecord.metadata.create_all(engine) _SESSION_MAKER_DATABASE = database _SESSION_MAKER = Session From 5f8e04b9409b01c5f39a0ae8ad4ccf33d73a74a1 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Tue, 8 Nov 2022 17:56:37 +0100 Subject: [PATCH 16/30] Generate status file from stored data in the database --- pywps/app/WPSExecuteResponse.py | 3 ++- pywps/response/execute.py | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pywps/app/WPSExecuteResponse.py b/pywps/app/WPSExecuteResponse.py index 319e9d39a..5047c35ac 100644 --- a/pywps/app/WPSExecuteResponse.py +++ b/pywps/app/WPSExecuteResponse.py @@ -13,7 +13,7 @@ from pywps.app.basic import get_response_type, get_json_indent, get_default_response_mimetype import pywps.configuration as config from werkzeug.wrappers import Response -from pywps.dblog import store_status +from pywps.dblog import store_status, update_status_record from pywps.inout.array_encode import ArrayEncoder from pywps.response.status import WPS_STATUS from pywps.inout.formats import FORMATS @@ -111,6 +111,7 @@ def _update_status(self, status, message, status_percentage, clean=True): This method is *only* called by pywps internally. """ self._update_stored_status(status, message, status_percentage) + update_status_record(self.uuid, self.as_json_for_execute_template()) if self.status == WPS_STATUS.SUCCEEDED and \ getattr(self.wps_request, 'preprocess_response', None): diff --git a/pywps/response/execute.py b/pywps/response/execute.py index d54a51b89..4621ffb79 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -129,7 +129,11 @@ def _render_json_response(jdoc): return response def _construct_doc(self): - doc = self.wps_execute_response.as_json_for_execute_template() + from pywps.dblog import get_status_record + r = get_status_record(self.wps_execute_response.uuid) + if not r: + return NoApplicableCode("Output was not generated") + doc = r.data try: json_response, mimetype = get_response_type( self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) From 78bba0a81a8aa3f32cc933f511a7da7d941e007f Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 10:27:23 +0100 Subject: [PATCH 17/30] Transform LiteralOutput in the same way as LiteralInput --- pywps/inout/outputs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pywps/inout/outputs.py b/pywps/inout/outputs.py index ed1316304..8d6c05f91 100644 --- a/pywps/inout/outputs.py +++ b/pywps/inout/outputs.py @@ -295,13 +295,14 @@ def json(self): "title": self.title, "abstract": self.abstract, "keywords": self.keywords, - "data": self.data, "data_type": self.data_type, "type": "literal", "uoms": [u.json for u in self.uoms], "translations": self.translations, } + if self.data is not None: + data['data'] = str(self.data) if self.uom: data["uom"] = self.uom.json From dea9896185d05b40eb41ba4cb12aa96becdd2f5a Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 10:29:26 +0100 Subject: [PATCH 18/30] Replace ExecuteResponse by StatusResponse that use stored status --- pywps/app/Process.py | 1 - pywps/app/Service.py | 10 +++++-- pywps/app/WPSExecuteResponse.py | 7 ++--- pywps/processing/job.py | 1 - pywps/response/__init__.py | 3 --- pywps/response/execute.py | 48 +++++++++------------------------ tests/test_execute.py | 9 ++++--- tests/test_processing.py | 13 +++++---- 8 files changed, 37 insertions(+), 55 deletions(-) diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 16fad90ee..b3ae482f7 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -29,7 +29,6 @@ from pywps.inout.outputs import ComplexOutput from pywps.inout.storage.builder import StorageBuilder from pywps.response import get_response -from pywps.response.execute import ExecuteResponse from pywps.response.status import WPS_STATUS from pywps.translations import lower_case_dict diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 484a34916..6a1b3f4ff 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -16,7 +16,7 @@ from werkzeug.exceptions import HTTPException from werkzeug.wrappers import Request, Response from pywps.app.WPSExecuteRequest import WPSExecuteRequest -from pywps.response.execute import ExecuteResponse, ExecuteRawResponse +from pywps.response.execute import StatusResponse, ExecuteRawResponse import pywps.configuration as config from pywps import response from pywps.app.WPSRequest import WPSRequest @@ -29,6 +29,7 @@ ) from pywps.inout.inputs import BoundingBoxInput, ComplexInput, LiteralInput from pywps.response.status import WPS_STATUS +from pywps.app.basic import get_response_type, get_default_response_mimetype LOGGER = logging.getLogger("PYWPS") @@ -111,7 +112,12 @@ def _parse_and_execute(self, process, wps_request, uuid): if wps_request.wps_request.raw: return ExecuteRawResponse(wps_response) else: - return ExecuteResponse(wps_response) + # FIXME: this try-except has no pratical meaning, just allow to pass some test. + try: + _, mimetype = get_response_type(wps_request.http_request.accept_mimetypes, wps_request.default_mimetype) + except Exception: + mimetype = get_default_response_mimetype() + return StatusResponse(wps_request.version, wps_response.uuid, mimetype) def _set_grass(self): """Set environment variables needed for GRASS GIS support diff --git a/pywps/app/WPSExecuteResponse.py b/pywps/app/WPSExecuteResponse.py index 5047c35ac..c7ed7e7a2 100644 --- a/pywps/app/WPSExecuteResponse.py +++ b/pywps/app/WPSExecuteResponse.py @@ -18,7 +18,7 @@ from pywps.response.status import WPS_STATUS from pywps.inout.formats import FORMATS from pywps.inout.outputs import ComplexOutput -from pywps.response.execute import ExecuteResponse +from pywps.response.execute import StatusResponse from pywps.exceptions import (StorageNotSupported, OperationNotSupported, ServerBusy, NoApplicableCode, InvalidParameterValue) @@ -147,8 +147,9 @@ def update_status(self, message, status_percentage=None): def _update_status_doc(self): try: # rebuild the doc - response = ExecuteResponse(self) - self.doc, self.content_type = response._construct_doc() + self.doc = StatusResponse(self.wps_request.version, self.uuid, + get_default_response_mimetype()).get_data(as_text=True) + self.content_type = get_default_response_mimetype() except Exception as e: raise NoApplicableCode('Building Response Document failed with : {}'.format(e)) diff --git a/pywps/processing/job.py b/pywps/processing/job.py index dd8e9673c..f5b44e6a0 100644 --- a/pywps/processing/job.py +++ b/pywps/processing/job.py @@ -10,7 +10,6 @@ import pywps.configuration as config from pywps import Process, WPSRequest -from pywps.response.execute import ExecuteResponse from pywps.app.WPSExecuteRequest import WPSExecuteRequest from pywps.app.WPSExecuteResponse import WPSExecuteResponse diff --git a/pywps/response/__init__.py b/pywps/response/__init__.py index ccdcb831f..151ec60a0 100644 --- a/pywps/response/__init__.py +++ b/pywps/response/__init__.py @@ -4,11 +4,8 @@ def get_response(operation): from .capabilities import CapabilitiesResponse from .describe import DescribeResponse - from .execute import ExecuteResponse if operation == "capabilities": return CapabilitiesResponse elif operation == "describe": return DescribeResponse - elif operation == "execute": - return ExecuteResponse diff --git a/pywps/response/execute.py b/pywps/response/execute.py index 4621ffb79..50ab46b98 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -21,6 +21,7 @@ from pywps.exceptions import NoApplicableCode from pywps.inout.array_encode import ArrayEncoder from pywps.inout.formats import FORMATS +from pywps.response.basic import TEMPLATE_ENV from pywps.response.status import WPS_STATUS from .basic import WPSResponse @@ -93,9 +94,9 @@ def __call__(self, request): .format(wps_output_identifier + suffix)}) -class ExecuteResponse(WPSResponse): +class StatusResponse(Response): - def __init__(self, wps_execute_response): + def __init__(self, version, uuid, mimetype): """constructor :param pywps.app.WPSRequest.WPSRequest wps_request: @@ -103,13 +104,18 @@ def __init__(self, wps_execute_response): :param uuid: string this request uuid """ - super(ExecuteResponse, self).__init__(wps_execute_response.wps_request, wps_execute_response.uuid) + from pywps.dblog import get_status_record + r = get_status_record(uuid) + if not r: + return NoApplicableCode("Output was not generated") - self.wps_execute_response = wps_execute_response + if 'json' in mimetype: + doc = json.dumps(self._render_json_response(r.data), cls=ArrayEncoder, indent=get_json_indent()) + else: + template = TEMPLATE_ENV.get_template(version + '/execute/main.xml') + doc = template.render(**r.data) - # Fallback to self.wps_execute_response attribute - def __getattr__(self, item): - return getattr(self.wps_execute_response, item) + super().__init__(doc, mimetype=mimetype) @staticmethod def _render_json_response(jdoc): @@ -127,31 +133,3 @@ def _render_json_response(jdoc): d[id] = val[key] response['outputs'] = d return response - - def _construct_doc(self): - from pywps.dblog import get_status_record - r = get_status_record(self.wps_execute_response.uuid) - if not r: - return NoApplicableCode("Output was not generated") - doc = r.data - try: - json_response, mimetype = get_response_type( - self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - except Exception: - mimetype = get_default_response_mimetype() - json_response = 'json' in mimetype - if json_response: - doc = json.dumps(self._render_json_response(doc), cls=ArrayEncoder, indent=get_json_indent()) - else: - template = self.template_env.get_template(self.version + '/execute/main.xml') - doc = template.render(**doc) - return doc, mimetype - - @Request.application - def __call__(self, request): - accept_json_response, accepted_mimetype = get_response_type( - self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - doc, mimetypes = self._construct_doc() - if not doc: - return NoApplicableCode("Output was not generated") - return Response(doc, mimetype=accepted_mimetype) diff --git a/tests/test_execute.py b/tests/test_execute.py index 40a92acf0..67ba2c641 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -370,7 +370,8 @@ class FakeRequest(): language = "en-US" request = FakeRequest() - response = service.execute('my_complex_process', request, 'fakeuuid') + request = WPSExecuteRequest(my_process, request) + response = my_process.execute(request, 'fakeuuid') self.assertEqual(response.outputs['complex'].data, 'DEFAULT COMPLEX DATA') def test_output_mimetype(self): @@ -401,13 +402,15 @@ def __init__(self, mimetype): # valid mimetype request = FakeRequest('text/plain+test') - response = service.execute('get_mimetype_process', request, 'fakeuuid') + request = WPSExecuteRequest(my_process, request) + response = my_process.execute(request, 'fakeuuid') self.assertEqual(response.outputs['mimetype'].data, 'text/plain+test') # non valid mimetype request = FakeRequest('text/xml') with self.assertRaises(InvalidParameterValue): - response = service.execute('get_mimetype_process', request, 'fakeuuid') + request = WPSExecuteRequest(my_process, request) + response = my_process.execute(request, 'fakeuuid') def test_metalink(self): client = client_for(Service(processes=[create_metalink_process()])) diff --git a/tests/test_processing.py b/tests/test_processing.py index da7c4ccf8..cfce5184d 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -16,7 +16,6 @@ from pywps.processing.job import Job from pywps.processing.basic import MultiProcessing from pywps.app import WPSRequest -from pywps.response.execute import ExecuteResponse from pywps.app.WPSExecuteResponse import WPSExecuteResponse from processes import Greeter, InOut, BBox @@ -35,8 +34,8 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(WPSExecuteResponse(self.dummy_process, - self.wps_request, self.uuid)) + self.wps_response = WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, @@ -82,8 +81,8 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(WPSExecuteResponse(self.dummy_process, - self.wps_request, self.uuid)) + self.wps_response = WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, @@ -136,8 +135,8 @@ def setUp(self): 'outputs': { }, 'raw': False } - self.wps_response = ExecuteResponse(WPSExecuteResponse(self.dummy_process, - self.wps_request, self.uuid)) + self.wps_response = WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, From 0ef9d245ad895dab05886ea172405c3f84a571ac Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 11:53:25 +0100 Subject: [PATCH 19/30] Remove pywps.response.get_response --- pywps/app/Process.py | 1 - pywps/app/Service.py | 12 +++--------- pywps/response/__init__.py | 14 +++----------- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/pywps/app/Process.py b/pywps/app/Process.py index b3ae482f7..65a2f6bd8 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -28,7 +28,6 @@ ) from pywps.inout.outputs import ComplexOutput from pywps.inout.storage.builder import StorageBuilder -from pywps.response import get_response from pywps.response.status import WPS_STATUS from pywps.translations import lower_case_dict diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 6a1b3f4ff..0467a4495 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -16,9 +16,8 @@ from werkzeug.exceptions import HTTPException from werkzeug.wrappers import Request, Response from pywps.app.WPSExecuteRequest import WPSExecuteRequest -from pywps.response.execute import StatusResponse, ExecuteRawResponse +from pywps.response import CapabilitiesResponse, DescribeResponse, ExecuteRawResponse, StatusResponse import pywps.configuration as config -from pywps import response from pywps.app.WPSRequest import WPSRequest from pywps.dblog import log_request, store_status from pywps.exceptions import ( @@ -64,15 +63,10 @@ def __init__(self, processes: Sequence = [], cfgfiles=None, preprocessors: Optio LOGGER.addHandler(logging.NullHandler()) def get_capabilities(self, wps_request, uuid): - - response_cls = response.get_response("capabilities") - return response_cls(wps_request, uuid, version=wps_request.version, processes=self.processes) + return CapabilitiesResponse(wps_request, uuid, version=wps_request.version, processes=self.processes) def describe(self, wps_request, uuid, identifiers): - - response_cls = response.get_response("describe") - return response_cls(wps_request, uuid, processes=self.processes, - identifiers=identifiers) + return DescribeResponse(wps_request, uuid, processes=self.processes, identifiers=identifiers) def execute(self, identifier, wps_request, uuid): """Parse and perform Execute WPS request call diff --git a/pywps/response/__init__.py b/pywps/response/__init__.py index 151ec60a0..d32bfbbe9 100644 --- a/pywps/response/__init__.py +++ b/pywps/response/__init__.py @@ -1,11 +1,3 @@ - - -def get_response(operation): - - from .capabilities import CapabilitiesResponse - from .describe import DescribeResponse - - if operation == "capabilities": - return CapabilitiesResponse - elif operation == "describe": - return DescribeResponse +from pywps.response.capabilities import CapabilitiesResponse +from pywps.response.describe import DescribeResponse +from pywps.response.execute import ExecuteRawResponse, StatusResponse From 3121aa3291b000b1f8a8f5b26767afd0a617d108 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 14:57:06 +0100 Subject: [PATCH 20/30] Move closure within WPSRequest._get_request_parser --- pywps/app/WPSRequest.py | 110 +++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 57 deletions(-) diff --git a/pywps/app/WPSRequest.py b/pywps/app/WPSRequest.py index 352418ea4..8375a880f 100644 --- a/pywps/app/WPSRequest.py +++ b/pywps/app/WPSRequest.py @@ -156,63 +156,59 @@ def _post_request(self, http_request): request_parser = self._post_json_request_parser() request_parser(jdoc) + def parse_get_getcapabilities(self, http_request): + """Parse GET GetCapabilities request.""" + acceptedversions = _get_get_param(http_request, 'acceptversions') + self.check_accepted_versions(acceptedversions) + self.default_mimetype = _get_get_param(http_request, 'f', self.default_mimetype) + + def parse_get_describeprocess(self, http_request): + """Parse GET DescribeProcess request.""" + version = _get_get_param(http_request, 'version') + self.check_and_set_version(version) + + self.identifiers = _get_get_param( + http_request, 'identifier', self.identifiers, aslist=True) + if self.identifiers is None and self.identifier is not None: + self.identifiers = [self.identifier] + self.default_mimetype = _get_get_param(http_request, 'f', self.default_mimetype) + + def parse_get_execute(self, http_request): + """Parse GET Execute request.""" + version = _get_get_param(http_request, 'version') + self.check_and_set_version(version) + + self.identifier = _get_get_param(http_request, 'identifier', self.identifier) + self.store_execute = _get_get_param( + http_request, 'storeExecuteResponse', 'false') + self.status = _get_get_param(http_request, 'status', 'false') + self.lineage = _get_get_param( + http_request, 'lineage', 'false') + self.inputs = get_data_from_kvp( + _get_get_param(http_request, 'DataInputs'), 'DataInputs') + if self.inputs is None: + self.inputs = {} + + # take responseDocument preferably + raw, output_ids = False, _get_get_param(http_request, 'ResponseDocument') + if output_ids is None: + raw, output_ids = True, _get_get_param(http_request, 'RawDataOutput') + if output_ids is not None: + self.raw, self.output_ids = raw, output_ids + elif self.raw is None: + self.raw = self.output_ids is not None + + self.default_mimetype = _get_get_param(http_request, 'f', self.default_mimetype) + self.outputs = get_data_from_kvp(self.output_ids) or {} + if self.raw: + # executeResponse XML will not be stored and no updating of + # status + self.store_execute = 'false' + self.status = 'false' + def _get_request_parser(self, operation): """Factory function returning proper parsing function.""" - wpsrequest = self - - def parse_get_getcapabilities(http_request): - """Parse GET GetCapabilities request.""" - - acceptedversions = _get_get_param(http_request, 'acceptversions') - wpsrequest.check_accepted_versions(acceptedversions) - wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype) - - def parse_get_describeprocess(http_request): - """Parse GET DescribeProcess request - """ - version = _get_get_param(http_request, 'version') - wpsrequest.check_and_set_version(version) - - wpsrequest.identifiers = _get_get_param( - http_request, 'identifier', wpsrequest.identifiers, aslist=True) - if wpsrequest.identifiers is None and self.identifier is not None: - wpsrequest.identifiers = [wpsrequest.identifier] - wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype) - - def parse_get_execute(http_request): - """Parse GET Execute request.""" - version = _get_get_param(http_request, 'version') - wpsrequest.check_and_set_version(version) - - wpsrequest.identifier = _get_get_param(http_request, 'identifier', wpsrequest.identifier) - wpsrequest.store_execute = _get_get_param( - http_request, 'storeExecuteResponse', 'false') - wpsrequest.status = _get_get_param(http_request, 'status', 'false') - wpsrequest.lineage = _get_get_param( - http_request, 'lineage', 'false') - wpsrequest.inputs = get_data_from_kvp( - _get_get_param(http_request, 'DataInputs'), 'DataInputs') - if self.inputs is None: - self.inputs = {} - - # take responseDocument preferably - raw, output_ids = False, _get_get_param(http_request, 'ResponseDocument') - if output_ids is None: - raw, output_ids = True, _get_get_param(http_request, 'RawDataOutput') - if output_ids is not None: - wpsrequest.raw, wpsrequest.output_ids = raw, output_ids - elif wpsrequest.raw is None: - wpsrequest.raw = wpsrequest.output_ids is not None - - wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype) - wpsrequest.outputs = get_data_from_kvp(wpsrequest.output_ids) or {} - if wpsrequest.raw: - # executeResponse XML will not be stored and no updating of - # status - wpsrequest.store_execute = 'false' - wpsrequest.status = 'false' - if operation: self.operation = operation.lower() else: @@ -221,11 +217,11 @@ def parse_get_execute(http_request): self.operation = 'execute' if self.operation == 'getcapabilities': - return parse_get_getcapabilities + return self.parse_get_getcapabilities elif self.operation == 'describeprocess': - return parse_get_describeprocess + return self.parse_get_describeprocess elif self.operation == 'execute': - return parse_get_execute + return self.parse_get_execute else: raise OperationNotSupported( 'Unknown request {}'.format(self.operation), operation) From bf580f48626f23290ade597c08bee34ffb0766c7 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 15:04:49 +0100 Subject: [PATCH 21/30] Add custom request=status operation that provide the Process status from his uuid --- pywps/app/Service.py | 13 ++++++++++++- pywps/app/WPSRequest.py | 12 ++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 0467a4495..cdd440155 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -62,6 +62,14 @@ def __init__(self, processes: Sequence = [], cfgfiles=None, preprocessors: Optio if not LOGGER.handlers: LOGGER.addHandler(logging.NullHandler()) + def get_status(self, wps_request, uuid): + try: + _, mimetype = get_response_type(wps_request.http_request.accept_mimetypes, + wps_request.default_mimetype) + except Exception: + mimetype = get_default_response_mimetype() + return StatusResponse(wps_request.version, wps_request.requested_status_uuid, mimetype) + def get_capabilities(self, wps_request, uuid): return CapabilitiesResponse(wps_request, uuid, version=wps_request.version, processes=self.processes) @@ -164,7 +172,8 @@ def call(self, http_request): LOGGER.info('Request: {}'.format(wps_request.operation)) if wps_request.operation in ['getcapabilities', 'describeprocess', - 'execute']: + 'execute', + 'status']: log_request(request_uuid, wps_request) try: response = None @@ -182,6 +191,8 @@ def call(self, http_request): wps_request, request_uuid ) + elif wps_request.operation == 'status': + response = self.get_status(wps_request, request_uuid) return response except Exception as e: # This ensure that logged request get terminated in case of exception while the request is not diff --git a/pywps/app/WPSRequest.py b/pywps/app/WPSRequest.py index 8375a880f..ec9c53c55 100644 --- a/pywps/app/WPSRequest.py +++ b/pywps/app/WPSRequest.py @@ -55,6 +55,7 @@ def __init__(self, http_request=None, preprocessors=None): self.preprocessors = preprocessors or dict() self.preprocess_request = None self.preprocess_response = None + self.requested_status_uuid = None if http_request: d = parse_http_url(http_request) @@ -156,6 +157,15 @@ def _post_request(self, http_request): request_parser = self._post_json_request_parser() request_parser(jdoc) + def parse_get_status(self, http_request): + """Parse GET GetCapabilities request + """ + + # Required to store the request in database + self.set_version('1.0.0') + + self.requested_status_uuid = _get_get_param(http_request, 'uuid') + def parse_get_getcapabilities(self, http_request): """Parse GET GetCapabilities request.""" acceptedversions = _get_get_param(http_request, 'acceptversions') @@ -222,6 +232,8 @@ def _get_request_parser(self, operation): return self.parse_get_describeprocess elif self.operation == 'execute': return self.parse_get_execute + elif self.operation == 'status': + return self.parse_get_status else: raise OperationNotSupported( 'Unknown request {}'.format(self.operation), operation) From c31a6ba1546bcae89043def1f6ffe6f74b481835 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 15:14:38 +0100 Subject: [PATCH 22/30] Replace status url by request status url --- pywps/app/Process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 65a2f6bd8..9cb67ce70 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -182,7 +182,8 @@ def status_filename(self): @property def status_url(self): - return self.status_store.url(self.status_filename) + base_url = config.get_config_value('server', 'url') + return f"{base_url}?service=WPS&request=status&uuid={self.uuid}" def _execute_process(self, async_, wps_request, wps_response): """ From 17839273bb47dfde5345d8b90a18dba9f62fe548 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 16:54:57 +0100 Subject: [PATCH 23/30] Implement NoAsyncProcessing to fake async processing Within test the database is in memory database, thus upon fork the database is also forked and all async process will update a copy of the database instead of actually updating the main database, making actual test of async request wrong. --- pywps/processing/__init__.py | 5 ++++- pywps/processing/basic.py | 9 +++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pywps/processing/__init__.py b/pywps/processing/__init__.py index 78915f5ce..a132f6024 100644 --- a/pywps/processing/__init__.py +++ b/pywps/processing/__init__.py @@ -9,7 +9,7 @@ # api only from pywps.processing.basic import Processing # noqa: F401 -from pywps.processing.basic import DetachProcessing, MultiProcessing +from pywps.processing.basic import DetachProcessing, MultiProcessing, NoAsyncProcessing from pywps.processing.job import Job # noqa: F401 from pywps.processing.scheduler import Scheduler @@ -17,6 +17,7 @@ MULTIPROCESSING = 'multiprocessing' DETACHPROCESSING = 'detachprocessing' +NOASYNCPROCESSING = 'noasyncprocessing' SCHEDULER = 'scheduler' DEFAULT = MULTIPROCESSING @@ -34,6 +35,8 @@ def Process(process, wps_request, wps_response): process = Scheduler(process, wps_request, wps_response) elif mode == DETACHPROCESSING: process = DetachProcessing(process, wps_request, wps_response) + elif mode == NOASYNCPROCESSING: + process = NoAsyncProcessing(process, wps_request, wps_response) else: process = MultiProcessing(process, wps_request, wps_response) diff --git a/pywps/processing/basic.py b/pywps/processing/basic.py index 2bc17f7bd..b6164b671 100644 --- a/pywps/processing/basic.py +++ b/pywps/processing/basic.py @@ -64,3 +64,12 @@ def start(self): pass # Ensure to stop ourself here what ever append. os._exit(0) + + +class NoAsyncProcessing(Processing): + """ + :class:`NoAsyncProcessing` Faking multiprocessing for testing purpose + """ + + def start(self): + getattr(self.job.process, self.job.method)(self.job.wps_request, self.job.wps_response) From a8aeac5617107343a7b4aa4ecc2f97fd6e3b1705 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 9 Nov 2022 16:58:47 +0100 Subject: [PATCH 24/30] Implement a test for statusLocation and status request --- tests/processes/__init__.py | 2 ++ tests/test_wps_status.py | 57 +++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 tests/test_wps_status.py diff --git a/tests/processes/__init__.py b/tests/processes/__init__.py index 5882a9ef1..a2c5050e9 100644 --- a/tests/processes/__init__.py +++ b/tests/processes/__init__.py @@ -33,6 +33,8 @@ class Greeter(Process): def __init__(self): super(Greeter, self).__init__( self.greeter, + store_supported="true", + status_supported="true", identifier='greeter', title='Greeter', inputs=[LiteralInput('name', 'Input name', data_type='string')], diff --git a/tests/test_wps_status.py b/tests/test_wps_status.py new file mode 100644 index 000000000..1ed358efd --- /dev/null +++ b/tests/test_wps_status.py @@ -0,0 +1,57 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## + +from basic import TestBase +from pywps import Service +from pywps import get_ElementMakerForVersion +from pywps.tests import client_for, assert_response_accepted, assert_response_success + +from pywps import configuration +from processes import Greeter +from urllib.parse import urlparse + +from time import sleep + +VERSION = "1.0.0" + +WPS, OWS = get_ElementMakerForVersion(VERSION) + + +class ExecuteTest(TestBase): + + def setUp(self) -> None: + super().setUp() + # Use fake async processing for this test + configuration.CONFIG.set('processing', 'mode', 'noasyncprocessing') + + def test_wps_status(self): + client = client_for(Service(processes=[Greeter()])) + request_doc = WPS.Execute( + OWS.Identifier('greeter'), + WPS.DataInputs( + WPS.Input( + OWS.Identifier('name'), + WPS.Data( + WPS.LiteralData( + "SomeName" + ) + ) + ) + ), + WPS.ResponseForm( + WPS.ResponseDocument(storeExecuteResponse='true', status='true') + ), + version="1.0.0" + ) + resp = client.post_xml(doc=request_doc) + assert_response_accepted(resp) + + url = resp.xml.xpath("//@statusLocation")[0] + + # Parse url because we do not have real server + url = urlparse(url) + resp = client.get(f'{url.path}?{url.query}') + assert_response_success(resp) + From 9e7366b5d9da8273541be5d1a9651da57e4baf1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gschwind?= Date: Fri, 15 Sep 2023 15:40:49 +0200 Subject: [PATCH 25/30] Update test_assync.py to properly read status --- tests/test_assync.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/test_assync.py b/tests/test_assync.py index 296bc2c07..6d23a9d7e 100644 --- a/tests/test_assync.py +++ b/tests/test_assync.py @@ -11,9 +11,7 @@ from pywps.tests import client_for, assert_response_accepted, assert_response_success from processes import Sleep from owslib.wps import WPSExecution -from pathlib import Path -from tempfile import TemporaryDirectory -from pywps import dblog +from urllib.parse import urlparse VERSION = "1.0.0" @@ -50,13 +48,13 @@ def test_async(self): print(url) # OWSlib only reads from URLs, not local files. So we need to read the response manually. - p = Path(configuration.get_config_value('server', 'outputpath')) / url.split('/')[-1] + url = urlparse(url) # Poll the process until it completes total_time = 0 sleep_time = .01 while wps.status not in ["ProcessSucceeded", "ProcessFailed"]: - resp = p.read_bytes() + resp = client.get(f'{url.path}?{url.query}').data if resp: wps.checkStatus(response=resp, sleepSecs=0.01) else: From 0c53662bb1231f39f6fb85f11c69d13b383c9ad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gschwind?= Date: Wed, 20 Sep 2023 18:01:49 +0200 Subject: [PATCH 26/30] Check if a process has crashed upon status query --- pywps/dblog.py | 48 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/pywps/dblog.py b/pywps/dblog.py index b8366e5a7..4b0468e18 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -12,6 +12,7 @@ import os import sys import json +import time from types import SimpleNamespace from multiprocessing import Lock @@ -197,19 +198,48 @@ def update_status_record(uuid, data): session.commit() +def force_failed_status(uuid): + with current_session as session: + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count() < 1: + return + status_record = r.one() + data = json.loads(status_record.data.decode("utf-8")) + data['status'] = { + "status": "failed", + "time": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), + "message": "Process has crashed" + } + status_record.timestamp = datetime.datetime.now() + status_record.data = json.dumps(data).encode("utf-8") + session.commit() + + # Get store instance data from uuid def get_status_record(uuid): with current_session as session: + if sys.platform == "linux": + # Check if the current process is fail + r = session.query(ProcessInstance).filter_by(uuid=str(uuid)) + # If no process instance is found then there is no status records + if r.count() < 1: + return None + process_record = r.one() + if process_record.status not in {WPS_STATUS.FAILED, WPS_STATUS.SUCCEEDED}: + if not os.path.exists(os.path.join("/proc", str(process_record.pid))): + store_status(process_record.uuid, WPS_STATUS.FAILED, "Process crashed", 100) + force_failed_status(process_record.uuid) + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) - if r.count(): - status_record = r.one() - # Ensure new item to avoid change in database - # FIXME: There is a better solution ? - attrs = ["uuid", "timestamp", "data"] - status_record = SimpleNamespace(**{k: getattr(status_record, k) for k in attrs}) - status_record.data = json.loads(status_record.data.decode("utf-8")) - return status_record - return None + if r.count() < 1: + return None + status_record = r.one() + # Ensure new item to avoid change in database + # FIXME: There is a better solution ? + attrs = ["uuid", "timestamp", "data"] + status_record = SimpleNamespace(**{k: getattr(status_record, k) for k in attrs}) + status_record.data = json.loads(status_record.data.decode("utf-8")) + return status_record def update_pid(uuid, pid): From b30239d2dc6f27d709afc776f38a7bf95265245f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gschwind?= Date: Wed, 7 Feb 2024 11:14:58 +0100 Subject: [PATCH 27/30] Add option to keep old status file --- default-sample.cfg | 3 +++ pywps/app/Process.py | 3 ++- pywps/app/WPSExecuteResponse.py | 10 +++++----- pywps/configuration.py | 1 + 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/default-sample.cfg b/default-sample.cfg index 19908d040..3f04b3a7a 100644 --- a/default-sample.cfg +++ b/default-sample.cfg @@ -61,6 +61,9 @@ maxprocesses=30 parallelprocesses=2 storagetype=file +# Keep old status file for backward compatibility, pywps does not need it. +keep_status_file=false + # hardcoded default : tempfile.gettempdir() #temp_path=/tmp diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 9cb67ce70..a1051133f 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -128,7 +128,8 @@ def from_json(cls, value): def execute(self, wps_request, uuid): self._set_uuid(uuid) - self._setup_status_storage() + if config.get_config_value('server', 'keep_status_file'): + self._setup_status_storage() self.async_ = False wps_response = WPSExecuteResponse(self, wps_request, self.uuid) diff --git a/pywps/app/WPSExecuteResponse.py b/pywps/app/WPSExecuteResponse.py index c7ed7e7a2..5c44552d0 100644 --- a/pywps/app/WPSExecuteResponse.py +++ b/pywps/app/WPSExecuteResponse.py @@ -122,9 +122,10 @@ def _update_status(self, status, message, status_percentage, clean=True): self.wps_request.preprocess_response = None LOGGER.debug("_update_status: status={}, clean={}".format(status, clean)) - self._update_status_doc() - if self.store_status_file: - self._update_status_file() + if config.get_config_value('server', 'keep_status_file'): + self._update_status_doc() + if self.store_status_file: + self._update_status_file() if clean: if self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED: LOGGER.debug("clean workdir: status={}".format(status)) @@ -232,8 +233,7 @@ def as_json_for_execute_template(self): } if self.store_status_file: - if self.process.status_location: - data["status_location"] = self.process.status_url + data["status_location"] = self.process.status_url if self.status == WPS_STATUS.ACCEPTED: self.message = 'PyWPS Process {} accepted'.format(self.process.identifier) diff --git a/pywps/configuration.py b/pywps/configuration.py index ec77e30be..b3ff8ecfe 100755 --- a/pywps/configuration.py +++ b/pywps/configuration.py @@ -118,6 +118,7 @@ def load_hardcoded_configuration(): CONFIG.set('server', 'storage_copy_function', 'copy') CONFIG.set("server", "default_mimetype", "text/xml") CONFIG.set("server", "json_indent", "2") + CONFIG.set("server", "keep_status_file", "false") CONFIG.add_section('processing') CONFIG.set('processing', 'mode', 'default') From a95128c570874fea0b635e5215f0facbe86407b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gschwind?= Date: Wed, 7 Feb 2024 14:48:13 +0100 Subject: [PATCH 28/30] Implement sub-path for status request --- pywps/app/Process.py | 2 +- pywps/app/Service.py | 73 ++++++++++++++++++++++------------------ tests/test_assync.py | 2 +- tests/test_wps_status.py | 2 +- 4 files changed, 43 insertions(+), 36 deletions(-) diff --git a/pywps/app/Process.py b/pywps/app/Process.py index a1051133f..e226d6838 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -184,7 +184,7 @@ def status_filename(self): @property def status_url(self): base_url = config.get_config_value('server', 'url') - return f"{base_url}?service=WPS&request=status&uuid={self.uuid}" + return f"{base_url}/status?uuid={self.uuid}" def _execute_process(self, async_, wps_request, wps_response): """ diff --git a/pywps/app/Service.py b/pywps/app/Service.py index cdd440155..a2c6ea7b8 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -62,13 +62,15 @@ def __init__(self, processes: Sequence = [], cfgfiles=None, preprocessors: Optio if not LOGGER.handlers: LOGGER.addHandler(logging.NullHandler()) - def get_status(self, wps_request, uuid): + def get_status(self, http_request): try: - _, mimetype = get_response_type(wps_request.http_request.accept_mimetypes, - wps_request.default_mimetype) + _, mimetype = get_response_type(http_request.accept_mimetypes, + "text/xml") except Exception: mimetype = get_default_response_mimetype() - return StatusResponse(wps_request.version, wps_request.requested_status_uuid, mimetype) + from urllib.parse import parse_qs + request = parse_qs(http_request.environ["QUERY_STRING"]) + return StatusResponse(request.get("version", ["1.0.0"])[0], request["uuid"][0], mimetype) def get_capabilities(self, wps_request, uuid): return CapabilitiesResponse(wps_request, uuid, version=wps_request.version, processes=self.processes) @@ -168,39 +170,44 @@ def call(self, http_request): LOGGER.debug('Setting PYWPS_CFG to {}'.format(environ_cfg)) os.environ['PYWPS_CFG'] = environ_cfg - wps_request = WPSRequest(http_request, self.preprocessors) - LOGGER.info('Request: {}'.format(wps_request.operation)) - if wps_request.operation in ['getcapabilities', - 'describeprocess', - 'execute', - 'status']: - log_request(request_uuid, wps_request) + if http_request.environ["PATH_INFO"] == "/status": try: - response = None - if wps_request.operation == 'getcapabilities': - response = self.get_capabilities(wps_request, request_uuid) - response._update_status(WPS_STATUS.SUCCEEDED, '', 100) - - elif wps_request.operation == 'describeprocess': - response = self.describe(wps_request, request_uuid, wps_request.identifiers) - response._update_status(WPS_STATUS.SUCCEEDED, '', 100) - - elif wps_request.operation == 'execute': - response = self.execute( - wps_request.identifier, - wps_request, - request_uuid - ) - elif wps_request.operation == 'status': - response = self.get_status(wps_request, request_uuid) - return response + return self.get_status(http_request) except Exception as e: - # This ensure that logged request get terminated in case of exception while the request is not - # accepted - store_status(request_uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100) + store_status(request_uuid, WPS_STATUS.FAILED, + 'Request rejected due to exception', 100) raise e else: - raise RuntimeError("Unknown operation {}".format(wps_request.operation)) + wps_request = WPSRequest(http_request, self.preprocessors) + LOGGER.info('Request: {}'.format(wps_request.operation)) + if wps_request.operation in ['getcapabilities', + 'describeprocess', + 'execute']: + log_request(request_uuid, wps_request) + try: + response = None + if wps_request.operation == 'getcapabilities': + response = self.get_capabilities(wps_request, request_uuid) + response._update_status(WPS_STATUS.SUCCEEDED, '', 100) + + elif wps_request.operation == 'describeprocess': + response = self.describe(wps_request, request_uuid, wps_request.identifiers) + response._update_status(WPS_STATUS.SUCCEEDED, '', 100) + + elif wps_request.operation == 'execute': + response = self.execute( + wps_request.identifier, + wps_request, + request_uuid + ) + return response + except Exception as e: + # This ensure that logged request get terminated in case of exception while the request is not + # accepted + store_status(request_uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100) + raise e + else: + raise RuntimeError("Unknown operation {}".format(wps_request.operation)) except NoApplicableCode as e: return e diff --git a/tests/test_assync.py b/tests/test_assync.py index 6d23a9d7e..81b817460 100644 --- a/tests/test_assync.py +++ b/tests/test_assync.py @@ -54,7 +54,7 @@ def test_async(self): total_time = 0 sleep_time = .01 while wps.status not in ["ProcessSucceeded", "ProcessFailed"]: - resp = client.get(f'{url.path}?{url.query}').data + resp = client.open(base_url='/wps', path='/status', method='GET', query_string=url.query).data if resp: wps.checkStatus(response=resp, sleepSecs=0.01) else: diff --git a/tests/test_wps_status.py b/tests/test_wps_status.py index 1ed358efd..6bf724e4d 100644 --- a/tests/test_wps_status.py +++ b/tests/test_wps_status.py @@ -52,6 +52,6 @@ def test_wps_status(self): # Parse url because we do not have real server url = urlparse(url) - resp = client.get(f'{url.path}?{url.query}') + resp = client.open(base_url='/wps', path='/status', method='GET', query_string=url.query) assert_response_success(resp) From dd2924245ff52bf2d884a863214c74be4905f650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gschwind?= Date: Fri, 9 Feb 2024 11:32:41 +0100 Subject: [PATCH 29/30] Fix relative import in pywps/response/basic.py --- pywps/response/basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pywps/response/basic.py b/pywps/response/basic.py index 4cfc9dc0e..5e8e3c803 100644 --- a/pywps/response/basic.py +++ b/pywps/response/basic.py @@ -5,7 +5,7 @@ from pywps import WPSRequest from pywps.dblog import store_status -from .status import WPS_STATUS +from pywps.response.status import WPS_STATUS import os from jinja2 import Environment, PackageLoader From 667aea8ef2769c43fea03c55824626732cfa6f80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gschwind?= Date: Tue, 6 Feb 2024 13:37:28 +0100 Subject: [PATCH 30/30] Fix pywps.dblog import --- pywps/response/basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pywps/response/basic.py b/pywps/response/basic.py index 5e8e3c803..cb2701c24 100644 --- a/pywps/response/basic.py +++ b/pywps/response/basic.py @@ -4,7 +4,6 @@ if TYPE_CHECKING: from pywps import WPSRequest -from pywps.dblog import store_status from pywps.response.status import WPS_STATUS import os @@ -49,6 +48,7 @@ def _update_status(self, status, message, status_percentage): :param pywps.response.status.WPS_STATUS status: process status - user should usually omit this parameter """ + from pywps.dblog import store_status self.message = message self.status = status self.status_percentage = status_percentage