Skip to content

Commit

Permalink
Rebase.
Browse files Browse the repository at this point in the history
  • Loading branch information
DailyDreaming committed Jul 26, 2018
2 parents 6ebb3a8 + 6e960bf commit ad949f1
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 169 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
long_description = readmeFile.read()

setup(name='wes-service',
version='2.4',
version='2.5',
description='GA4GH Workflow Execution Service reference implementation',
long_description=long_description,
author='GA4GH Containers and Workflows task team',
Expand Down
8 changes: 4 additions & 4 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_multipart_upload(self):

def run_cwl_md5sum(cwl_input):
"""Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created."""
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows'
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs'
params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'},
'input_file': {'path': os.path.abspath('testdata/md5sum.input'), 'class': 'File'}}

Expand All @@ -71,8 +71,8 @@ def run_cwl_md5sum(cwl_input):
else:
parts.append(("workflow_url", cwl_input))
response = requests.post(endpoint, files=parts).json()
output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir'))
return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id']
output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir'))
return os.path.join(output_dir, 'md5sum.txt'), response['run_id']


def run_wdl_md5sum(wdl_input):
Expand All @@ -95,7 +95,7 @@ def run_wdl_md5sum(wdl_input):


def get_log_request(run_id):
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows/{}'.format(run_id)
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs/{}'.format(run_id)
return requests.get(endpoint).json()


Expand Down
24 changes: 14 additions & 10 deletions wes_client/wes_client_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ def main(argv=sys.argv[1:]):
http_client=http_client, config={"use_models": False})

if args.list:
response = client.WorkflowExecutionService.ListWorkflows(page_token=args.page, page_size=args.page_size)
response = client.WorkflowExecutionService.ListRuns(page_token=args.page, page_size=args.page_size)
json.dump(response.result(), sys.stdout, indent=4)
return 0

if args.log:
response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.log)
response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.log)
sys.stdout.write(response.result()["workflow_log"]["stderr"])
return 0

if args.get:
response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get)
response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.get)
json.dump(response.result(), sys.stdout, indent=4)
return 0

Expand All @@ -88,6 +88,10 @@ def main(argv=sys.argv[1:]):
elif args.workflow_url.lower().endswith('py'):
wf_type = 'PY'

if not args.job_order:
logging.error("Missing job order")
return 1

loader = schema_salad.ref_resolver.Loader({
"location": {"@type": "@id"},
"path": {"@type": "@id"}
Expand All @@ -109,7 +113,7 @@ def fixpaths(d):
visit(input_dict, fixpaths)

workflow_url = args.workflow_url
if not workflow_url.startswith("/") and ":" not in workflow_url:
if ":" not in workflow_url:
workflow_url = "file://" + os.path.abspath(workflow_url)

if args.quiet:
Expand Down Expand Up @@ -138,7 +142,7 @@ def fixpaths(d):
else:
parts.append(("workflow_url", workflow_url))

postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/workflows" % (args.proto, args.host),
postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/runs" % (args.proto, args.host),
files=parts,
headers={"Authorization": args.auth})

Expand All @@ -149,19 +153,19 @@ def fixpaths(d):
exit(1)

if args.wait:
logging.info("Workflow id is %s", r["workflow_id"])
logging.info("Workflow run id is %s", r["run_id"])
else:
sys.stdout.write(r["workflow_id"] + "\n")
sys.stdout.write(r["run_id"] + "\n")
exit(0)

r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result()
while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"):
time.sleep(8)
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result()

logging.info("State is %s", r["state"])

s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r["workflow_id"]).result()
s = client.WorkflowExecutionService.GetRunLog(run_id=r["run_id"]).result()

try:
# TODO: Only works with Arvados atm
Expand Down
56 changes: 21 additions & 35 deletions wes_service/arvados_wes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import shutil

from wes_service.util import visit, WESBackend
from werkzeug.utils import secure_filename


class MissingAuthorization(Exception):
Expand Down Expand Up @@ -81,7 +80,7 @@ def GetServiceInfo(self):
}

@catch_exceptions
def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_search=None):
def ListRuns(self, page_size=None, page_token=None, state_search=None):
api = get_api()

paging = []
Expand All @@ -100,13 +99,13 @@ def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_

uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers}

workflow_list = [{"workflow_id": cr["uuid"],
workflow_list = [{"run_id": cr["uuid"],
"state": uuidmap.get(cr["container_uuid"])}
for cr in requests
if cr["command"] and cr["command"][0] == "arvados-cwl-runner"]
return {
"workflows": workflow_list,
"next_page_token": workflow_list[-1]["workflow_id"] if workflow_list else ""
"next_page_token": workflow_list[-1]["run_id"] if workflow_list else ""
}

def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
Expand Down Expand Up @@ -156,23 +155,10 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
workflow_descriptor_file.close()

@catch_exceptions
def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version,
workflow_url, workflow_descriptor, workflow_engine_parameters=None, tags=None):
tempdir = tempfile.mkdtemp()
body = {}
for k, ls in connexion.request.files.iterlists():
for v in ls:
if k == "workflow_descriptor":
filename = secure_filename(v.filename)
v.save(os.path.join(tempdir, filename))
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
body[k] = json.loads(v.read())
else:
body[k] = v.read()
body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])

if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0": # NOQA
return
def RunWorkflow(self, **args):
tempdir, body = self.collect_attachments()

print(body)

if not connexion.request.headers.get('Authorization'):
raise MissingAuthorization()
Expand Down Expand Up @@ -215,13 +201,13 @@ def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version,
project_uuid,
tempdir)).start()

return {"workflow_id": cr["uuid"]}
return {"run_id": cr["uuid"]}

@catch_exceptions
def GetWorkflowLog(self, workflow_id):
def GetRunLog(self, run_id):
api = get_api()

request = api.container_requests().get(uuid=workflow_id).execute()
request = api.container_requests().get(uuid=run_id).execute()
if request["container_uuid"]:
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
task_reqs = arvados.util.list_all(api.container_requests().list, filters=[["requesting_container_uuid", "=", container["uuid"]]])
Expand Down Expand Up @@ -273,7 +259,7 @@ def log_object(cr):
return r

r = {
"workflow_id": request["uuid"],
"run_id": request["uuid"],
"request": {
"workflow_url": "",
"workflow_params": request["mounts"].get("/var/lib/cwl/cwl.input.json", {}).get("content", {})
Expand All @@ -287,30 +273,30 @@ def log_object(cr):
return r

@catch_exceptions
def CancelJob(self, workflow_id): # NOQA
def CancelRun(self, run_id): # NOQA
api = get_api()
request = api.container_requests().update(uuid=workflow_id, body={"priority": 0}).execute() # NOQA
return {"workflow_id": request["uuid"]}
request = api.container_requests().update(uuid=run_id, body={"priority": 0}).execute() # NOQA
return {"run_id": request["uuid"]}

@catch_exceptions
def GetWorkflowStatus(self, workflow_id):
def GetRunStatus(self, run_id):
api = get_api()
request = api.container_requests().get(uuid=workflow_id).execute()
request = api.container_requests().get(uuid=run_id).execute()
if request["container_uuid"]:
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
elif request["priority"] == 0:
container = {"state": "Cancelled"}
else:
container = {"state": "Queued"}
return {"workflow_id": request["uuid"],
return {"run_id": request["uuid"],
"state": statemap[container["state"]]}


def dynamic_logs(workflow_id, logstream):
def dynamic_logs(run_id, logstream):
api = get_api()
cr = api.container_requests().get(uuid=workflow_id).execute()
cr = api.container_requests().get(uuid=run_id).execute()
l1 = [t["properties"]["text"]
for t in api.logs().list(filters=[["object_uuid", "=", workflow_id],
for t in api.logs().list(filters=[["object_uuid", "=", run_id],
["event_type", "=", logstream]],
order="created_at desc",
limit=100).execute()["items"]]
Expand All @@ -327,5 +313,5 @@ def dynamic_logs(workflow_id, logstream):

def create_backend(app, opts):
ab = ArvadosBackend(opts)
app.app.route('/ga4gh/wes/v1/workflows/<workflow_id>/x-dynamic-logs/<logstream>')(dynamic_logs)
app.app.route('/ga4gh/wes/v1/runs/<run_id>/x-dynamic-logs/<logstream>')(dynamic_logs)
return ab
65 changes: 21 additions & 44 deletions wes_service/cwl_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@
import json
import os
import subprocess
import tempfile
import urllib
import uuid

import connexion
from werkzeug.utils import secure_filename

from wes_service.util import WESBackend


class Workflow(object):
def __init__(self, workflow_id):
def __init__(self, run_id):
super(Workflow, self).__init__()
self.workflow_id = workflow_id
self.workdir = os.path.join(os.getcwd(), "workflows", self.workflow_id)
self.run_id = run_id
self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id)

def run(self, request, opts):
"""
Expand All @@ -37,7 +33,7 @@ def run(self, request, opts):
:param dict request: A dictionary containing the cwl/json information.
:param wes_service.util.WESBackend opts: contains the user's arguments;
specifically the runner and runner options
:return: {"workflow_id": self.workflow_id, "state": state}
:return: {"run_id": self.run_id, "state": state}
"""
os.makedirs(self.workdir)
outdir = os.path.join(self.workdir, "outdir")
Expand Down Expand Up @@ -117,7 +113,7 @@ def getstatus(self):
state, exit_code = self.getstate()

return {
"workflow_id": self.workflow_id,
"run_id": self.run_id,
"state": state
}

Expand All @@ -137,7 +133,7 @@ def getlog(self):
outputobj = json.load(outputtemp)

return {
"workflow_id": self.workflow_id,
"run_id": self.run_id,
"request": request,
"state": state,
"workflow_log": {
Expand Down Expand Up @@ -169,58 +165,39 @@ def GetServiceInfo(self):
"key_values": {}
}

def ListWorkflows(self):
def ListRuns(self, page_size=None, page_token=None, state_search=None):
# FIXME #15 results don't page
wf = []
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
wf.append(Workflow(l))

workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
return {
"workflows": workflows,
"next_page_token": ""
}

def RunWorkflow(self):
tempdir = tempfile.mkdtemp()
body = {}
for k, ls in connexion.request.files.iterlists():
for v in ls:
if k == "workflow_descriptor":
filename = secure_filename(v.filename)
v.save(os.path.join(tempdir, filename))
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
body[k] = json.loads(v.read())
else:
body[k] = v.read()

if body['workflow_type'] != "CWL" or \
body['workflow_type_version'] != "v1.0":
return

body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])
index = body["workflow_url"].find("http")
if index > 0:
body["workflow_url"] = body["workflow_url"][index:]

workflow_id = uuid.uuid4().hex
job = Workflow(workflow_id)
def RunWorkflow(self, **args):
tempdir, body = self.collect_attachments()

run_id = uuid.uuid4().hex
job = Workflow(run_id)

job.run(body, self)
return {"workflow_id": workflow_id}
return {"run_id": run_id}

def GetWorkflowLog(self, workflow_id):
job = Workflow(workflow_id)
def GetRunLog(self, run_id):
job = Workflow(run_id)
return job.getlog()

def CancelJob(self, workflow_id):
job = Workflow(workflow_id)
def CancelRun(self, run_id):
job = Workflow(run_id)
job.cancel()
return {"workflow_id": workflow_id}
return {"run_id": run_id}

def GetWorkflowStatus(self, workflow_id):
job = Workflow(workflow_id)
def GetRunStatus(self, run_id):
job = Workflow(run_id)
return job.getstatus()


Expand Down
Loading

0 comments on commit ad949f1

Please sign in to comment.