From fd064847818e7f3b07742c4987bcfdcef0ef53e9 Mon Sep 17 00:00:00 2001 From: leahh Date: Thu, 3 Oct 2024 08:44:43 -0600 Subject: [PATCH 1/6] add call to archive_workflow for cancelled workflows --- beeflow/wf_manager/resources/wf_actions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 30b0d6285..3ede430be 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -6,6 +6,7 @@ from flask_restful import Resource, reqparse from beeflow.common import log as bee_logging from beeflow.wf_manager.resources import wf_utils +from beeflow.wf_manager.resources.wf_update import archive_workflow from beeflow.common.db import wfm_db from beeflow.common.db.bdb import connect_db @@ -64,7 +65,9 @@ def delete(self, wf_id): wf_utils.update_wf_status(wf_id, 'Cancelled') db.workflows.update_workflow_state(wf_id, 'Cancelled') log.info(f"Workflow {wf_id} cancelled") - resp = make_response(jsonify(status='Cancelled'), 202) + # Archive cancelled workflow + archive_workflow(db, wf_id, final_state='Cancelled') + resp = make_response(jsonify(status='Cancelled and Archived'), 202) elif option == "remove": log.info(f"Removing workflow {wf_id}.") db.workflows.delete_workflow(wf_id) From 2285fff34f88204ad02d344c8b8f0fff9135b248 Mon Sep 17 00:00:00 2001 From: leahh Date: Thu, 3 Oct 2024 09:06:23 -0600 Subject: [PATCH 2/6] update status message in test --- beeflow/tests/test_wf_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index f46998183..dfb1621c4 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -200,7 +200,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): request = {'wf_id': WF_ID, 'option': 'cancel'} resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request) - assert resp.json['status'] == 'Cancelled' + assert resp.json['status'] == 'Cancelled and Archived' assert resp.status_code == 202 From 99bcd3fdeab747338003042fc4ce1fee71df6a1f Mon Sep 17 00:00:00 2001 From: leahh Date: Tue, 22 Oct 2024 07:54:37 -0600 Subject: [PATCH 3/6] revert resp to original --- beeflow/tests/test_wf_manager.py | 2 +- beeflow/wf_manager/resources/wf_actions.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index dfb1621c4..f46998183 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -200,7 +200,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): request = {'wf_id': WF_ID, 'option': 'cancel'} resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request) - assert resp.json['status'] == 'Cancelled and Archived' + assert resp.json['status'] == 'Cancelled' assert resp.status_code == 202 diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 3ede430be..2e9223405 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -67,7 +67,7 @@ def delete(self, wf_id): log.info(f"Workflow {wf_id} cancelled") # Archive cancelled workflow archive_workflow(db, wf_id, final_state='Cancelled') - resp = make_response(jsonify(status='Cancelled and Archived'), 202) + resp = make_response(jsonify(status='Cancelled'), 202) elif option == "remove": log.info(f"Removing workflow {wf_id}.") db.workflows.delete_workflow(wf_id) From 90dce1e563b6a2a9272ae7070e5bbb811e2478a7 Mon Sep 17 00:00:00 2001 From: leahh Date: Tue, 22 Oct 2024 09:40:24 -0600 Subject: [PATCH 4/6] save DAGs to archive --- beeflow/client/bee_client.py | 4 ++- beeflow/common/gdb/generate_graph.py | 41 +++++++++++++++++------ beeflow/wf_manager/resources/wf_update.py | 4 ++- beeflow/wf_manager/resources/wf_utils.py | 4 +-- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 139583da4..8bcac437f 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -643,6 +643,7 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id), # Check if the workflow is archived wf_status = get_wf_status(wf_id) if wf_status == 'Archived': + copy_dag_in_archive = False bee_workdir = wf_utils.get_bee_workdir() mount_dir = os.path.join(bee_workdir, 'gdb_mount') graphmls_dir = mount_dir + '/graphmls' @@ -650,10 +651,11 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id), "in the archive directory.", fg=typer.colors.MAGENTA) else: + copy_dag_in_archive = True wf_dir = wf_utils.get_workflow_dir(wf_id) graphmls_dir = wf_dir + '/graphmls' os.makedirs(graphmls_dir, exist_ok=True) - wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir) + wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive) typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.", fg=typer.colors.GREEN) diff --git a/beeflow/common/gdb/generate_graph.py b/beeflow/common/gdb/generate_graph.py index d823f5fcf..4ef9a8147 100644 --- a/beeflow/common/gdb/generate_graph.py +++ b/beeflow/common/gdb/generate_graph.py @@ -5,8 +5,10 @@ import networkx as nx import graphviz +from beeflow.wf_manager.resources import wf_utils -def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir): + +def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive): """Generate a PNG of a workflow graph from a GraphML file.""" short_id = wf_id[:6] graphml_path = graphmls_dir + "/" + short_id + ".graphml" @@ -18,13 +20,7 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir): os.makedirs(dags_dir, exist_ok=True) output_path = dags_dir + "/" + short_id + ".png" - if os.path.exists(output_path): - i = 1 - backup_path = f'{dags_dir}/{short_id}_v{i}.png' - while os.path.exists(backup_path): - i += 1 - backup_path = f'{dags_dir}/{short_id}_v{i}.png' - shutil.copy(output_path, backup_path) + backup_dag(output_path, dags_dir, short_id) # Load the GraphML file using NetworkX graph = nx.read_graphml(graphml_path) @@ -38,8 +34,27 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir): # Render the graph and save as PNG png_data = dot.pipe(format='png') - with open(output_path, "wb") as png_file: - png_file.write(png_data) + save_png(output_path, png_data) + + if copy_dag_in_archive: + # Save and backup DAGs in the workflow_dir which will be archived + workflow_dir = wf_utils.get_workflow_dir(wf_id) + archive_dag_dir = workflow_dir + "/dags" + os.makedirs(archive_dag_dir, exist_ok=True) + archive_dag_path = archive_dag_dir + "/" + short_id + ".png" + backup_dag(archive_dag_path, archive_dag_dir, short_id) + save_png(archive_dag_path, png_data) + + +def backup_dag(path, dags_dir, short_id): + """Backup DAGs.""" + if os.path.exists(path): + i = 1 + backup_path = f'{dags_dir}/{short_id}_v{i}.png' + while os.path.exists(backup_path): + i += 1 + backup_path = f'{dags_dir}/{short_id}_v{i}.png' + shutil.copy(path, backup_path) def add_nodes_to_dot(graph, dot): @@ -91,3 +106,9 @@ def add_edges_to_dot(graph, dot): fontsize="10", fontname="times-bold") else: dot.edge(target, source, label=edge_label, fontsize="10") + + +def save_png(output_path, png_data): + """Save png data.""" + with open(output_path, "wb") as png_file: + png_file.write(png_data) diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 47dbea2d0..e2a45dd79 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -29,7 +29,9 @@ def archive_workflow(db, wf_id, final_state=None): # Archive Completed DAG graphmls_dir = workflow_dir + "/graphmls" os.makedirs(graphmls_dir, exist_ok=True) - wf_utils.export_dag(wf_id, workflow_dir, graphmls_dir, no_dag_dir=True) + dags_dir = workflow_dir + "/dags" + os.makedirs(dags_dir, exist_ok=True) + wf_utils.export_dag(wf_id, dags_dir, graphmls_dir, no_dag_dir=True, copy_dag_in_archive=False) wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived' db.workflows.update_workflow_state(wf_id, wf_state) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index 4dde69b86..ced30eb87 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -294,12 +294,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None, start_workflow(wf_id) -def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir): +def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive): """Export the DAG of the workflow.""" wfi = get_workflow_interface(wf_id) wfi.export_graphml() update_graphml(wf_id, graphmls_dir) - generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir) + generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive) def start_workflow(wf_id): From 04c253ff1ce8ed6feaf5ce6787636d469cbb3b5d Mon Sep 17 00:00:00 2001 From: leahh Date: Tue, 22 Oct 2024 12:48:30 -0600 Subject: [PATCH 5/6] update MockWFI and ignore archive_workflow to fix unit tests --- beeflow/tests/mocks.py | 4 ++++ beeflow/tests/test_wf_manager.py | 1 + 2 files changed, 5 insertions(+) diff --git a/beeflow/tests/mocks.py b/beeflow/tests/mocks.py index 1d7cada86..2ade26d58 100644 --- a/beeflow/tests/mocks.py +++ b/beeflow/tests/mocks.py @@ -105,6 +105,10 @@ def execute_workflow(self): """Fake executing a workflow.""" pass # noqa + def export_graphml(self): + """Fake exporting a graphml of a workflow.""" + pass + class MockGDBDriver: """A mock GDB driver. diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index f46998183..08025e6d4 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -190,6 +190,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): return_value=MockWFI()) mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_actions.archive_workflow', return_value=None) wf_name = 'wf' workdir = 'dir' From c60103532d94e947af4a6a408651edb247213b88 Mon Sep 17 00:00:00 2001 From: leahh Date: Tue, 22 Oct 2024 14:15:51 -0600 Subject: [PATCH 6/6] fix linting error --- beeflow/tests/mocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/tests/mocks.py b/beeflow/tests/mocks.py index 2ade26d58..6643f3975 100644 --- a/beeflow/tests/mocks.py +++ b/beeflow/tests/mocks.py @@ -107,7 +107,7 @@ def execute_workflow(self): def export_graphml(self): """Fake exporting a graphml of a workflow.""" - pass + pass # noqa class MockGDBDriver: