-
Notifications
You must be signed in to change notification settings - Fork 3
/
pynodelauncher.py
134 lines (115 loc) · 4.36 KB
/
pynodelauncher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import argparse
# Running subprocess is purpose of this tools, no security concern here.
import subprocess # nosec
import sys
from mpi4py import MPI
def print_message(*args, **kwargs):
"""Print informative message"""
print(*args, **kwargs, flush=True)
def print_error(*args, **kwargs):
"""Print error message"""
print(*args, **kwargs, file=sys.stderr, flush=True)
def cli_parser():
"""Create CLI parser"""
parser = argparse.ArgumentParser(description="Launch subprocesses using MPI")
parser.add_argument(
"--verbose",
action="store_true",
help="output more messages about the task scheduling",
)
parser.add_argument("file", help="file with list of tasks (commands to execute)")
return parser
def main():
# MPI Initialization
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
verbose = False
if rank == 0:
args = cli_parser().parse_args()
filename = args.file
verbose = args.verbose
verbose = comm.bcast(verbose, root=0)
if size == 1:
sys.exit("Number of processes to run to be greater than 1")
num_lines = 0
if rank == 0:
with open(filename) as task_file:
lines = task_file.readlines()
lines = [line for line in lines if line] # skip empty lines
num_lines = len(lines)
num_lines = comm.bcast(num_lines, root=0)
if rank == 0:
if num_lines < size - 1:
sys.exit(
f"The number of provided commands ({num_lines}) is less"
f" than the number of MPI worker tasks ({size - 1})"
)
else:
if num_lines < size - 1:
sys.exit()
if rank == 0:
pending_tasks = 0
sent_tasks = 0
# Read the rest, send after each read
for i in range(1, size):
line = lines[sent_tasks]
comm.send(line, dest=i, tag=0)
pending_tasks += 1
sent_tasks += 1
if verbose:
print_message(
f"Pending tasks: {pending_tasks}, sent tasks: {sent_tasks}"
f" (from total: {num_lines})"
)
# At this point, there is a task for each PE
print_message(
f"The first {sent_tasks} tasks have been sent,"
f" {num_lines - sent_tasks} are queued"
)
# Wait for results, which can be from any source.
while True:
status = MPI.Status()
# TODO: Maybe this should be less aggressive in waiting and consume less CPU
result = comm.recv(source=MPI.ANY_SOURCE, tag=0, status=status)
free_proc = status.Get_source() # Which PE is free to request a new task
if verbose:
print_message(f"Worker {free_proc} finished its task")
if sent_tasks < num_lines:
line = lines[sent_tasks]
comm.send(line, dest=free_proc, tag=0)
sent_tasks += 1
print_message(f"Sent task {sent_tasks} of {num_lines}")
else:
# All tasks have been sent - wait for all the results.
pending_tasks -= 1
print_message(f"Tasks still running: {pending_tasks}")
# If all the tasks are complete, exit.
if pending_tasks == 0:
break
# When all the tasks are complete, tell the workers there will be no more
# messages.
for i in range(1, size):
command = "QUIT"
comm.send(command, dest=i, tag=0)
print_message(f"Sent quit message to worker {i}")
else:
while True:
# Other processors receive inputs from proc0
command = comm.recv(source=0, tag=0)
if command == "QUIT":
break
# No security concern here. The point is to execute arbitrary user code.
completed_process = subprocess.run(
command, shell=True, check=False # nosec
)
result = completed_process.returncode
if result:
print_error(
f"Command ended with non-zero return code {result}: {command}"
)
if verbose:
print_message(f"Worker {rank} completed command: {command}")
comm.send(result, dest=0, tag=0)
if __name__ == "__main__":
main()