Skip to content

Commit

Permalink
Merge branch 'main' into 20230824_merge_dcl10
Browse files Browse the repository at this point in the history
Improved and finished the migration and integration of proposed changes from @dcl10
  • Loading branch information
jmfernandez committed Aug 29, 2023
2 parents 3d9a030 + 2063728 commit 06c46d1
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 127 deletions.
10 changes: 7 additions & 3 deletions wfexs_backend/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,14 @@ def identifyWorkflow(

@abc.abstractmethod
def materializeWorkflow(
self, matWorfklowEngine: "MaterializedWorkflowEngine", offline: "bool" = False
self,
matWorfklowEngine: "MaterializedWorkflowEngine",
consolidatedWorkflowDir: "AbsPath",
offline: "bool" = False,
) -> "Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]":
"""
Method to ensure the workflow has been materialized. It returns the
localWorkflow directory, as well as the list of containers
Method to ensure the workflow has been materialized. It returns a
possibly updated materialized workflow engine, as well as the list of containers
For Nextflow it is usually a no-op, but for CWL it requires resolution
"""
Expand Down Expand Up @@ -704,6 +707,7 @@ class StagedSetup(NamedTuple):
raw_work_dir: "AbsPath"
work_dir: "Optional[AbsPath]"
workflow_dir: "Optional[AbsPath]"
consolidated_workflow_dir: "Optional[AbsPath]"
inputs_dir: "Optional[AbsPath]"
extrapolated_inputs_dir: "Optional[AbsPath]"
outputs_dir: "Optional[AbsPath]"
Expand Down
106 changes: 62 additions & 44 deletions wfexs_backend/cwl_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@
from .engine import WORKDIR_STDOUT_FILE, WORKDIR_STDERR_FILE, STATS_DAG_DOT_FILE
from .engine import WorkflowEngine, WorkflowEngineException

from .utils.contents import CWLClass2WfExS
from .utils.contents import (
CWLClass2WfExS,
link_or_copy,
)


# Next methods are borrowed from
Expand Down Expand Up @@ -657,13 +660,16 @@ def _enrichWorkflowDeps(
return localWf

def materializeWorkflow(
self, matWorkflowEngine: "MaterializedWorkflowEngine", offline: "bool" = False
self,
matWorkflowEngine: "MaterializedWorkflowEngine",
consolidatedWorkflowDir: "AbsPath",
offline: "bool" = False,
) -> "Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]":
"""
Method to ensure the workflow has been materialized. It returns the
localWorkflow directory, as well as the list of containers.
For Nextflow it is usually a no-op, but for CWL it requires resolution.
Method to ensure the workflow has been materialized. In the case
of CWL, it returns a new materialized workflow engine, which points
to a newly genereated LocalWorkflow instance pointing to the
consolidated workflow. Also, it returns the list of containers.
"""
localWf = matWorkflowEngine.workflow
localWorkflowDir = localWf.dir
Expand Down Expand Up @@ -698,54 +704,65 @@ def materializeWorkflow(
os.path.join(localWorkflowUsedHashes_head, localWorkflowUsedHashes_tail)
+ ".cwl"
).replace("/", "_")
packedLocalWorkflowFile = os.path.join(
self.cacheWorkflowPackDir, localWorkflowPackedName
)

# TODO: check whether the repo is newer than the packed file

consolidatedPackedWorkflowFile = cast(
"AbsPath", os.path.join(consolidatedWorkflowDir, localWorkflowPackedName)
)
if (
not os.path.isfile(packedLocalWorkflowFile)
or os.path.getsize(packedLocalWorkflowFile) == 0
not os.path.isfile(consolidatedPackedWorkflowFile)
or os.path.getsize(consolidatedPackedWorkflowFile) == 0
):
if offline:
raise WorkflowEngineException(
"Cannot allow to materialize packed CWL workflow in offline mode. Risk to access external content."
)

# Execute cwltool --pack
with open(packedLocalWorkflowFile, mode="wb") as packedH:
with tempfile.NamedTemporaryFile() as cwltool_pack_stderr:
# Writing straight to the file
retVal = subprocess.Popen(
[
f"{cwltool_install_dir}/bin/cwltool",
"--no-doc-cache",
"--pack",
localWorkflowFile,
],
stdout=packedH,
stderr=cwltool_pack_stderr,
cwd=cwltool_install_dir,
).wait()
packedLocalWorkflowFile = cast(
"AbsPath",
os.path.join(self.cacheWorkflowPackDir, localWorkflowPackedName),
)
if (
not os.path.isfile(packedLocalWorkflowFile)
or os.path.getsize(packedLocalWorkflowFile) == 0
):
if offline:
raise WorkflowEngineException(
"Cannot allow to materialize packed CWL workflow in offline mode. Risk to access external content."
)

# Proper error handling
if retVal != 0:
# Reading the output and error for the report
with open(cwltool_pack_stderr.name, "r") as c_stF:
cwltool_pack_stderr_v = c_stF.read()
# Execute cwltool --pack
with open(packedLocalWorkflowFile, mode="wb") as packedH:
with tempfile.NamedTemporaryFile() as cwltool_pack_stderr:
# Writing straight to the file
retVal = subprocess.Popen(
[
f"{cwltool_install_dir}/bin/cwltool",
"--no-doc-cache",
"--pack",
localWorkflowFile,
],
stdout=packedH,
stderr=cwltool_pack_stderr,
cwd=cwltool_install_dir,
).wait()

# Proper error handling
if retVal != 0:
# Reading the output and error for the report
with open(cwltool_pack_stderr.name, "r") as c_stF:
cwltool_pack_stderr_v = c_stF.read()

errstr = "Could not pack CWL running cwltool --pack {}. Retval {}\n======\nSTDERR\n======\n{}".format(
engineVersion, retVal, cwltool_pack_stderr_v
)
raise WorkflowEngineException(errstr)

errstr = "Could not pack CWL running cwltool --pack {}. Retval {}\n======\nSTDERR\n======\n{}".format(
engineVersion, retVal, cwltool_pack_stderr_v
)
raise WorkflowEngineException(errstr)
# Last, deploy a copy of this packed workflow in the working directory
link_or_copy(packedLocalWorkflowFile, consolidatedPackedWorkflowFile)

containerTags: "Set[str]" = set()

# Getting the identifiers
cwlVersion = None
# TODO: collect conda hints
with open(packedLocalWorkflowFile, encoding="utf-8") as pLWH:
with open(consolidatedPackedWorkflowFile, encoding="utf-8") as pLWH:
wf_yaml = yaml.safe_load(pLWH) # parse packed CWL
cwlVersion = wf_yaml.get("cwlVersion", "v1.0")
dockerExprParser = jsonpath_ng.ext.parse(
Expand All @@ -764,11 +781,12 @@ def materializeWorkflow(
containerTags.add(dockerPullId)

newLocalWf = LocalWorkflow(
dir=localWf.dir,
relPath=cast("RelPath", packedLocalWorkflowFile),
dir=consolidatedWorkflowDir,
relPath=cast("RelPath", localWorkflowPackedName),
effectiveCheckout=localWf.effectiveCheckout,
langVersion=cwlVersion,
relPathFiles=localWf.relPathFiles,
# No file should be needed
relPathFiles=[],
)
newWfEngine = MaterializedWorkflowEngine(
instance=matWorkflowEngine.instance,
Expand Down
9 changes: 7 additions & 2 deletions wfexs_backend/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
WORKDIR_OUTPUTS_RELDIR = "outputs"
WORKDIR_ENGINE_TWEAKS_RELDIR = "engineTweaks"
WORKDIR_WORKFLOW_RELDIR = "workflow"
WORKDIR_CONSOLIDATED_WORKFLOW_RELDIR = "consolidated-workflow"
WORKDIR_CONTAINERS_RELDIR = "containers"

WORKDIR_STDOUT_FILE = "stdout.txt"
Expand Down Expand Up @@ -516,7 +517,10 @@ def materializeEngine(

@abc.abstractmethod
def materializeWorkflow(
self, matWorfklowEngine: "MaterializedWorkflowEngine", offline: "bool" = False
self,
matWorfklowEngine: "MaterializedWorkflowEngine",
consolidatedWorkflowDir: "AbsPath",
offline: "bool" = False,
) -> "Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]":
"""
Method to ensure the workflow has been materialized. It returns the
Expand Down Expand Up @@ -622,10 +626,11 @@ def MaterializeWorkflowAndContainers(
cls,
matWfEng: "MaterializedWorkflowEngine",
containersDir: "AbsPath",
consolidatedWorkflowDir: "AbsPath",
offline: "bool" = False,
) -> "Tuple[MaterializedWorkflowEngine, ContainerEngineVersionStr, ContainerOperatingSystem, ProcessorArchitecture]":
matWfEngV2, listOfContainerTags = matWfEng.instance.materializeWorkflow(
matWfEng, offline=offline
matWfEng, consolidatedWorkflowDir, offline=offline
)

(
Expand Down
12 changes: 7 additions & 5 deletions wfexs_backend/nextflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,13 +1302,15 @@ def _genDockSingContainerTaggedName(
return None

def materializeWorkflow(
self, matWorkflowEngine: "MaterializedWorkflowEngine", offline: "bool" = False
self,
matWorkflowEngine: "MaterializedWorkflowEngine",
consolidatedWorkflowDir: "AbsPath",
offline: "bool" = False,
) -> "Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]":
"""
Method to ensure the workflow has been materialized. It returns the
localWorkflow directory, as well as the list of containers
For Nextflow it is usually a no-op, but for CWL it requires resolution
Method to ensure the workflow has been materialized. In the case
of Nextflow, it returns the input matWorkflowEngine after the checks,
as well as the list of containers
"""

# Default nextflow profile is 'standard'
Expand Down
Loading

0 comments on commit 06c46d1

Please sign in to comment.