Skip to content

Commit

Permalink
Enable HPC scheduler to work with INL HPC containerized modules
Browse files Browse the repository at this point in the history
  • Loading branch information
loganharbour committed Nov 8, 2024
1 parent 97dcc4d commit 626bcc2
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 73 deletions.
35 changes: 29 additions & 6 deletions python/TestHarness/TestHarness.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,6 @@ def parseCLArgs(self, argv):
hpcgroup = parser.add_argument_group('HPC Options', 'Options controlling HPC execution')
hpcgroup.add_argument('--hpc', dest='hpc', action='store', choices=['pbs', 'slurm'], help='Launch tests using a HPC scheduler')
hpcgroup.add_argument('--hpc-host', nargs='+', action='store', dest='hpc_host', metavar='', help='The host(s) to use for submitting HPC jobs')
hpcgroup.add_argument('--hpc-pre-source', nargs=1, action="store", dest='hpc_pre_source', metavar='', help='Source specified file before launching HPC tests')
hpcgroup.add_argument('--hpc-file-timeout', nargs=1, type=int, action='store', dest='hpc_file_timeout', default=300, help='The time in seconds to wait for HPC output')
hpcgroup.add_argument('--hpc-scatter-procs', nargs=1, type=int, action='store', dest='hpc_scatter_procs', default=None, help='Set to run HPC jobs with scatter placement when the processor count is this or lower')
hpcgroup.add_argument('--hpc-apptainer-bindpath', nargs=1, action='store', type=str, dest='hpc_apptainer_bindpath', help='Sets the apptainer bindpath for HPC jobs')
Expand Down Expand Up @@ -1154,11 +1153,9 @@ def parseCLArgs(self, argv):
# Try to guess the --hpc option if --hpc-host is set
if self.options.hpc_host and not self.options.hpc:
hpc_host = self.options.hpc_host[0]
if 'sawtooth' in hpc_host or 'lemhi' in hpc_host:
self.options.hpc = 'pbs'
elif 'bitterroot' in hpc_host:
self.options.hpc = 'slurm'
if self.options.hpc:
hpc_config = TestHarness.queryHPCCluster(hpc_host)
if hpc_config is not None:
self.options.hpc = hpc_config.scheduler
print(f'INFO: Setting --hpc={self.options.hpc} for known host {hpc_host}')

self.options.runtags = [tag for tag in self.options.run.split(',') if tag != '']
Expand Down Expand Up @@ -1244,3 +1241,29 @@ def preRun(self):

def getOptions(self):
return self.options

# Helper tuple for storing information about a cluster
HPCCluster = namedtuple('HPCCluster', ['scheduler', 'apptainer_modules'])
# The modules that we want to load when running in a non-moduled
# container on INL HPC
inl_modules = ['use.moose', 'moose-dev-container-openmpi/5.0.5_0']
# Define INL HPC clusters
hpc_configs = {'sawtooth': HPCCluster(scheduler='pbs',
apptainer_modules=inl_modules),
'bitterroot': HPCCluster(scheduler='slurm',
apptainer_modules=inl_modules)}

@staticmethod
def queryHPCCluster(hostname: str):
"""
Attempt to get the HPC cluster configuration given a host
Args:
hostname: The HPC system hostname
Returns:
HPCCluster: The config, if found, otherwise None
"""
for host, config in TestHarness.hpc_configs.items():
if host in hostname:
return config
return None
170 changes: 111 additions & 59 deletions python/TestHarness/schedulers/RunHPC.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

import urllib.parse
from RunParallel import RunParallel
import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib
import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib, copy
from enum import Enum
import paramiko
import jinja2
import statistics
import contextlib
from collections import namedtuple

from multiprocessing.pool import ThreadPool
from TestHarness import util

Expand Down Expand Up @@ -85,6 +84,8 @@ class RunHPC(RunParallel):
Base scheduler for jobs that are ran on HPC.
"""
def __init__(self, harness, params):
import paramiko

super().__init__(harness, params)

self.params = params
Expand Down Expand Up @@ -165,14 +166,13 @@ def __init__(self, harness, params):
# held in the background without blocking
self.submit_job_pool = None if self.options.hpc_no_hold else ThreadPool(processes=10)

# Build the base submission environemnt for a job
self.submit_env, self.app_exec_prefix, self.app_exec_suffix = self.setupRunEnvironment(harness)

if os.environ.get('APPTAINER_CONTAINER'):
if not self.ssh_hosts:
print('ERROR: --hpc-host must be set when using HPC jobs within apptainer')
sys.exit(1)
if not self.options.hpc_pre_source:
default_pre_source = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_source')
self.options.hpc_pre_source = default_pre_source
print(f'INFO: Setting --hpc-pre-source={default_pre_source}')
else:
if self.options.hpc_apptainer_bindpath:
print('ERROR: --hpc-apptainer-bindpath is unused when not executing with apptainer')
Expand All @@ -181,21 +181,13 @@ def __init__(self, harness, params):
print('ERROR: --hpc-apptainer-no-home is unused when not executing with apptainer')
sys.exit(1)

if self.options.hpc_pre_source and not os.path.exists(self.options.hpc_pre_source):
print(f'ERROR: --hpc-pre-source path {self.options.hpc_pre_source} does not exist')
sys.exit(1)
if self.options.hpc and self.options.pedantic_checks:
print('ERROR: --hpc and --pedantic-checks cannot be used simultaneously')
sys.exit(1)
if self.options.hpc and self.options.jobs:
print('ERROR: --hpc and -j|--jobs cannot be used simultaneously')
sys.exit(1)

# Load the pre-source if it exists
self.source_contents = None
if self.options.hpc_pre_source:
self.source_contents = open(self.options.hpc_pre_source, 'r').read()

# Load the submission template
template_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_template')
self.submission_template = open(template_path, 'r').read()
Expand All @@ -219,6 +211,8 @@ def _getSSHClient(self, reconnect=False):
This is threaded so that we can operate a few connections at once.
"""
import paramiko

process = threading.get_ident()
with self.ssh_clients_lock:
if process not in self.ssh_clients or reconnect:
Expand Down Expand Up @@ -345,6 +339,8 @@ def submitJob(self, job, hold, lock=True):
Returns the resulting HPCJob.
"""
import jinja2

# If we're submitting this Job to be held, but the Job status isn't
# currently held, it means that we've hit job in the submit_job_pool
# that was submitted previously but has already been set to be skipped
Expand Down Expand Up @@ -385,45 +381,32 @@ def submitJob(self, job, hold, lock=True):
if os.path.exists(file):
os.remove(file)

# Add MOOSE's python path for python scripts
moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..'))

# Start building the jinja environment for the submission script
submission_env = {'SCHEDULER_NAME': self.getHPCSchedulerName(),
'NAME': self.getHPCJobName(job),
'CWD': tester.getTestDir(),
'OUTPUT': output_file,
'RESULT': result_file,
'SUBMISSION_SCRIPT': submission_script,
'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())),
'PROJECT': self.options.hpc_project,
'TEST_SPEC': tester.getSpecFile(),
'TEST_NAME': tester.getTestNameShort(),
'SUBMITTED_HOSTNAME': socket.gethostname(),
'MOOSE_PYTHONPATH': moose_python,
'NUM_PROCS': int(tester.getProcs(options)),
'NUM_THREADS': int(tester.getThreads(options)),
'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'),
'JOB_ID_VARIABLE': self.getHPCJobIDVariable(),
'PLACE': tester.getHPCPlace(options)}
submission_env = copy.deepcopy(self.submit_env)
submission_env.update({'NAME': self.getHPCJobName(job),
'CWD': tester.getTestDir(),
'OUTPUT': output_file,
'RESULT': result_file,
'SUBMISSION_SCRIPT': submission_script,
'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())),
'TEST_SPEC': tester.getSpecFile(),
'TEST_NAME': tester.getTestNameShort(),
'NUM_PROCS': int(tester.getProcs(options)),
'NUM_THREADS': int(tester.getThreads(options)),
'PLACE': tester.getHPCPlace(options)})
if hold:
submission_env['HOLD'] = 1
if self.options.hpc_pre_source:
submission_env['SOURCE_FILE'] = options.hpc_pre_source
if self.source_contents:
submission_env['SOURCE_CONTENTS'] = self.source_contents

# Get the unescaped command
command = tester.getCommand(options)

# Parse out the mpi command from the command if we're running in apptainer.
# We do this before any of the other escaping
APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER')
apptainer_command_prefix = ''
if APPTAINER_CONTAINER:
# Parse out the mpi command from the command if we're wrapping
# things around the mpi command
mpi_prefix = ''
if self.app_exec_prefix:
mpi_command = self.parseMPICommand(command)
if mpi_command:
apptainer_command_prefix = mpi_command
mpi_prefix = mpi_command
command = command.replace(mpi_command, '')

# Replace newlines, clean up spaces, and encode the command. We encode the
Expand All @@ -442,27 +425,22 @@ def submitJob(self, job, hold, lock=True):
# we need to manipulate the command like such
# Original command: <mpiexec ...> </path/to/binary ...>
# New command: <mpiexec ...> apptainer exec /path/to/image '</path/to/binary ...>'
APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER')
if APPTAINER_CONTAINER:
job_command = apptainer_command_prefix
job_command = mpi_prefix

# The root filesystem path that we're in so that we can be sure to bind
# it into the container, if not already set
if self.options.hpc_apptainer_bindpath:
bindpath = self.options.hpc_apptainer_bindpath
else:
bindpath = '/' + os.path.abspath(tester.getTestDir()).split(os.path.sep)[1]
# The apptainer command that will get sandwiched in the middle
apptainer_command = ['apptainer', 'exec', '-B', bindpath]
if self.options.hpc_apptainer_no_home:
apptainer_command.append('--no-home')
apptainer_command.append(APPTAINER_CONTAINER)
apptainer_command = shlex.join(apptainer_command)
submission_env['VARS']['APPTAINER_BINDPATH'] = bindpath + ',${APPTAINER_BINDPATH}'

# Append the apptainer command along with the command to be ran
job_command += f"{apptainer_command} {hpc_run} {command_encoded}"
serial_command = shlex.join(self.app_exec_prefix + self.app_exec_suffix)

# Set that we're using apptainer
submission_env['USING_APPTAINER'] = '1'
# Append the apptainer command along with the command to be ran
job_command += f"{serial_command} {hpc_run} {command_encoded}"
# Not in apptainer, so we can just use the escaped command as is
else:
job_command = f'{hpc_run} {command_encoded}'
Expand Down Expand Up @@ -952,8 +930,6 @@ def appendResultFooter(self, stats):

def appendResultFileHeader(self):
entry = {'scheduler': self.options.hpc,
'pre_source_file': self.options.hpc_pre_source,
'pre_source': self.source_contents,
'hosts': self.options.hpc_host if isinstance(self.options.hpc_host, list) else [self.options.hpc_host]}
return {'hpc': entry}

Expand All @@ -971,3 +947,79 @@ def callHPCShouldRetry(self, pool_type, result: str):
retry a command given a failure with a certain result.
"""
return False

def setupRunEnvironment(self, harness):
"""
Sets up the run environment for all HPC jobs
"""
hpc_cluster = harness.queryHPCCluster(self.ssh_hosts[0])

# HPC containerized module that we're in, if any
module_name = os.environ.get('CONTAINER_MODULE_NAME')
# Container that we're in, if any
apptainer_container = os.environ.get('APPTAINER_CONTAINER')

# Base submission environment
submit_env = {# Name of the scheduler
'SCHEDULER_NAME': self.getHPCSchedulerName(),
# Project to submit to within the cluster scheduler
'PROJECT': self.options.hpc_project,
# Ending comment for output files
'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'),
# Env var on the compute node that contains the HPC scheduler JOB id
'JOB_ID_VARIABLE': self.getHPCJobIDVariable(),
# Modules to load
'LOAD_MODULES': [],
# Environment variables to set before loading the modules
'PRE_MODULE_VARS': {},
# Environment variables to set after loading the modules
'VARS': {},
# The host the HPC job was submitted from
'SUBMITTED_HOSTNAME': socket.gethostname(),
# Whether or not we're using apptainer
'USING_APPTAINER': apptainer_container is not None}

# The prefix and suffix to wrap around what we're actually
# running within the HPC job
app_exec_prefix = []
app_exec_suffix = []

# If running on INL HPC, minimize the bindpath; this is a configuration
# option for the moose-dev-container module on INL HPC
if hpc_cluster is not None:
submit_env['PRE_MODULE_VARS']['MOOSE_DEV_CONTAINER_MINIMAL_BINDPATH'] = '1'

# Pass apptainer options
if self.options.hpc_apptainer_no_home:
submit_env['VARS']['APPTAINER_NO_HOME'] = '1'

# Add MOOSE's pythonpath
moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..'))
submit_env['VARS']['PYTHONPATH'] = moose_python + ':${PYTHONPATH}'

# We're in a loaded container module on INL HPC, use that environment
# and the associated <container>-exec command to run apptainer
if module_name:
assert apptainer_container is not None
module_version = os.environ['CONTAINER_MODULE_VERSION']
module_exec = os.environ['CONTAINER_MODULE_EXEC']
required_modules = os.environ.get('CONTAINER_MODULE_REQUIRED_MODULES', '').split(' ')
modules = required_modules + [f'{module_name}/{module_version}']
submit_env['LOAD_MODULES'] = modules
app_exec_prefix = [module_exec]
# We're in a container without the container module environment, use
# a direct apptainer exec command
elif apptainer_container:
# If on INL HPC, use the wrapped environment
if hpc_cluster is not None:
submit_env['LOAD_MODULES'] = hpc_cluster.apptainer_modules
app_exec_prefix = ['apptainer', 'exec']
app_exec_suffix = [apptainer_container]

if submit_env['LOAD_MODULES']:
print(f'INFO: Using modules "{" ".join(submit_env["LOAD_MODULES"])}" for HPC environment')
if app_exec_prefix:
exec_combined = app_exec_prefix + app_exec_suffix
print(f'INFO: Using "{" ".join(exec_combined)}" as HPC execution wrapper')

return submit_env, app_exec_prefix, app_exec_suffix
2 changes: 0 additions & 2 deletions python/TestHarness/schedulers/hpc_source

This file was deleted.

14 changes: 8 additions & 6 deletions python/TestHarness/schedulers/hpc_template
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
# Exit on failure
set -e

{%- if SOURCE_FILE is defined %}
# Loaded from {{ SOURCE_FILE }}
{{ SOURCE_CONTENTS }}
{%- for key, val in PRE_MODULE_VARS.items() %}
export {{ key }}="{{ val }}"
{%- endfor %}
{%- if LOAD_MODULES is defined %}
module load {{ " ".join(LOAD_MODULES) }}
{%- endif %}

# Add MOOSE's python path for python scripts
export PYTHONPATH={{ MOOSE_PYTHONPATH }}:${PYTHONPATH}
{%- for key, val in VARS.items() %}
export {{ key }}="{{ val }}"
{%- endfor %}

# Print a useful header
echo "TestHarness {{ SCHEDULER_NAME }} job on $(hostname) in job ${{ JOB_ID_VARIABLE }}"
Expand Down

0 comments on commit 626bcc2

Please sign in to comment.