Asynchronous Tasks with Falcon and Celery

Last updated November 18th, 2022

Asynchronous tasks are used to move intensive, time-consuming processes, which are prone to failure, to the background so a response can be returned immediately to the client.

This tutorial looks at how to integrate Celery, an asynchronous task queue, into the Python-based Falcon web framework. We'll also use Docker and Docker Compose to tie everything together. Finally, we'll look at how to test the Celery tasks with unit and integration tests.

Contents

Learning Objectives

By the end of this tutorial, you should be able to:

  1. Integrate Celery into a Falcon web app.
  2. Containerize Falcon, Celery, and Redis with Docker.
  3. Execute tasks in the background with a separate worker process.
  4. Save Celery logs to a file.
  5. Set up Flower to monitor and administer Celery jobs and workers.
  6. Test a Celery task with both unit and integration tests.

Background Tasks

Again, to improve user experience, long-running processes should be run outside the normal HTTP request/response flow, in a background process.

Examples:

  1. Sending confirmation emails
  2. Web scraping and crawling
  3. Analyzing data
  4. Image processing
  5. Producing daily reports
  6. Running machine learning models

As you're building out an app, try to distinguish tasks that should run during the request/response lifecycle, like CRUD operations, from those that should run in the background.

Falcon Framework

Falcon is a micro Python web framework that's great for creating back-end, RESTful APIs. Falcon feels much like Flask, but it's a lot faster in terms of both development and performance.

Falcon is a minimalist WSGI library for building speedy web APIs and app backends. We like to think of Falcon as the Dieter Rams of web frameworks.

When it comes to building HTTP APIs, other frameworks weigh you down with tons of dependencies and unnecessary abstractions. Falcon cuts to the chase with a clean design that embraces HTTP and the REST architectural style.

Be sure to review the official docs for more information.

Project Setup

Clone down the base project:

$ git clone https://github.com/testdrivenio/falcon-celery --branch base --single-branch
$ cd falcon-celery

Take a quick glance at the code as well as the project structure, and then spin up the app using Docker:

$ docker-compose up -d --build

This should only take a moment to build and run the images. Once done, the app should be live on http://localhost:8000/ping.

Ensure the tests pass:

$ docker-compose run web python test.py

.
----------------------------------------------------------------------
Ran 1 test in 0.001s

OK

Celery

Now comes the fun part -- adding Celery! Start by adding both Celery and Redis to the requirements.txt file:

celery==5.2.7
falcon==3.1.0
gunicorn==20.1.0
redis==4.3.4

Create a Task

Add a new file to the "project/app" directory called tasks.py:

# project/app/tasks.py


import os
from time import sleep

import celery


CELERY_BROKER = os.environ.get('CELERY_BROKER')
CELERY_BACKEND = os.environ.get('CELERY_BACKEND')

app = celery.Celery('tasks', broker=CELERY_BROKER, backend=CELERY_BACKEND)


@app.task
def fib(n):
    sleep(2)  # simulate slow computation
    if n < 0:
        return []
    elif n == 0:
        return [0]
    elif n == 1:
        return [0, 1]
    else:
        results = fib(n - 1)
        results.append(results[-1] + results[-2])
        return results

Here, we created a new instance of Celery and defined a new Celery task called fib that calculates the fibonacci sequence from a given number.

Celery uses a message broker to facilitate communication between the Celery worker and the web application. Messages are added to the broker, which are then processed by the worker(s). Once done, the results are added to the backend.

Redis will be used as both the broker and backend. Add both Redis and a Celery worker to the docker-compose.yml file:

version: '3.8'

services:

  web:
    build: ./project
    image: web
    container_name: web
    ports:
      - 8000:8000
    volumes:
      - ./project:/usr/src/app
    command: gunicorn -b 0.0.0.0:8000 app:app
    environment:
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    image: web
    volumes:
      - ./project:/usr/src/app
    command: celery -A app.tasks worker --loglevel=info
    environment:
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:7-alpine

Add a new route handler to kick off the fib task to __init__.py:

class CreateTask(object):

    def on_post(self, req, resp):
        raw_json = req.stream.read()
        result = json.loads(raw_json)
        task = fib.delay(int(result['number']))
        resp.status = falcon.HTTP_200
        result = {
            'status': 'success',
            'data': {
                'task_id': task.id
            }
        }
        resp.text = json.dumps(result)

Register the route:

app.add_route('/create', CreateTask())

Import the task:

from app.tasks import fib

Build the image and spin up the containers:

$ docker-compose up -d --build

Test:

$ curl -X POST http://localhost:8000/create \
    -d '{"number":"4"}' \
    -H "Content-Type: application/json"

You should see something like:

{
  "status": "success",
  "data": {
    "task_id": "d935fa51-44ad-488f-b63d-6b0e178700a8"
  }
}

Check Task Status

Next, add a new route handler to check the status of the task:

class CheckStatus(object):

    def on_get(self, req, resp, task_id):
        task_result = AsyncResult(task_id)
        result = {'status': task_result.status, 'result': task_result.result}
        resp.status = falcon.HTTP_200
        resp.text = json.dumps(result)

Register the route:

app.add_route('/status/{task_id}', CheckStatus())

Import AsyncResult:

from celery.result import AsyncResult

Update the containers:

$ docker-compose up -d --build

Trigger a new task:

$ curl -X POST http://localhost:8000/create \
    -d '{"number":"3"}' \
    -H "Content-Type: application/json"

{
  "status": "success",
  "data": {
    "task_id": "65a1c427-ee08-4fb1-9842-d0f90d081c54"
  }
}

Then, use the returned task_id to check the status:

$ curl http://localhost:8000/status/65a1c427-ee08-4fb1-9842-d0f90d081c54

{
  "status": "SUCCESS", "result": [0, 1, 1, 2]
}

Logs

Update the celery service so that Celery logs are dumped to a log file:

celery:
  image: web
  volumes:
    - ./project:/usr/src/app
    - ./project/logs:/usr/src/app/logs # add this line
  command: celery -A app.tasks worker --loglevel=info  --logfile=logs/celery.log # update this line
  environment:
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Update:

$ docker-compose up -d --build

You should see the log file in logs/celery.log locally since we set up a volume:

[2022-11-15 17:44:31,471: INFO/MainProcess] Connected to redis://redis:6379/0
[2022-11-15 17:44:31,476: INFO/MainProcess] mingle: searching for neighbors
[2022-11-15 17:44:32,488: INFO/MainProcess] mingle: all alone
[2022-11-15 17:44:32,503: INFO/MainProcess] celery@80a00f0c917e ready.
[2022-11-15 17:44:32,569: INFO/MainProcess] Received task: app.tasks.fib[0b161c4d-5e1c-424a-ae9f-5c3e84de5043]
[2022-11-15 17:44:32,593: INFO/ForkPoolWorker-1] Task app.tasks.fib[0b161c4d-5e1c-424a-ae9f-5c3e84de5043] succeeded in 6.018030700040981s: [0, 1, 1, 2]

Flower

Flower is a real-time, web-based monitoring tool for Celery. You can monitor currently running tasks, increase or decrease the worker pool, view graphs and a number of statistics, to name a few.

Add it to requirements.txt:

celery==5.2.7
falcon==3.1.0
flower==1.2.0
gunicorn==20.1.0
redis==4.3.4

And then add the service to docker-compose.yml:

monitor:
  image: web
  ports:
    - 5555:5555
  command: celery flower -A app.tasks --port=5555 --broker=redis://redis:6379/0
  environment:
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Test it out:

$ docker-compose up -d --build

Navigate to http://localhost:5555 to view the dashboard. You should see one worker ready to go:

flower

Trigger a few more tasks:

flower

Flow

Before writing any tests, let's take a step back and look at the overall workflow.

In essence, an HTTP POST request hits /create. Within the route handler, a message is added to the broker, and the Celery worker process grabs it from the queue and processes the task. Meanwhile, the web application continues to execute and function properly, sending a response back to the client with a task ID. The client can then hit the /status/<TASK_ID> endpoint with an HTTP GET request to check the status of the task.

falcon celery flow

Tests

Let's start with a unit test:

class TestCeleryTasks(unittest.TestCase):

    def test_fib_task(self):
        self.assertEqual(tasks.fib.run(-1), [])
        self.assertEqual(tasks.fib.run(1), [0, 1])
        self.assertEqual(tasks.fib.run(3), [0, 1, 1, 2])
        self.assertEqual(tasks.fib.run(5), [0, 1, 1, 2, 3, 5])

Add the above test case to project/test.py, and then update the imports:

import unittest

from falcon import testing

from app import app, tasks

Run:

$ docker-compose run web python test.py

It should take about 20 seconds to run:

..
----------------------------------------------------------------------
Ran 2 tests in 20.038s

OK

It's worth noting that in the above asserts, we used the .run method (rather than .delay) to run the task directly without a Celery worker.

Want to mock out the Celery task?

class TestCeleryTasks(unittest.TestCase):

    # def test_fib_task(self):
    #     self.assertEqual(tasks.fib.run(-1), [])
    #     self.assertEqual(tasks.fib.run(1), [0, 1])
    #     self.assertEqual(tasks.fib.run(3), [0, 1, 1, 2])
    #     self.assertEqual(tasks.fib.run(5), [0, 1, 1, 2, 3, 5])

    @patch('app.tasks.fib')
    def test_mock_fib_task(self, mock_fib):
        mock_fib.run.return_value = []
        self.assertEqual(tasks.fib.run(-1), [])
        mock_fib.run.return_value = [0, 1]
        self.assertEqual(tasks.fib.run(1), [0, 1])
        mock_fib.run.return_value = [0, 1, 1, 2]
        self.assertEqual(tasks.fib.run(3), [0, 1, 1, 2])
        mock_fib.run.return_value = [0, 1, 1, 2, 3, 5]
        self.assertEqual(tasks.fib.run(5), [0, 1, 1, 2, 3, 5])

Add the import:

from unittest.mock import patch
$ docker-compose run web python test.py


..
----------------------------------------------------------------------
Ran 2 tests in 0.002s

OK

Much better!

You can also run a full integration test from outside the container by running the following script:

#!/bin/bash

# trigger jobs
test=`curl -X POST http://localhost:8000/create \
    -d '{"number":"2"}' \
    -H "Content-Type: application/json" \
    -s \
| jq -r '.data.task_id'`

# get status
check=`curl http://localhost:8000/status/${test} -s | jq -r '.status'`

while [ "$check" != "SUCCESS" ]
do
  check=`curl http://localhost:8000/status/${test} -s | jq -r '.status'`
  echo $(curl http://localhost:8000/status/${test} -s)
done

test

Keep in mind that this is hitting the same broker and backend used in development. You may want to instantiate a new Celery app for testing:

app = celery.Celery('tests', broker=CELERY_BROKER, backend=CELERY_BACKEND)

Next Steps

Looking for some challenges?

  1. Spin up DigitalOcean and deploy this application across a number of droplets using Docker Swarm or Kubernetes.
  2. Add a basic client side with React, Angular, Vue, or just vanilla JavaScript. Allow an end user to kick off a new task. Set up a polling mechanism to check the status of a task as well.

Grab the code from the repo.

Featured Course

The Definitive Guide to Celery and Flask

Learn how to add Celery to a Flask application to provide asynchronous task processing.

Featured Course

The Definitive Guide to Celery and Flask

Learn how to add Celery to a Flask application to provide asynchronous task processing.