Skip to content

Commit

Permalink
Issue883/add check for different front end (#933)
Browse files Browse the repository at this point in the history
* Create client database and populate with the host that beeflow is running on and prevent beeflow commands (start, status, reset, and stop) to work on a different front end
  • Loading branch information
kchilleri authored Nov 5, 2024
1 parent eda2372 commit 0e27361
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 9 deletions.
46 changes: 44 additions & 2 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""bee-client.
This script provides an client interface to the user to manage workflows.
This script provides a client interface to the user to manage workflows.
Capablities include submitting, starting, listing, pausing and cancelling workflows.
"""
import os
Expand Down Expand Up @@ -29,6 +29,8 @@
from beeflow.common.wf_data import generate_workflow_id
from beeflow.client import core
from beeflow.wf_manager.resources import wf_utils
from beeflow.common.db import client_db
from beeflow.common.db import bdb

# Length of a shortened workflow ID
short_id_len = 6 #noqa: Not a constant
Expand All @@ -53,6 +55,41 @@ def __init__(self, *args):
self.args = args


def warn(*pargs):
"""Print a red warning message."""
typer.secho(' '.join(pargs), fg=typer.colors.RED, file=sys.stderr)


def db_path():
"""Return the client database path."""
bee_workdir = config_driver.BeeConfig.get('DEFAULT', 'bee_workdir')
return os.path.join(bee_workdir, 'client.db')


def setup_hostname(start_hn):
"""Set up front end name when beeflow core start is returned."""
db = bdb.connect_db(client_db, db_path())
db.info.set_hostname(start_hn)


def get_hostname():
"""Check if beeflow is running somewhere else."""
db = bdb.connect_db(client_db, db_path())
curr_hn = db.info.get_hostname()
return curr_hn


def check_hostname(curr_hn):
"""Check current front end name matches the one beeflow was started on."""
db = bdb.connect_db(client_db, db_path())
start_hn = db.info.get_hostname()
if start_hn and curr_hn != start_hn: # noqa: don't use set instead
warn(f'beeflow was started on "{start_hn}" and you are trying to '
f'run a command on "{curr_hn}".')
if start_hn == "":
warn('beeflow has not been started!')


def error_exit(msg, include_caller=True):
"""Print a message and exit or raise an error with that message."""
if include_caller:
Expand Down Expand Up @@ -115,7 +152,11 @@ def get_wf_list():
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if get_hostname() == "":
warn('beeflow has not been started!')
sys.exit(1)
else:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')
Expand Down Expand Up @@ -680,4 +721,5 @@ def main():

# Ignore W0511: This allows us to have TODOs in the code
# Ignore R1732: Significant code restructuring required to fix
# Ignore R1714: Not using a set instead
# pylama:ignore=W0511,R1732
21 changes: 14 additions & 7 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
help='run in the foreground'), backend: bool = typer.Option(False, '--backend',
'-B', help='allow to run on a backend node')):
"""Start all BEE components."""
start_hn = socket.gethostname() # hostname when beeflow starts
if bee_client.get_hostname() == "":
bee_client.setup_hostname(start_hn) # add to client db
elif bee_client.get_hostname() != start_hn:
warn(f'Error: beeflow is already running on "{bee_client.get_hostname()}."')
sys.exit(1)
if backend: # allow beeflow to run on backend node
check_dependencies(backend=True)
else:
Expand Down Expand Up @@ -439,6 +445,7 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',

version = importlib.metadata.version("hpc-beeflow")
print(f'Starting beeflow {version}...')
print(f'Running beeflow on {start_hn}')
if not foreground:
print('Run `beeflow core status` for more information.')
# Create the log path if it doesn't exist yet
Expand All @@ -457,11 +464,10 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
@app.command()
def status():
"""Check the status of beeflow and the components."""
status_hn = socket.gethostname() # hostname when beeflow core status returned
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'status'})
if resp is None:
beeflow_log = paths.log_fname('beeflow')
warn('Cannot connect to the beeflow daemon, is it running? Check the '
f'log at "{beeflow_log}".')
bee_client.check_hostname(status_hn)
sys.exit(1)
print('beeflow components:')
for comp, stat in resp['components'].items():
Expand All @@ -480,6 +486,7 @@ def info():
@app.command()
def stop(query='yes'):
"""Stop the current running beeflow daemon."""
stop_hn = socket.gethostname() # hostname when beeflow core stop returned
# Check workflow states; warn if there are active states, pause running workflows
workflow_list = bee_client.get_wf_list()
concern_states = {'Running', 'Initializing', 'Waiting'}
Expand All @@ -502,15 +509,13 @@ def stop(query='yes'):
bee_client.pause(wf_id)
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
if resp is None:
beeflow_log = paths.log_fname('beeflow')
warn('Error: beeflow is not running on this system. It could be '
'running on a different front end.\n'
f' Check the beeflow log: "{beeflow_log}".')
bee_client.check_hostname(stop_hn)
sys.exit(1)
# As long as it returned something, we should be good
beeflow_log = paths.log_fname('beeflow')
if query == "yes":
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')
bee_client.setup_hostname("")


def archive_dir(dir_to_archive):
Expand Down Expand Up @@ -547,6 +552,8 @@ def reset(archive: bool = typer.Option(False, '--archive', '-a',
help='Archive bee_workdir before removal')):
"""Stop all components and delete the bee_workdir directory."""
# Check workflow states; warn if there are active states.
reset_hn = socket.gethostname()
bee_client.check_hostname(reset_hn)
workflow_list = bee_client.get_wf_list()
active_states = {'Running', 'Paused', 'Initializing', 'Waiting'}
caution = ""
Expand Down
57 changes: 57 additions & 0 deletions beeflow/common/db/client_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Client database code."""

from collections import namedtuple

from beeflow.common.db import bdb


class ClientInfo:
"""Client Info object."""

def __init__(self, db_file):
"""Initialize info and db file."""
self.Info = namedtuple("Info", "id hostname") # noqa Snake Case
self.db_file = db_file

def set_hostname(self, new_hostname):
"""Set hostname for current front end."""
stmt = "UPDATE info set hostname=?"
bdb.run(self.db_file, stmt, [new_hostname])

def get_hostname(self):
"""Return hostname for current front end."""
stmt = "SELECT hostname FROM info"
result = bdb.getone(self.db_file, stmt)[0]
hostname = result
return hostname


class ClientDB:
"""Client database."""

def __init__(self, db_file):
"""Construct a new client database connection."""
self.db_file = db_file
self._init_tables()

def _init_tables(self):
"""Initialize the client table if it doesn't exist."""
info_stmt = """CREATE TABLE IF NOT EXISTS info (
id INTEGER PRIMARY KEY ASC,
hostname TEXT);"""
if not bdb.table_exists(self.db_file, 'info'):
bdb.create_table(self.db_file, info_stmt)
# insert a new workflow into the database
stmt = """INSERT INTO info (hostname) VALUES(?);"""
tmp = ""
bdb.run(self.db_file, stmt, [tmp])

@property
def info(self):
"""Get info from the database."""
return ClientInfo(self.db_file)


def open_db(db_file):
"""Open and return a new database."""
return ClientDB(db_file)
37 changes: 37 additions & 0 deletions beeflow/tests/test_db_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Tests of the client database."""
import tempfile
import os

import pytest

from beeflow.common.db import client_db


@pytest.fixture
def temp_db():
"""Create a fixture for making a temporary database."""
fname = tempfile.mktemp()
db = client_db.open_db(fname)
yield db
os.remove(fname)


def test_empty(temp_db):
"""Test the empty database."""
db = temp_db

host_name = db.info.get_hostname()
assert host_name == ""


def test_info(temp_db):
"""Test setting the info."""
db = temp_db

db.info.set_hostname('front_end_name')
host_name = db.info.get_hostname()

assert host_name == 'front_end_name'
# Ignore W0621: PyLama complains about redefining 'temp_db' from the outer
# scope. This is how pytest fixtures work.
# pylama:ignore=W0621

0 comments on commit 0e27361

Please sign in to comment.