From 3dda40139ecf40a825ed1d0367625ee096100cf0 Mon Sep 17 00:00:00 2001 From: Logan Harbour Date: Fri, 8 Nov 2024 08:59:32 -0700 Subject: [PATCH] Enable HPC scheduler to work with INL HPC containerized modules refs #29034 --- python/TestHarness/TestHarness.py | 35 ++++- python/TestHarness/schedulers/RunHPC.py | 170 ++++++++++++++------- python/TestHarness/schedulers/hpc_source | 2 - python/TestHarness/schedulers/hpc_template | 14 +- 4 files changed, 148 insertions(+), 73 deletions(-) delete mode 100644 python/TestHarness/schedulers/hpc_source diff --git a/python/TestHarness/TestHarness.py b/python/TestHarness/TestHarness.py index 0ae971eccda9..6a04504f5e8c 100644 --- a/python/TestHarness/TestHarness.py +++ b/python/TestHarness/TestHarness.py @@ -1117,7 +1117,6 @@ def parseCLArgs(self, argv): hpcgroup = parser.add_argument_group('HPC Options', 'Options controlling HPC execution') hpcgroup.add_argument('--hpc', dest='hpc', action='store', choices=['pbs', 'slurm'], help='Launch tests using a HPC scheduler') hpcgroup.add_argument('--hpc-host', nargs='+', action='store', dest='hpc_host', metavar='', help='The host(s) to use for submitting HPC jobs') - hpcgroup.add_argument('--hpc-pre-source', nargs=1, action="store", dest='hpc_pre_source', metavar='', help='Source specified file before launching HPC tests') hpcgroup.add_argument('--hpc-file-timeout', nargs=1, type=int, action='store', dest='hpc_file_timeout', default=300, help='The time in seconds to wait for HPC output') hpcgroup.add_argument('--hpc-scatter-procs', nargs=1, type=int, action='store', dest='hpc_scatter_procs', default=None, help='Set to run HPC jobs with scatter placement when the processor count is this or lower') hpcgroup.add_argument('--hpc-apptainer-bindpath', nargs=1, action='store', type=str, dest='hpc_apptainer_bindpath', help='Sets the apptainer bindpath for HPC jobs') @@ -1154,11 +1153,9 @@ def parseCLArgs(self, argv): # Try to guess the --hpc option if --hpc-host is set if self.options.hpc_host and not self.options.hpc: hpc_host = self.options.hpc_host[0] - if 'sawtooth' in hpc_host or 'lemhi' in hpc_host: - self.options.hpc = 'pbs' - elif 'bitterroot' in hpc_host: - self.options.hpc = 'slurm' - if self.options.hpc: + hpc_config = TestHarness.queryHPCCluster(hpc_host) + if hpc_config is not None: + self.options.hpc = hpc_config.scheduler print(f'INFO: Setting --hpc={self.options.hpc} for known host {hpc_host}') self.options.runtags = [tag for tag in self.options.run.split(',') if tag != ''] @@ -1244,3 +1241,29 @@ def preRun(self): def getOptions(self): return self.options + + # Helper tuple for storing information about a cluster + HPCCluster = namedtuple('HPCCluster', ['scheduler', 'apptainer_modules']) + # The modules that we want to load when running in a non-moduled + # container on INL HPC + inl_modules = ['use.moose', 'moose-dev-container-openmpi/5.0.5_0'] + # Define INL HPC clusters + hpc_configs = {'sawtooth': HPCCluster(scheduler='pbs', + apptainer_modules=inl_modules), + 'bitterroot': HPCCluster(scheduler='slurm', + apptainer_modules=inl_modules)} + + @staticmethod + def queryHPCCluster(hostname: str): + """ + Attempt to get the HPC cluster configuration given a host + + Args: + hostname: The HPC system hostname + Returns: + HPCCluster: The config, if found, otherwise None + """ + for host, config in TestHarness.hpc_configs.items(): + if host in hostname: + return config + return None diff --git a/python/TestHarness/schedulers/RunHPC.py b/python/TestHarness/schedulers/RunHPC.py index b8530b36e932..b42f92fa9a49 100644 --- a/python/TestHarness/schedulers/RunHPC.py +++ b/python/TestHarness/schedulers/RunHPC.py @@ -9,12 +9,11 @@ import urllib.parse from RunParallel import RunParallel -import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib +import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib, copy from enum import Enum -import paramiko -import jinja2 import statistics -import contextlib +from collections import namedtuple + from multiprocessing.pool import ThreadPool from TestHarness import util @@ -85,6 +84,8 @@ class RunHPC(RunParallel): Base scheduler for jobs that are ran on HPC. """ def __init__(self, harness, params): + import paramiko + super().__init__(harness, params) self.params = params @@ -165,14 +166,13 @@ def __init__(self, harness, params): # held in the background without blocking self.submit_job_pool = None if self.options.hpc_no_hold else ThreadPool(processes=10) + # Build the base submission environemnt for a job + self.submit_env, self.app_exec_prefix, self.app_exec_suffix = self.setupRunEnvironment(harness) + if os.environ.get('APPTAINER_CONTAINER'): if not self.ssh_hosts: print('ERROR: --hpc-host must be set when using HPC jobs within apptainer') sys.exit(1) - if not self.options.hpc_pre_source: - default_pre_source = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_source') - self.options.hpc_pre_source = default_pre_source - print(f'INFO: Setting --hpc-pre-source={default_pre_source}') else: if self.options.hpc_apptainer_bindpath: print('ERROR: --hpc-apptainer-bindpath is unused when not executing with apptainer') @@ -181,9 +181,6 @@ def __init__(self, harness, params): print('ERROR: --hpc-apptainer-no-home is unused when not executing with apptainer') sys.exit(1) - if self.options.hpc_pre_source and not os.path.exists(self.options.hpc_pre_source): - print(f'ERROR: --hpc-pre-source path {self.options.hpc_pre_source} does not exist') - sys.exit(1) if self.options.hpc and self.options.pedantic_checks: print('ERROR: --hpc and --pedantic-checks cannot be used simultaneously') sys.exit(1) @@ -191,11 +188,6 @@ def __init__(self, harness, params): print('ERROR: --hpc and -j|--jobs cannot be used simultaneously') sys.exit(1) - # Load the pre-source if it exists - self.source_contents = None - if self.options.hpc_pre_source: - self.source_contents = open(self.options.hpc_pre_source, 'r').read() - # Load the submission template template_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_template') self.submission_template = open(template_path, 'r').read() @@ -219,6 +211,8 @@ def _getSSHClient(self, reconnect=False): This is threaded so that we can operate a few connections at once. """ + import paramiko + process = threading.get_ident() with self.ssh_clients_lock: if process not in self.ssh_clients or reconnect: @@ -345,6 +339,8 @@ def submitJob(self, job, hold, lock=True): Returns the resulting HPCJob. """ + import jinja2 + # If we're submitting this Job to be held, but the Job status isn't # currently held, it means that we've hit job in the submit_job_pool # that was submitted previously but has already been set to be skipped @@ -385,45 +381,32 @@ def submitJob(self, job, hold, lock=True): if os.path.exists(file): os.remove(file) - # Add MOOSE's python path for python scripts - moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')) - # Start building the jinja environment for the submission script - submission_env = {'SCHEDULER_NAME': self.getHPCSchedulerName(), - 'NAME': self.getHPCJobName(job), - 'CWD': tester.getTestDir(), - 'OUTPUT': output_file, - 'RESULT': result_file, - 'SUBMISSION_SCRIPT': submission_script, - 'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())), - 'PROJECT': self.options.hpc_project, - 'TEST_SPEC': tester.getSpecFile(), - 'TEST_NAME': tester.getTestNameShort(), - 'SUBMITTED_HOSTNAME': socket.gethostname(), - 'MOOSE_PYTHONPATH': moose_python, - 'NUM_PROCS': int(tester.getProcs(options)), - 'NUM_THREADS': int(tester.getThreads(options)), - 'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'), - 'JOB_ID_VARIABLE': self.getHPCJobIDVariable(), - 'PLACE': tester.getHPCPlace(options)} + submission_env = copy.deepcopy(self.submit_env) + submission_env.update({'NAME': self.getHPCJobName(job), + 'CWD': tester.getTestDir(), + 'OUTPUT': output_file, + 'RESULT': result_file, + 'SUBMISSION_SCRIPT': submission_script, + 'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())), + 'TEST_SPEC': tester.getSpecFile(), + 'TEST_NAME': tester.getTestNameShort(), + 'NUM_PROCS': int(tester.getProcs(options)), + 'NUM_THREADS': int(tester.getThreads(options)), + 'PLACE': tester.getHPCPlace(options)}) if hold: submission_env['HOLD'] = 1 - if self.options.hpc_pre_source: - submission_env['SOURCE_FILE'] = options.hpc_pre_source - if self.source_contents: - submission_env['SOURCE_CONTENTS'] = self.source_contents # Get the unescaped command command = tester.getCommand(options) - # Parse out the mpi command from the command if we're running in apptainer. - # We do this before any of the other escaping - APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER') - apptainer_command_prefix = '' - if APPTAINER_CONTAINER: + # Parse out the mpi command from the command if we're wrapping + # things around the mpi command + mpi_prefix = '' + if self.app_exec_prefix: mpi_command = self.parseMPICommand(command) if mpi_command: - apptainer_command_prefix = mpi_command + mpi_prefix = mpi_command command = command.replace(mpi_command, '') # Replace newlines, clean up spaces, and encode the command. We encode the @@ -442,8 +425,9 @@ def submitJob(self, job, hold, lock=True): # we need to manipulate the command like such # Original command: # New command: apptainer exec /path/to/image '' + APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER') if APPTAINER_CONTAINER: - job_command = apptainer_command_prefix + job_command = mpi_prefix # The root filesystem path that we're in so that we can be sure to bind # it into the container, if not already set @@ -451,18 +435,12 @@ def submitJob(self, job, hold, lock=True): bindpath = self.options.hpc_apptainer_bindpath else: bindpath = '/' + os.path.abspath(tester.getTestDir()).split(os.path.sep)[1] - # The apptainer command that will get sandwiched in the middle - apptainer_command = ['apptainer', 'exec', '-B', bindpath] - if self.options.hpc_apptainer_no_home: - apptainer_command.append('--no-home') - apptainer_command.append(APPTAINER_CONTAINER) - apptainer_command = shlex.join(apptainer_command) + submission_env['VARS']['APPTAINER_BINDPATH'] = bindpath + ',${APPTAINER_BINDPATH}' - # Append the apptainer command along with the command to be ran - job_command += f"{apptainer_command} {hpc_run} {command_encoded}" + serial_command = shlex.join(self.app_exec_prefix + self.app_exec_suffix) - # Set that we're using apptainer - submission_env['USING_APPTAINER'] = '1' + # Append the apptainer command along with the command to be ran + job_command += f"{serial_command} {hpc_run} {command_encoded}" # Not in apptainer, so we can just use the escaped command as is else: job_command = f'{hpc_run} {command_encoded}' @@ -952,8 +930,6 @@ def appendResultFooter(self, stats): def appendResultFileHeader(self): entry = {'scheduler': self.options.hpc, - 'pre_source_file': self.options.hpc_pre_source, - 'pre_source': self.source_contents, 'hosts': self.options.hpc_host if isinstance(self.options.hpc_host, list) else [self.options.hpc_host]} return {'hpc': entry} @@ -971,3 +947,79 @@ def callHPCShouldRetry(self, pool_type, result: str): retry a command given a failure with a certain result. """ return False + + def setupRunEnvironment(self, harness): + """ + Sets up the run environment for all HPC jobs + """ + hpc_cluster = harness.queryHPCCluster(self.ssh_hosts[0]) + + # HPC containerized module that we're in, if any + module_name = os.environ.get('CONTAINER_MODULE_NAME') + # Container that we're in, if any + apptainer_container = os.environ.get('APPTAINER_CONTAINER') + + # Base submission environment + submit_env = {# Name of the scheduler + 'SCHEDULER_NAME': self.getHPCSchedulerName(), + # Project to submit to within the cluster scheduler + 'PROJECT': self.options.hpc_project, + # Ending comment for output files + 'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'), + # Env var on the compute node that contains the HPC scheduler JOB id + 'JOB_ID_VARIABLE': self.getHPCJobIDVariable(), + # Modules to load + 'LOAD_MODULES': [], + # Environment variables to set before loading the modules + 'PRE_MODULE_VARS': {}, + # Environment variables to set after loading the modules + 'VARS': {}, + # The host the HPC job was submitted from + 'SUBMITTED_HOSTNAME': socket.gethostname(), + # Whether or not we're using apptainer + 'USING_APPTAINER': apptainer_container is not None} + + # The prefix and suffix to wrap around what we're actually + # running within the HPC job + app_exec_prefix = [] + app_exec_suffix = [] + + # If running on INL HPC, minimize the bindpath; this is a configuration + # option for the moose-dev-container module on INL HPC + if hpc_cluster is not None: + submit_env['PRE_MODULE_VARS']['MOOSE_DEV_CONTAINER_MINIMAL_BINDPATH'] = '1' + + # Pass apptainer options + if self.options.hpc_apptainer_no_home: + submit_env['VARS']['APPTAINER_NO_HOME'] = '1' + + # Add MOOSE's pythonpath + moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')) + submit_env['VARS']['PYTHONPATH'] = moose_python + ':${PYTHONPATH}' + + # We're in a loaded container module on INL HPC, use that environment + # and the associated -exec command to run apptainer + if module_name: + assert apptainer_container is not None + module_version = os.environ['CONTAINER_MODULE_VERSION'] + module_exec = os.environ['CONTAINER_MODULE_EXEC'] + required_modules = os.environ.get('CONTAINER_MODULE_REQUIRED_MODULES', '').split(' ') + modules = required_modules + [f'{module_name}/{module_version}'] + submit_env['LOAD_MODULES'] = modules + app_exec_prefix = [module_exec] + # We're in a container without the container module environment, use + # a direct apptainer exec command + elif apptainer_container: + # If on INL HPC, use the wrapped environment + if hpc_cluster is not None: + submit_env['LOAD_MODULES'] = hpc_cluster.apptainer_modules + app_exec_prefix = ['apptainer', 'exec'] + app_exec_suffix = [apptainer_container] + + if submit_env['LOAD_MODULES']: + print(f'INFO: Using modules "{" ".join(submit_env["LOAD_MODULES"])}" for HPC environment') + if app_exec_prefix: + exec_combined = app_exec_prefix + app_exec_suffix + print(f'INFO: Using "{" ".join(exec_combined)}" as HPC execution wrapper') + + return submit_env, app_exec_prefix, app_exec_suffix diff --git a/python/TestHarness/schedulers/hpc_source b/python/TestHarness/schedulers/hpc_source deleted file mode 100644 index abb65f042821..000000000000 --- a/python/TestHarness/schedulers/hpc_source +++ /dev/null @@ -1,2 +0,0 @@ -export MOOSE_DEV_CONTAINER_MINIMAL_BINDPATH=1 -module load use.moose moose-dev-container-openmpi diff --git a/python/TestHarness/schedulers/hpc_template b/python/TestHarness/schedulers/hpc_template index 730ff8e66184..e382aefa880c 100644 --- a/python/TestHarness/schedulers/hpc_template +++ b/python/TestHarness/schedulers/hpc_template @@ -31,13 +31,15 @@ # Exit on failure set -e -{%- if SOURCE_FILE is defined %} -# Loaded from {{ SOURCE_FILE }} -{{ SOURCE_CONTENTS }} +{%- for key, val in PRE_MODULE_VARS.items() %} +export {{ key }}="{{ val }}" +{%- endfor %} +{%- if LOAD_MODULES is defined %} +module load {{ " ".join(LOAD_MODULES) }} {%- endif %} - -# Add MOOSE's python path for python scripts -export PYTHONPATH={{ MOOSE_PYTHONPATH }}:${PYTHONPATH} +{%- for key, val in VARS.items() %} +export {{ key }}="{{ val }}" +{%- endfor %} # Print a useful header echo "TestHarness {{ SCHEDULER_NAME }} job on $(hostname) in job ${{ JOB_ID_VARIABLE }}"