Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable HPC scheduler to work with INL HPC containerized modules #29035

Open
wants to merge 4 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions python/TestHarness/TestHarness.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,11 +1154,9 @@ def parseCLArgs(self, argv):
# Try to guess the --hpc option if --hpc-host is set
if self.options.hpc_host and not self.options.hpc:
hpc_host = self.options.hpc_host[0]
if 'sawtooth' in hpc_host or 'lemhi' in hpc_host:
self.options.hpc = 'pbs'
elif 'bitterroot' in hpc_host:
self.options.hpc = 'slurm'
if self.options.hpc:
hpc_config = TestHarness.queryHPCCluster(hpc_host)
if hpc_config is not None:
self.options.hpc = hpc_config.scheduler
print(f'INFO: Setting --hpc={self.options.hpc} for known host {hpc_host}')

self.options.runtags = [tag for tag in self.options.run.split(',') if tag != '']
Expand Down Expand Up @@ -1244,3 +1242,29 @@ def preRun(self):

def getOptions(self):
return self.options

# Helper tuple for storing information about a cluster
HPCCluster = namedtuple('HPCCluster', ['scheduler', 'apptainer_modules'])
# The modules that we want to load when running in a non-moduled
# container on INL HPC
inl_modules = ['use.moose', 'moose-dev-container-openmpi/5.0.5_0']
# Define INL HPC clusters
hpc_configs = {'sawtooth': HPCCluster(scheduler='pbs',
apptainer_modules=inl_modules),
'bitterroot': HPCCluster(scheduler='slurm',
apptainer_modules=inl_modules)}
loganharbour marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def queryHPCCluster(hostname: str):
"""
Attempt to get the HPC cluster configuration given a host

Args:
hostname: The HPC system hostname
Returns:
HPCCluster: The config, if found, otherwise None
"""
for host, config in TestHarness.hpc_configs.items():
if host in hostname:
return config
return None
178 changes: 123 additions & 55 deletions python/TestHarness/schedulers/RunHPC.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

import urllib.parse
from RunParallel import RunParallel
import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib
import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib, copy
from enum import Enum
import paramiko
import jinja2
import statistics
import contextlib
from collections import namedtuple

from multiprocessing.pool import ThreadPool
from TestHarness import util

Expand Down Expand Up @@ -85,6 +84,8 @@ class RunHPC(RunParallel):
Base scheduler for jobs that are ran on HPC.
"""
def __init__(self, harness, params):
import paramiko

super().__init__(harness, params)

self.params = params
Expand Down Expand Up @@ -165,14 +166,13 @@ def __init__(self, harness, params):
# held in the background without blocking
self.submit_job_pool = None if self.options.hpc_no_hold else ThreadPool(processes=10)

# Build the base submission environemnt for a job
self.submit_env, self.app_exec_prefix, self.app_exec_suffix = self.setupRunEnvironment(harness)

if os.environ.get('APPTAINER_CONTAINER'):
if not self.ssh_hosts:
print('ERROR: --hpc-host must be set when using HPC jobs within apptainer')
sys.exit(1)
if not self.options.hpc_pre_source:
default_pre_source = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_source')
self.options.hpc_pre_source = default_pre_source
print(f'INFO: Setting --hpc-pre-source={default_pre_source}')
else:
if self.options.hpc_apptainer_bindpath:
print('ERROR: --hpc-apptainer-bindpath is unused when not executing with apptainer')
Expand All @@ -192,13 +192,15 @@ def __init__(self, harness, params):
sys.exit(1)

# Load the pre-source if it exists
self.source_contents = None
self.pre_source_contents = None
if self.options.hpc_pre_source:
self.source_contents = open(self.options.hpc_pre_source, 'r').read()
with open(self.options.hpc_pre_source, 'r') as f:
self.pre_source_contents = f.read()

# Load the submission template
template_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_template')
self.submission_template = open(template_path, 'r').read()
with open(template_path, 'r') as f:
self.submission_template = f.read()

class CallHPCException(Exception):
"""
Expand All @@ -219,6 +221,8 @@ def _getSSHClient(self, reconnect=False):

This is threaded so that we can operate a few connections at once.
"""
import paramiko

process = threading.get_ident()
with self.ssh_clients_lock:
if process not in self.ssh_clients or reconnect:
Expand Down Expand Up @@ -345,6 +349,8 @@ def submitJob(self, job, hold, lock=True):

Returns the resulting HPCJob.
"""
import jinja2

# If we're submitting this Job to be held, but the Job status isn't
# currently held, it means that we've hit job in the submit_job_pool
# that was submitted previously but has already been set to be skipped
Expand Down Expand Up @@ -385,45 +391,32 @@ def submitJob(self, job, hold, lock=True):
if os.path.exists(file):
os.remove(file)

# Add MOOSE's python path for python scripts
moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..'))

# Start building the jinja environment for the submission script
submission_env = {'SCHEDULER_NAME': self.getHPCSchedulerName(),
'NAME': self.getHPCJobName(job),
'CWD': tester.getTestDir(),
'OUTPUT': output_file,
'RESULT': result_file,
'SUBMISSION_SCRIPT': submission_script,
'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())),
'PROJECT': self.options.hpc_project,
'TEST_SPEC': tester.getSpecFile(),
'TEST_NAME': tester.getTestNameShort(),
'SUBMITTED_HOSTNAME': socket.gethostname(),
'MOOSE_PYTHONPATH': moose_python,
'NUM_PROCS': int(tester.getProcs(options)),
'NUM_THREADS': int(tester.getThreads(options)),
'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'),
'JOB_ID_VARIABLE': self.getHPCJobIDVariable(),
'PLACE': tester.getHPCPlace(options)}
submission_env = copy.deepcopy(self.submit_env)
submission_env.update({'NAME': self.getHPCJobName(job),
'CWD': tester.getTestDir(),
'OUTPUT': output_file,
'RESULT': result_file,
'SUBMISSION_SCRIPT': submission_script,
'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())),
'TEST_SPEC': tester.getSpecFile(),
'TEST_NAME': tester.getTestNameShort(),
'NUM_PROCS': int(tester.getProcs(options)),
'NUM_THREADS': int(tester.getThreads(options)),
'PLACE': tester.getHPCPlace(options)})
if hold:
submission_env['HOLD'] = 1
if self.options.hpc_pre_source:
submission_env['SOURCE_FILE'] = options.hpc_pre_source
if self.source_contents:
submission_env['SOURCE_CONTENTS'] = self.source_contents

# Get the unescaped command
command = tester.getCommand(options)

# Parse out the mpi command from the command if we're running in apptainer.
# We do this before any of the other escaping
APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER')
apptainer_command_prefix = ''
if APPTAINER_CONTAINER:
# Parse out the mpi command from the command if we're wrapping
# things around the mpi command
mpi_prefix = ''
if self.app_exec_prefix:
mpi_command = self.parseMPICommand(command)
if mpi_command:
apptainer_command_prefix = mpi_command
mpi_prefix = mpi_command
command = command.replace(mpi_command, '')

# Replace newlines, clean up spaces, and encode the command. We encode the
Expand All @@ -442,27 +435,22 @@ def submitJob(self, job, hold, lock=True):
# we need to manipulate the command like such
# Original command: <mpiexec ...> </path/to/binary ...>
# New command: <mpiexec ...> apptainer exec /path/to/image '</path/to/binary ...>'
APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER')
if APPTAINER_CONTAINER:
job_command = apptainer_command_prefix
job_command = mpi_prefix

# The root filesystem path that we're in so that we can be sure to bind
# it into the container, if not already set
if self.options.hpc_apptainer_bindpath:
bindpath = self.options.hpc_apptainer_bindpath
else:
bindpath = '/' + os.path.abspath(tester.getTestDir()).split(os.path.sep)[1]
# The apptainer command that will get sandwiched in the middle
apptainer_command = ['apptainer', 'exec', '-B', bindpath]
if self.options.hpc_apptainer_no_home:
apptainer_command.append('--no-home')
apptainer_command.append(APPTAINER_CONTAINER)
apptainer_command = shlex.join(apptainer_command)
submission_env['VARS']['APPTAINER_BINDPATH'] = bindpath + ',${APPTAINER_BINDPATH}'

# Append the apptainer command along with the command to be ran
job_command += f"{apptainer_command} {hpc_run} {command_encoded}"
serial_command = shlex.join(self.app_exec_prefix + self.app_exec_suffix)

# Set that we're using apptainer
submission_env['USING_APPTAINER'] = '1'
# Append the apptainer command along with the command to be ran
job_command += f"{serial_command} {hpc_run} {command_encoded}"
# Not in apptainer, so we can just use the escaped command as is
else:
job_command = f'{hpc_run} {command_encoded}'
Expand Down Expand Up @@ -801,7 +789,8 @@ def killHPCJobs(self, functor):

def killRemaining(self, keyboard=False):
"""Kills all currently running HPC jobs"""
functor = lambda hpc_job: hpc_job.state not in [hpc_job.State.killed, hpc_job.State.done]
running_states = [HPCJob.State.killed, HPCJob.State.done]
functor = lambda hpc_job: hpc_job is not None and hpc_job.state not in running_states
killed_jobs = self.killHPCJobs(functor)
if keyboard and killed_jobs:
print(f'\nAttempted to kill remaining {killed_jobs} HPC jobs...')
Expand Down Expand Up @@ -952,8 +941,6 @@ def appendResultFooter(self, stats):

def appendResultFileHeader(self):
entry = {'scheduler': self.options.hpc,
'pre_source_file': self.options.hpc_pre_source,
'pre_source': self.source_contents,
'hosts': self.options.hpc_host if isinstance(self.options.hpc_host, list) else [self.options.hpc_host]}
return {'hpc': entry}

Expand All @@ -971,3 +958,84 @@ def callHPCShouldRetry(self, pool_type, result: str):
retry a command given a failure with a certain result.
"""
return False

def setupRunEnvironment(self, harness):
"""
Sets up the run environment for all HPC jobs
"""
hpc_cluster = harness.queryHPCCluster(self.ssh_hosts[0])

# HPC containerized module that we're in, if any
module_name = os.environ.get('CONTAINER_MODULE_NAME')
# Container that we're in, if any
apptainer_container = os.environ.get('APPTAINER_CONTAINER')

# Base submission environment
submit_env = {# Name of the scheduler
'SCHEDULER_NAME': self.getHPCSchedulerName(),
# Project to submit to within the cluster scheduler
'PROJECT': self.options.hpc_project,
# Ending comment for output files
'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'),
# Env var on the compute node that contains the HPC scheduler JOB id
'JOB_ID_VARIABLE': self.getHPCJobIDVariable(),
# Modules to load
'LOAD_MODULES': [],
# Environment variables to set before loading the modules
'PRE_MODULE_VARS': {},
# Environment variables to set after loading the modules
'VARS': {},
# The host the HPC job was submitted from
'SUBMITTED_HOSTNAME': socket.gethostname(),
# Whether or not we're using apptainer
'USING_APPTAINER': apptainer_container is not None}

# The prefix and suffix to wrap around what we're actually
# running within the HPC job
app_exec_prefix = []
app_exec_suffix = []

# --hpc-pre-source contents
if self.options.hpc_pre_source:
submission_env['PRE_SOURCE_FILE'] = self.options.hpc_pre_source
submission_env['PRE_SOURCE_CONTENTS'] = self.source_contents

# If running on INL HPC, minimize the bindpath; this is a configuration
# option for the moose-dev-container module on INL HPC
if hpc_cluster is not None:
submit_env['PRE_MODULE_VARS']['MOOSE_DEV_CONTAINER_MINIMAL_BINDPATH'] = '1'

# Pass apptainer options
if self.options.hpc_apptainer_no_home:
submit_env['VARS']['APPTAINER_NO_HOME'] = '1'

# Add MOOSE's pythonpath
moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..'))
submit_env['VARS']['PYTHONPATH'] = moose_python + ':${PYTHONPATH}'

# We're in a loaded container module on INL HPC, use that environment
# and the associated <container>-exec command to run apptainer
if module_name:
assert apptainer_container is not None
module_version = os.environ['CONTAINER_MODULE_VERSION']
module_exec = os.environ['CONTAINER_MODULE_EXEC']
required_modules = os.environ.get('CONTAINER_MODULE_REQUIRED_MODULES', '').split(' ')
modules = required_modules + [f'{module_name}/{module_version}']
submit_env['LOAD_MODULES'] = modules
app_exec_prefix = [module_exec]
# We're in a container without the container module environment, use
# a direct apptainer exec command
elif apptainer_container:
# If on INL HPC, use the wrapped environment
if hpc_cluster is not None:
submit_env['LOAD_MODULES'] = hpc_cluster.apptainer_modules
app_exec_prefix = ['apptainer', 'exec']
app_exec_suffix = [apptainer_container]

if submit_env['LOAD_MODULES']:
print(f'INFO: Using modules "{" ".join(submit_env["LOAD_MODULES"])}" for HPC environment')
if app_exec_prefix:
exec_combined = app_exec_prefix + app_exec_suffix
print(f'INFO: Using "{" ".join(exec_combined)}" as HPC execution wrapper')

return submit_env, app_exec_prefix, app_exec_suffix
2 changes: 0 additions & 2 deletions python/TestHarness/schedulers/hpc_source

This file was deleted.

34 changes: 28 additions & 6 deletions python/TestHarness/schedulers/hpc_template
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@
# Exit on failure
set -e

{%- if SOURCE_FILE is defined %}
# Loaded from {{ SOURCE_FILE }}
{{ SOURCE_CONTENTS }}
{%- for key, val in PRE_MODULE_VARS.items() %}
export {{ key }}="{{ val }}"
{%- endfor %}
{%- if LOAD_MODULES is defined %}
module load {{ " ".join(LOAD_MODULES) }}
{%- endif %}
{%- for key, val in VARS.items() %}
export {{ key }}="{{ val }}"
{%- endfor %}
{%- if PRE_SOURCE_FILE is defined %}
# Loaded from {{ PRE_SOURCE_FILE }}
{{ PRE_SOURCE_CONTENTS }}
{%- endif %}

# Add MOOSE's python path for python scripts
export PYTHONPATH={{ MOOSE_PYTHONPATH }}:${PYTHONPATH}

# Print a useful header
echo "TestHarness {{ SCHEDULER_NAME }} job on $(hostname) in job ${{ JOB_ID_VARIABLE }}"
Expand Down Expand Up @@ -101,6 +107,22 @@ if ((return_code > 128)); then
echo "Apptainer exited with code $return_code, using $new_return_code instead"
return_code=$new_return_code
fi
# If we're using --sharens, make sure the instance is dead
if [ "$APPTAINER_SHARENS" == "1" ] && [ -n "$APPTAINER_CONFIGDIR" ]; then
apptainer instance list
set +e
instance_list=$(apptainer instance list | tail -n +2)
if [ -n "$instance_list" ]; then
instance_name=$(echo "$instance_list" | awk '{print $1}')
instance_pid=$(echo "$instance_list" | awk '{print $2}')
echo "Killing running apptainer instance \"${instance_name}\" in ${instance_pid}"
apptainer instance stop ${instance_name}
if ps -p $instance_pid > /dev/null; then
kill -9 $instance_pid
fi
fi
set -e
fi
{%- endif %}
# Load the execution time; we use a tail here because the process will
# include a comment about a non-zero status first if the exit code is nonzero
Expand Down