Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add on_failure to handle cleanup during pipeline failure #1651

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Helpful Resources

- Review our `comprehensive guide <https://scancode-toolkit.readthedocs.io/en/latest/contribute/index.html>`_
for more details on how to add quality contributions to our codebase and documentation
- Check this free resource on `how to contribute to an open source project on github <https://egghead.io/courses/how-to-contribute-to-an-open-source-project-on-github>`_
- Check this free resource on `How to contribute to an open source project on github <https://egghead.io/lessons/javascript-identifying-how-to-contribute-to-an-open-source-project-on-github>`_
- Follow `this wiki page <https://aboutcode.readthedocs.io/en/latest/contributing/writing_good_commit_messages.html>`_
on how to write good commit messages
- `Pro Git book <https://git-scm.com/book/en/v2>`_
Expand Down
6 changes: 5 additions & 1 deletion docs/source/tutorial_add_importer_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,14 @@ version management from `univers <https://github.com/aboutcode-org/univers>`_.
**advisories_count** should never be directly added in steps.


.. attention::

Implement ``on_failure`` to handle cleanup in case of pipeline failure.
Cleanup of downloaded archives or cloned repos is necessary to avoid potential resource leakage.

.. note::

| Use ``make valid`` to format your code using black and isort automatically.
| Use ``make valid`` to format your new code using black and isort automatically.
| Use ``make check`` to check for formatting errors.

Register the Importer Pipeline
Expand Down
5 changes: 5 additions & 0 deletions docs/source/tutorial_add_improver_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ methods.
self.log(f"Successfully flagged {ghost_package_count:,d} ghost Packages")


.. attention::

Implement ``on_failure`` to handle cleanup in case of pipeline failure.
Cleanup of downloaded archives or cloned repos is necessary to avoid potential resource leakage.

.. note::

| Use ``make valid`` to format your new code using black and isort automatically.
Expand Down
57 changes: 55 additions & 2 deletions vulnerabilities/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import logging
from datetime import datetime
from datetime import timezone
from timeit import default_timer as timer
from traceback import format_exc as traceback_format_exc
from typing import Iterable

from aboutcode.pipeline import BasePipeline
from aboutcode.pipeline import LoopProgress
from aboutcode.pipeline import humanize_time

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.improver import MAX_CONFIDENCE
Expand All @@ -29,6 +31,57 @@
class VulnerableCodePipeline(BasePipeline):
pipeline_id = None # Unique Pipeline ID

def on_failure(self):
"""
Tasks to run in the event that pipeline execution fails.

Implement cleanup or other tasks that need to be performed
on pipeline failure, such as:
- Removing cloned repositories.
- Deleting downloaded archives.
"""
pass

def execute(self):
"""Execute each steps in the order defined on this pipeline class."""
self.log(f"Pipeline [{self.pipeline_name}] starting")

steps = self.pipeline_class.get_steps(groups=self.selected_groups)
steps_count = len(steps)
pipeline_start_time = timer()

for current_index, step in enumerate(steps, start=1):
step_name = step.__name__

if self.selected_steps and step_name not in self.selected_steps:
self.log(f"Step [{step_name}] skipped")
continue

self.set_current_step(f"{current_index}/{steps_count} {step_name}")
self.log(f"Step [{step_name}] starting")
step_start_time = timer()

try:
step(self)
except Exception as exception:
self.log("Pipeline failed")
on_failure_start_time = timer()
self.log(f"Running [on_failure] tasks")
self.on_failure()
on_failure_run_time = timer() - on_failure_start_time
self.log(f"Completed [on_failure] tasks in {humanize_time(on_failure_run_time)}")

return 1, self.output_from_exception(exception)

step_run_time = timer() - step_start_time
self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}")

self.set_current_step("") # Reset the `current_step` field on completion
pipeline_run_time = timer() - pipeline_start_time
self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}")

return 0, ""

def log(self, message, level=logging.INFO):
"""Log the given `message` to the current module logger and execution_log."""
now_local = datetime.now(timezone.utc).astimezone()
Expand All @@ -51,8 +104,8 @@ class VulnerableCodeBaseImporterPipeline(VulnerableCodePipeline):
Base importer pipeline for importing advisories.

Uses:
Subclass this Pipeline and implement ``advisories_count`` and ``collect_advisories`` method.
Also override the ``steps`` and ``advisory_confidence`` as needed.
Subclass this Pipeline and implement ``advisories_count`` and ``collect_advisories``
method. Also override the ``steps`` and ``advisory_confidence`` as needed.
"""

pipeline_id = None # Unique Pipeline ID, this should be the name of pipeline module.
Expand Down
3 changes: 3 additions & 0 deletions vulnerabilities/pipelines/gitlab_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def clean_downloads(self):
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()


def parse_advisory_path(base_path: Path, file_path: Path) -> Tuple[str, str, str]:
"""
Expand Down
3 changes: 3 additions & 0 deletions vulnerabilities/pipelines/npm_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,6 @@ def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
3 changes: 3 additions & 0 deletions vulnerabilities/pipelines/pypa_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
29 changes: 29 additions & 0 deletions vulnerabilities/tests/pipelines/test_base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.tests.pipelines import TestLogger

advisory_data1 = AdvisoryData(
aliases=["CVE-2020-13371337"],
Expand Down Expand Up @@ -47,6 +49,33 @@ def get_advisory1(created_by="test_pipeline"):
)


class TestVulnerableCodePipeline(TestCase):
def test_on_failure(self):
class TestPipeline(VulnerableCodePipeline):
def __init__(self, test_logger):
super().__init__()
self.log = test_logger.write

@classmethod
def steps(cls):
return (cls.step1,)

def step1(self):
raise Exception("Something went wrong!")

def on_failure(self):
self.log("Doing cleanup.")

logger = TestLogger()
pipeline = TestPipeline(test_logger=logger)

pipeline.execute()
log_result = logger.getvalue()

self.assertIn("Pipeline failed", log_result)
self.assertIn("Running [on_failure] tasks", log_result)


class TestVulnerableCodeBaseImporterPipeline(TestCase):
@patch.object(
VulnerableCodeBaseImporterPipeline,
Expand Down
Loading