Asynchronous task queue with Django, Celery and AWS SQS

asynchronous task queue

When dealing with heavy workload functionalities that can have a big impact on web application performance, you may run an asynchronous task queue (scheduled or not). These asynchronous background tasks can not only drastically improve the scalability of the application by moving those highly consuming operations to the background, but also improve the functionality usability. Once spread on different components, each with its own responsibility, your code can even look cleaner, more isolated and can be easier to maintain.

This article introduces a few topics regarding a prebuilt architecture using Django, Celery, Docker, and AWS SQS. The codebase is available on Github and you can easily follow the README steps to have the application up and running with no effort.

The following section brings a brief overview of the components used to build the architecture.

Components description

Message broker

Message broker usage

First things first. What is a message broker? Our always friendly Wikipedia says that: It mediates communication among applications, minimizing the mutual awareness that applications should have of each other in order to be able to exchange messages, effectively implementing decoupling. (2020)

In other words, it is an intermediate layer in which applications can communicate with – read and/or write messages – and by having that, it allows the possibility of building two decoupled applications that do not rely on each other.

 

Amazon SQS

Amazon SQS queue messaging flow

What is AWS SQS? “Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. […] Using SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.” (Amazon, 2020)

There are a few alternatives for SQS, like Kafka, Redis or RabbitMQ and all of them can be easily set up on Amazon using ElasticCache or AmazonMQ. Besides that, you do have the option of deploying your message broker on EC2 instances, which could be cheaper depending on the number of messages. But you should also consider the time and effort required to set it up on EC2 and the extra effort in case you decide to use it with Auto Scaling. 

What I see as the main benefit of using SQS is that it is fully managed by AWS and you pay on-demand according to your usage and if your application or feature requirements are according to SQS usage, it is very straightforward to configure and use it.

 

Celery

Celery library logo

What is Celery? “Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. […] Tasks can execute asynchronously (in the background) or synchronously (wait until ready).” (Celery, 2020)

Essentially, Celery is used to coordinate and execute distributed Python tasks. It allows the possibility of moving a specific code execution outside of the HTTP request-response cycle. This way, your server can respond as quickly as possible for a specific request, as it spawns an asynchronous job as a worker to execute the specific piece of code, which improves your server response time. Also, with Celery you have the option of running jobs in the background on a regular schedule.

Putting things together

To get everybody on the same page: AWS SQS is the message broker chosen here and Celery is the component responsible for orchestrating – consume: read and write – the message queue. Since Celery is a library, it needs to be set up on top of Django.

Note: The source code used in this blog post is available on GitHub. It assumes the following folders/app structure:

 
.
└── src
    ├── app
    │   ├── __init__.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── manage.py
    […]


Docker isn’t necessary for the system to work, but it makes it a lot easier to reproduce the final system architecture locally and it also serves as a guide when configuring the system architecture on a cloud computing service environment.

On the docker-compose file below, there are 5 configured services:

 
version: "3"
services:
  web:
    build: .
    volumes:
      - .:/usr/src/app
    ports:
      - 8000:8000
    environment:
      ENV: development
    depends_on:
      - db
      - sqs

  worker:
    build: .
    volumes:
      - .:/usr/src/app
    command: bash -c "cd src/ && celery -A app worker --loglevel=debug"
    depends_on:
      - web
      - sqs

  beat:
    build: .
    volumes:
      - .:/usr/src/app
    command: bash -c "cd src/ && celery -A app beat --loglevel=debug" 
    depends_on:
      - web 
      - sqs 

  db:
    image: postgres:12
    ports:
      - 5432:5432
    volumes:
      - ./docker/db/pgdata:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: app

  sqs:
    image: roribio16/alpine-sqs
    ports:
      - 9324:9324
      - 9325:9325
    volumes:
      - ./config/elasticmq.conf:/opt/config/elasticmq.conf

Here is the lowdown on the docker-compose file:

  • web: is the web service container.
  • workeris a celery worker that spawns a supervisor process which does not process any tasks. Instead, it spawns child processes to execute the actual available tasks.
  • beatis a celery scheduler that periodically spawn tasks that are executed by the available workers.
  • db: postgres database container.
  • sqs: is a containerized Java implementation of the Amazon Queue Service that we will use to mimic the AWS SQS behaviour. The port 9324 is available to use the queue service and the 9325 is available to access the web interface.

Basically, the worker container service is responsible for searching and executing the available messages on the queues and the beat container service is responsible for spawning periodical tasks to be executed by the workers. When working with periodical tasks, there are a few possible configurations. You can check a few examples on this page.

The following code section is the config/elasticmq.conf file and it is used for the local execution only. It configures the node address for the sqs container. As Celery by default creates and uses a queue named “celery”, we don’t need to create a new queue unless we need it.


include classpath("application.conf")

node-address {
   protocol = http
   host = "*"
   port = 9324
   context-path = ""
}

rest-sqs {
   enabled = true
   bind-port = 9324
   bind-hostname = "0.0.0.0"
   // Possible values: relaxed, strict
   sqs-limits = strict
}

This is what your app/celery.py file looks like. You can easily find the meaning of each config key on the Celery website. The main one is the BROKER_URL, which is the broker address that Celery will connect to. The BROKER_URL variable value is hardcoded just for illustration purposes. The user and password values are dummy values, as they are not configured on the docker container sqs service. 


import os

from celery import Celery
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings")

app = Celery("app")

CELERY_CONFIG = {
    "CELERY_TASK_SERIALIZER": "json",
    "CELERY_ACCEPT_CONTENT": ["json"],
    "CELERY_RESULT_SERIALIZER": "json",
    "CELERY_RESULT_BACKEND": None,
    "CELERY_TIMEZONE": "America/Sao_Paulo",
    "CELERY_ENABLE_UTC": True,
    "CELERY_ENABLE_REMOTE_CONTROL": False,
}

BROKER_URL = "sqs://user:password@sqs:9324/",

CELERY_CONFIG.update(
    **{
        "BROKER_URL": BROKER_URL,
        "BROKER_TRANSPORT": "sqs",
        "BROKER_TRANSPORT_OPTIONS": {
            "region": "us-west-2",
            "visibility_timeout": 3600,
            "polling_interval": 60,
        },
    }
)

app.conf.update(**CELERY_CONFIG)
app.autodiscover_tasks(packages={"payment.tasks"}))

The following code is what a dummy task function looks like. The application already knows that this is an asynchronous job just by using the decorator @task imported from Celery. Basically the decorator wraps the function and returns a task class instance with a few methods implemented.


from celery import task

from payment.models import Payment

@task
def capture_payment(pk):
    payment = Payment.objects.get(pk=pk)
    print(f"Capture payment: {payment}")

The following section is an example of its usage.


from payment.models import Payment

payment = Payment.objects.first()
capture_payment.delay(pk=payment.pk)

By calling it with the delay method an AsyncResult instance will be generated and the worker will execute the message task once it is available in the queue.

Summary

In this blog post I’ve introduced to you the main concepts behind the proposed architecture and also I’ve enlightened what I considered important to know about it.  With the codebase available on GitHub, you can effortlessly have a containerized Docker application that implements a job queueing system using Celery on top of Django for local development purposes. So, in order to deploy it, you need to create the respective services on AWS.

Known limitations

Recently we have come across a limitation using AWS SQS: the visibility timeout, which is a period of time in which Amazon SQS prevents other consumers from receiving and processing the messages. In one of our projects, a task would have the possibility to be scheduled for running only 2 weeks from now, but as the maximum visibility timeout for a message is 12 hours, it isn’t possible to accomplish it by using SQS.

References

Wikipedia.com, Message Broker. Available in: <https://en.wikipedia.org/wiki/Message_broker>. Access in: June 24th, 2020.

Docs.aws.amazon.com, What is Amazon Simple Queue Service?. Available in: <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html>. Access in: June 24th, 2020.

Celeryproject, Celery – Distributed Task Queue. Available in: <https://docs.celeryproject.org/en/stable/>. Access in: June 24th, 2020.

About the author.

Rodolfo Lottin
Rodolfo Lottin

A backend developer devoted to create high quality services and to solve problems on an elegant way. Fascinated about philosophy and a big fan of civic technology initiatives.