Skip to content

Commit

Permalink
Archive Cancelled Workflows (#939)
Browse files Browse the repository at this point in the history
* Archive cancelled workflows

---------

Co-authored-by: leahh <leahh@lanl.gov>
  • Loading branch information
Leahh02 and leahh authored Nov 6, 2024
1 parent 0e27361 commit c0117fc
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 14 deletions.
4 changes: 3 additions & 1 deletion beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,17 +684,19 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id),
# Check if the workflow is archived
wf_status = get_wf_status(wf_id)
if wf_status == 'Archived':
copy_dag_in_archive = False
bee_workdir = wf_utils.get_bee_workdir()
mount_dir = os.path.join(bee_workdir, 'gdb_mount')
graphmls_dir = mount_dir + '/graphmls'
typer.secho("Workflow has been archived. All new DAGs will look the same as the one "
"in the archive directory.",
fg=typer.colors.MAGENTA)
else:
copy_dag_in_archive = True
wf_dir = wf_utils.get_workflow_dir(wf_id)
graphmls_dir = wf_dir + '/graphmls'
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir)
wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive)
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)

Expand Down
41 changes: 31 additions & 10 deletions beeflow/common/gdb/generate_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import networkx as nx
import graphviz

from beeflow.wf_manager.resources import wf_utils

def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):

def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive):
"""Generate a PNG of a workflow graph from a GraphML file."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
Expand All @@ -18,13 +20,7 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):
os.makedirs(dags_dir, exist_ok=True)

output_path = dags_dir + "/" + short_id + ".png"
if os.path.exists(output_path):
i = 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
while os.path.exists(backup_path):
i += 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
shutil.copy(output_path, backup_path)
backup_dag(output_path, dags_dir, short_id)

# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)
Expand All @@ -38,8 +34,27 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):

# Render the graph and save as PNG
png_data = dot.pipe(format='png')
with open(output_path, "wb") as png_file:
png_file.write(png_data)
save_png(output_path, png_data)

if copy_dag_in_archive:
# Save and backup DAGs in the workflow_dir which will be archived
workflow_dir = wf_utils.get_workflow_dir(wf_id)
archive_dag_dir = workflow_dir + "/dags"
os.makedirs(archive_dag_dir, exist_ok=True)
archive_dag_path = archive_dag_dir + "/" + short_id + ".png"
backup_dag(archive_dag_path, archive_dag_dir, short_id)
save_png(archive_dag_path, png_data)


def backup_dag(path, dags_dir, short_id):
"""Backup DAGs."""
if os.path.exists(path):
i = 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
while os.path.exists(backup_path):
i += 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
shutil.copy(path, backup_path)


def add_nodes_to_dot(graph, dot):
Expand Down Expand Up @@ -91,3 +106,9 @@ def add_edges_to_dot(graph, dot):
fontsize="10", fontname="times-bold")
else:
dot.edge(target, source, label=edge_label, fontsize="10")


def save_png(output_path, png_data):
"""Save png data."""
with open(output_path, "wb") as png_file:
png_file.write(png_data)
4 changes: 4 additions & 0 deletions beeflow/tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def execute_workflow(self):
"""Fake executing a workflow."""
pass # noqa

def export_graphml(self):
"""Fake exporting a graphml of a workflow."""
pass # noqa


class MockGDBDriver:
"""A mock GDB driver.
Expand Down
1 change: 1 addition & 0 deletions beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db):
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.archive_workflow', return_value=None)

wf_name = 'wf'
workdir = 'dir'
Expand Down
3 changes: 3 additions & 0 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from flask_restful import Resource, reqparse
from beeflow.common import log as bee_logging
from beeflow.wf_manager.resources import wf_utils
from beeflow.wf_manager.resources.wf_update import archive_workflow

from beeflow.common.db import wfm_db
from beeflow.common.db.bdb import connect_db
Expand Down Expand Up @@ -64,6 +65,8 @@ def delete(self, wf_id):
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info(f"Workflow {wf_id} cancelled")
# Archive cancelled workflow
archive_workflow(db, wf_id, final_state='Cancelled')
resp = make_response(jsonify(status='Cancelled'), 202)
elif option == "remove":
log.info(f"Removing workflow {wf_id}.")
Expand Down
4 changes: 3 additions & 1 deletion beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def archive_workflow(db, wf_id, final_state=None):
# Archive Completed DAG
graphmls_dir = workflow_dir + "/graphmls"
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, workflow_dir, graphmls_dir, no_dag_dir=True)
dags_dir = workflow_dir + "/dags"
os.makedirs(dags_dir, exist_ok=True)
wf_utils.export_dag(wf_id, dags_dir, graphmls_dir, no_dag_dir=True, copy_dag_in_archive=False)

wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
Expand Down
4 changes: 2 additions & 2 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
start_workflow(wf_id)


def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir):
def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive):
"""Export the DAG of the workflow."""
wfi = get_workflow_interface(wf_id)
wfi.export_graphml()
update_graphml(wf_id, graphmls_dir)
generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir)
generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive)


def start_workflow(wf_id):
Expand Down

0 comments on commit c0117fc

Please sign in to comment.