diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 139583da..8bcac437 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -643,6 +643,7 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id), # Check if the workflow is archived wf_status = get_wf_status(wf_id) if wf_status == 'Archived': + copy_dag_in_archive = False bee_workdir = wf_utils.get_bee_workdir() mount_dir = os.path.join(bee_workdir, 'gdb_mount') graphmls_dir = mount_dir + '/graphmls' @@ -650,10 +651,11 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id), "in the archive directory.", fg=typer.colors.MAGENTA) else: + copy_dag_in_archive = True wf_dir = wf_utils.get_workflow_dir(wf_id) graphmls_dir = wf_dir + '/graphmls' os.makedirs(graphmls_dir, exist_ok=True) - wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir) + wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive) typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.", fg=typer.colors.GREEN) diff --git a/beeflow/common/gdb/generate_graph.py b/beeflow/common/gdb/generate_graph.py index d823f5fc..4ef9a814 100644 --- a/beeflow/common/gdb/generate_graph.py +++ b/beeflow/common/gdb/generate_graph.py @@ -5,8 +5,10 @@ import networkx as nx import graphviz +from beeflow.wf_manager.resources import wf_utils -def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir): + +def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive): """Generate a PNG of a workflow graph from a GraphML file.""" short_id = wf_id[:6] graphml_path = graphmls_dir + "/" + short_id + ".graphml" @@ -18,13 +20,7 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir): os.makedirs(dags_dir, exist_ok=True) output_path = dags_dir + "/" + short_id + ".png" - if os.path.exists(output_path): - i = 1 - backup_path = f'{dags_dir}/{short_id}_v{i}.png' - while os.path.exists(backup_path): - i += 1 - backup_path = f'{dags_dir}/{short_id}_v{i}.png' - shutil.copy(output_path, backup_path) + backup_dag(output_path, dags_dir, short_id) # Load the GraphML file using NetworkX graph = nx.read_graphml(graphml_path) @@ -38,8 +34,27 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir): # Render the graph and save as PNG png_data = dot.pipe(format='png') - with open(output_path, "wb") as png_file: - png_file.write(png_data) + save_png(output_path, png_data) + + if copy_dag_in_archive: + # Save and backup DAGs in the workflow_dir which will be archived + workflow_dir = wf_utils.get_workflow_dir(wf_id) + archive_dag_dir = workflow_dir + "/dags" + os.makedirs(archive_dag_dir, exist_ok=True) + archive_dag_path = archive_dag_dir + "/" + short_id + ".png" + backup_dag(archive_dag_path, archive_dag_dir, short_id) + save_png(archive_dag_path, png_data) + + +def backup_dag(path, dags_dir, short_id): + """Backup DAGs.""" + if os.path.exists(path): + i = 1 + backup_path = f'{dags_dir}/{short_id}_v{i}.png' + while os.path.exists(backup_path): + i += 1 + backup_path = f'{dags_dir}/{short_id}_v{i}.png' + shutil.copy(path, backup_path) def add_nodes_to_dot(graph, dot): @@ -91,3 +106,9 @@ def add_edges_to_dot(graph, dot): fontsize="10", fontname="times-bold") else: dot.edge(target, source, label=edge_label, fontsize="10") + + +def save_png(output_path, png_data): + """Save png data.""" + with open(output_path, "wb") as png_file: + png_file.write(png_data) diff --git a/beeflow/tests/mocks.py b/beeflow/tests/mocks.py index 1d7cada8..6643f397 100644 --- a/beeflow/tests/mocks.py +++ b/beeflow/tests/mocks.py @@ -105,6 +105,10 @@ def execute_workflow(self): """Fake executing a workflow.""" pass # noqa + def export_graphml(self): + """Fake exporting a graphml of a workflow.""" + pass # noqa + class MockGDBDriver: """A mock GDB driver. diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index f4699818..08025e6d 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -190,6 +190,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): return_value=MockWFI()) mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_actions.archive_workflow', return_value=None) wf_name = 'wf' workdir = 'dir' diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 30b0d628..2e922340 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -6,6 +6,7 @@ from flask_restful import Resource, reqparse from beeflow.common import log as bee_logging from beeflow.wf_manager.resources import wf_utils +from beeflow.wf_manager.resources.wf_update import archive_workflow from beeflow.common.db import wfm_db from beeflow.common.db.bdb import connect_db @@ -64,6 +65,8 @@ def delete(self, wf_id): wf_utils.update_wf_status(wf_id, 'Cancelled') db.workflows.update_workflow_state(wf_id, 'Cancelled') log.info(f"Workflow {wf_id} cancelled") + # Archive cancelled workflow + archive_workflow(db, wf_id, final_state='Cancelled') resp = make_response(jsonify(status='Cancelled'), 202) elif option == "remove": log.info(f"Removing workflow {wf_id}.") diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 47dbea2d..e2a45dd7 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -29,7 +29,9 @@ def archive_workflow(db, wf_id, final_state=None): # Archive Completed DAG graphmls_dir = workflow_dir + "/graphmls" os.makedirs(graphmls_dir, exist_ok=True) - wf_utils.export_dag(wf_id, workflow_dir, graphmls_dir, no_dag_dir=True) + dags_dir = workflow_dir + "/dags" + os.makedirs(dags_dir, exist_ok=True) + wf_utils.export_dag(wf_id, dags_dir, graphmls_dir, no_dag_dir=True, copy_dag_in_archive=False) wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived' db.workflows.update_workflow_state(wf_id, wf_state) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index 4dde69b8..ced30eb8 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -294,12 +294,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None, start_workflow(wf_id) -def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir): +def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive): """Export the DAG of the workflow.""" wfi = get_workflow_interface(wf_id) wfi.export_graphml() update_graphml(wf_id, graphmls_dir) - generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir) + generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive) def start_workflow(wf_id):