Skip to content

Commit

Permalink
Merge pull request #43 from common-workflow-language/addToilBackend
Browse files Browse the repository at this point in the history
Add a toil backend.
  • Loading branch information
DailyDreaming authored Jul 26, 2018
2 parents 6e960bf + f3c93b8 commit 51424b9
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 17 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ python:
- '2.7'
before_install:
- sudo apt-get update -qq
- pip install toil[all]==3.16.0
- pip install . --process-dependency-links
- pip install -r dev-requirements.txt
- pip install toil[all]==3.16.0
script:
- flake8 wes_service wes_client
- pytest
services:
- docker
deploy:
provider: pypi
on:
Expand Down
45 changes: 31 additions & 14 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import signal
import requests
import shutil
import logging

logging.basicConfig(level=logging.INFO)


class IntegrationTest(unittest.TestCase):
"""A baseclass that's inherited for use with different cwl backends."""

def setUp(self):
"""Start a (local) wes-service server to make requests against."""
raise NotImplementedError
Expand All @@ -27,42 +29,40 @@ def tearDown(self):
time.sleep(3)
except OSError as e:
print(e)

if os.path.exists('workflows'):
shutil.rmtree('workflows')
unittest.TestCase.tearDown(self)

def test_dockstore_md5sum(self):
"""Fetch the md5sum cwl from dockstore, run it on the wes-service server, and check for the correct output."""
cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl'
output_filepath, _ = run_md5sum(cwl_input=cwl_dockstore_url)
output_filepath, _ = run_cwl_md5sum(cwl_input=cwl_dockstore_url)

self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
shutil.rmtree('workflows')

def test_local_md5sum(self):
"""Pass a local md5sum cwl to the wes-service server, and check for the correct output."""
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
output_filepath, _ = run_md5sum(cwl_input='file://' + cwl_local_path)
output_filepath, _ = run_cwl_md5sum(cwl_input='file://' + cwl_local_path)

self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
shutil.rmtree('workflows')

def test_multipart_upload(self):
"""Pass a local md5sum cwl to the wes-service server, and check for uploaded file in service."""
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
_, run_id = run_md5sum(cwl_input='file://' + cwl_local_path)
_, run_id = run_cwl_md5sum(cwl_input='file://' + cwl_local_path)

get_response = get_log_request(run_id)["request"]

self.assertTrue(check_for_file(get_response["workflow_url"][7:]), 'Output file was not found: '
+ get_response["workflow_url"][:7])
shutil.rmtree('workflows')


def run_md5sum(cwl_input):
def run_cwl_md5sum(cwl_input):
"""Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created."""
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs'
params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'},
'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}}
'input_file': {'path': os.path.abspath('testdata/md5sum.input'), 'class': 'File'}}

parts = [("workflow_params", json.dumps(params)), ("workflow_type", "CWL"), ("workflow_type_version", "v1.0")]
if cwl_input.startswith("file://"):
Expand All @@ -75,6 +75,25 @@ def run_md5sum(cwl_input):
return os.path.join(output_dir, 'md5sum.txt'), response['run_id']


def run_wdl_md5sum(wdl_input):
"""Pass a local md5sum wdl to the wes-service server, and return the path of the output file that was created."""
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows'
params = '{"ga4ghMd5.inputFile": "' + os.path.abspath('testdata/md5sum.input') + '"}'
parts = [("workflow_params", params),
("workflow_type", "WDL"),
("workflow_type_version", "v1.0"),
("workflow_url", wdl_input)]
response = requests.post(endpoint, files=parts).json()
output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir'))
check_travis_log = os.path.join(output_dir, 'stderr')
with open(check_travis_log, 'r') as f:
logging.info(f.read())
logging.info(subprocess.check_output(['ls', os.path.join('workflows', response['workflow_id'])]))
logging.info('\n')
logging.info(subprocess.check_output(['ls', output_dir]))
return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id']


def get_log_request(run_id):
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs/{}'.format(run_id)
return requests.get(endpoint).json()
Expand All @@ -88,7 +107,7 @@ def get_server_pids():
return pids


def check_for_file(filepath, seconds=20):
def check_for_file(filepath, seconds=40):
"""Return True if a file exists within a certain amount of time."""
wait_counter = 0
while not os.path.exists(filepath):
Expand Down Expand Up @@ -117,14 +136,12 @@ def setUp(self):

class ToilTest(IntegrationTest):
"""Test using Toil."""

def setUp(self):
"""
Start a (local) wes-service server to make requests against.
Use toil as the wes-service server 'backend'.
"""
self.wes_server_process = subprocess.Popen('python {} '
'--opt runner=cwltoil --opt extra=--logLevel=CRITICAL'
self.wes_server_process = subprocess.Popen('python {} --backend=wes_service.toil_wes --opt="extra=--logLevel=CRITICAL"'
''.format(os.path.abspath('wes_service/wes_service_main.py')),
shell=True)
time.sleep(5)
Expand Down
22 changes: 22 additions & 0 deletions testdata/md5sum.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
task md5 {
File inputFile

command {
/bin/my_md5sum ${inputFile}
}

output {
File value = "md5sum.txt"
}

runtime {
docker: "quay.io/briandoconnor/dockstore-tool-md5sum:1.0.4"
cpu: 1
memory: "512 MB"
}
}

workflow ga4ghMd5 {
File inputFile
call md5 { input: inputFile=inputFile }
}
1 change: 1 addition & 0 deletions testdata/md5sum.wdl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"ga4ghMd5.inputFile": "md5sum.input"}
13 changes: 11 additions & 2 deletions wes_client/wes_client_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
import schema_salad.ref_resolver
import requests
from requests.exceptions import InvalidSchema
from requests.exceptions import InvalidSchema, MissingSchema
from wes_service.util import visit
from bravado.client import SwaggerClient
from bravado.requests_client import RequestsClient
Expand Down Expand Up @@ -81,6 +81,13 @@ def main(argv=sys.argv[1:]):
json.dump(response.result(), sys.stdout, indent=4)
return 0

if args.workflow_url.lower().endswith('wdl'):
wf_type = 'WDL'
elif args.workflow_url.lower().endswith('cwl'):
wf_type = 'CWL'
elif args.workflow_url.lower().endswith('py'):
wf_type = 'PY'

if not args.job_order:
logging.error("Missing job order")
return 1
Expand Down Expand Up @@ -116,7 +123,7 @@ def fixpaths(d):

parts = [
("workflow_params", json.dumps(input_dict)),
("workflow_type", "CWL"),
("workflow_type", wf_type),
("workflow_type_version", "v1.0")
]
if workflow_url.startswith("file://"):
Expand Down Expand Up @@ -167,6 +174,8 @@ def fixpaths(d):
logging.info("Workflow log:\n" + logs)
except InvalidSchema:
logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"]))
except MissingSchema:
logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"]))

if "fields" in s["outputs"] and s["outputs"]["fields"] is None:
del s["outputs"]["fields"]
Expand Down
Loading

0 comments on commit 51424b9

Please sign in to comment.