Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dynamic status #667

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3e94fc5
Implement safe DB session management
gschwind Sep 21, 2023
6907477
Reduce the use of WPSRequest.http_request toward removing it
gschwind Nov 7, 2022
074fbcb
Move WPSResponse.template_env to pywps.response.TEMPLATE_ENV module c…
gschwind Nov 7, 2022
74984e0
Move RelEnvironment from pywps.response to pywps.response.basic
gschwind Nov 9, 2022
9636cf0
Split WPSRequest into WPSExecuteRequest
gschwind Nov 7, 2022
352aa83
Ensure WPSExecuteRequest.inputs['id'] to be python list
gschwind Nov 7, 2022
ae1b896
Split pywps.response.execute.ExecuteResponse into WPSExecuteResponse
gschwind Nov 7, 2022
98edcd4
Improve tests/test_processing.py with realistic WPSRequest
gschwind Nov 7, 2022
9baf3aa
Split ExecuteResponse into ExecuteRawResponse
gschwind Nov 7, 2022
bfb7ec6
Update comments in WPSExecuteRequest to clarify the intend of the code
gschwind Nov 7, 2022
c9691b9
Update comments in WPSExecuteResponse to clarify the intend of the code
gschwind Nov 7, 2022
8c4268e
Parse XML request more accuratly
gschwind Nov 8, 2022
2977c90
Update execute template to safely export xml attribute if they are set
gschwind Nov 8, 2022
350d780
Apply preprocess_response within WPSExecuteResponse
gschwind Nov 8, 2022
f5ce9c8
Implement pywps.dblog.StatusRecord
gschwind Jul 5, 2022
5f8e04b
Generate status file from stored data in the database
gschwind Nov 8, 2022
78bba0a
Transform LiteralOutput in the same way as LiteralInput
gschwind Nov 9, 2022
dea9896
Replace ExecuteResponse by StatusResponse that use stored status
gschwind Nov 9, 2022
0ef9d24
Remove pywps.response.get_response
gschwind Nov 9, 2022
3121aa3
Move closure within WPSRequest._get_request_parser
gschwind Nov 9, 2022
bf580f4
Add custom request=status operation that provide the Process status f…
gschwind Nov 9, 2022
c31a6ba
Replace status url by request status url
gschwind Nov 9, 2022
1783927
Implement NoAsyncProcessing to fake async processing
gschwind Nov 9, 2022
a8aeac5
Implement a test for statusLocation and status request
gschwind Nov 9, 2022
9e7366b
Update test_assync.py to properly read status
gschwind Sep 15, 2023
0c53662
Check if a process has crashed upon status query
gschwind Sep 20, 2023
b30239d
Add option to keep old status file
gschwind Feb 7, 2024
a95128c
Implement sub-path for status request
gschwind Feb 7, 2024
dd29242
Fix relative import in pywps/response/basic.py
gschwind Feb 9, 2024
667aea8
Fix pywps.dblog import
gschwind Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions default-sample.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ maxprocesses=30
parallelprocesses=2
storagetype=file

# Keep old status file for backward compatibility, pywps does not need it.
keep_status_file=false

# hardcoded default : tempfile.gettempdir()
#temp_path=/tmp

Expand Down
21 changes: 12 additions & 9 deletions pywps/app/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import sys
import traceback

from pywps.app.WPSExecuteRequest import WPSExecuteRequest
from pywps.app.WPSExecuteResponse import WPSExecuteResponse
from pywps.inout.inputs import input_from_json
from pywps.inout.outputs import output_from_json
import pywps.configuration as config
from pywps import dblog
from pywps.app.exceptions import ProcessError
Expand All @@ -24,8 +28,6 @@
)
from pywps.inout.outputs import ComplexOutput
from pywps.inout.storage.builder import StorageBuilder
from pywps.response import get_response
from pywps.response.execute import ExecuteResponse
from pywps.response.status import WPS_STATUS
from pywps.translations import lower_case_dict

Expand Down Expand Up @@ -126,10 +128,10 @@ def from_json(cls, value):

def execute(self, wps_request, uuid):
self._set_uuid(uuid)
self._setup_status_storage()
if config.get_config_value('server', 'keep_status_file'):
self._setup_status_storage()
self.async_ = False
response_cls = get_response("execute")
wps_response = response_cls(wps_request, process=self, uuid=self.uuid)
wps_response = WPSExecuteResponse(self, wps_request, self.uuid)

LOGGER.debug('Check if status storage and updating are supported by this process')
if wps_request.store_execute == 'true':
Expand Down Expand Up @@ -181,7 +183,8 @@ def status_filename(self):

@property
def status_url(self):
return self.status_store.url(self.status_filename)
base_url = config.get_config_value('server', 'url')
return f"{base_url}/status?uuid={self.uuid}"

def _execute_process(self, async_, wps_request, wps_response):
"""
Expand Down Expand Up @@ -220,7 +223,7 @@ def _execute_process(self, async_, wps_request, wps_response):
if stored >= maxprocesses and maxprocesses != -1:
raise ServerBusy('Maximum number of processes in queue reached. Please try later.')
LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid))
dblog.store_process(self.uuid, wps_request)
dblog.store_process(self.uuid, wps_request.wps_request)
wps_response._update_status(WPS_STATUS.ACCEPTED, 'PyWPS Process stored in job queue', 0)

# not async
Expand Down Expand Up @@ -320,11 +323,11 @@ def launch_next_process(self):
new_wps_request.json = json.loads(request_json)
process_identifier = new_wps_request.identifier
process = self.service.prepare_process_for_execution(process_identifier)
new_wps_request = WPSExecuteRequest(process, new_wps_request)
process._set_uuid(uuid)
process._setup_status_storage()
process.async_ = True
process.setup_outputs_from_wps_request(new_wps_request)
new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid)
new_wps_response = WPSExecuteResponse(process, new_wps_request, uuid)
new_wps_response.store_status_file = True
process._run_async(new_wps_request, new_wps_response)
except Exception as e:
Expand Down
228 changes: 60 additions & 168 deletions pywps/app/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
from urllib.parse import urlparse

from werkzeug.exceptions import HTTPException
from werkzeug.wrappers import Request

from werkzeug.wrappers import Request, Response
from pywps.app.WPSExecuteRequest import WPSExecuteRequest
from pywps.response import CapabilitiesResponse, DescribeResponse, ExecuteRawResponse, StatusResponse
import pywps.configuration as config
from pywps import response
from pywps.app.WPSRequest import WPSRequest
from pywps.dblog import log_request, store_status
from pywps.exceptions import (
Expand All @@ -28,6 +28,7 @@
)
from pywps.inout.inputs import BoundingBoxInput, ComplexInput, LiteralInput
from pywps.response.status import WPS_STATUS
from pywps.app.basic import get_response_type, get_default_response_mimetype

LOGGER = logging.getLogger("PYWPS")

Expand Down Expand Up @@ -61,16 +62,21 @@ def __init__(self, processes: Sequence = [], cfgfiles=None, preprocessors: Optio
if not LOGGER.handlers:
LOGGER.addHandler(logging.NullHandler())

def get_capabilities(self, wps_request, uuid):
def get_status(self, http_request):
try:
_, mimetype = get_response_type(http_request.accept_mimetypes,
"text/xml")
except Exception:
mimetype = get_default_response_mimetype()
from urllib.parse import parse_qs
request = parse_qs(http_request.environ["QUERY_STRING"])
return StatusResponse(request.get("version", ["1.0.0"])[0], request["uuid"][0], mimetype)

response_cls = response.get_response("capabilities")
return response_cls(wps_request, uuid, version=wps_request.version, processes=self.processes)
def get_capabilities(self, wps_request, uuid):
return CapabilitiesResponse(wps_request, uuid, version=wps_request.version, processes=self.processes)

def describe(self, wps_request, uuid, identifiers):

response_cls = response.get_response("describe")
return response_cls(wps_request, uuid, processes=self.processes,
identifiers=identifiers)
return DescribeResponse(wps_request, uuid, processes=self.processes, identifiers=identifiers)

def execute(self, identifier, wps_request, uuid):
"""Parse and perform Execute WPS request call
Expand Down Expand Up @@ -104,116 +110,18 @@ def _parse_and_execute(self, process, wps_request, uuid):
"""Parse and execute request
"""

LOGGER.debug('Checking if all mandatory inputs have been passed')
data_inputs = {}
for inpt in process.inputs:
# Replace the dicts with the dict of Literal/Complex inputs
# set the input to the type defined in the process.

request_inputs = None
if inpt.identifier in wps_request.inputs:
request_inputs = wps_request.inputs[inpt.identifier]

if not request_inputs:
if inpt._default is not None:
if not inpt.data_set and isinstance(inpt, ComplexInput):
inpt._set_default_value()

data_inputs[inpt.identifier] = [inpt.clone()]
else:

if isinstance(inpt, ComplexInput):
data_inputs[inpt.identifier] = self.create_complex_inputs(
inpt, request_inputs)
elif isinstance(inpt, LiteralInput):
data_inputs[inpt.identifier] = self.create_literal_inputs(
inpt, request_inputs)
elif isinstance(inpt, BoundingBoxInput):
data_inputs[inpt.identifier] = self.create_bbox_inputs(
inpt, request_inputs)

for inpt in process.inputs:

if inpt.identifier not in data_inputs:
if inpt.min_occurs > 0:
LOGGER.error('Missing parameter value: {}'.format(inpt.identifier))
raise MissingParameterValue(
inpt.identifier, inpt.identifier)

wps_request.inputs = data_inputs

process.setup_outputs_from_wps_request(wps_request)
wps_request = WPSExecuteRequest(process, wps_request)

wps_response = process.execute(wps_request, uuid)
return wps_response

def create_complex_inputs(self, source, inputs):
"""Create new ComplexInput as clone of original ComplexInput
because of inputs can be more than one, take it just as Prototype.

:param source: The process's input definition.
:param inputs: The request input data.
:return collections.deque:
"""

outinputs = deque(maxlen=source.max_occurs)

for inpt in inputs:
data_input = source.clone()
frmt = data_input.supported_formats[0]
if 'mimeType' in inpt:
if inpt['mimeType']:
frmt = data_input.get_format(inpt['mimeType'])
else:
frmt = data_input.data_format

if frmt:
data_input.data_format = frmt
else:
raise InvalidParameterValue(
'Invalid mimeType value {} for input {}'.format(inpt.get('mimeType'), source.identifier),
'mimeType')

data_input.method = inpt.get('method', 'GET')
data_input.process(inpt)
outinputs.append(data_input)

if len(outinputs) < source.min_occurs:
description = "At least {} inputs are required. You provided {}.".format(
source.min_occurs,
len(outinputs),
)
raise MissingParameterValue(description=description, locator=source.identifier)
return outinputs

def create_literal_inputs(self, source, inputs):
""" Takes the http_request and parses the input to objects
:return collections.deque:
"""

outinputs = deque(maxlen=source.max_occurs)

for inpt in inputs:
newinpt = source.clone()
# set the input to the type defined in the process
newinpt.uom = inpt.get('uom')
data_type = inpt.get('datatype')
if data_type:
newinpt.data_type = data_type

# get the value of the field
newinpt.data = inpt.get('data')

outinputs.append(newinpt)

if len(outinputs) < source.min_occurs:
description = "At least {} inputs are required. You provided {}.".format(
source.min_occurs,
len(outinputs),
)
raise MissingParameterValue(description, locator=source.identifier)

return outinputs
if wps_request.wps_request.raw:
return ExecuteRawResponse(wps_response)
else:
# FIXME: this try-except has no pratical meaning, just allow to pass some test.
try:
_, mimetype = get_response_type(wps_request.http_request.accept_mimetypes, wps_request.default_mimetype)
except Exception:
mimetype = get_default_response_mimetype()
return StatusResponse(wps_request.version, wps_response.uuid, mimetype)

def _set_grass(self):
"""Set environment variables needed for GRASS GIS support
Expand Down Expand Up @@ -241,30 +149,6 @@ def _set_grass(self):
os.putenv('PYTHONPATH', os.environ.get('PYTHONPATH'))
sys.path.insert(0, python_path)

def create_bbox_inputs(self, source, inputs):
""" Takes the http_request and parses the input to objects
:return collections.deque:
"""

outinputs = deque(maxlen=source.max_occurs)

for inpt in inputs:
newinpt = source.clone()
newinpt.data = inpt.get('data')
LOGGER.debug(f'newinpt bbox data={newinpt.data}')
newinpt.crs = inpt.get('crs')
newinpt.dimensions = inpt.get('dimensions')
outinputs.append(newinpt)

if len(outinputs) < source.min_occurs:
description = "At least {} inputs are required. You provided {}.".format(
source.min_occurs,
len(outinputs),
)
raise MissingParameterValue(description=description, locator=source.identifier)

return outinputs

# May not raise exceptions, this function must return a valid werkzeug.wrappers.Response.
def call(self, http_request):

Expand All @@ -286,36 +170,44 @@ def call(self, http_request):
LOGGER.debug('Setting PYWPS_CFG to {}'.format(environ_cfg))
os.environ['PYWPS_CFG'] = environ_cfg

wps_request = WPSRequest(http_request, self.preprocessors)
LOGGER.info('Request: {}'.format(wps_request.operation))
if wps_request.operation in ['getcapabilities',
'describeprocess',
'execute']:
log_request(request_uuid, wps_request)
if http_request.environ["PATH_INFO"] == "/status":
try:
response = None
if wps_request.operation == 'getcapabilities':
response = self.get_capabilities(wps_request, request_uuid)
response._update_status(WPS_STATUS.SUCCEEDED, '', 100)

elif wps_request.operation == 'describeprocess':
response = self.describe(wps_request, request_uuid, wps_request.identifiers)
response._update_status(WPS_STATUS.SUCCEEDED, '', 100)

elif wps_request.operation == 'execute':
response = self.execute(
wps_request.identifier,
wps_request,
request_uuid
)
return response
return self.get_status(http_request)
except Exception as e:
# This ensure that logged request get terminated in case of exception while the request is not
# accepted
store_status(request_uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100)
store_status(request_uuid, WPS_STATUS.FAILED,
'Request rejected due to exception', 100)
raise e
else:
raise RuntimeError("Unknown operation {}".format(wps_request.operation))
wps_request = WPSRequest(http_request, self.preprocessors)
LOGGER.info('Request: {}'.format(wps_request.operation))
if wps_request.operation in ['getcapabilities',
'describeprocess',
'execute']:
log_request(request_uuid, wps_request)
try:
response = None
if wps_request.operation == 'getcapabilities':
response = self.get_capabilities(wps_request, request_uuid)
response._update_status(WPS_STATUS.SUCCEEDED, '', 100)

elif wps_request.operation == 'describeprocess':
response = self.describe(wps_request, request_uuid, wps_request.identifiers)
response._update_status(WPS_STATUS.SUCCEEDED, '', 100)

elif wps_request.operation == 'execute':
response = self.execute(
wps_request.identifier,
wps_request,
request_uuid
)
return response
except Exception as e:
# This ensure that logged request get terminated in case of exception while the request is not
# accepted
store_status(request_uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100)
raise e
else:
raise RuntimeError("Unknown operation {}".format(wps_request.operation))

except NoApplicableCode as e:
return e
Expand Down
Loading
Loading