A middleware for Dramatiq (for Django) that keeps track of task state only when you need it to.
When using the term "task" in the documentation: that would generally refer
to the task model in this package. It has nothing to do with Dramatiq or the
django_dramatiq
package except that the Task
model is an abstraction of
a Dramatiq task. Therefore, this package only operates on the Task
model
and not Dramatiq tasks.
-
Install dramatiq-taskstate via pip:
pip install dramatiq-taskstate
-
Add
taskstate
anddjango.contrib.postgres
to yourINSTALLED_APPS
in your project settings.py file:INSTALLED_APPS = [ 'django_dramatiq', '...', 'django.contrib.postgres', '...', 'taskstate', ]
-
Run migrate:
python manage.py migrate
-
Include the middleware in your
django_dramatiq
configuration:DRAMATIQ_BROKER = { 'BROKER': '...', 'OPTIONS': { # ... }, 'MIDDLEWARE': [ # ... 'taskstate.middleware.StateMiddleware', ] }
-
Add a
for_state
parameter to Dramatiq actors that need task state to be tracked. The middleware will ignore any tasks that don't have this argument. Also remember that all values in thefor_state
dictionary must be JSON serializable.@dramatiq.actor def my_actor(arg, for_state={}): pass
-
Then, when sending a new task to Dramatiq: the
for_state
dictionary can contain any of the following keys:'for_state': { 'user_pk': user.pk, 'model_name': 'model', 'app_name': 'app', 'description': 'description', }
-
Each time a task's status is updated a
task_changed
signal is dispatched which can be handled like this:from django.dispatch import receiver from taskstate.middleware import StateMiddleware from taskstate.signals import task_changed @receiver(task_changed, sender=StateMiddleware) def handle_task_changed(sender, task, **kwargs): pass
Keep in mind that this is not a
post_save
signal -- it only fires for status updates.
Of course, a common case with background tasks is that the progress/state of a
task needs to be displayed to a user somehow. This package includes a
WebsocketConsumer
that can be used with django-channels to check the
status of a task. Check the flowchart in the root of the repo for more
information on how this works.
Check the get_task_status.js
file in the taskstate/static
directory for
an example of how to send a request via websockets to get/monitor the task
status/progress. Also, as shown in the flowchart in the root of the repo,
both task_changed
-- which only handles when the task object's status is
updated, i.e, enqueued, done, etc. -- and post_save
signals are handled.
Routing is included for django-channels. Make sure to use the URLRouter for your django-channels configuration. You can send data for the websocket to the following route:
/ws/get-task-status/
Or, create your own route:
from django.urls import re_path
from taskstate.consumers import CheckTaskStatus
websocket_urlpatterns = [
re_path(r'^ws/custom-route-task-status/$', CheckTaskStatus.as_asgi()),
]
Also remember to add the routes to your django-channels router, for example:
application = ProtocolTypeRouter({
'http': django_asgi_app,
'websocket': AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
taskstate.routing.websocket_urlpatterns,
)
),
),
})
A default template is included to render tasks in the UI -- use the following in your templates (check the template to see which context variables to use):
{% include 'taskstate/task_list.html' %}
The above template will merely render a list of tasks, however, to check/monitor the statuses of those tasks include the default script in your HTML before the closing body tag:
<script src="{% static 'taskstate/get_task_status.js' %}" charset="utf-8"></script>
See the task_progress_example.py
file in the examples directory in the root of
the repo for an example of how to update the progress of a task. Note that
some of the details there are specific to Dramatiq itself.
A task can only be marked as seen when it is complete. The seen status of a set of tasks can be set through another django-channels route:
/ws/set-task-seen/
Sending a list of task ID's to this route will automatically mark all
completed tasks from the list of ID's as seen. This is handled in the default
JS script -- therefore, check the get_task_status.js
file for an example.
There is also an APS (Advanced Python Scheduler) periodic task that will
delete tasks older than 120 seconds for tasks that have been seen and
have a "final/completed" status like "skipped", "failed" or "done". To add
the cleanup_tasks
periodic job to APS:
from django.conf import settings
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from taskstate.tasks import cleanup_tasks
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_job(
cleanup_tasks.send,
trigger=CronTrigger(second='*/240'), # Every 240 seconds
max_instances=1,
replace_existing=True,
)
- enqueued
- delayed
- running
- failed
- done
- skipped
completed_tasks = Task.objects.completed()
To get all the tasks that have been recently seen and that have not been seen (including currently active tasks), use the following:
tasks = Task.objects.for_display()
This will show tasks that have been seen in the last 30 seconds. To only show tasks seen in the last 15 seconds use the following:
tasks = Task.objects.for_display(seconds_since_seen=15)
The clear_tasks
management command will delete all Task
objects currently
in the database irrespective of status.
python manage.py clear_tasks
- Python 3.6+
- Django 3.2+
- Only supports PostgreSQL because
django.contrib.postgres.fields.ArrayField
is used. This could be looked at in future.
This project follows semantic versioning (SemVer).
Check the root of the repo for these files.