From bbb65067918209e27f534214eea5a84609f9c675 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 12:33:13 -0700 Subject: [PATCH 01/18] Add a toil backend. --- test/test_integration.py | 19 +-- wes_service/toil_wes.py | 304 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 311 insertions(+), 12 deletions(-) create mode 100644 wes_service/toil_wes.py diff --git a/test/test_integration.py b/test/test_integration.py index e04ec81..21ea622 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -12,7 +12,6 @@ class IntegrationTest(unittest.TestCase): """A baseclass that's inherited for use with different cwl backends.""" - def setUp(self): """Start a (local) wes-service server to make requests against.""" raise NotImplementedError @@ -28,41 +27,39 @@ def tearDown(self): except OSError as e: print(e) + shutil.rmtree('workflows') unittest.TestCase.tearDown(self) def test_dockstore_md5sum(self): """Fetch the md5sum cwl from dockstore, run it on the wes-service server, and check for the correct output.""" cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl' - output_filepath, _ = run_md5sum(cwl_input=cwl_dockstore_url) + output_filepath, _ = run_cwl_md5sum(cwl_input=cwl_dockstore_url) self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath)) - shutil.rmtree('workflows') def test_local_md5sum(self): """Pass a local md5sum cwl to the wes-service server, and check for the correct output.""" cwl_local_path = os.path.abspath('testdata/md5sum.cwl') - output_filepath, _ = run_md5sum(cwl_input='file://' + cwl_local_path) + output_filepath, _ = run_cwl_md5sum(cwl_input='file://' + cwl_local_path) self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath)) - shutil.rmtree('workflows') def test_multipart_upload(self): """Pass a local md5sum cwl to the wes-service server, and check for uploaded file in service.""" cwl_local_path = os.path.abspath('testdata/md5sum.cwl') - _, run_id = run_md5sum(cwl_input='file://' + cwl_local_path) + _, run_id = run_cwl_md5sum(cwl_input='file://' + cwl_local_path) get_response = get_log_request(run_id)["request"] self.assertTrue(check_for_file(get_response["workflow_url"][7:]), 'Output file was not found: ' + get_response["workflow_url"][:7]) - shutil.rmtree('workflows') -def run_md5sum(cwl_input): +def run_cwl_md5sum(cwl_input): """Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created.""" endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows' params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'}, - 'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}} + 'input_file': {'path': os.path.abspath('testdata/md5sum.input'), 'class': 'File'}} parts = [("workflow_params", json.dumps(params)), ("workflow_type", "CWL"), ("workflow_type_version", "v1.0")] if cwl_input.startswith("file://"): @@ -117,14 +114,12 @@ def setUp(self): class ToilTest(IntegrationTest): """Test using Toil.""" - def setUp(self): """ Start a (local) wes-service server to make requests against. Use toil as the wes-service server 'backend'. """ - self.wes_server_process = subprocess.Popen('python {} ' - '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' + self.wes_server_process = subprocess.Popen('python {} --backend=wes_service.toil_wes --opt="extra=--logLevel=CRITICAL"' ''.format(os.path.abspath('wes_service/wes_service_main.py')), shell=True) time.sleep(5) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py new file mode 100644 index 0000000..2d43609 --- /dev/null +++ b/wes_service/toil_wes.py @@ -0,0 +1,304 @@ +from __future__ import print_function +import json +import os +import subprocess +import tempfile +import time +import logging +import urllib +import uuid + +import connexion +from multiprocessing import Process +from werkzeug.utils import secure_filename +from wes_service.util import WESBackend + +logging.basicConfig(level=logging.INFO) + + +class ToilWorkflow(object): + def __init__(self, workflow_id): + super(ToilWorkflow, self).__init__() + self.workflow_id = workflow_id + + self.workdir = os.path.join(os.getcwd(), 'workflows', self.workflow_id) + self.outdir = os.path.join(self.workdir, 'outdir') + if not os.path.exists(self.outdir): + os.makedirs(self.outdir) + + self.outfile = os.path.join(self.workdir, 'stdout') + self.errfile = os.path.join(self.workdir, 'stderr') + self.starttime = os.path.join(self.workdir, 'starttime') + self.endtime = os.path.join(self.workdir, 'endtime') + self.pidfile = os.path.join(self.workdir, 'pid') + self.cmdfile = os.path.join(self.workdir, 'cmd') + self.request_json = os.path.join(self.workdir, 'request.json') + self.output_json = os.path.join(self.workdir, "output.json") + self.input_wf_filename = os.path.join(self.workdir, "workflow.cwl") + self.input_json = os.path.join(self.workdir, "input.json") + + def write_workflow(self, request, opts, wftype='cwl'): + """Writes a cwl, wdl, or python file as appropriate from the request dictionary.""" + self.input_wf_filename = os.path.join(self.workdir, 'workflow.' + wftype) + + if request.get("workflow_descriptor"): + workflow_descriptor = request.get('workflow_descriptor') + with open(self.input_wf_filename, "w") as f: + f.write(workflow_descriptor) + workflow_url = urllib.pathname2url(self.input_wf_filename) + else: + workflow_url = request.get("workflow_url") + + extra = opts.getoptlist("extra") + if wftype == 'cwl': + command_args = ['toil-cwl-runner'] + extra + [workflow_url, self.input_json] + elif wftype == 'wdl': + command_args = ['toil-wdl-runner'] + extra + [workflow_url, self.input_json] + elif wftype == 'py': + command_args = ['python'] + extra + [workflow_url] + else: + raise RuntimeError('workflow_type is not "cwl", "wdl", or "py": ' + str(wftype)) + + return command_args + + def write_json(self, request_dict): + input_json = os.path.join(self.workdir, 'input.json') + with open(input_json, 'w') as inputtemp: + json.dump(request_dict['workflow_params'], inputtemp) + return input_json + + def call_cmd(self, cmd): + """ + Calls a command with Popen. + Writes stdout, stderr, and the command to separate files. + + :param cmd: A string or array of strings. + :return: The pid of the command. + """ + with open(self.cmdfile, 'w') as f: + f.write(str(cmd)) + stdout = open(self.outfile, 'w') + stderr = open(self.errfile, 'w') + logging.info('Calling: ' + ' '.join(cmd)) + process = subprocess.Popen(cmd, + stdout=stdout, + stderr=stderr, + close_fds=True, + cwd=self.outdir) + stdout.close() + stderr.close() + return process.pid + + def cancel(self): + pass + + def fetch(self, filename): + if os.path.exists(filename): + with open(filename, "r") as f: + return f.read() + return '' + + def getlog(self): + state, exit_code = self.getstate() + + with open(self.request_json, "r") as f: + request = json.load(f) + + stderr = self.fetch(self.errfile) + starttime = self.fetch(self.starttime) + endtime = self.fetch(self.endtime) + cmd = self.fetch(self.cmdfile) + + + outputobj = {} + if state == "COMPLETE": + with open(self.output_json, "r") as outputtemp: + outputobj = json.load(outputtemp) + + return { + "workflow_id": self.workflow_id, + "request": request, + "state": state, + "workflow_log": { + "cmd": cmd, # array? + "start_time": starttime, + "end_time": endtime, + "stdout": "", + "stderr": stderr, + "exit_code": exit_code + }, + "task_logs": [], + "outputs": outputobj + } + + def run(self, request, opts): + """ + Constructs a command to run a cwl/json from requests and opts, + runs it, and deposits the outputs in outdir. + + Runner: + opts.getopt("runner", default="cwl-runner") + + CWL (url): + request["workflow_url"] == a url to a cwl file + or + request["workflow_descriptor"] == input cwl text (written to a file and a url constructed for that file) + + JSON File: + request["workflow_params"] == input json text (to be written to a file) + + :param dict request: A dictionary containing the cwl/json information. + :param wes_service.util.WESBackend opts: contains the user's arguments; + specifically the runner and runner options + :return: {"workflow_id": self.workflow_id, "state": state} + """ + wftype = request['workflow_type'].lower().strip() + version = request['workflow_type_version'] + + if version != 'v1.0' and wftype in ('cwl', 'wdl'): + raise RuntimeError('workflow_type "cwl", "wdl" requires ' + '"workflow_type_version" to be "v1.0": ' + str(version)) + if version != '2.7' and wftype == 'py': + raise RuntimeError('workflow_type "py" requires ' + '"workflow_type_version" to be "2.7": ' + str(version)) + + logging.info('Beginning Toil Workflow ID: ' + str(self.workflow_id)) + + with open(self.starttime, 'w') as f: + f.write(str(time.time())) + with open(self.request_json, 'w') as f: + json.dump(request, f) + with open(self.input_json, "w") as inputtemp: + json.dump(request["workflow_params"], inputtemp) + + command_args = self.write_workflow(request, opts, wftype=wftype) + pid = self.call_cmd(command_args) + + with open(self.endtime, 'w') as f: + f.write(str(time.time())) + with open(self.pidfile, 'w') as f: + f.write(str(pid)) + + return self.getstatus() + + def getstate(self): + """ + Returns RUNNING, -1 + COMPLETE, 0 + or + EXECUTOR_ERROR, 255 + """ + state = "RUNNING" + exit_code = -1 + + exitcode_file = os.path.join(self.workdir, "exit_code") + + if os.path.exists(exitcode_file): + with open(exitcode_file) as f: + exit_code = int(f.read()) + elif os.path.exists(self.pidfile): + with open(self.pidfile, "r") as pid: + pid = int(pid.read()) + try: + (_pid, exit_status) = os.waitpid(pid, os.WNOHANG) + if _pid != 0: + exit_code = exit_status >> 8 + with open(exitcode_file, "w") as f: + f.write(str(exit_code)) + os.unlink(self.pidfile) + except OSError: + os.unlink(self.pidfile) + exit_code = 255 + + if exit_code == 0: + state = "COMPLETE" + elif exit_code != -1: + state = "EXECUTOR_ERROR" + + return state, exit_code + + def getstatus(self): + state, exit_code = self.getstate() + + return { + "workflow_id": self.workflow_id, + "state": state + } + + +class ToilBackend(WESBackend): + processes = {} + + def GetServiceInfo(self): + return { + 'workflow_type_versions': { + 'CWL': {'workflow_type_version': ['v1.0']}, + 'WDL': {'workflow_type_version': ['v1.0']}, + 'py': {'workflow_type_version': ['2.7']} + }, + 'supported_wes_versions': '0.3.0', + 'supported_filesystem_protocols': ['file', 'http', 'https'], + 'engine_versions': ['3.16.0'], + 'system_state_counts': {}, + 'key_values': {} + } + + def ListWorkflows(self): + # FIXME #15 results don't page + wf = [] + for l in os.listdir(os.path.join(os.getcwd(), "workflows")): + if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): + wf.append(ToilWorkflow(l)) + + workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA + return { + "workflows": workflows, + "next_page_token": "" + } + + def RunWorkflow(self): + tempdir = tempfile.mkdtemp() + body = {} + for k, ls in connexion.request.files.iterlists(): + for v in ls: + if k == "workflow_descriptor": + filename = secure_filename(v.filename) + v.save(os.path.join(tempdir, filename)) + elif k in ("workflow_params", "tags", "workflow_engine_parameters"): + body[k] = json.loads(v.read()) + else: + body[k] = v.read() + + if body['workflow_type'] != "CWL" or \ + body['workflow_type_version'] != "v1.0": + return + + body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"]) + index = body["workflow_url"].find("http") + if index > 0: + body["workflow_url"] = body["workflow_url"][index:] + + workflow_id = uuid.uuid4().hex + job = ToilWorkflow(workflow_id) + p = Process(target=job.run, args=(body, self)) + p.start() + self.processes[workflow_id] = p + return {"workflow_id": workflow_id} + + def GetWorkflowLog(self, workflow_id): + job = ToilWorkflow(workflow_id) + return job.getlog() + + def CancelJob(self, workflow_id): + # should this block with `p.is_alive()`? + if workflow_id in self.processes: + self.processes[workflow_id].terminate() + return {'workflow_id': workflow_id} + + def GetWorkflowStatus(self, workflow_id): + job = ToilWorkflow(workflow_id) + return job.getstatus() + + +def create_backend(app, opts): + return ToilBackend(opts) From 5ab72eba9878ec172ba136a4ea748097523fbf46 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 12:35:00 -0700 Subject: [PATCH 02/18] flake8. --- wes_service/toil_wes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 2d43609..91db78d 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -109,7 +109,6 @@ def getlog(self): endtime = self.fetch(self.endtime) cmd = self.fetch(self.cmdfile) - outputobj = {} if state == "COMPLETE": with open(self.output_json, "r") as outputtemp: From 0b7a7077386b937f270d24a677fb9277b189f54d Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 14:33:34 -0700 Subject: [PATCH 03/18] Remove exception. --- wes_service/toil_wes.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 91db78d..38f47e4 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -268,10 +268,6 @@ def RunWorkflow(self): else: body[k] = v.read() - if body['workflow_type'] != "CWL" or \ - body['workflow_type_version'] != "v1.0": - return - body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"]) index = body["workflow_url"].find("http") if index > 0: From 71e973d64fd64df1334f8c83c9977cc6d10411f9 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 15:13:57 -0700 Subject: [PATCH 04/18] WDL test. --- test/test_integration.py | 24 ++++++++++++++++++++++-- testdata/md5sum.wdl | 22 ++++++++++++++++++++++ testdata/md5sum.wdl.json | 1 + wes_service/toil_wes.py | 6 +++--- 4 files changed, 48 insertions(+), 5 deletions(-) create mode 100644 testdata/md5sum.wdl create mode 100644 testdata/md5sum.wdl.json diff --git a/test/test_integration.py b/test/test_integration.py index 21ea622..40a1e88 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -26,8 +26,8 @@ def tearDown(self): time.sleep(3) except OSError as e: print(e) - - shutil.rmtree('workflows') + if os.path.exists('workflows'): + shutil.rmtree('workflows') unittest.TestCase.tearDown(self) def test_dockstore_md5sum(self): @@ -72,6 +72,19 @@ def run_cwl_md5sum(cwl_input): return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] +def run_wdl_md5sum(cwl_input): + """Pass a local md5sum wdl to the wes-service server, and return the path of the output file that was created.""" + endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows' + params = '{"ga4ghMd5.inputFile": "' + os.path.abspath('testdata/md5sum.input') + '"}' + parts = [("workflow_params", params), + ("workflow_type", "WDL"), + ("workflow_type_version", "v1.0"), + ("workflow_url", cwl_input)] + response = requests.post(endpoint, files=parts).json() + output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir')) + return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] + + def get_log_request(run_id): endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows/{}'.format(run_id) return requests.get(endpoint).json() @@ -124,6 +137,13 @@ def setUp(self): shell=True) time.sleep(5) + def test_wdl_md5sum(self): + """Pass a local md5sum cwl to the wes-service server, and check for the correct output.""" + cwl_local_path = os.path.abspath('testdata/md5sum.wdl') + output_filepath, _ = run_wdl_md5sum(cwl_input=cwl_local_path) + + self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath)) + # Prevent pytest/unittest's discovery from attempting to discover the base test class. del IntegrationTest diff --git a/testdata/md5sum.wdl b/testdata/md5sum.wdl new file mode 100644 index 0000000..bab0ab7 --- /dev/null +++ b/testdata/md5sum.wdl @@ -0,0 +1,22 @@ +task md5 { + File inputFile + + command { + /bin/my_md5sum ${inputFile} + } + + output { + File value = "md5sum.txt" + } + + runtime { + docker: "quay.io/briandoconnor/dockstore-tool-md5sum:1.0.4" + cpu: 1 + memory: "512 MB" + } +} + +workflow ga4ghMd5 { + File inputFile + call md5 { input: inputFile=inputFile } +} diff --git a/testdata/md5sum.wdl.json b/testdata/md5sum.wdl.json new file mode 100644 index 0000000..c56730e --- /dev/null +++ b/testdata/md5sum.wdl.json @@ -0,0 +1 @@ +{"ga4ghMd5.inputFile": "md5sum.input"} diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 38f47e4..30cf746 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -55,7 +55,7 @@ def write_workflow(self, request, opts, wftype='cwl'): elif wftype == 'wdl': command_args = ['toil-wdl-runner'] + extra + [workflow_url, self.input_json] elif wftype == 'py': - command_args = ['python'] + extra + [workflow_url] + command_args = ['python'] + extra + [self.input_wf_filename] else: raise RuntimeError('workflow_type is not "cwl", "wdl", or "py": ' + str(wftype)) @@ -261,14 +261,14 @@ def RunWorkflow(self): for k, ls in connexion.request.files.iterlists(): for v in ls: if k == "workflow_descriptor": - filename = secure_filename(v.filename) + filename = secure_filename(os.path.basename(v.filename)) v.save(os.path.join(tempdir, filename)) + body["workflow_url"] = "file:///%s/%s" % (tempdir, filename) elif k in ("workflow_params", "tags", "workflow_engine_parameters"): body[k] = json.loads(v.read()) else: body[k] = v.read() - body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"]) index = body["workflow_url"].find("http") if index > 0: body["workflow_url"] = body["workflow_url"][index:] From a66bf9f42798d3909cc2ea9cb21fa03ec5ec2fb5 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 15:16:11 -0700 Subject: [PATCH 05/18] Naming. --- test/test_integration.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index 40a1e88..2cf4085 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -72,14 +72,14 @@ def run_cwl_md5sum(cwl_input): return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] -def run_wdl_md5sum(cwl_input): +def run_wdl_md5sum(wdl_input): """Pass a local md5sum wdl to the wes-service server, and return the path of the output file that was created.""" endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows' params = '{"ga4ghMd5.inputFile": "' + os.path.abspath('testdata/md5sum.input') + '"}' parts = [("workflow_params", params), ("workflow_type", "WDL"), ("workflow_type_version", "v1.0"), - ("workflow_url", cwl_input)] + ("workflow_url", wdl_input)] response = requests.post(endpoint, files=parts).json() output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir')) return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] @@ -139,8 +139,8 @@ def setUp(self): def test_wdl_md5sum(self): """Pass a local md5sum cwl to the wes-service server, and check for the correct output.""" - cwl_local_path = os.path.abspath('testdata/md5sum.wdl') - output_filepath, _ = run_wdl_md5sum(cwl_input=cwl_local_path) + wdl_local_path = os.path.abspath('testdata/md5sum.wdl') + output_filepath, _ = run_wdl_md5sum(wdl_input=wdl_local_path) self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath)) From 04b94542b4af874d0770a00405f135861a0eb7a7 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 15:24:47 -0700 Subject: [PATCH 06/18] Debug travis paths. --- wes_service/toil_wes.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 30cf746..83663e8 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -54,6 +54,12 @@ def write_workflow(self, request, opts, wftype='cwl'): command_args = ['toil-cwl-runner'] + extra + [workflow_url, self.input_json] elif wftype == 'wdl': command_args = ['toil-wdl-runner'] + extra + [workflow_url, self.input_json] + assert(os.path.exists(workflow_url), workflow_url) + with open(workflow_url, 'r') as f: + logging.info(f.read()) + assert(os.path.exists(self.input_json), self.input_json) + with open(self.input_json, 'r') as f: + logging.info(f.read()) elif wftype == 'py': command_args = ['python'] + extra + [self.input_wf_filename] else: From 7c3d45e0368ddb4fb60583e043fe2e85f014aabe Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 15:35:06 -0700 Subject: [PATCH 07/18] Wait a bit longer for file generation. --- test/test_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_integration.py b/test/test_integration.py index 2cf4085..6678959 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -98,7 +98,7 @@ def get_server_pids(): return pids -def check_for_file(filepath, seconds=20): +def check_for_file(filepath, seconds=40): """Return True if a file exists within a certain amount of time.""" wait_counter = 0 while not os.path.exists(filepath): From 1984819f0bac0863a86e1f82326cb397a68125d6 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 15:56:40 -0700 Subject: [PATCH 08/18] Type error. --- wes_service/toil_wes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 83663e8..5f1d82f 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -125,7 +125,7 @@ def getlog(self): "request": request, "state": state, "workflow_log": { - "cmd": cmd, # array? + "cmd": [""], "start_time": starttime, "end_time": endtime, "stdout": "", From fa314d2a713ce49c7f8bb1184012d42dd0804a74 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 16:39:56 -0700 Subject: [PATCH 09/18] Rearrange dependencies. --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index cd46fab..3f3451b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,9 +3,9 @@ python: - '2.7' before_install: - sudo apt-get update -qq +- pip install toil[all]==3.16.0 - pip install . --process-dependency-links - pip install -r dev-requirements.txt -- pip install toil[all]==3.16.0 script: - flake8 wes_service wes_client - pytest From afbe0026d5f459c717bdbd4825943156bad9222a Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 17:59:15 -0700 Subject: [PATCH 10/18] WDL support. --- wes_client/wes_client_main.py | 13 +++++++++++-- wes_service/toil_wes.py | 3 +++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/wes_client/wes_client_main.py b/wes_client/wes_client_main.py index bc9658d..1088248 100644 --- a/wes_client/wes_client_main.py +++ b/wes_client/wes_client_main.py @@ -10,7 +10,7 @@ import logging import schema_salad.ref_resolver import requests -from requests.exceptions import InvalidSchema +from requests.exceptions import InvalidSchema, MissingSchema from wes_service.util import visit from bravado.client import SwaggerClient from bravado.requests_client import RequestsClient @@ -81,6 +81,13 @@ def main(argv=sys.argv[1:]): json.dump(response.result(), sys.stdout, indent=4) return 0 + if args.workflow_url.lower().endswith('wdl'): + wf_type = 'WDL' + elif args.workflow_url.lower().endswith('cwl'): + wf_type = 'CWL' + elif args.workflow_url.lower().endswith('py'): + wf_type = 'PY' + loader = schema_salad.ref_resolver.Loader({ "location": {"@type": "@id"}, "path": {"@type": "@id"} @@ -112,7 +119,7 @@ def fixpaths(d): parts = [ ("workflow_params", json.dumps(input_dict)), - ("workflow_type", "CWL"), + ("workflow_type", wf_type), ("workflow_type_version", "v1.0") ] if workflow_url.startswith("file://"): @@ -163,6 +170,8 @@ def fixpaths(d): logging.info("Workflow log:\n" + logs) except InvalidSchema: logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"])) + except MissingSchema: + logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"])) if "fields" in s["outputs"] and s["outputs"]["fields"] is None: del s["outputs"]["fields"] diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 5f1d82f..447e008 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -53,6 +53,9 @@ def write_workflow(self, request, opts, wftype='cwl'): if wftype == 'cwl': command_args = ['toil-cwl-runner'] + extra + [workflow_url, self.input_json] elif wftype == 'wdl': + if workflow_url.startswith('http://') or workflow_url.startswith('https://'): + subprocess.check_call(['wget', workflow_url]) + workflow_url = os.path.abspath(workflow_url.split('/')[-1]) command_args = ['toil-wdl-runner'] + extra + [workflow_url, self.input_json] assert(os.path.exists(workflow_url), workflow_url) with open(workflow_url, 'r') as f: From bb7dcd9c6dad5f471245614ff0e96d7aa47a8f89 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 19:49:31 -0700 Subject: [PATCH 11/18] Temporarily disable status. --- wes_service/toil_wes.py | 46 ++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 447e008..a16446e 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -199,29 +199,29 @@ def getstate(self): state = "RUNNING" exit_code = -1 - exitcode_file = os.path.join(self.workdir, "exit_code") - - if os.path.exists(exitcode_file): - with open(exitcode_file) as f: - exit_code = int(f.read()) - elif os.path.exists(self.pidfile): - with open(self.pidfile, "r") as pid: - pid = int(pid.read()) - try: - (_pid, exit_status) = os.waitpid(pid, os.WNOHANG) - if _pid != 0: - exit_code = exit_status >> 8 - with open(exitcode_file, "w") as f: - f.write(str(exit_code)) - os.unlink(self.pidfile) - except OSError: - os.unlink(self.pidfile) - exit_code = 255 - - if exit_code == 0: - state = "COMPLETE" - elif exit_code != -1: - state = "EXECUTOR_ERROR" + # exitcode_file = os.path.join(self.workdir, "exit_code") + # + # if os.path.exists(exitcode_file): + # with open(exitcode_file) as f: + # exit_code = int(f.read()) + # elif os.path.exists(self.pidfile): + # with open(self.pidfile, "r") as pid: + # pid = int(pid.read()) + # try: + # (_pid, exit_status) = os.waitpid(pid, os.WNOHANG) + # if _pid != 0: + # exit_code = exit_status >> 8 + # with open(exitcode_file, "w") as f: + # f.write(str(exit_code)) + # os.unlink(self.pidfile) + # except OSError: + # os.unlink(self.pidfile) + # exit_code = 255 + # + # if exit_code == 0: + # state = "COMPLETE" + # elif exit_code != -1: + # state = "EXECUTOR_ERROR" return state, exit_code From 3f2c6e28110d39e3e22e7e0994875c7ab2c3125a Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 20:07:50 -0700 Subject: [PATCH 12/18] Travis docker. --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 3f3451b..7aa03da 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,8 @@ before_install: script: - flake8 wes_service wes_client - pytest +services: + - docker deploy: provider: pypi on: From adc318661051595fd17f42aaec61702a6eeded9c Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 20:13:15 -0700 Subject: [PATCH 13/18] More debugging on travis. --- test/test_integration.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/test_integration.py b/test/test_integration.py index 6678959..affc3bf 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -82,6 +82,12 @@ def run_wdl_md5sum(wdl_input): ("workflow_url", wdl_input)] response = requests.post(endpoint, files=parts).json() output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir')) + check_travis_log = os.path.join(output_dir, 'stderr') + with open(check_travis_log, 'r') as f: + print(f.read()) + print(subprocess.check_output(['ls', os.path.join('workflows', response['workflow_id'])])) + print('\n') + print(subprocess.check_output(['ls', output_dir])) return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] From 9b90803500f3e466bd1cccd2c3859d9145ce3e52 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 20 Jul 2018 20:22:59 -0700 Subject: [PATCH 14/18] More debugging on travis. --- test/test_integration.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index affc3bf..58041bd 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,6 +8,9 @@ import signal import requests import shutil +import logging + +logging.basicConfig(level=logging.INFO) class IntegrationTest(unittest.TestCase): @@ -84,10 +87,10 @@ def run_wdl_md5sum(wdl_input): output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir')) check_travis_log = os.path.join(output_dir, 'stderr') with open(check_travis_log, 'r') as f: - print(f.read()) - print(subprocess.check_output(['ls', os.path.join('workflows', response['workflow_id'])])) - print('\n') - print(subprocess.check_output(['ls', output_dir])) + logging.info(f.read()) + logging.info(subprocess.check_output(['ls', os.path.join('workflows', response['workflow_id'])])) + logging.info('\n') + logging.info(subprocess.check_output(['ls', output_dir])) return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] From 40518a349fc28c69aa957c8c6ce683ff8e9e2272 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Thu, 26 Jul 2018 06:00:17 -0700 Subject: [PATCH 15/18] Update. --- wes_service/toil_wes.py | 112 +++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 64 deletions(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index a16446e..764d35c 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -2,26 +2,23 @@ import json import os import subprocess -import tempfile import time import logging import urllib import uuid -import connexion from multiprocessing import Process -from werkzeug.utils import secure_filename from wes_service.util import WESBackend logging.basicConfig(level=logging.INFO) class ToilWorkflow(object): - def __init__(self, workflow_id): + def __init__(self, run_id): super(ToilWorkflow, self).__init__() - self.workflow_id = workflow_id + self.run_id = run_id - self.workdir = os.path.join(os.getcwd(), 'workflows', self.workflow_id) + self.workdir = os.path.join(os.getcwd(), 'runs', self.run_id) self.outdir = os.path.join(self.workdir, 'outdir') if not os.path.exists(self.outdir): os.makedirs(self.outdir) @@ -124,7 +121,7 @@ def getlog(self): outputobj = json.load(outputtemp) return { - "workflow_id": self.workflow_id, + "run_id": self.run_id, "request": request, "state": state, "workflow_log": { @@ -158,7 +155,7 @@ def run(self, request, opts): :param dict request: A dictionary containing the cwl/json information. :param wes_service.util.WESBackend opts: contains the user's arguments; specifically the runner and runner options - :return: {"workflow_id": self.workflow_id, "state": state} + :return: {"run_id": self.run_id, "state": state} """ wftype = request['workflow_type'].lower().strip() version = request['workflow_type_version'] @@ -170,7 +167,7 @@ def run(self, request, opts): raise RuntimeError('workflow_type "py" requires ' '"workflow_type_version" to be "2.7": ' + str(version)) - logging.info('Beginning Toil Workflow ID: ' + str(self.workflow_id)) + logging.info('Beginning Toil Workflow ID: ' + str(self.run_id)) with open(self.starttime, 'w') as f: f.write(str(time.time())) @@ -199,29 +196,31 @@ def getstate(self): state = "RUNNING" exit_code = -1 - # exitcode_file = os.path.join(self.workdir, "exit_code") - # - # if os.path.exists(exitcode_file): - # with open(exitcode_file) as f: - # exit_code = int(f.read()) - # elif os.path.exists(self.pidfile): - # with open(self.pidfile, "r") as pid: - # pid = int(pid.read()) - # try: - # (_pid, exit_status) = os.waitpid(pid, os.WNOHANG) - # if _pid != 0: - # exit_code = exit_status >> 8 - # with open(exitcode_file, "w") as f: - # f.write(str(exit_code)) - # os.unlink(self.pidfile) - # except OSError: - # os.unlink(self.pidfile) - # exit_code = 255 - # - # if exit_code == 0: - # state = "COMPLETE" - # elif exit_code != -1: - # state = "EXECUTOR_ERROR" + # TODO: This sections gets a pid that finishes before the workflow exits unless it is + # very quick, like md5sum + exitcode_file = os.path.join(self.workdir, "exit_code") + + if os.path.exists(exitcode_file): + with open(exitcode_file) as f: + exit_code = int(f.read()) + elif os.path.exists(self.pidfile): + with open(self.pidfile, "r") as pid: + pid = int(pid.read()) + try: + (_pid, exit_status) = os.waitpid(pid, os.WNOHANG) + if _pid != 0: + exit_code = exit_status >> 8 + with open(exitcode_file, "w") as f: + f.write(str(exit_code)) + os.unlink(self.pidfile) + except OSError: + os.unlink(self.pidfile) + exit_code = 255 + + if exit_code == 0: + state = "COMPLETE" + elif exit_code != -1: + state = "EXECUTOR_ERROR" return state, exit_code @@ -229,7 +228,7 @@ def getstatus(self): state, exit_code = self.getstate() return { - "workflow_id": self.workflow_id, + "run_id": self.run_id, "state": state } @@ -251,56 +250,41 @@ def GetServiceInfo(self): 'key_values': {} } - def ListWorkflows(self): + def ListRuns(self): # FIXME #15 results don't page wf = [] for l in os.listdir(os.path.join(os.getcwd(), "workflows")): if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): wf.append(ToilWorkflow(l)) - workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA + workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA return { "workflows": workflows, "next_page_token": "" } def RunWorkflow(self): - tempdir = tempfile.mkdtemp() - body = {} - for k, ls in connexion.request.files.iterlists(): - for v in ls: - if k == "workflow_descriptor": - filename = secure_filename(os.path.basename(v.filename)) - v.save(os.path.join(tempdir, filename)) - body["workflow_url"] = "file:///%s/%s" % (tempdir, filename) - elif k in ("workflow_params", "tags", "workflow_engine_parameters"): - body[k] = json.loads(v.read()) - else: - body[k] = v.read() - - index = body["workflow_url"].find("http") - if index > 0: - body["workflow_url"] = body["workflow_url"][index:] - - workflow_id = uuid.uuid4().hex - job = ToilWorkflow(workflow_id) + tempdir, body = self.collect_attachments() + + run_id = uuid.uuid4().hex + job = ToilWorkflow(run_id) p = Process(target=job.run, args=(body, self)) p.start() - self.processes[workflow_id] = p - return {"workflow_id": workflow_id} + self.processes[run_id] = p + return {"run_id": run_id} - def GetWorkflowLog(self, workflow_id): - job = ToilWorkflow(workflow_id) + def GetRunLog(self, run_id): + job = ToilWorkflow(run_id) return job.getlog() - def CancelJob(self, workflow_id): + def CancelRun(self, run_id): # should this block with `p.is_alive()`? - if workflow_id in self.processes: - self.processes[workflow_id].terminate() - return {'workflow_id': workflow_id} + if run_id in self.processes: + self.processes[run_id].terminate() + return {'run_id': run_id} - def GetWorkflowStatus(self, workflow_id): - job = ToilWorkflow(workflow_id) + def GetRunStatus(self, run_id): + job = ToilWorkflow(run_id) return job.getstatus() From 89e89b0b4559bbfc48370068fd1eddbecfb3a8b5 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Thu, 26 Jul 2018 06:01:54 -0700 Subject: [PATCH 16/18] Separate test to different PR. --- test/test_integration.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index 58041bd..e2f3e5c 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -146,13 +146,6 @@ def setUp(self): shell=True) time.sleep(5) - def test_wdl_md5sum(self): - """Pass a local md5sum cwl to the wes-service server, and check for the correct output.""" - wdl_local_path = os.path.abspath('testdata/md5sum.wdl') - output_filepath, _ = run_wdl_md5sum(wdl_input=wdl_local_path) - - self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath)) - # Prevent pytest/unittest's discovery from attempting to discover the base test class. del IntegrationTest From 6ebb3a8ad734972dcb9d176062fef093c8c223f6 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Thu, 26 Jul 2018 06:07:02 -0700 Subject: [PATCH 17/18] Workflows folder. --- wes_service/toil_wes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 764d35c..8fe62e7 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -18,7 +18,7 @@ def __init__(self, run_id): super(ToilWorkflow, self).__init__() self.run_id = run_id - self.workdir = os.path.join(os.getcwd(), 'runs', self.run_id) + self.workdir = os.path.join(os.getcwd(), 'workflows', self.run_id) self.outdir = os.path.join(self.workdir, 'outdir') if not os.path.exists(self.outdir): os.makedirs(self.outdir) From f3c93b8e1ff6a48a74066c82db96412a0dbf1800 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Thu, 26 Jul 2018 06:22:31 -0700 Subject: [PATCH 18/18] flake8. --- wes_service/toil_wes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 8fe62e7..3dbe789 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -54,10 +54,10 @@ def write_workflow(self, request, opts, wftype='cwl'): subprocess.check_call(['wget', workflow_url]) workflow_url = os.path.abspath(workflow_url.split('/')[-1]) command_args = ['toil-wdl-runner'] + extra + [workflow_url, self.input_json] - assert(os.path.exists(workflow_url), workflow_url) + assert(os.path.exists(workflow_url), workflow_url) # noqa with open(workflow_url, 'r') as f: logging.info(f.read()) - assert(os.path.exists(self.input_json), self.input_json) + assert(os.path.exists(self.input_json), self.input_json) # noqa with open(self.input_json, 'r') as f: logging.info(f.read()) elif wftype == 'py': @@ -113,7 +113,7 @@ def getlog(self): stderr = self.fetch(self.errfile) starttime = self.fetch(self.starttime) endtime = self.fetch(self.endtime) - cmd = self.fetch(self.cmdfile) + # cmd = self.fetch(self.cmdfile) outputobj = {} if state == "COMPLETE":