diff --git a/setup.py b/setup.py index 26ec0bb..949f5eb 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,6 @@ install_requires=[ 'future', 'connexion==1.4.2', - 'bravado==10.1.0', 'ruamel.yaml >= 0.12.4, < 0.15', 'cwlref-runner==1.0', 'schema-salad>=2.6, <3', diff --git a/test/test_client_util.py b/test/test_client_util.py new file mode 100644 index 0000000..9a57b91 --- /dev/null +++ b/test/test_client_util.py @@ -0,0 +1,39 @@ +from __future__ import absolute_import + +import unittest +import os +import logging +import subprocess + +from wes_client.util import expand_globs + +logging.basicConfig(level=logging.INFO) + + +class IntegrationTest(unittest.TestCase): + def setUp(self): + dirname, filename = os.path.split(os.path.abspath(__file__)) + self.testdata_dir = dirname + 'data' + + def tearDown(self): + unittest.TestCase.tearDown(self) + + def test_expand_globs(self): + """Asserts that wes_client.expand_globs() sees the same files in the cwd as 'ls'.""" + files = subprocess.check_output(['ls', '-1', '.']) + + # python 2/3 bytestring/utf-8 compatibility + if isinstance(files, str): + files = files.split('\n') + else: + files = files.decode('utf-8').split('\n') + + if '' in files: + files.remove('') + files = ['file://' + os.path.abspath(f) for f in files] + glob_files = expand_globs('*') + assert set(files) == glob_files, '\n' + str(set(files)) + '\n' + str(glob_files) + + +if __name__ == '__main__': + unittest.main() # run all tests diff --git a/test/test_integration.py b/test/test_integration.py index a233125..47bdd7c 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -5,11 +5,10 @@ import os import subprocess32 as subprocess import signal -import requests import shutil import logging -from wes_client.util import build_wes_request +from wes_client.util import WESClient logging.basicConfig(level=logging.INFO) @@ -28,7 +27,10 @@ def setUpClass(cls): cls.wdl_local_path = os.path.abspath('testdata/md5sum.wdl') cls.wdl_json_input = "file://" + os.path.abspath('testdata/md5sum.wdl.json') cls.wdl_attachments = ['file://' + os.path.abspath('testdata/md5sum.input')] - + + # client for the swagger API methods + cls.client = WESClient({'auth': '', 'proto': 'http', 'host': 'localhost:8080'}) + # manual test (wdl only working locally atm) cls.manual = False @@ -52,44 +54,68 @@ def tearDown(self): def test_dockstore_md5sum(self): """HTTP md5sum cwl (dockstore), run it on the wes-service server, and check for the correct output.""" - outfile_path, _ = run_md5sum(wf_input=self.cwl_dockstore_url, - json_input=self.cwl_json_input, - workflow_attachment=self.cwl_attachments) + outfile_path, _ = self.run_md5sum(wf_input=self.cwl_dockstore_url, + json_input=self.cwl_json_input, + workflow_attachment=self.cwl_attachments) self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + str(outfile_path)) - + def test_local_md5sum(self): """LOCAL md5sum cwl to the wes-service server, and check for the correct output.""" - outfile_path, run_id = run_md5sum(wf_input=self.cwl_local_path, - json_input=self.cwl_json_input, - workflow_attachment=self.cwl_attachments) + outfile_path, run_id = self.run_md5sum(wf_input=self.cwl_local_path, + json_input=self.cwl_json_input, + workflow_attachment=self.cwl_attachments) self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + str(outfile_path)) - + def test_run_attachments(self): """LOCAL md5sum cwl to the wes-service server, check for attachments.""" - outfile_path, run_id = run_md5sum(wf_input=self.cwl_local_path, - json_input=self.cwl_json_input, - workflow_attachment=self.cwl_attachments) - get_response = get_log_request(run_id)["request"] + outfile_path, run_id = self.run_md5sum(wf_input=self.cwl_local_path, + json_input=self.cwl_json_input, + workflow_attachment=self.cwl_attachments) + get_response = self.client.get_run_log(run_id)["request"] self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + get_response["workflow_attachment"]) attachment_tool_path = get_response["workflow_attachment"][7:] + "/dockstore-tool-md5sum.cwl" self.assertTrue(check_for_file(attachment_tool_path), 'Attachment file was not found: ' + get_response["workflow_attachment"]) + def test_get_service_info(self): + """ + Test wes_client.util.WESClient.get_service_info() + + This method will exit(1) if the response is not 200. + """ + r = self.client.get_service_info() + assert 'workflow_type_versions' in r + assert 'supported_wes_versions' in r + assert 'supported_filesystem_protocols' in r + assert 'engine_versions' in r + + def test_list_runs(self): + """ + Test wes_client.util.WESClient.list_runs() -def run_md5sum(wf_input, json_input, workflow_attachment=None): - """Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created.""" - endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs' - parts = build_wes_request(wf_input, - json_input, - attachments=workflow_attachment) - response = requests.post(endpoint, files=parts).json() - assert 'run_id' in response, str(response.json()) - output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir')) - return os.path.join(output_dir, 'md5sum.txt'), response['run_id'] + This method will exit(1) if the response is not 200. + """ + r = self.client.list_runs() + assert 'workflows' in r + def test_get_run_status(self): + """ + Test wes_client.util.WESClient.run_status() -def get_log_request(run_id): - endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs/{}'.format(run_id) - return requests.get(endpoint).json() + This method will exit(1) if the response is not 200. + """ + outfile_path, run_id = self.run_md5sum(wf_input=self.cwl_local_path, + json_input=self.cwl_json_input, + workflow_attachment=self.cwl_attachments) + r = self.client.get_run_status(run_id) + assert 'state' in r + assert 'run_id' in r + + def run_md5sum(self, wf_input, json_input, workflow_attachment=None): + """Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created.""" + response = self.client.run(wf_input, json_input, workflow_attachment) + assert 'run_id' in response, str(response.json()) + output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir')) + return os.path.join(output_dir, 'md5sum.txt'), response['run_id'] def get_server_pids(): @@ -143,14 +169,15 @@ def test_local_wdl(self): """LOCAL md5sum wdl to the wes-service server, and check for the correct output.""" # Working locally but not on travis... >.<; if self.manual: - outfile_path, run_id = run_md5sum(wf_input=self.wdl_local_path, - json_input=self.wdl_json_input, - workflow_attachment=self.wdl_attachments) + outfile_path, run_id = self.run_md5sum(wf_input=self.wdl_local_path, + json_input=self.wdl_json_input, + workflow_attachment=self.wdl_attachments) self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + str(outfile_path)) # Prevent pytest/unittest's discovery from attempting to discover the base test class. del IntegrationTest + if __name__ == '__main__': unittest.main() # run all tests diff --git a/wes_client/util.py b/wes_client/util.py index d044d71..721f060 100644 --- a/wes_client/util.py +++ b/wes_client/util.py @@ -1,7 +1,7 @@ import os import json -import urlparse -from bravado.client import SwaggerClient +import glob +import requests import urllib import logging import schema_salad.ref_resolver @@ -63,14 +63,6 @@ def build_wes_request(workflow_file, json_path, attachments=None): return parts -def wes_client(http_client, auth, proto, host): - split = urlparse.urlsplit("%s://%s/" % (proto, host)) - http_client.set_api_key(split.hostname, auth, param_name="Authorization", param_in="header") - client = SwaggerClient.from_url("%s://%s/ga4gh/wes/v1/swagger.json" % (proto, host), - http_client=http_client, config={"use_models": False}) - return client.WorkflowExecutionService - - def modify_jsonyaml_paths(jsonyaml_file): """ Changes relative paths in a json/yaml file to be relative @@ -99,125 +91,124 @@ def fixpaths(d): visit(input_dict, fixpaths) -def run_wf(workflow_file, jsonyaml, attachments, http_client, auth, proto, host): - """ - Composes and sends a post request that signals the wes server to run a workflow. - - :param str workflow_file: A local/http/https path to a cwl/wdl/python workflow file. - :param str jsonyaml: A local path to a json or yaml file. - :param list attachments: A list of local paths to files that will be uploaded to the server. - :param object http_client: bravado.requests_client.RequestsClient - :param str auth: String to send in the auth header. - :param proto: Schema where the server resides (http, https) - :param host: Port where the post request will be sent and the wes server listens at (default 8080) - - :return: The body of the post result as a dictionary. - """ - parts = build_wes_request(workflow_file, jsonyaml, attachments) - postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/runs" % (proto, host), - files=parts, - headers={"Authorization": auth}) - if postresult.status_code != 200: - logging.error("%s", json.loads(postresult.text)) - exit(1) - return json.loads(postresult.text) - - -def cancel_wf(run_id, http_client, auth, proto, host): - """ - Cancel a running workflow. - - :param run_id: - :param object http_client: bravado.requests_client.RequestsClient - :param str auth: String to send in the auth header. - :param proto: Schema where the server resides (http, https) - :param host: Port where the post request will be sent and the wes server listens at (default 8080) - :return: The body of the delete result as a dictionary. - """ - postresult = http_client.session.delete("%s://%s/ga4gh/wes/v1/runs/%s" % (proto, host, run_id), - headers={"Authorization": auth}) - if postresult.status_code != 200: - logging.error("%s", json.loads(postresult.text)) - exit(1) - return json.loads(postresult.text) - - -def get_status(run_id, http_client, auth, proto, host): - """ - Get quick status info about a running workflow. - - :param run_id: - :param object http_client: bravado.requests_client.RequestsClient - :param str auth: String to send in the auth header. - :param proto: Schema where the server resides (http, https) - :param host: Port where the post request will be sent and the wes server listens at (default 8080) - :return: The body of the get result as a dictionary. - """ - postresult = http_client.session.get("%s://%s/ga4gh/wes/v1/runs/%s/status" % (proto, host, run_id), - headers={"Authorization": auth}) - if postresult.status_code != 200: - logging.error("%s", json.loads(postresult.text)) - exit(1) - return json.loads(postresult.text) - - -def get_wf_details(run_id, http_client, auth, proto, host): - """ - Get detailed info about a running workflow. - - :param run_id: - :param object http_client: bravado.requests_client.RequestsClient - :param str auth: String to send in the auth header. - :param proto: Schema where the server resides (http, https) - :param host: Port where the post request will be sent and the wes server listens at (default 8080) - :return: The body of the get result as a dictionary. - """ - postresult = http_client.session.get("%s://%s/ga4gh/wes/v1/runs/%s" % (proto, host, run_id), - headers={"Authorization": auth}) - if postresult.status_code != 200: - logging.error("%s", json.loads(postresult.text)) - exit(1) - return json.loads(postresult.text) +def expand_globs(attachments): + expanded_list = [] + for filepath in attachments: + if 'file://' in filepath: + for f in glob.glob(filepath[7:]): + expanded_list += ['file://' + os.path.abspath(f)] + elif ':' not in filepath: + for f in glob.glob(filepath): + expanded_list += ['file://' + os.path.abspath(f)] + else: + expanded_list += [filepath] + return set(expanded_list) -def get_wf_list(http_client, auth, proto, host): - """ - List the workflows, this endpoint will list the workflows - in order of oldest to newest. There is no guarantee of - live updates as the user traverses the pages, the behavior - should be decided (and documented) by each implementation. - - :param object http_client: bravado.requests_client.RequestsClient - :param str auth: String to send in the auth header. - :param proto: Schema where the server resides (http, https) - :param host: Port where the post request will be sent and the wes server listens at (default 8080) - :return: The body of the get result as a dictionary. - """ - postresult = http_client.session.get("%s://%s/ga4gh/wes/v1/runs" % (proto, host), - headers={"Authorization": auth}) +def wes_reponse(postresult): if postresult.status_code != 200: logging.error("%s", json.loads(postresult.text)) exit(1) return json.loads(postresult.text) -def get_service_info(http_client, auth, proto, host): - """ - Get information about Workflow Execution Service. May - include information related (but not limited to) the - workflow descriptor formats, versions supported, the - WES API versions supported, and information about general - the service availability. - - :param object http_client: bravado.requests_client.RequestsClient - :param str auth: String to send in the auth header. - :param proto: Schema where the server resides (http, https) - :param host: Port where the post request will be sent and the wes server listens at (default 8080) - :return: The body of the get result as a dictionary. - """ - postresult = http_client.session.get("%s://%s/ga4gh/wes/v1/service-info" % (proto, host), - headers={"Authorization": auth}) - if postresult.status_code != 200: - logging.error("%s", json.loads(postresult.text)) - exit(1) - return json.loads(postresult.text) +class WESClient(object): + def __init__(self, service): + self.auth = service['auth'] + self.proto = service['proto'] + self.host = service['host'] + + def get_service_info(self): + """ + Get information about Workflow Execution Service. May + include information related (but not limited to) the + workflow descriptor formats, versions supported, the + WES API versions supported, and information about general + the service availability. + + :param str auth: String to send in the auth header. + :param proto: Schema where the server resides (http, https) + :param host: Port where the post request will be sent and the wes server listens at (default 8080) + :return: The body of the get result as a dictionary. + """ + postresult = requests.get("%s://%s/ga4gh/wes/v1/service-info" % (self.proto, self.host), + headers={"Authorization": self.auth}) + return wes_reponse(postresult) + + def list_runs(self): + """ + List the workflows, this endpoint will list the workflows + in order of oldest to newest. There is no guarantee of + live updates as the user traverses the pages, the behavior + should be decided (and documented) by each implementation. + + :param str auth: String to send in the auth header. + :param proto: Schema where the server resides (http, https) + :param host: Port where the post request will be sent and the wes server listens at (default 8080) + :return: The body of the get result as a dictionary. + """ + postresult = requests.get("%s://%s/ga4gh/wes/v1/runs" % (self.proto, self.host), + headers={"Authorization": self.auth}) + return wes_reponse(postresult) + + def run(self, wf, jsonyaml, attachments): + """ + Composes and sends a post request that signals the wes server to run a workflow. + + :param str workflow_file: A local/http/https path to a cwl/wdl/python workflow file. + :param str jsonyaml: A local path to a json or yaml file. + :param list attachments: A list of local paths to files that will be uploaded to the server. + :param str auth: String to send in the auth header. + :param proto: Schema where the server resides (http, https) + :param host: Port where the post request will be sent and the wes server listens at (default 8080) + + :return: The body of the post result as a dictionary. + """ + attachments = list(expand_globs(attachments)) + parts = build_wes_request(wf, jsonyaml, attachments) + postresult = requests.post("%s://%s/ga4gh/wes/v1/runs" % (self.proto, self.host), + files=parts, + headers={"Authorization": self.auth}) + return wes_reponse(postresult) + + def cancel(self, run_id): + """ + Cancel a running workflow. + + :param run_id: String (typically a uuid) identifying the run. + :param str auth: String to send in the auth header. + :param proto: Schema where the server resides (http, https) + :param host: Port where the post request will be sent and the wes server listens at (default 8080) + :return: The body of the delete result as a dictionary. + """ + postresult = requests.delete("%s://%s/ga4gh/wes/v1/runs/%s" % (self.proto, self.host, run_id), + headers={"Authorization": self.auth}) + return wes_reponse(postresult) + + def get_run_log(self, run_id): + """ + Get detailed info about a running workflow. + + :param run_id: String (typically a uuid) identifying the run. + :param str auth: String to send in the auth header. + :param proto: Schema where the server resides (http, https) + :param host: Port where the post request will be sent and the wes server listens at (default 8080) + :return: The body of the get result as a dictionary. + """ + postresult = requests.get("%s://%s/ga4gh/wes/v1/runs/%s" % (self.proto, self.host, run_id), + headers={"Authorization": self.auth}) + return wes_reponse(postresult) + + def get_run_status(self, run_id): + """ + Get quick status info about a running workflow. + + :param run_id: String (typically a uuid) identifying the run. + :param str auth: String to send in the auth header. + :param proto: Schema where the server resides (http, https) + :param host: Port where the post request will be sent and the wes server listens at (default 8080) + :return: The body of the get result as a dictionary. + """ + postresult = requests.get("%s://%s/ga4gh/wes/v1/runs/%s/status" % (self.proto, self.host, run_id), + headers={"Authorization": self.auth}) + return wes_reponse(postresult) diff --git a/wes_client/wes_client_main.py b/wes_client/wes_client_main.py index b68b06b..5fd869d 100644 --- a/wes_client/wes_client_main.py +++ b/wes_client/wes_client_main.py @@ -8,10 +8,7 @@ import logging import requests from requests.exceptions import InvalidSchema, MissingSchema -from wes_client.util import (run_wf, - wes_client, - modify_jsonyaml_paths) -from bravado.requests_client import RequestsClient +from wes_client.util import modify_jsonyaml_paths, WESClient def main(argv=sys.argv[1:]): @@ -52,27 +49,26 @@ def main(argv=sys.argv[1:]): print(u"%s %s" % (sys.argv[0], pkg[0].version)) exit(0) - http_client = RequestsClient() - client = wes_client(http_client, args.auth, args.proto, args.host) + client = WESClient({'auth': args.auth, 'proto': args.proto, 'host': args.host}) if args.list: - response = client.ListRuns(page_token=args.page, page_size=args.page_size) - json.dump(response.result(), sys.stdout, indent=4) + response = client.list_runs() # how to include: page_token=args.page, page_size=args.page_size ? + json.dump(response, sys.stdout, indent=4) return 0 if args.log: - response = client.GetRunLog(run_id=args.log) - sys.stdout.write(response.result()["workflow_log"]["stderr"]) + response = client.get_run_log(run_id=args.log) + sys.stdout.write(response["workflow_log"]["stderr"]) return 0 if args.get: - response = client.GetRunLog(run_id=args.get) - json.dump(response.result(), sys.stdout, indent=4) + response = client.get_run_log(run_id=args.get) + json.dump(response, sys.stdout, indent=4) return 0 if args.info: - response = client.GetServiceInfo() - json.dump(response.result(), sys.stdout, indent=4) + response = client.get_service_info() + json.dump(response, sys.stdout, indent=4) return 0 if not args.workflow_url: @@ -90,14 +86,8 @@ def main(argv=sys.argv[1:]): else: logging.basicConfig(level=logging.INFO) - args.attachments = args.attachments if not args.attachments else args.attachments.split(',') - r = run_wf(args.workflow_url, - args.job_order, - args.attachments, - http_client, - args.auth, - args.proto, - args.host) + args.attachments = "" if not args.attachments else args.attachments.split(',') + r = client.run(args.workflow_url, args.job_order, args.attachments) if args.wait: logging.info("Workflow run id is %s", r["run_id"]) @@ -105,14 +95,14 @@ def main(argv=sys.argv[1:]): sys.stdout.write(r["run_id"] + "\n") exit(0) - r = client.GetRunStatus(run_id=r["run_id"]).result() + r = client.get_run_status(run_id=r["run_id"]) while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"): time.sleep(8) - r = client.GetRunStatus(run_id=r["run_id"]).result() + r = client.get_run_status(run_id=r["run_id"]) logging.info("State is %s", r["state"]) - s = client.GetRunLog(run_id=r["run_id"]).result() + s = client.get_run_log(run_id=r["run_id"]) try: # TODO: Only works with Arvados atm diff --git a/wes_service/cwl_runner.py b/wes_service/cwl_runner.py index 9a47124..eaef36e 100644 --- a/wes_service/cwl_runner.py +++ b/wes_service/cwl_runner.py @@ -173,6 +173,8 @@ def GetServiceInfo(self): def ListRuns(self, page_size=None, page_token=None, state_search=None): # FIXME #15 results don't page + if not os.path.exists(os.path.join(os.getcwd(), "workflows")): + return {"workflows": [], "next_page_token": ""} wf = [] for l in os.listdir(os.path.join(os.getcwd(), "workflows")): if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 9528dcd..1485629 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -298,6 +298,8 @@ def GetServiceInfo(self): def ListRuns(self, page_size=None, page_token=None, state_search=None): # FIXME #15 results don't page + if not os.path.exists(os.path.join(os.getcwd(), "workflows")): + return {"workflows": [], "next_page_token": ""} wf = [] for l in os.listdir(os.path.join(os.getcwd(), "workflows")): if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): diff --git a/wes_service/util.py b/wes_service/util.py index 5195bc7..7942f65 100644 --- a/wes_service/util.py +++ b/wes_service/util.py @@ -2,7 +2,7 @@ import json import os -from six import itervalues +from six import itervalues, iterlists import connexion from werkzeug.utils import secure_filename @@ -45,7 +45,7 @@ def getoptlist(self, p): def collect_attachments(self): tempdir = tempfile.mkdtemp() body = {} - for k, ls in connexion.request.files.iterlists(): + for k, ls in iterlists(connexion.request.files): for v in ls: if k == "workflow_attachment": filename = secure_filename(v.filename)