diff --git a/.travis.yml b/.travis.yml index cd46fab..7aa03da 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,12 +3,14 @@ python: - '2.7' before_install: - sudo apt-get update -qq +- pip install toil[all]==3.16.0 - pip install . --process-dependency-links - pip install -r dev-requirements.txt -- pip install toil[all]==3.16.0 script: - flake8 wes_service wes_client - pytest +services: + - docker deploy: provider: pypi on: diff --git a/test/test_integration.py b/test/test_integration.py index 52e6c54..b10290b 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,11 +8,13 @@ import signal import requests import shutil +import logging + +logging.basicConfig(level=logging.INFO) class IntegrationTest(unittest.TestCase): """A baseclass that's inherited for use with different cwl backends.""" - def setUp(self): """Start a (local) wes-service server to make requests against.""" raise NotImplementedError @@ -27,42 +29,40 @@ def tearDown(self): time.sleep(3) except OSError as e: print(e) - + if os.path.exists('workflows'): + shutil.rmtree('workflows') unittest.TestCase.tearDown(self) def test_dockstore_md5sum(self): """Fetch the md5sum cwl from dockstore, run it on the wes-service server, and check for the correct output.""" cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl' - output_filepath, _ = run_md5sum(cwl_input=cwl_dockstore_url) + output_filepath, _ = run_cwl_md5sum(cwl_input=cwl_dockstore_url) self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath)) - shutil.rmtree('workflows') def test_local_md5sum(self): """Pass a local md5sum cwl to the wes-service server, and check for the correct output.""" cwl_local_path = os.path.abspath('testdata/md5sum.cwl') - output_filepath, _ = run_md5sum(cwl_input='file://' + cwl_local_path) + output_filepath, _ = run_cwl_md5sum(cwl_input='file://' + cwl_local_path) self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath)) - shutil.rmtree('workflows') def test_multipart_upload(self): """Pass a local md5sum cwl to the wes-service server, and check for uploaded file in service.""" cwl_local_path = os.path.abspath('testdata/md5sum.cwl') - _, run_id = run_md5sum(cwl_input='file://' + cwl_local_path) + _, run_id = run_cwl_md5sum(cwl_input='file://' + cwl_local_path) get_response = get_log_request(run_id)["request"] self.assertTrue(check_for_file(get_response["workflow_url"][7:]), 'Output file was not found: ' + get_response["workflow_url"][:7]) - shutil.rmtree('workflows') -def run_md5sum(cwl_input): +def run_cwl_md5sum(cwl_input): """Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created.""" endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs' params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'}, - 'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}} + 'input_file': {'path': os.path.abspath('testdata/md5sum.input'), 'class': 'File'}} parts = [("workflow_params", json.dumps(params)), ("workflow_type", "CWL"), ("workflow_type_version", "v1.0")] if cwl_input.startswith("file://"): @@ -75,6 +75,25 @@ def run_md5sum(cwl_input): return os.path.join(output_dir, 'md5sum.txt'), response['run_id'] +def run_wdl_md5sum(wdl_input): + """Pass a local md5sum wdl to the wes-service server, and return the path of the output file that was created.""" + endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows' + params = '{"ga4ghMd5.inputFile": "' + os.path.abspath('testdata/md5sum.input') + '"}' + parts = [("workflow_params", params), + ("workflow_type", "WDL"), + ("workflow_type_version", "v1.0"), + ("workflow_url", wdl_input)] + response = requests.post(endpoint, files=parts).json() + output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir')) + check_travis_log = os.path.join(output_dir, 'stderr') + with open(check_travis_log, 'r') as f: + logging.info(f.read()) + logging.info(subprocess.check_output(['ls', os.path.join('workflows', response['workflow_id'])])) + logging.info('\n') + logging.info(subprocess.check_output(['ls', output_dir])) + return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] + + def get_log_request(run_id): endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs/{}'.format(run_id) return requests.get(endpoint).json() @@ -88,7 +107,7 @@ def get_server_pids(): return pids -def check_for_file(filepath, seconds=20): +def check_for_file(filepath, seconds=40): """Return True if a file exists within a certain amount of time.""" wait_counter = 0 while not os.path.exists(filepath): @@ -117,14 +136,12 @@ def setUp(self): class ToilTest(IntegrationTest): """Test using Toil.""" - def setUp(self): """ Start a (local) wes-service server to make requests against. Use toil as the wes-service server 'backend'. """ - self.wes_server_process = subprocess.Popen('python {} ' - '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' + self.wes_server_process = subprocess.Popen('python {} --backend=wes_service.toil_wes --opt="extra=--logLevel=CRITICAL"' ''.format(os.path.abspath('wes_service/wes_service_main.py')), shell=True) time.sleep(5) diff --git a/testdata/md5sum.wdl b/testdata/md5sum.wdl new file mode 100644 index 0000000..bab0ab7 --- /dev/null +++ b/testdata/md5sum.wdl @@ -0,0 +1,22 @@ +task md5 { + File inputFile + + command { + /bin/my_md5sum ${inputFile} + } + + output { + File value = "md5sum.txt" + } + + runtime { + docker: "quay.io/briandoconnor/dockstore-tool-md5sum:1.0.4" + cpu: 1 + memory: "512 MB" + } +} + +workflow ga4ghMd5 { + File inputFile + call md5 { input: inputFile=inputFile } +} diff --git a/testdata/md5sum.wdl.json b/testdata/md5sum.wdl.json new file mode 100644 index 0000000..c56730e --- /dev/null +++ b/testdata/md5sum.wdl.json @@ -0,0 +1 @@ +{"ga4ghMd5.inputFile": "md5sum.input"} diff --git a/wes_client/wes_client_main.py b/wes_client/wes_client_main.py index 1b66d69..9d68df8 100644 --- a/wes_client/wes_client_main.py +++ b/wes_client/wes_client_main.py @@ -10,7 +10,7 @@ import logging import schema_salad.ref_resolver import requests -from requests.exceptions import InvalidSchema +from requests.exceptions import InvalidSchema, MissingSchema from wes_service.util import visit from bravado.client import SwaggerClient from bravado.requests_client import RequestsClient @@ -81,6 +81,13 @@ def main(argv=sys.argv[1:]): json.dump(response.result(), sys.stdout, indent=4) return 0 + if args.workflow_url.lower().endswith('wdl'): + wf_type = 'WDL' + elif args.workflow_url.lower().endswith('cwl'): + wf_type = 'CWL' + elif args.workflow_url.lower().endswith('py'): + wf_type = 'PY' + if not args.job_order: logging.error("Missing job order") return 1 @@ -116,7 +123,7 @@ def fixpaths(d): parts = [ ("workflow_params", json.dumps(input_dict)), - ("workflow_type", "CWL"), + ("workflow_type", wf_type), ("workflow_type_version", "v1.0") ] if workflow_url.startswith("file://"): @@ -167,6 +174,8 @@ def fixpaths(d): logging.info("Workflow log:\n" + logs) except InvalidSchema: logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"])) + except MissingSchema: + logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"])) if "fields" in s["outputs"] and s["outputs"]["fields"] is None: del s["outputs"]["fields"] diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py new file mode 100644 index 0000000..3dbe789 --- /dev/null +++ b/wes_service/toil_wes.py @@ -0,0 +1,292 @@ +from __future__ import print_function +import json +import os +import subprocess +import time +import logging +import urllib +import uuid + +from multiprocessing import Process +from wes_service.util import WESBackend + +logging.basicConfig(level=logging.INFO) + + +class ToilWorkflow(object): + def __init__(self, run_id): + super(ToilWorkflow, self).__init__() + self.run_id = run_id + + self.workdir = os.path.join(os.getcwd(), 'workflows', self.run_id) + self.outdir = os.path.join(self.workdir, 'outdir') + if not os.path.exists(self.outdir): + os.makedirs(self.outdir) + + self.outfile = os.path.join(self.workdir, 'stdout') + self.errfile = os.path.join(self.workdir, 'stderr') + self.starttime = os.path.join(self.workdir, 'starttime') + self.endtime = os.path.join(self.workdir, 'endtime') + self.pidfile = os.path.join(self.workdir, 'pid') + self.cmdfile = os.path.join(self.workdir, 'cmd') + self.request_json = os.path.join(self.workdir, 'request.json') + self.output_json = os.path.join(self.workdir, "output.json") + self.input_wf_filename = os.path.join(self.workdir, "workflow.cwl") + self.input_json = os.path.join(self.workdir, "input.json") + + def write_workflow(self, request, opts, wftype='cwl'): + """Writes a cwl, wdl, or python file as appropriate from the request dictionary.""" + self.input_wf_filename = os.path.join(self.workdir, 'workflow.' + wftype) + + if request.get("workflow_descriptor"): + workflow_descriptor = request.get('workflow_descriptor') + with open(self.input_wf_filename, "w") as f: + f.write(workflow_descriptor) + workflow_url = urllib.pathname2url(self.input_wf_filename) + else: + workflow_url = request.get("workflow_url") + + extra = opts.getoptlist("extra") + if wftype == 'cwl': + command_args = ['toil-cwl-runner'] + extra + [workflow_url, self.input_json] + elif wftype == 'wdl': + if workflow_url.startswith('http://') or workflow_url.startswith('https://'): + subprocess.check_call(['wget', workflow_url]) + workflow_url = os.path.abspath(workflow_url.split('/')[-1]) + command_args = ['toil-wdl-runner'] + extra + [workflow_url, self.input_json] + assert(os.path.exists(workflow_url), workflow_url) # noqa + with open(workflow_url, 'r') as f: + logging.info(f.read()) + assert(os.path.exists(self.input_json), self.input_json) # noqa + with open(self.input_json, 'r') as f: + logging.info(f.read()) + elif wftype == 'py': + command_args = ['python'] + extra + [self.input_wf_filename] + else: + raise RuntimeError('workflow_type is not "cwl", "wdl", or "py": ' + str(wftype)) + + return command_args + + def write_json(self, request_dict): + input_json = os.path.join(self.workdir, 'input.json') + with open(input_json, 'w') as inputtemp: + json.dump(request_dict['workflow_params'], inputtemp) + return input_json + + def call_cmd(self, cmd): + """ + Calls a command with Popen. + Writes stdout, stderr, and the command to separate files. + + :param cmd: A string or array of strings. + :return: The pid of the command. + """ + with open(self.cmdfile, 'w') as f: + f.write(str(cmd)) + stdout = open(self.outfile, 'w') + stderr = open(self.errfile, 'w') + logging.info('Calling: ' + ' '.join(cmd)) + process = subprocess.Popen(cmd, + stdout=stdout, + stderr=stderr, + close_fds=True, + cwd=self.outdir) + stdout.close() + stderr.close() + return process.pid + + def cancel(self): + pass + + def fetch(self, filename): + if os.path.exists(filename): + with open(filename, "r") as f: + return f.read() + return '' + + def getlog(self): + state, exit_code = self.getstate() + + with open(self.request_json, "r") as f: + request = json.load(f) + + stderr = self.fetch(self.errfile) + starttime = self.fetch(self.starttime) + endtime = self.fetch(self.endtime) + # cmd = self.fetch(self.cmdfile) + + outputobj = {} + if state == "COMPLETE": + with open(self.output_json, "r") as outputtemp: + outputobj = json.load(outputtemp) + + return { + "run_id": self.run_id, + "request": request, + "state": state, + "workflow_log": { + "cmd": [""], + "start_time": starttime, + "end_time": endtime, + "stdout": "", + "stderr": stderr, + "exit_code": exit_code + }, + "task_logs": [], + "outputs": outputobj + } + + def run(self, request, opts): + """ + Constructs a command to run a cwl/json from requests and opts, + runs it, and deposits the outputs in outdir. + + Runner: + opts.getopt("runner", default="cwl-runner") + + CWL (url): + request["workflow_url"] == a url to a cwl file + or + request["workflow_descriptor"] == input cwl text (written to a file and a url constructed for that file) + + JSON File: + request["workflow_params"] == input json text (to be written to a file) + + :param dict request: A dictionary containing the cwl/json information. + :param wes_service.util.WESBackend opts: contains the user's arguments; + specifically the runner and runner options + :return: {"run_id": self.run_id, "state": state} + """ + wftype = request['workflow_type'].lower().strip() + version = request['workflow_type_version'] + + if version != 'v1.0' and wftype in ('cwl', 'wdl'): + raise RuntimeError('workflow_type "cwl", "wdl" requires ' + '"workflow_type_version" to be "v1.0": ' + str(version)) + if version != '2.7' and wftype == 'py': + raise RuntimeError('workflow_type "py" requires ' + '"workflow_type_version" to be "2.7": ' + str(version)) + + logging.info('Beginning Toil Workflow ID: ' + str(self.run_id)) + + with open(self.starttime, 'w') as f: + f.write(str(time.time())) + with open(self.request_json, 'w') as f: + json.dump(request, f) + with open(self.input_json, "w") as inputtemp: + json.dump(request["workflow_params"], inputtemp) + + command_args = self.write_workflow(request, opts, wftype=wftype) + pid = self.call_cmd(command_args) + + with open(self.endtime, 'w') as f: + f.write(str(time.time())) + with open(self.pidfile, 'w') as f: + f.write(str(pid)) + + return self.getstatus() + + def getstate(self): + """ + Returns RUNNING, -1 + COMPLETE, 0 + or + EXECUTOR_ERROR, 255 + """ + state = "RUNNING" + exit_code = -1 + + # TODO: This sections gets a pid that finishes before the workflow exits unless it is + # very quick, like md5sum + exitcode_file = os.path.join(self.workdir, "exit_code") + + if os.path.exists(exitcode_file): + with open(exitcode_file) as f: + exit_code = int(f.read()) + elif os.path.exists(self.pidfile): + with open(self.pidfile, "r") as pid: + pid = int(pid.read()) + try: + (_pid, exit_status) = os.waitpid(pid, os.WNOHANG) + if _pid != 0: + exit_code = exit_status >> 8 + with open(exitcode_file, "w") as f: + f.write(str(exit_code)) + os.unlink(self.pidfile) + except OSError: + os.unlink(self.pidfile) + exit_code = 255 + + if exit_code == 0: + state = "COMPLETE" + elif exit_code != -1: + state = "EXECUTOR_ERROR" + + return state, exit_code + + def getstatus(self): + state, exit_code = self.getstate() + + return { + "run_id": self.run_id, + "state": state + } + + +class ToilBackend(WESBackend): + processes = {} + + def GetServiceInfo(self): + return { + 'workflow_type_versions': { + 'CWL': {'workflow_type_version': ['v1.0']}, + 'WDL': {'workflow_type_version': ['v1.0']}, + 'py': {'workflow_type_version': ['2.7']} + }, + 'supported_wes_versions': '0.3.0', + 'supported_filesystem_protocols': ['file', 'http', 'https'], + 'engine_versions': ['3.16.0'], + 'system_state_counts': {}, + 'key_values': {} + } + + def ListRuns(self): + # FIXME #15 results don't page + wf = [] + for l in os.listdir(os.path.join(os.getcwd(), "workflows")): + if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): + wf.append(ToilWorkflow(l)) + + workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA + return { + "workflows": workflows, + "next_page_token": "" + } + + def RunWorkflow(self): + tempdir, body = self.collect_attachments() + + run_id = uuid.uuid4().hex + job = ToilWorkflow(run_id) + p = Process(target=job.run, args=(body, self)) + p.start() + self.processes[run_id] = p + return {"run_id": run_id} + + def GetRunLog(self, run_id): + job = ToilWorkflow(run_id) + return job.getlog() + + def CancelRun(self, run_id): + # should this block with `p.is_alive()`? + if run_id in self.processes: + self.processes[run_id].terminate() + return {'run_id': run_id} + + def GetRunStatus(self, run_id): + job = ToilWorkflow(run_id) + return job.getstatus() + + +def create_backend(app, opts): + return ToilBackend(opts)