From 0d4e555cc7db17f969096143c4c2654a2eed9a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Thu, 24 Aug 2023 04:19:07 +0200 Subject: [PATCH] Implemented container deployment phase on workflow execution. When a staged working directory is moved from one machine to another, and docker containers are used, the destination machine might not have the proper containers deployed there. The container deployment phase on workflow execution fixes that issue, just loading the frozen image obtained when the container was materialized. --- wfexs_backend/common.py | 14 + wfexs_backend/container.py | 88 ++++- wfexs_backend/docker_container.py | 488 ++++++++++++++++++------- wfexs_backend/engine.py | 34 ++ wfexs_backend/podman_container.py | 152 ++++++-- wfexs_backend/singularity_container.py | 13 + 6 files changed, 610 insertions(+), 179 deletions(-) diff --git a/wfexs_backend/common.py b/wfexs_backend/common.py index 1b843eda..8c77a236 100644 --- a/wfexs_backend/common.py +++ b/wfexs_backend/common.py @@ -559,6 +559,20 @@ def materialize_containers( ) -> "Tuple[ContainerEngineVersionStr, Sequence[Container], ContainerOperatingSystem, ProcessorArchitecture]": pass + @abc.abstractmethod + def deploy_containers( + self, + containers_list: "Sequence[Container]", + containersDir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "Sequence[Container]": + pass + + @property + @abc.abstractmethod + def staged_containers_dir(self) -> "AnyPath": + pass + @abc.abstractmethod def materializeEngine( self, localWf: "LocalWorkflow", engineVersion: "Optional[EngineVersion]" = None diff --git a/wfexs_backend/container.py b/wfexs_backend/container.py index bae002a9..b9f7a970 100644 --- a/wfexs_backend/container.py +++ b/wfexs_backend/container.py @@ -17,6 +17,7 @@ # limitations under the License. from __future__ import absolute_import +import copy import os import tempfile import atexit @@ -50,6 +51,7 @@ ) from typing_extensions import ( + TypeAlias, TypedDict, Final, ) @@ -68,13 +70,18 @@ RelPath, ) + DockerLikeManifest: TypeAlias = Mapping[str, Any] + MutableDockerLikeManifest: TypeAlias = MutableMapping[str, Any] + class DockerManifestMetadata(TypedDict): + image_id: "Fingerprint" image_signature: "Fingerprint" manifests_signature: "Fingerprint" - manifests: "Sequence[Mapping[str, Any]]" + manifests: "Sequence[DockerLikeManifest]" from . import common +from .utils.digests import ComputeDigestFromObject class ContainerFactoryException(AbstractWfExSException): @@ -315,12 +322,52 @@ def materializeSingleContainer( self, tag: "ContainerTaggedName", simpleFileNameMethod: "ContainerFileNamingMethod", - containers_dir: "Optional[Union[RelPath, AbsPath]]" = None, + containers_dir: "Optional[AnyPath]" = None, offline: "bool" = False, force: "bool" = False, ) -> "Optional[Container]": """ - It is assured the containers are materialized + It is assured the container is properly materialized + """ + pass + + def deployContainers( + self, + containers_list: "Sequence[Container]", + simpleFileNameMethod: "ContainerFileNamingMethod", + containers_dir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "Sequence[Container]": + """ + It is assured the containers are properly deployed + """ + redeployed_containers: "MutableSequence[Container]" = [] + + if containers_dir is None: + containers_dir = self.stagedContainersDir + for container in containers_list: + if self.AcceptsContainer(container): + was_redeployed = self.deploySingleContainer( + container, + simpleFileNameMethod, + containers_dir=containers_dir, + force=force, + ) + if was_redeployed is not None: + redeployed_containers.append(container) + + return redeployed_containers + + @abc.abstractmethod + def deploySingleContainer( + self, + container: "Container", + simpleFileNameMethod: "ContainerFileNamingMethod", + containers_dir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "bool": + """ + It is assured the container is properly deployed """ pass @@ -370,6 +417,18 @@ def materializeSingleContainer( """ return None + def deploySingleContainer( + self, + container: "Container", + simpleFileNameMethod: "ContainerFileNamingMethod", + containers_dir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "bool": + """ + This is a no-op + """ + return False + class AbstractDockerContainerFactory(ContainerFactory): ACCEPTED_CONTAINER_TYPES = set( @@ -387,3 +446,26 @@ def AcceptsContainerType( return not cls.ACCEPTED_CONTAINER_TYPES.isdisjoint( container_type if isinstance(container_type, set) else (container_type,) ) + + @classmethod + @abc.abstractmethod + def trimmable_manifest_keys(cls) -> "Sequence[str]": + pass + + def _gen_trimmed_manifests_signature( + self, manifests: "Sequence[DockerLikeManifest]" + ) -> "Fingerprint": + trimmed_manifests: "MutableSequence[DockerLikeManifest]" = [] + some_trimmed = False + for manifest in manifests: + # Copy the manifest + trimmed_manifest = cast("MutableDockerLikeManifest", copy.copy(manifest)) + # And trim the keys + for key in self.trimmable_manifest_keys(): + if key in trimmed_manifest: + del trimmed_manifest[key] + some_trimmed = True + + trimmed_manifests.append(trimmed_manifest) + + return ComputeDigestFromObject(trimmed_manifests if some_trimmed else manifests) diff --git a/wfexs_backend/docker_container.py b/wfexs_backend/docker_container.py index cb881b67..3ac746d3 100644 --- a/wfexs_backend/docker_container.py +++ b/wfexs_backend/docker_container.py @@ -31,6 +31,7 @@ if TYPE_CHECKING: from typing import ( + Any, Mapping, Optional, Sequence, @@ -38,6 +39,10 @@ Union, ) + from typing_extensions import ( + Final, + ) + from .common import ( AbsPath, AnyPath, @@ -46,11 +51,16 @@ ContainerOperatingSystem, ContainerTaggedName, ExitVal, + Fingerprint, ProcessorArchitecture, RelPath, URIType, ) + from .container import ( + DockerManifestMetadata, + ) + from .common import ( Container, ContainerType, @@ -62,13 +72,24 @@ ContainerEngineException, ContainerFactoryException, ) -from .utils.contents import link_or_copy -from .utils.digests import ComputeDigestFromFile, ComputeDigestFromObject +from .utils.contents import ( + link_or_copy, + real_unlink_if_exists, +) +from .utils.digests import ComputeDigestFromFile DOCKER_PROTO = "docker://" class DockerContainerFactory(AbstractDockerContainerFactory): + TRIMMABLE_MANIFEST_KEYS: "Final[Sequence[str]]" = [ + "RepoDigests", + ] + + @classmethod + def trimmable_manifest_keys(cls) -> "Sequence[str]": + return cls.TRIMMABLE_MANIFEST_KEYS + def __init__( self, cacheDir: "Optional[AnyPath]" = None, @@ -290,7 +311,7 @@ def materializeSingleContainer( self, tag: "ContainerTaggedName", simpleFileNameMethod: "ContainerFileNamingMethod", - containers_dir: "Optional[Union[RelPath, AbsPath]]" = None, + containers_dir: "Optional[AnyPath]" = None, offline: "bool" = False, force: "bool" = False, ) -> "Optional[Container]": @@ -321,32 +342,181 @@ def materializeSingleContainer( # Last case, it already has a registry declared self.logger.info(f"downloading docker container: {tag_name} => {dockerTag}") - if force: + # These are the paths to the copy of the saved container + containerFilename = simpleFileNameMethod(cast("URIType", tag_name)) + containerFilenameMeta = containerFilename + self.META_JSON_POSTFIX + localContainerPath = cast( + "AbsPath", + os.path.join(self.engineContainersSymlinkDir, containerFilename), + ) + localContainerPathMeta = cast( + "AbsPath", + os.path.join(self.engineContainersSymlinkDir, containerFilenameMeta), + ) + + # Keep a copy outside the cache directory + if containers_dir is None: + containers_dir = self.stagedContainersDir + containerPath = cast("AbsPath", os.path.join(containers_dir, containerFilename)) + containerPathMeta = cast( + "AbsPath", os.path.join(containers_dir, containerFilenameMeta) + ) + + # Now it is time to check whether the local cache of the container + # does exist and it is right + trusted_copy = False + image_id: "Optional[Fingerprint]" = None + imageSignature: "Optional[Fingerprint]" = None + manifestsImageSignature: "Optional[Fingerprint]" = None + manifests = None + manifest = None + if not force and os.path.isfile(localContainerPathMeta): + trusted_copy = True + try: + with open(localContainerPathMeta, mode="r", encoding="utf-8") as mH: + signaturesAndManifest = cast( + "DockerManifestMetadata", json.load(mH) + ) + image_id = signaturesAndManifest["image_id"] + imageSignature = signaturesAndManifest["image_signature"] + manifestsImageSignature = signaturesAndManifest[ + "manifests_signature" + ] + manifests = signaturesAndManifest["manifests"] + + # Check the status of the gathered manifests + trusted_copy = ( + manifestsImageSignature + == self._gen_trimmed_manifests_signature(manifests) + ) + except Exception as e: + self.logger.exception( + f"Problems extracting docker metadata at {localContainerPathMeta}" + ) + trusted_copy = False + + # Let's check metadata coherence + if trusted_copy: + trusted_copy = ( + imageSignature is not None + and manifestsImageSignature is not None + and manifests is not None + ) + + if trusted_copy: + assert manifestsImageSignature is not None + if os.path.islink(localContainerPathMeta): + # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols + unlinkedContainerPathMeta = os.readlink(localContainerPathMeta) + fsImageSignatureMeta = os.path.basename(unlinkedContainerPathMeta) + if fsImageSignatureMeta.endswith(self.META_JSON_POSTFIX): + fsImageSignatureMeta = fsImageSignatureMeta[ + : -len(self.META_JSON_POSTFIX) + ] + putativeManifestsImageSignature = ( + fsImageSignatureMeta.replace("~", "=") + .replace("-", "/") + .replace("_", "+") + ) + + trusted_copy = ( + putativeManifestsImageSignature == manifestsImageSignature + ) + if trusted_copy: + canonicalContainerPathMeta = os.path.join( + self.containersCacheDir, + fsImageSignatureMeta + self.META_JSON_POSTFIX, + ) + + trusted_copy = os.path.samefile( + os.path.realpath(localContainerPathMeta), + os.path.realpath(canonicalContainerPathMeta), + ) + else: + # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols + putativeCanonicalContainerPathMeta = os.path.join( + self.containersCacheDir, + manifestsImageSignature.replace("=", "~") + .replace("/", "-") + .replace("+", "_"), + ) + + # This is to detect poisoned caches + trusted_copy = os.path.samefile( + localContainerPathMeta, putativeCanonicalContainerPathMeta + ) + + # Now, let's check the image itself + if trusted_copy and os.path.isfile(localContainerPath): + trusted_copy = imageSignature == ComputeDigestFromFile( + localContainerPath + ) + + if trusted_copy: + assert imageSignature is not None + if os.path.islink(localContainerPath): + # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols + unlinkedContainerPath = os.readlink(localContainerPath) + fsImageSignature = os.path.basename(unlinkedContainerPath) + putativeImageSignature = ( + fsImageSignature.replace("~", "=") + .replace("-", "/") + .replace("_", "+") + ) + + trusted_copy = putativeImageSignature == manifestsImageSignature + if trusted_copy: + canonicalContainerPath = os.path.join( + self.containersCacheDir, + fsImageSignature, + ) + + trusted_copy = os.path.samefile( + os.path.realpath(localContainerPath), + os.path.realpath(canonicalContainerPath), + ) + + else: + # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols + putativeCanonicalContainerPath = os.path.join( + self.containersCacheDir, + imageSignature.replace("=", "~") + .replace("/", "-") + .replace("+", "_"), + ) + + # This is to detect poisoned caches + trusted_copy = os.path.samefile( + localContainerPath, putativeCanonicalContainerPath + ) + + # And now, the final judgement! + if force or not trusted_copy: if offline: raise ContainerFactoryException( f"Banned remove docker containers in offline mode from {tag_name}" ) + if os.path.exists(localContainerPathMeta) or os.path.exists( + localContainerPath + ): + self.logger.warning( + f"Unable to trust Docker container {tag_name} => {dockerTag} . Discarding cached contents" + ) + real_unlink_if_exists(localContainerPathMeta) + real_unlink_if_exists(localContainerPath) + # Blindly remove _, _, _ = self._rmi(dockerTag, matEnv) - d_retval = -1 - else: - d_retval, d_out_v, d_err_v = self._inspect(dockerTag, matEnv) - - # Time to pull the image - if d_retval != 0: - if offline: - raise ContainerFactoryException( - f"Banned pull docker containers in offline mode from {tag_name}" - ) + # And now, let's materialize the new world d_retval, d_out_v, d_err_v = self._pull(dockerTag, matEnv) if d_retval == 0: # Second try d_retval, d_out_v, d_err_v = self._inspect(dockerTag, matEnv) - if d_retval != 0: - errstr = """Could not materialize docker image {}. Retval {} + if d_retval != 0: + errstr = """Could not materialize docker image {}. Retval {} ====== STDOUT ====== @@ -356,96 +526,45 @@ def materializeSingleContainer( STDERR ====== {}""".format( - dockerTag, d_retval, d_out_v, d_err_v - ) - raise ContainerEngineException(errstr) - - # Parsing the output from docker inspect - try: - manifests = json.loads(d_out_v) - manifest = manifests[0] - except Exception as e: - raise ContainerFactoryException( - f"FATAL ERROR: Docker finished properly but it did not properly materialize {tag_name}: {e}" - ) - - # Then, compute the signature - tagId = manifest["Id"] - fingerprint = None - if len(manifest["RepoDigests"]) > 0: - fingerprint = manifest["RepoDigests"][0] + dockerTag, d_retval, d_out_v, d_err_v + ) + raise ContainerEngineException(errstr) - # Last but one, let's save a copy of the container locally - containerFilename = simpleFileNameMethod(cast("URIType", tag_name)) - containerFilenameMeta = containerFilename + self.META_JSON_POSTFIX - localContainerPath = cast( - "AbsPath", - os.path.join(self.engineContainersSymlinkDir, containerFilename), - ) - localContainerPathMeta = cast( - "AbsPath", - os.path.join(self.engineContainersSymlinkDir, containerFilenameMeta), - ) + # Parsing the output from docker inspect + try: + manifests = cast("Sequence[Mapping[str, Any]]", json.loads(d_out_v)) + manifest = manifests[0] + image_id = cast("Fingerprint", manifest["Id"]) + except Exception as e: + raise ContainerFactoryException( + f"FATAL ERROR: Docker finished properly but it did not properly materialize {tag_name}: {e}" + ) - self.logger.info( - "saving docker container (for reproducibility matters): {} => {}".format( - tag_name, localContainerPath + self.logger.info( + "saving docker container (for reproducibility matters): {} => {}".format( + tag_name, localContainerPath + ) ) - ) - # First, let's materialize the container image - manifestsImageSignature = ComputeDigestFromObject(manifests) - canonicalContainerPath = os.path.join( - self.containersCacheDir, - manifestsImageSignature.replace("=", "~") - .replace("/", "-") - .replace("+", "_"), - ) - canonicalContainerPathMeta = canonicalContainerPath + self.META_JSON_POSTFIX - # Defining the destinations - if os.path.isfile(canonicalContainerPathMeta): - with open(canonicalContainerPathMeta, mode="r", encoding="utf-8") as tcpm: - metadataLocal = json.load(tcpm) - - manifestsImageSignatureLocal = metadataLocal.get("manifests_signature") - manifestsImageSignatureLocalRead = ComputeDigestFromObject( - metadataLocal.get("manifests", []) + # Let's materialize the container image for preservation + manifestsImageSignature = self._gen_trimmed_manifests_signature(manifests) + canonicalContainerPath = os.path.join( + self.containersCacheDir, + manifestsImageSignature.replace("=", "~") + .replace("/", "-") + .replace("+", "_"), ) - if ( - manifestsImageSignature != manifestsImageSignatureLocal - or manifestsImageSignature != manifestsImageSignatureLocalRead - ): - self.logger.warning( - f"Corrupted canonical container metadata {tag_name}. Re-saving" - ) - saveContainerPathMeta = True - imageSignatureLocal = None - else: - saveContainerPathMeta = False - imageSignatureLocal = metadataLocal.get("image_signature") - else: - saveContainerPathMeta = True - imageSignature = None - imageSignatureLocal = None - - # Only trust when they match - tmpContainerPath: "Optional[str]" = os.path.join( - self.containersCacheDir, str(uuid.uuid4()) - ) - if os.path.isfile(canonicalContainerPath) and (imageSignatureLocal is not None): - imageSignatureLocalRead = ComputeDigestFromFile(canonicalContainerPath) - if imageSignatureLocalRead != imageSignatureLocal: - self.logger.warning( - f"Corrupted canonical container {tag_name}. Re-saving" - ) - else: - imageSignature = imageSignatureLocal - tmpContainerPath = None - if tmpContainerPath is not None: - saveContainerPathMeta = True + # Being sure the paths do not exist + if os.path.exists(canonicalContainerPath): + os.unlink(canonicalContainerPath) + canonicalContainerPathMeta = canonicalContainerPath + self.META_JSON_POSTFIX + if os.path.exists(canonicalContainerPathMeta): + os.unlink(canonicalContainerPathMeta) + + # Now, save the image as such d_retval, d_err_ev = self._save( - dockerTag, cast("AbsPath", tmpContainerPath), matEnv + dockerTag, cast("AbsPath", canonicalContainerPath), matEnv ) self.logger.debug("docker save retval: {}".format(d_retval)) self.logger.debug("docker save stderr: {}".format(d_err_v)) @@ -458,37 +577,33 @@ def materializeSingleContainer( {}""".format( dockerTag, d_retval, d_err_v ) - if os.path.exists(tmpContainerPath): + + # Removing partial dumps + if os.path.exists(canonicalContainerPath): try: - os.unlink(tmpContainerPath) + os.unlink(canonicalContainerPath) except: pass raise ContainerEngineException(errstr) - shutil.move(tmpContainerPath, canonicalContainerPath) - imageSignature = ComputeDigestFromFile(canonicalContainerPath) + imageSignature = cast( + "Fingerprint", ComputeDigestFromFile(canonicalContainerPath) + ) - if saveContainerPathMeta: + # Last, save the metadata itself for further usage with open(canonicalContainerPathMeta, mode="w", encoding="utf-8") as tcpM: - json.dump( - { - "image_signature": imageSignature, - "manifests_signature": manifestsImageSignature, - "manifests": manifests, - }, - tcpM, - ) - - # Now, check the relative symbolic link of image - createSymlink = True - if os.path.lexists(localContainerPath): - if os.path.realpath(localContainerPath) != os.path.realpath( - canonicalContainerPath - ): + manifest_metadata: "DockerManifestMetadata" = { + "image_id": image_id, + "image_signature": imageSignature, + "manifests_signature": manifestsImageSignature, + "manifests": manifests, + } + json.dump(manifest_metadata, tcpM) + + # Now, check the relative symbolic link of image + if os.path.lexists(localContainerPath): os.unlink(localContainerPath) - else: - createSymlink = False - if createSymlink: + os.symlink( os.path.relpath( canonicalContainerPath, self.engineContainersSymlinkDir @@ -496,16 +611,9 @@ def materializeSingleContainer( localContainerPath, ) - # Now, check the relative symbolic link of metadata - createSymlink = True - if os.path.lexists(localContainerPathMeta): - if os.path.realpath(localContainerPathMeta) != os.path.realpath( - canonicalContainerPathMeta - ): + # Now, check the relative symbolic link of metadata + if os.path.lexists(localContainerPathMeta): os.unlink(localContainerPathMeta) - else: - createSymlink = False - if createSymlink: os.symlink( os.path.relpath( canonicalContainerPathMeta, self.engineContainersSymlinkDir @@ -513,13 +621,10 @@ def materializeSingleContainer( localContainerPathMeta, ) - # Last, hardlink or copy the container and its metadata - if containers_dir is None: - containers_dir = self.stagedContainersDir - containerPath = cast("AbsPath", os.path.join(containers_dir, containerFilename)) - containerPathMeta = cast( - "AbsPath", os.path.join(containers_dir, containerFilenameMeta) - ) + assert manifestsImageSignature is not None + assert manifests is not None + if manifest is None: + manifest = manifests[0] # Do not allow overwriting in offline mode if not offline or not os.path.exists(containerPath): @@ -527,6 +632,13 @@ def materializeSingleContainer( if not offline or not os.path.exists(containerPathMeta): link_or_copy(localContainerPathMeta, containerPathMeta) + # Now the image is not loaded here, but later in deploySingleContainer + + # Then, compute the fingerprint + fingerprint = None + if len(manifest["RepoDigests"]) > 0: + fingerprint = manifest["RepoDigests"][0] + # Learning about the intended processor architecture and variant architecture = manifest.get("Architecture") if architecture is not None: @@ -537,7 +649,7 @@ def materializeSingleContainer( return Container( origTaggedName=tag_name, taggedName=cast("URIType", dockerTag), - signature=tagId, + signature=image_id, fingerprint=fingerprint, architecture=architecture, operatingSystem=manifest.get("Os"), @@ -545,3 +657,101 @@ def materializeSingleContainer( localPath=containerPath, registries=tag.registries, ) + + def deploySingleContainer( + self, + container: "Container", + simpleFileNameMethod: "ContainerFileNamingMethod", + containers_dir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "bool": + # Should we load the image? + matEnv = dict(os.environ) + matEnv.update(self.environment) + dockerTag = container.taggedName + tag_name = container.origTaggedName + + # These are the paths to the copy of the saved container + containerFilename = simpleFileNameMethod(cast("URIType", tag_name)) + containerFilenameMeta = containerFilename + self.META_JSON_POSTFIX + + # Keep a copy outside the cache directory + if containers_dir is None: + containers_dir = self.stagedContainersDir + containerPath = cast("AbsPath", os.path.join(containers_dir, containerFilename)) + containerPathMeta = cast( + "AbsPath", os.path.join(containers_dir, containerFilenameMeta) + ) + + imageSignature: "Optional[Fingerprint]" = None + manifestsImageSignature: "Optional[Fingerprint]" = None + manifests = None + manifest = None + if not os.path.isfile(containerPathMeta): + errmsg = f"FATAL ERROR: Docker saved image {containerFilenameMeta} is not in the staged working dir for {tag_name}" + self.logger.error(errmsg) + raise ContainerFactoryException(errmsg) + + try: + with open(containerPathMeta, mode="r", encoding="utf-8") as mH: + signaturesAndManifest = cast("DockerManifestMetadata", json.load(mH)) + imageSignature = signaturesAndManifest["image_signature"] + manifestsImageSignature = signaturesAndManifest["manifests_signature"] + manifests = signaturesAndManifest["manifests"] + except Exception as e: + errmsg = f"Problems extracting docker metadata at {containerPathMeta}" + self.logger.exception(errmsg) + raise ContainerFactoryException(errmsg) + + d_retval, d_out_v, d_err_v = self._inspect(dockerTag, matEnv) + # d_retval, d_out_v, d_err_v = self._images(matEnv) + + if d_retval not in (0, 125): + errstr = """Could not inspect docker image {}. Retval {} +====== +STDOUT +====== +{} + +====== +STDERR +====== +{}""".format( + dockerTag, d_retval, d_out_v, d_err_v + ) + raise ContainerEngineException(errstr) + + # Parsing the output from docker inspect + try: + ins_manifests = json.loads(d_out_v) + except Exception as e: + errmsg = f"FATAL ERROR: Docker inspect finished properly but it did not properly answered for {tag_name}" + self.logger.exception(errmsg) + raise ContainerFactoryException(errmsg) from e + + # Let's load then + do_redeploy = manifestsImageSignature != self._gen_trimmed_manifests_signature( + ins_manifests + ) + if do_redeploy: + self.logger.debug(f"Redeploying {dockerTag}") + # Should we load the image? + d_retval, d_out_v, d_err_v = self._load(containerPath, dockerTag, matEnv) + + if d_retval != 0: + errstr = """Could not load docker image {}. Retval {} +====== +STDOUT +====== +{} + +====== +STDERR +====== +{}""".format( + dockerTag, d_retval, d_out_v, d_err_v + ) + self.logger.error(errstr) + raise ContainerEngineException(errstr) + + return do_redeploy diff --git a/wfexs_backend/engine.py b/wfexs_backend/engine.py index c5e550a6..ed0f71db 100644 --- a/wfexs_backend/engine.py +++ b/wfexs_backend/engine.py @@ -548,6 +548,9 @@ def materialize_containers( containersDir: "Optional[AnyPath]" = None, offline: "bool" = False, ) -> "Tuple[ContainerEngineVersionStr, Sequence[Container], ContainerOperatingSystem, ProcessorArchitecture]": + if containersDir is None: + containersDir = self.stagedContainersDir + return ( self.container_factory.engine_version(), self.container_factory.materializeContainers( @@ -559,6 +562,26 @@ def materialize_containers( *self.container_factory.architecture, ) + def deploy_containers( + self, + containers_list: "Sequence[Container]", + containersDir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "Sequence[Container]": + if containersDir is None: + containersDir = self.stagedContainersDir + + return self.container_factory.deployContainers( + containers_list=containers_list, + simpleFileNameMethod=self.simpleContainerFileName, + containers_dir=containersDir, + force=force, + ) + + @property + def staged_containers_dir(self) -> "AnyPath": + return self.stagedContainersDir + @abc.abstractmethod def launchWorkflow( self, @@ -577,6 +600,17 @@ def ExecuteWorkflow( environment: "Sequence[MaterializedInput]", outputs: "Sequence[ExpectedOutput]", ) -> "StagedExecution": + # Now, deploy the containers to the local registry (needed for Docker) + if matWfEng.containers is not None: + matWfEng.instance.deploy_containers( + matWfEng.containers, matWfEng.instance.staged_containers_dir + ) + if matWfEng.operational_containers is not None: + matWfEng.instance.deploy_containers( + matWfEng.operational_containers, matWfEng.instance.staged_containers_dir + ) + + # And once deployed, let's run the workflow! stagedExec = matWfEng.instance.launchWorkflow( matWfEng, inputs, environment, outputs ) diff --git a/wfexs_backend/podman_container.py b/wfexs_backend/podman_container.py index 5ec18d52..2df40e13 100644 --- a/wfexs_backend/podman_container.py +++ b/wfexs_backend/podman_container.py @@ -39,6 +39,10 @@ Union, ) + from typing_extensions import ( + Final, + ) + from .common import ( AbsPath, AnyPath, @@ -71,12 +75,23 @@ link_or_copy, real_unlink_if_exists, ) -from .utils.digests import ComputeDigestFromFile, ComputeDigestFromObject +from .utils.digests import ComputeDigestFromFile DOCKER_PROTO = "docker://" class PodmanContainerFactory(AbstractDockerContainerFactory): + TRIMMABLE_MANIFEST_KEYS: "Final[Sequence[str]]" = [ + "Digest", + "RepoDigests", + "Size", + "VirtualSize", + ] + + @classmethod + def trimmable_manifest_keys(cls) -> "Sequence[str]": + return cls.TRIMMABLE_MANIFEST_KEYS + def __init__( self, cacheDir: "Optional[AnyPath]" = None, @@ -399,6 +414,7 @@ def materializeSingleContainer( # Now it is time to check whether the local cache of the container # does exist and it is right trusted_copy = False + image_id: "Optional[Fingerprint]" = None imageSignature: "Optional[Fingerprint]" = None manifestsImageSignature: "Optional[Fingerprint]" = None manifests = None @@ -410,6 +426,7 @@ def materializeSingleContainer( signaturesAndManifest = cast( "DockerManifestMetadata", json.load(mH) ) + image_id = signaturesAndManifest["image_id"] imageSignature = signaturesAndManifest["image_signature"] manifestsImageSignature = signaturesAndManifest[ "manifests_signature" @@ -417,8 +434,9 @@ def materializeSingleContainer( manifests = signaturesAndManifest["manifests"] # Check the status of the gathered manifests - trusted_copy = manifestsImageSignature == ComputeDigestFromObject( - manifests + trusted_copy = ( + manifestsImageSignature + == self._gen_trimmed_manifests_signature(manifests) ) except Exception as e: self.logger.exception( @@ -427,11 +445,15 @@ def materializeSingleContainer( trusted_copy = False # Let's check metadata coherence - assert imageSignature is not None - assert manifestsImageSignature is not None - assert manifests is not None + if trusted_copy: + trusted_copy = ( + imageSignature is not None + and manifestsImageSignature is not None + and manifests is not None + ) if trusted_copy: + assert manifestsImageSignature is not None if os.path.islink(localContainerPathMeta): # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols unlinkedContainerPathMeta = os.readlink(localContainerPathMeta) @@ -480,6 +502,7 @@ def materializeSingleContainer( ) if trusted_copy: + assert imageSignature is not None if os.path.islink(localContainerPath): # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols unlinkedContainerPath = os.readlink(localContainerPath) @@ -560,6 +583,7 @@ def materializeSingleContainer( try: manifests = cast("Sequence[Mapping[str, Any]]", json.loads(d_out_v)) manifest = manifests[0] + image_id = cast("Fingerprint", manifest["Id"]) except Exception as e: raise ContainerFactoryException( f"FATAL ERROR: Podman finished properly but it did not properly materialize {tag_name}: {e}" @@ -572,7 +596,7 @@ def materializeSingleContainer( ) # Let's materialize the container image for preservation - manifestsImageSignature = ComputeDigestFromObject(manifests) + manifestsImageSignature = self._gen_trimmed_manifests_signature(manifests) canonicalContainerPath = os.path.join( self.containersCacheDir, manifestsImageSignature.replace("=", "~") @@ -618,6 +642,7 @@ def materializeSingleContainer( # Last, save the metadata itself for further usage with open(canonicalContainerPathMeta, mode="w", encoding="utf-8") as tcpM: manifest_metadata: "DockerManifestMetadata" = { + "image_id": image_id, "image_signature": imageSignature, "manifests_signature": manifestsImageSignature, "manifests": manifests, @@ -656,7 +681,78 @@ def materializeSingleContainer( if not offline or not os.path.exists(containerPathMeta): link_or_copy(localContainerPathMeta, containerPathMeta) + # Now the image is not loaded here, but later in deploySingleContainer + + # Then, compute the fingerprint based on remote repo's information + fingerprint = None + if len(manifest["RepoDigests"]) > 0: + fingerprint = manifest["RepoDigests"][0] + + # Learning about the intended processor architecture and variant + architecture = manifest.get("Architecture") + # As of version 4.5.0, podman does not report the architecture variant + if architecture is not None: + variant = manifest.get("Variant") + if variant is not None: + architecture += "/" + variant + # And add to the list of containers + return Container( + origTaggedName=tag_name, + taggedName=cast("URIType", dockerTag), + signature=image_id, + fingerprint=fingerprint, + architecture=architecture, + operatingSystem=manifest.get("Os"), + type=self.containerType, + localPath=containerPath, + registries=tag.registries, + ) + + def deploySingleContainer( + self, + container: "Container", + simpleFileNameMethod: "ContainerFileNamingMethod", + containers_dir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "bool": # Should we load the image? + matEnv = dict(os.environ) + matEnv.update(self.environment) + dockerTag = container.taggedName + tag_name = container.origTaggedName + + # These are the paths to the copy of the saved container + containerFilename = simpleFileNameMethod(cast("URIType", tag_name)) + containerFilenameMeta = containerFilename + self.META_JSON_POSTFIX + + # Keep a copy outside the cache directory + if containers_dir is None: + containers_dir = self.stagedContainersDir + containerPath = cast("AbsPath", os.path.join(containers_dir, containerFilename)) + containerPathMeta = cast( + "AbsPath", os.path.join(containers_dir, containerFilenameMeta) + ) + + imageSignature: "Optional[Fingerprint]" = None + manifestsImageSignature: "Optional[Fingerprint]" = None + manifests = None + manifest = None + if not os.path.isfile(containerPathMeta): + errmsg = f"FATAL ERROR: Podman saved image {containerFilenameMeta} is not in the staged working dir for {tag_name}" + self.logger.error(errmsg) + raise ContainerFactoryException(errmsg) + + try: + with open(containerPathMeta, mode="r", encoding="utf-8") as mH: + signaturesAndManifest = cast("DockerManifestMetadata", json.load(mH)) + imageSignature = signaturesAndManifest["image_signature"] + manifestsImageSignature = signaturesAndManifest["manifests_signature"] + manifests = signaturesAndManifest["manifests"] + except Exception as e: + errmsg = f"Problems extracting podman metadata at {containerPathMeta}" + self.logger.exception(errmsg) + raise ContainerFactoryException(errmsg) + d_retval, d_out_v, d_err_v = self._inspect(dockerTag, matEnv) # d_retval, d_out_v, d_err_v = self._images(matEnv) @@ -671,7 +767,7 @@ def materializeSingleContainer( STDERR ====== {}""".format( - podmanPullTag, d_retval, d_out_v, d_err_v + dockerTag, d_retval, d_out_v, d_err_v ) raise ContainerEngineException(errstr) @@ -679,12 +775,16 @@ def materializeSingleContainer( try: ins_manifests = json.loads(d_out_v) except Exception as e: - raise ContainerFactoryException( - f"FATAL ERROR: Podman inspect finished properly but it did not properly answered for {tag_name}: {e}" - ) + errmsg = f"FATAL ERROR: Podman inspect finished properly but it did not properly answered for {tag_name}" + self.logger.exception(errmsg) + raise ContainerFactoryException(errmsg) from e # Let's load then - if manifestsImageSignature != ComputeDigestFromObject(ins_manifests): + do_redeploy = manifestsImageSignature != self._gen_trimmed_manifests_signature( + ins_manifests + ) + if do_redeploy: + self.logger.debug(f"Redeploying {dockerTag}") # Should we load the image? d_retval, d_out_v, d_err_v = self._load(containerPath, dockerTag, matEnv) @@ -699,31 +799,9 @@ def materializeSingleContainer( STDERR ====== {}""".format( - podmanPullTag, d_retval, d_out_v, d_err_v + dockerTag, d_retval, d_out_v, d_err_v ) + self.logger.error(errstr) raise ContainerEngineException(errstr) - # Then, compute the signature - fingerprint = None - if len(manifest["RepoDigests"]) > 0: - fingerprint = manifest["RepoDigests"][0] - - # Learning about the intended processor architecture and variant - architecture = manifest.get("Architecture") - # As of version 4.5.0, podman does not report the architecture variant - if architecture is not None: - variant = manifest.get("Variant") - if variant is not None: - architecture += "/" + variant - # And add to the list of containers - return Container( - origTaggedName=tag_name, - taggedName=cast("URIType", dockerTag), - signature=manifest["Id"], - fingerprint=fingerprint, - architecture=architecture, - operatingSystem=manifest.get("Os"), - type=self.containerType, - localPath=containerPath, - registries=tag.registries, - ) + return do_redeploy diff --git a/wfexs_backend/singularity_container.py b/wfexs_backend/singularity_container.py index 6233ba3f..d2aaf5f5 100644 --- a/wfexs_backend/singularity_container.py +++ b/wfexs_backend/singularity_container.py @@ -790,3 +790,16 @@ def materializeContainers( ) return containersList + + def deploySingleContainer( + self, + container: "Container", + simpleFileNameMethod: "ContainerFileNamingMethod", + containers_dir: "Optional[AnyPath]" = None, + force: "bool" = False, + ) -> "bool": + """ + This is almost no-op, but it should check + the integrity of the local images + """ + return force