Skip to content

Simple flows

Thibaut Lamadon edited this page Jun 11, 2022 · 3 revisions

Gather

This is a simple dependency such that A -> C and B -> C, ie we can run A and B concurently, but C needs for both to have run first. This just means that we need to await both tasks before running C. It looks like this:

task_A = Task(cmd = "do something")
task_B = Task(cmd = "do something")
task_C = Task(cmd = "do something")

# we await A and B
await bag(task_A, task_B)

# only then we await C
await task_C

The bag command allows to express that A and B can be run concurrently (in any order, or at the same time), however we wait for both to finish before we call task C.

Split

Let's say we have a task that generates 10 files that can then be processed concurrently by other tasks.

# we run the first task
task_A = Task(cmd = "generates 10 files")
await task_A

# we then run the list of processing concurently
tasks_step2 = [ Task(cmd = f"process file{i}") for i in range"(10)]
await bag(*tasks_step2)

Split automaticaly

We can use the outputs argument of Task to collect the generated file automatically.

# we run the first task
task_A = Task(
    cmd = "generates 10 files",
    output = [f"taskA_file{i}" for i in range(10)])
await task_A

# we then run the list of processing concurently
tasks_step2 = [ Task(cmd = f"process {file}") for file in taskA.get_outputs()]
await bag(*tasks_step2)
Clone this wiki locally