Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework parallel doc example using CalcJob's #288

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 233 additions & 86 deletions docs/gallery/howto/autogen/parallel.py
Original file line number Diff line number Diff line change
@@ -1,148 +1,295 @@
"""
=======================
=====================
Run tasks in parallel
=======================
=====================
"""
# %%
# Introduction
# ============
# In this tutorial, you will learn how to run task in parallel.
#
# Load the AiiDA profile.
#

# In this tutorial, you will learn how to run tasks and WorkGraphs in parallel.
# When defining the dependencies WorkGraph by linking tasks the WorkGraph
# engine will automatically take care of parallelizing the independent tasks. One
# caveat is that we cannot use calcfunctions for this purpose as they all run
# in the same runner environment and therefore are blocking each other. For
# that reason we need to use `CalcJob`s that can be run in different runner
# environments and therefore can be run in parallel.

# Load the AiiDA profile.
from aiida import load_profile

load_profile()


# %%
# First workflow
# ==============
# Suppose we want to calculate ```(x + y) * z ``` in two steps. First, add `x` and `y`, then multiply the result with `z`. And `X` is a list of values. We want to calculate these in parallel.
#
# Create task
# ------------
# First, one should know that we can not launch a subprocess inside a `task` or a `calcfunction`. We need a create a `WorkGraph` to run tasksin parallel. And then treat this `WorkGraph` as a task.
#
# Parallel addition workflow
# ==========================
# Suppose we want to calculate ```x + y + u + v``` in a parallel, instead of
# computing sequentially ```(((x + y) + u) + v)``` we compute it like
# ```((x + y) + (u + v))``` to compute ```x + y``` and ```u + v``` in parallel.
# aiida-core already provides a ArithmeticAddCalculation CalcJob for performing
# addition which we will use it for this example

from aiida_workgraph import WorkGraph, task
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
from aiida.orm import InstalledCode, load_computer, load_code, load_node
from aiida.common.exceptions import NotExistent

# The ArithmeticAddCalculation needs to know where bash is stored
try:
code = load_code("add@localhost") # The computer label can also be omitted here
except NotExistent:
code = InstalledCode(
computer=load_computer("localhost"),
filepath_executable="/bin/bash",
label="add",
default_calc_job_plugin="core.arithmetic.add",
).store()

wg = WorkGraph("parallel")
x, y, u, v = (1, 2, 3, 4)
add_xy = wg.add_task(ArithmeticAddCalculation, name="add_xy", x=x, y=y, code=code)
add_xy.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds
add_uv = wg.add_task(ArithmeticAddCalculation, name="add_uv", x=u, y=v, code=code)
add_uv.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds
add_xyuv = wg.add_task(
ArithmeticAddCalculation,
name="add_xyuv",
x=add_xy.outputs["sum"],
y=add_uv.outputs["sum"],
code=code,
)
# %%
# We can verify that the tasks add_xy and add_uv are independent from each other
# and therefore will be run automatically in parallel.

wg.to_html()

from aiida_workgraph import task, WorkGraph

# define multiply task
@task.calcfunction()
def multiply(x, y):
return x * y
# %%
# Running workgraph

wg.submit(wait=True)

# Create a WorkGraph as a task
@task.graph_builder()
def multiply_parallel(X, y):
wg = WorkGraph()
# here the task `multiply` is created and will run in parallel
for key, value in X.items():
wg.add_task(multiply, name=f"multiply_{key}", x=value, y=y)
return wg
# %%
# We look at the ctime (the time of creation when submitted/run) and the mtime (the time the task has been last modified which is when its state changes to finish).
print("add_xy created at:", add_xy.ctime.time(), "finished at:", add_xy.mtime.time())
print("add_uv created at:", add_uv.ctime.time(), "finished at:", add_uv.mtime.time())

# %%
# We can see that both CalcJob's have been created almost at the same time

# %%
# Create the workflow
# ---------------------
# Comparison with a calcfunction
# ------------------------------
#

from aiida_workgraph import WorkGraph
from aiida.orm import Int, List

X = {"a": Int(1), "b": Int(2), "c": Int(3)}
y = Int(2)
z = Int(3)
wg = WorkGraph("parallel_tasks")
multiply_parallel1 = wg.add_task(multiply_parallel, name="multiply_parallel1", X=X, y=y)
@task.calcfunction()
def add(x, y, sleep):
import time

time.sleep(sleep.value)
return x + y


wg = WorkGraph("parallel")
x, y, u, v = (1, 2, 3, 4)
add_xy = wg.add_task(add, x=x, y=y, sleep=3)
add_uv = wg.add_task(add, x=x, y=y, sleep=3)
add_xyuv = wg.add_task(
add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0
)

wg.to_html()

# %%

wg.submit(wait=True)

# %%
# Printing timings

print("add_xy created at", add_xy.ctime.time(), "finished at", add_xy.mtime.time())
print("add_uv created at", add_uv.ctime.time(), "finished at", add_uv.mtime.time())

# %%
# Check the status and results
# -----------------------------
#
# We can see that the calcfunctions have been run with a 3 seconds delay


# %%
# Parallelizing WorkGraphs
# ========================
# We will parallelize a workgraph by two ways, one time we submit all workgraphs,
# the other time we use the graph builder to submit once the whole workflow.


# This is our initial WorkGraph we want to parallelize
@task.graph_builder(
inputs=[{"name": "integer"}], outputs=[{"name": "sum", "from": "sum_task.result"}]
)
def add10(integer):
wg = WorkGraph()
code = load_code("add@localhost") # code needs to loaded in the graph builder
add = wg.add_task(
ArithmeticAddCalculation, name="sum_task", x=10, y=integer, code=code
)
add.set({"metadata.options.sleep": 3})
return wg

print("State of WorkGraph: {}".format(wg.state))

# %%
# Generate node graph from the AiiDA process:
#

from aiida_workgraph.utils import generate_node_graph
wgs = []
tasks = []
for i in range(2):
wg = WorkGraph(f"parallel_wg{i}")
tasks.append(wg.add_task(add10, name="add10", integer=i))
wgs.append(wg)

generate_node_graph(wg.pk)
# We use wait=False so we can continue submitting
wgs[0].submit() # do not wait (by default), so that we can continue to submit next WG.
wgs[1].submit(wait=True)
# we wait for all the WorkGraphs to finish
wgs[0].wait()

# %%
# Second workflow: gather results
# ================================
# Now I want to gather the results from the previous `multiply_parallel` tasks and calculate the sum of all their results.
# Let's update the `multiply_parallel` function to `multiply_parallel_gather`.
#
# We print the difference between the mtime (the time the WorkGraph has been
# last time changed) and the ctime (the time of creation). Since the
# WorkGraph's status is changed when finished, this give us a good estimate of
# the running time.
print(
"add10 task of WG0 created:",
load_node(tasks[0].pk).ctime.time(),
"finished:",
load_node(tasks[0].pk).mtime.time(),
)
print(
"add10 task of WG1 created:",
load_node(tasks[1].pk).ctime.time(),
"finished:",
load_node(tasks[1].pk).mtime.time(),
)


# %%
# Using graph builder
# -------------------


@task.graph_builder(outputs=[{"name": "result", "from": "context.mul"}])
def multiply_parallel_gather(X, y):
# This graph_builder runs the add10 over a loop and its
@task.graph_builder()
def parallel_add(nb_iterations):
wg = WorkGraph()
for key, value in X.items():
multiply1 = wg.add_task(multiply, x=value, y=y)
# add result of multiply1 to `self.context.mul`
# self.context.mul is a dict {"a": value1, "b": value2, "c": value3}
multiply1.set_context({"result": f"mul.{key}"})
for i in range(nb_iterations):
wg.add_task(add10, name=f"add10_{i}", integer=i)
return wg


@task.calcfunction()
# the input is dynamic, we must use a variable kewword argument. **datas
def sum(**datas):
from aiida.orm import Float
# Submitting a parallel that adds 10 two times to different numbers
wg = WorkGraph(f"parallel_graph_builder")
parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2)
wg.to_html()

# %%
wg.submit(wait=True)

# %%
parallel_add_wg = WorkGraph.load(parallel_add_task.pk)
add10_0_task = parallel_add_wg.tasks["add10_0"]
add10_1_task = parallel_add_wg.tasks["add10_1"]
print(
"add10_0 task created:",
add10_0_task.ctime.time(),
"finished:",
add10_0_task.mtime.time(),
)
print(
"add10_1 task created:",
add10_1_task.ctime.time(),
"finished:",
add10_1_task.mtime.time(),
)

total = 0
for key, data in datas.items():
total += data
return Float(total)
# %%
# We can see that the time is less than 6 seconds which means that the two additions
# were performed in parallel

# %%
# Increasing number of daemon workers
# -----------------------------------
# Since each daemon worker can only manage one WorkGraph (handling the results)
# at a time, one can experience slow downs when running many jobs that can be
# run in parallel. The optimal number of workers depends highly on the jobs
Comment on lines +220 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Since each daemon worker can only manage one WorkGraph (handling the results)
# at a time, one can experience slow downs when running many jobs that can be
# run in parallel. The optimal number of workers depends highly on the jobs
# One can experience slow downs when running many jobs (e.g., 100 jobs) that can be
# run in parallel. The optimal number of workers depends highly on the jobs

# that are run.

from aiida.engine.daemon.client import get_daemon_client

# %%
# Now, let's create a `WorkGraph` to use the new task:
#
# We run the 10 iterations with one daemon

from aiida_workgraph import WorkGraph
from aiida.orm import Int, List
client = get_daemon_client()
print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}")
wg = WorkGraph("wg_daemon_worker_2")
parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=10)
wg.to_html()

X = {"a": Int(1), "b": Int(2), "c": Int(3)}
y = Int(2)
z = Int(3)
wg = WorkGraph("parallel_tasks")
multiply_parallel_gather1 = wg.add_task(multiply_parallel_gather, X=X, y=y)
sum1 = wg.add_task(sum, name="sum1")
# wg.add_link(add1.outputs[0], multiply_parallel_gather1.inputs["uuids"])
wg.add_link(multiply_parallel_gather1.outputs[0], sum1.inputs[0])
# %%

wg.submit(wait=True)

# %%
# Get the result of the tasks:
#
# And look at the total time and see the overhead costs.

print(
"Time for running parallelized graph builder",
parallel_add_task.mtime - parallel_add_task.ctime,
)

print("State of WorkGraph: {}".format(wg.state))
print("Result of task add1: {}".format(wg.tasks["sum1"].outputs["result"].value))


# %%
# Generate node graph from the AiiDA process:
#
# We rerun it now with 2 damon workers
client.increase_workers(1)
print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}")
wg = WorkGraph("wg_daemon_worker_2")
parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=10)
wg.to_html()

# %%
wg.submit(wait=True)

# %%
print(
"Time for running parallelized graph builder with 2 daemons",
parallel_add_task_2.mtime - parallel_add_task_2.ctime,
)

# %%
# The time has not change as the handling of the CalcJobs. If one can increase
# the number of iterations to see a more significant difference.

from aiida_workgraph.utils import generate_node_graph

generate_node_graph(wg.pk)
# %%
# Reset back to one worker
client.decrease_workers(1)

# %%
# You can see that the outputs of `multiply_parallel_gather` workgraph is linked to the input of the `sum` task.
# Maximum number of active WorkGraphs
# -----------------------------------
# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time.
# To increase that limit one can set this variable to a higher value.
Comment on lines +279 to +280
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time.
# To increase that limit one can set this variable to a higher value.
# Be aware that for the moment, AiiDA can only run 200 processes (WorkGraph, CalcJob etc) at the same time.
# To increase that limit, one can set this variable to a higher value.

#
# .. code-block:: bash
#
# verdi config set daemon.worker_process_slots 200
# verdi daemon restart
#
# For more information about improving the performance please refer to the
# `"Tuning performance" section in the official AiiDA documentation <https://aiida.readthedocs.io/projects/aiida-core/en/latest/howto/installation.html#tuning-performance>`_

# %%
# Further reading
# ---------------
# Now you learned how to run tasks in parallel you might want to know how to
# aggregate the results of all these parallel tasks (e.g. taking the mean of
# all computed values). For this you can further read `how to aggregate outputs <aggregate.html>`_
Loading