Asynchronous tasks in Django with Django Q

Learn how to use Django Q for offloading asynchronous tasks in your Django applications.

Django Q

Requirements

To follow along you'll need:

  • a newer version of Python, ideally 3.6 or 3.7
  • Git

Setting up the project

Before starting off with the tutorial, make sure you have a Django project in place.

The tutorial assumes your project is called django_q_django.

Asynchronous tasks in Django with Django Q: the problem with synchronous code

The main issue for Python and Django is that they're synchronous. It's not a bad thing per se, and there are a lot of ways to circumvent it.

Python, on which Django builds on, is single threaded by nature. Single threaded means that the language interpreter can only run your code in sequence.

The practical implication is that any view in a Django application can get stuck if one or more operations take too much to complete.

To demonstrate the concept let's create a new Django application inside our project:

django-admin startapp demo_app

In this app we're going to define a view which returns a simple JSON response:

# demo_app/views.py

from django.http import JsonResponse

def index(request):
    json_payload = {
        "message": "Hello world!"
    }
    return JsonResponse(json_payload)

And let's also create the corresponding url:

# demo_app/urls.py

from django.urls import path
from .views import index

urlpatterns = [
    path("demo-app/", index)
]

Don't forget to wire up the url for the new app:

# django_q_django/urls.py

from django.contrib import admin
from django.urls import path, include
from .settings.base import ADMIN_URL

urlpatterns = [
    path(f'{ADMIN_URL}/', admin.site.urls),
    # the new url
    path("", include("demo_app.urls"))
]

And finally activate the app:

# django_q_django/settings.py

INSTALLED_APPS = [
    # omitted for brevity
    'demo_app.apps.DemoAppConfig'
]

Now to simulate a blocking event in the view we're going to use sleep from the time module, part of the Python standard library:

from django.http import JsonResponse
from time import sleep

def index(request):
    json_payload = {
        "message": "Hello world!"
    }
    sleep(10)
    return JsonResponse(json_payload)

Run the development server, head over http://127.0.0.1:8000/demo-app/ and you can see the view hanging for 10 seconds before returning to the user.

Now, this is a delay created on purpose, but in a real application the block could happen for a number of reasons:

  • I/O bound operations taking too long (network delay, interactions with file systems and databases)
  • CPU bound operations (data crunching and mathematical operations)

Even if it's a contrived example you can see why it's crucial to offload long running tasks in a web application.

Django Q was born with this goal in mind. In the next sections we'll finally put our hands on it.

If you like watching videos here's the first part of this tutorial:

How about asynchronous Django?

Django 3.1 shipped with async views. For simpler use cases, you can use async views and asyncio.create_task() for calling an external API or crunching some numbers:

import asyncio
from django.http import JsonResponse
from asgiref.sync import sync_to_async
from time import sleep

@sync_to_async
def crunching_stuff():
    sleep(10)
    print("Woke up after 10 seconds!")

async def index(request):
    json_payload = {
        "message": "Hello world"
    }
    """
    or also
    asyncio.ensure_future(crunching_stuff())
    loop.create_task(crunching_stuff())
    """
    asyncio.create_task(crunching_stuff())
    return JsonResponse(json_payload)

This is a nice alternative to bringing in task queues for simpler tasks, but I don't feel it's a reliable workaround for more intensive use cases.

Preparing the Heroku app and the Redis instance

In this section we'll prepare the Heroku project. I'm using Heroku here because you may want to deploy to production later, also because they offer the Redis add-on for free.

If you're new to Redis, it's an in-memory database, can be used as a cache and as a message broker.

A message broker is more or less like a post office box: it takes messages, holds them in a queue, and folks from around the city can retrieve these messages later.

If you're interested in how Django Q uses brokers check out this page.

Still in the project folder initialize a Git repo:

git init

Then create a new Heroku app. I'm going to add two add-ons:

  • heroku-postgresql which is more robust than the default sqlite for production
  • heroku-redis which will give us the Redis instance

If you haven't got the Heroku CLI and an Heroku account go create one, install the CLI and come back later.

Otherwise follow along with me and create the app:

heroku create --addons=heroku-postgresql,heroku-redis

Once done give Heroku a couple of minutes and then run:

heroku config:get REDIS_URL

This command will reveal REDIS_URL, an environment variable with the credentials for the Redis instance.

Take note of it and head over the next section!

Asynchronous tasks in Django with Django Q: installing and running Django Q

Let's install Django Q and the Redis client library (the client is needed by the Redis broker for Django Q):

pip install django-q redis

Once done activate Django Q in the list of installed apps:

INSTALLED_APPS = [
    # omit
    # add Django Q
    'django_q'
]

Now reveal the Redis Heroku credentials:

heroku config:get REDIS_URL

You should see a string like this:

redis://h:p948710311f252a334c3b21cabe0bd63f943f68f0824cd41932781e7793c785bf@ec2-52-18-11-1.eu-west-1.compute.amazonaws.com:9059

Before the @ you'll find the password:

p948710311f252a334c3b21cabe0bd63f943f68f0824cd41932781e7793c785bf

After the @ there's the host:

ec2-52-18-11-1.eu-west-1.compute.amazonaws.com

And 9059 is the port. Note that the credentials will be different for you, don't use mine!

(Needless to say, by the time you read this article these credentials will be gone.)

Now configure Django Q in django_q_django/settings.py. Fill host, port, and password with your credentials:

Q_CLUSTER = {
    'name': 'django_q_django',
    'workers': 8,
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'save_limit': 250,
    'queue_limit': 500,
    'cpu_affinity': 1,
    'label': 'Django Q',
    'redis': {
        'host': 'ec2-52-18-11-1.eu-west-1.compute.amazonaws.com',
        'port': 9059,
        'password': 'p948710311f252a334c3b21cabe0bd63f943f68f0824cd41932781e7793c785bf',
        'db': 0, }
}

You might wonder why I'm not using REDIS_URL as it is. The reason is that Django Q wants credentials in a dictionary.

I didn't have time to check if is the Python Redis client imposing this limitation, maybe I'll write a patch for both in the future. It was a limitation of Django Q, hope I'll have time to open a PR I opened a pull request which got merged, and now you can use a Redis url:

Q_CLUSTER = {
    'name': 'django_q_django',
    # omitted for brevity  
    'label': 'Django Q',
    'redis': 'redis://h:asdfqwer1234asdf@ec2-111-1-1-1.compute-1.amazonaws.com:111'
}

(When running the project in production you may want to switch to using environment variables. See the base configuration for learning how to use env).

Once you're done run the migrations (Django Q needs to create its tables in the database):

python manage.py migrate

At this point you're ready to run the Django Q cluster with:

python manage.py qcluster

If everything goes well you should see this:

Django Q cluser

Well done! In the next section we'll create our first asynchronous task.

What is the Django Q cluster? Check this out.

If you like watching videos here's the second part of this tutorial:

Asynchronous tasks in Django with Django Q: async_task

Worth doing a quick recap of what we covered so far:

  • we created a Django project
  • we created a Django application
  • we installed Django Q and the Redis client
  • we created an Heroku project and a Redis instance
  • finally we configured Django Q

To test that Django Q could connect to Redis I launched:

python manage.py qcluster

With the project in place let's finally see an example of Django Q in action. Remember your view?

# demo_app/views.py

from django.http import JsonResponse
from time import sleep

def index(request):
    json_payload = {
        "message": "Hello world!"
    }
    sleep(10)
    return JsonResponse(json_payload)

Remove the time import and create a new file in demo_app/services.py (the name of this file is totally up to you).

In this new module we're going to define a function, sleep_and_print:

# demo_app/services.py

from time import sleep

def sleep_and_print(secs):
    sleep(secs)
    print("Task ran!")

In the view instead we'll borrow async_task from Django Q:

from django.http import JsonResponse
from django_q.tasks import async_task


def index(request):
    json_payload = {
        "message": "hello world!"
    }
    """
    TODO
    """
    return JsonResponse(json_payload)

async_task is the principal function you'll use with Django Q. It takes at least one argument, the function's module that you want to enqueue:

# example

async_task("demo_app.services.sleep_and_print")

The second group of arguments instead is any argument that the function is supposed to take. sleep_and_print in our example takes one argument, the seconds to wait before printing. That means for async_task:

# example

async_task("demo_app.services.sleep_and_print", 10)

That's enough to enqueue a task. Let's now mix our view with async_task.

Asynchronous tasks in Django with Django Q: enqueue your first task

Back to our view, with async_task imported, call it right after the return statement:

from django.http import JsonResponse
from django_q.tasks import async_task


def index(request):
    json_payload = {"message": "hello world!"}
    # enqueue the task
    async_task("demo_app.services.sleep_and_print", 10)
    #
    return JsonResponse(json_payload)

Now run the cluster:

python manage.py qcluster

Run the Django server:

python manage.py runserver

And finally make a call to your view, either from http://127.0.0.1:8000/demo-app/ or from the terminal:

curl http://127.0.0.1:8000/demo-app/

Now you should notice a couple of things. The Django dev server should log:

13:55:42 [Q] INFO Enqueued 1

The Django Q cluster should log something along these lines:

13:55:42 [Q] INFO Process-1:1 processing [juliet-mountain-august-alaska]

And after that you should see:

Task ran!

Here's my terminal:

Django Q async task

What happened here is that:

  1. the Django view responded immediately to the request
  2. Django Q saved the task (just a reference) in Redis
  3. Django Q ran the task

With this "architecture" the view does not remain stuck anymore. Brilliant.

Think about the use cases for this pattern. You can:

  • safely interact with the I/O
  • crunch data in the background
  • safely move out API calls from your views

and much more.

Watch the third part of the tutorial with an example of API call:

Asynchronous tasks in Django with Django Q: what's next?

In addition to async_task Django Q has the ability to schedule a task. A practical use case is do X every X days, much like a cron job. Or think about an on-boarding series of emails that most services send to a newly registered user.

Check the documentation to learn more.

Django Q supports other brokers in addition to Redis. Again, the docs are your friend.

Another neat feature of Django Q is the admin integration. Create a super user for your Django project, log in into admin, and you'll find all your tasks and schedules there.

If you don't need other brokers than Redis, django-rq might be a lightweight alternative to Django Q.

A practical use case: sending an email when a new model instance is created

To illustrate Django Q schedules we're going to create a new model named Person. This model has nothing to do with the Django user model and it's just for illustrating things out.

Let's say that when a new Person instance is created we want to send out an email every day. First let's create the model in demo_app/models.py:

from django.db import models


class Person(models.Model):
    name = models.CharField(max_length=100)
    surname = models.CharField(max_length=100)
    email = models.EmailField()
    schedule_id = models.IntegerField(default=0)

The model will hold a reference to the corresponding schedule. Don't forget to make the migrations and to migrate:

python manage.py makemigrations
python manage.py migrate

Now with Django Q schedules, which are available also as models, we can wire up the app so any time a Person is created, a new schedule is registered in Django Q (we could use schedule as a function, but in this case the model comes handy).

There are many ways to model this relationship. I could have used a one to one, but I don't feel that Schedule "is a" Person. Instead we're going to override save and delete on the Person model so that:

  • a new Schedule is created on save
  • the related Schedule is deleted on delete

Let's start with the saving phase (notice that I'm importing the Schedule model from Django Q):

from django.db import models
from django_q.models import Schedule


class Person(models.Model):
    name = models.CharField(max_length=100)
    surname = models.CharField(max_length=100)
    email = models.EmailField()
    schedule_id = models.IntegerField(default=0)

    def save(self, *args, **kwargs):
        # Create the schedule
        schedule = Schedule.objects.create(
            name=self.__str__(),
            func="demo_app.services.send_email",
            args=f"'{self.email}'",
            schedule_type=Schedule.DAILY,
        )
        # Save the model with the schedule id
        self.schedule_id = schedule.pk
        super().save(*args, **kwargs)

From now on any new Person will also have a related Schedule. To send out the email you can create a new function named send_email in demo_app/services.py:

from django.core.mail import send_mail


def send_email(to):
    send_mail(
        subject="Links of the day!",
        message="Links of the day ... TODO!",
        from_email="email@example.com",
        recipient_list=[to],
    )

And now here's the model with the deletion phase:

from django.db import models
from django_q.models import Schedule


class Person(models.Model):
    name = models.CharField(max_length=100)
    surname = models.CharField(max_length=100)
    email = models.EmailField()
    schedule_id = models.IntegerField(default=0)

    def save(self, *args, **kwargs):
        # Create the schedule
        schedule = Schedule.objects.create(
            name=self.__str__(),
            func="demo_app.services.send_email",
            args=f"'{self.email}'",
            schedule_type=Schedule.DAILY,
        )
        # Save the model with the schedule id
        self.schedule_id = schedule.pk
        super().save(*args, **kwargs)

    def delete(self, *args, **kwargs):
        # Delete the schedule
        Schedule.objects.get(pk=self.schedule_id).delete()
        # Delete the person
        super().delete(*args, **kwargs)

The Schedule associated with Person is deleted with Schedule.objects.get(pk=self.schedule_id).delete().

Asynchronous tasks in Django with Django Q: why not Celery?

Fun fact: Celery was created by a friend of mine. We were in high school together. Despite that I don't have much experience with Celery itself, but I always heard a lot of people complaining about it.

Check this out for a better perspective.

Thanks for reading!

Resources

Valentino Gagliardi

Hi! I'm Valentino! I'm a freelance consultant with a wealth of experience in the IT industry. I spent the last years as a frontend consultant, providing advice and help, coaching and training on JavaScript, testing, and software development. Let's get in touch!

More from the blog: