Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to check all the tasks are completed #331

Open
emelpolaris opened this issue Jun 13, 2024 · 1 comment
Open

How to check all the tasks are completed #331

emelpolaris opened this issue Jun 13, 2024 · 1 comment

Comments

@emelpolaris
Copy link

I'm using taskiq_redis to distribute the tasks to multiple gpus.
I wanna know if all the tasks are completed or not.
Please let me know how I can this info.
Thanks.

@s3rius
Copy link
Member

s3rius commented Jun 13, 2024

The easiest way would be to start using pipelines.

https://github.com/taskiq-python/taskiq-pipelines

import asyncio
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from taskiq_pipelines import Pipeline, PipelineMiddleware

broker = (
    ListQueueBroker("redis://localhost:6379/0")
    # Here we add the PipelineMiddleware to the broker,
    # so we can use the pipelines in the broker
    .with_middlewares(PipelineMiddleware())
    # Here's the result backend for the broker,
    # It's required for pipelines to work, because
    # intermediate results should be stored somewhere
    .with_result_backend(
        RedisAsyncResultBackend(
            "redis://localhost:6379/1",
            keep_results=False,
        )
    )
)


@broker.task
async def my_task(a: int) -> int:
    await asyncio.sleep(1)
    return a * 2


@broker.task
async def generate_tasks(num: int) -> list[int]:
    return list(range(num))


# Here's the pipeline to run your set of tasks
# It will generate some required values for tasks and then
# map each element in parallel.
# 
# Also, we define the check_interval for my_task to be 1 second
# to minimize amount of requests to the result backend.
# You can granularly define check_interval for each task
# in the pipeline.
pipe = Pipeline(broker, generate_tasks).map(my_task, check_interval=1)


async def main():
    await broker.startup()
    # Pipeline itself is a task and therefore the interface is the same.
    # You can wait for the result as if it was a regular task.
    task = await pipe.kiq(10)
    res = await task.wait_result(check_interval=1)
    print(res.return_value)
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Or, you can use utility function to gather all tasks as in asyncio.gather, by running:

import asyncio
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from taskiq import gather as taskiq_gather

broker = ListQueueBroker("redis://localhost:6379/0").with_result_backend(
    RedisAsyncResultBackend(
        "redis://localhost:6379/1",
        keep_results=False,
    )
)


@broker.task
async def my_task(a: int) -> int:
    await asyncio.sleep(1)
    return a * 2


async def main():
    await broker.startup()
    tasks = []
    for i in range(10):
        tasks.append(await my_task.kiq(i))
    results = await taskiq_gather(*tasks, periodicity=1)
    for result in results:
        print(result.return_value)
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

To minimize amount of requests to the result_backend don't forget to specify delays between checks. Hope it helped you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants