Skip to content

Commit

Permalink
threadpoolexecutor for distributing MPI (#230)
Browse files Browse the repository at this point in the history
* multi config analysis

* add tests

* tests once again

* Update continous_integration.yml

* Update multi_config_analysis.py

* Rename testing function

* change to relative import

* add logic for checking is args is None

* working multi config

* working multi config

* only 2 cores

* Update continous_integration.yml

* Update continous_integration.yml

* Update continous_integration.yml

* Update continous_integration.yml

* Update continous_integration.yml

* remove file deletion

---------

Co-authored-by: Peter T. H. Pang <tsunhopang@gmail.com>
  • Loading branch information
sahiljhawar and tsunhopang authored Oct 21, 2024
1 parent e036771 commit 3b0b3b0
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/continous_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ jobs:
run: |
sudo apt-get update
sudo apt-get install -y texlive texlive-latex-extra texlive-fonts-recommended dvipng cm-super
- name: Install Python dependencies
run: |
conda install mpi4py
python -m pip install --upgrade git+https://github.com/bitranox/wrapt_timeout_decorator.git
python -m pip install pytest pytest-cov flake8 pytest-aiohttp sqlparse freezegun PyJWT joblib coveralls
python -m pip install -r ml_requirements.txt -r grb_requirements.txt -r tf_requirements.txt -r sklearn_requirements.txt
Expand Down
75 changes: 75 additions & 0 deletions example_files/multi_config_analysis/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
Me2017_1:
model: Me2017
outdir: outdir_1
label: injection
prior: ../../priors/Me2017.prior
tmin: 0.1
tmax: 20
dt: 0.5
error-budget: 1
nlive: 40
Ebv-max: 0
injection: ./injection.json
injection-num: 0
injection-outfile: outdir_1/lc.csv
generation-seed: 42
filters: ztfg,ztfr,ztfi
remove-nondetections: True
#process-per-config: 4

Me2017_2:
model: Me2017
outdir: outdir_2
label: injection
prior: ../../priors/Me2017.prior
tmin: 0.1
tmax: 20
dt: 0.5
error-budget: 1
nlive: 40
Ebv-max: 0
injection: ./injection.json
injection-num: 0
injection-outfile: outdir_2/lc.csv
generation-seed: 42
filters: ztfg,ztfr,ztfi
remove-nondetections: True
#process-per-config: 5

Me2017_3:
model: Me2017
outdir: outdir_3
label: injection
prior: ../../priors/Me2017.prior
tmin: 0.1
tmax: 20
dt: 0.5
error-budget: 1
nlive: 40
Ebv-max: 0
injection: ./injection.json
injection-num: 0
injection-outfile: outdir_3/lc.csv
generation-seed: 42
filters: ztfg,ztfr,ztfi
remove-nondetections: True
#process-per-config: 6

Me2017_4:
model: Me2017
outdir: outdir_4
label: injection
prior: ../../priors/Me2017.prior
tmin: 0.1
tmax: 20
dt: 0.5
error-budget: 1
nlive: 40
Ebv-max: 0
injection: ./injection.json
injection-num: 0
injection-outfile: outdir_4/lc.csv
generation-seed: 42
filters: ztfg,ztfr,ztfi
remove-nondetections: True
#process-per-config: 7
31 changes: 31 additions & 0 deletions example_files/multi_config_analysis/injection.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"injections": {
"__dataframe__": true,
"content": {
"simulation_id": [
0
],
"luminosity_distance": [
152.52372678074587
],
"beta": [
3.1674793717549354
],
"log10_kappa_r": [
1.9033736926068743
],
"timeshift": [
-1.640734750742769
],
"log10_vej": [
-1.0471069387482033
],
"log10_mej": [
-2.9158814967880233
],
"geocent_time": [
0.09662692256248354
]
}
}
}
107 changes: 107 additions & 0 deletions nmma/em/multi_config_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import yaml
import subprocess
from .analysis import get_parser
from pathlib import Path
import argparse
from concurrent.futures import ThreadPoolExecutor
import sys
import os


def get_parser_here():
parser = argparse.ArgumentParser(description="Multi config analysis script for NMMA.")

parser.add_argument(
"--config",
type=str,
help="Name of the configuration file containing parameter values.",
)

parser.add_argument(
"--parallel",
action="store_true",
default=False,
help="To run multiple configs in parallel",
)

parser.add_argument(
"-p",
"--process",
type=int,
help=(
"No of processess each configuration should have, if --parallel is set then process will be divided equally among"
" all configs, else each config will run sequentially with given no of process. Strictly required if"
" --process-per-config is not given"
),
)
parser.add_argument(
"--process-per-config",
type=int,
help=(
"If multiple configurations are given, how many MPI process should be assigned to each configuration. In the yaml"
" file, indicate the number of process for each configuration with the key 'process-per-config'. If not given, all"
" configurations will be run depending on the state and value of --parallel and --process. This takes precedence"
" over --process"
),
)

return parser


def run_cmd_in_subprocess(cmd):
subprocess.run(cmd)


def main(args=None):
parser = get_parser_here()
args, _ = parser.parse_known_args(namespace=args)

_parser = get_parser()
main_args = _parser.parse_args([])

yaml_dict = yaml.safe_load(os.path.expandvars(Path(args.config).read_text()))

total_configs = len(list(yaml_dict.keys()))
futures = []

with ThreadPoolExecutor() as executor:
for analysis_set in yaml_dict.keys():
params = yaml_dict[analysis_set]

if "process-per-config" in params or args.process is None:
processes = params["process-per-config"]
elif args.parallel and args.process is not None:
processes = args.process // total_configs
else:
processes = args.process

cmd = ["mpiexec", "-np", str(processes), "lightcurve-analysis"]

for key, value in params.items():
key = key.replace("-", "_")

if key == "process_per_config":
continue

if key not in main_args:
print(f"{key} not a known argument... please remove")
exit()
key = key.replace("_", "-")

if isinstance(value, bool) and value:
cmd.append(f"--{key}")
else:
cmd.append(f"--{key}")
cmd.append(str(value))

if not args.parallel:
print(f"{'#'*100}")
print(f"Running analysis set: {analysis_set} with {processes} processes")
run_cmd_in_subprocess(cmd)
print(f"{'#'*100}")
else:
future = executor.submit(run_cmd_in_subprocess, cmd)
futures.append(future)
if args.parallel:
for future in futures:
future.result()
6 changes: 6 additions & 0 deletions nmma/tests/data/multi_config_analysis/Me2017.prior
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
luminosity_distance = Uniform(minimum=0.0, maximum=200., name='luminosity_distance',latex_label='$D_L$')
beta = Uniform(name='beta', minimum=1., maximum=5., latex_label='$\\beta$')
log10_kappa_r = Uniform(name='log10_kappa_r', minimum=-1., maximum=2., latex_label='$\\log_{10}\\kappa_r$')
timeshift = Uniform(minimum=-2.0, maximum=1.0, name='trigger_time',latex_label='$t_0$')
log10_vej = Uniform(name='log10_vej', minimum=-2., maximum=-0.5, latex_label='$\\log_{10}v^{\\rm{ej}}$')
log10_mej = Uniform(name='log10_mej', minimum=-3., maximum=-0.5, latex_label='$\\log_{10}M^{\\rm{ej}}$')
35 changes: 35 additions & 0 deletions nmma/tests/data/multi_config_analysis/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
Me2017_1:
model: Me2017
outdir: ${WORKING_DIR}/outdir_1
label: injection
prior: ${WORKING_DIR}/data/multi_config_analysis/Me2017.prior
tmin: 0.1
tmax: 5
dt: 0.5
error-budget: 1
nlive: 40
Ebv-max: 0
injection: ${WORKING_DIR}/data/multi_config_analysis/injection.json
injection-num: 0
injection-outfile: ${WORKING_DIR}/outdir_1/lc.csv
generation-seed: 42
filters: ztfg,ztfr,ztfi
remove-nondetections: True

Me2017_2:
model: Me2017
outdir: ${WORKING_DIR}/outdir_2
label: injection
prior: ${WORKING_DIR}/data/multi_config_analysis/Me2017.prior
tmin: 0.1
tmax: 5
dt: 0.5
error-budget: 1
nlive: 40
Ebv-max: 0
injection: ${WORKING_DIR}/data/multi_config_analysis/injection.json
injection-num: 0
injection-outfile: ${WORKING_DIR}/outdir_2/lc.csv
generation-seed: 42
filters: ztfg,ztfr,ztfi
remove-nondetections: True
31 changes: 31 additions & 0 deletions nmma/tests/data/multi_config_analysis/injection.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"injections": {
"__dataframe__": true,
"content": {
"simulation_id": [
0
],
"luminosity_distance": [
152.52372678074587
],
"beta": [
3.1674793717549354
],
"log10_kappa_r": [
1.9033736926068743
],
"timeshift": [
-1.640734750742769
],
"log10_vej": [
-1.0471069387482033
],
"log10_mej": [
-2.9158814967880233
],
"geocent_time": [
0.09662692256248354
]
}
}
}
30 changes: 30 additions & 0 deletions nmma/tests/multi_config_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from argparse import Namespace
import os
import pytest
import shutil

from ..em import multi_config_analysis


WORKING_DIR = os.path.dirname(__file__)



@pytest.fixture(scope="module")
def args():
WORKING_DIR = os.path.dirname(__file__)
os.environ["WORKING_DIR"] = WORKING_DIR
config = os.path.join(WORKING_DIR, "data/multi_config_analysis/config.yaml")

args = Namespace(config=config, process=2, parallel=True)

return args


def test_analysis_multi(args):
multi_config_analysis.main(args)


def test_analysis_parallel(args):
args.parallel = False
multi_config_analysis.main(args)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ gwem-Hubble-estimate = "nmma.em.gwem_Hubble_estimate:main"
maximum-mass-constraint = "nmma.joint.maximum_mass_constraint:main"
convert-skyportal-lcs = "tools.convert_skyportal_lcs:main"
resample-grid = "tools.resample_grid:main"
multi-config-analysis = "nmma.em.multi_config_analysis:main"

[project.urls]
Homepage = "https://github.com/nuclear-multimessenger-astronomy/nmma"
Expand Down

0 comments on commit 3b0b3b0

Please sign in to comment.