Home   About Me   Blog  

Understand how celery works by building a clone.(15 July 2019)

  1. Intro
  2. How they work
  3. Implementation
  4. Usage
  5. Conclusion
Intro
A delayed job processor(also called a background processor, asynchronous task queue etc) is a software system that can run code at a later time.
Examples of such software includes; Celery, Resque, Sidekiq, among others.
In this blogpost we will try and understand how these things work by building a clone/replica of such software.

When do we need to use a background task processor?
Take the example of an e-commerce website, when someone makes an order(by clicking on the order submit button); the e-commerce website needs to; All the above actions can be carried out synchronously as the customer waits for the page to reload. However, if any of them has potential to take a couple of a hundred milliseconds, then you better figure out ways not to keep the customer waiting. Apparently, a latency of even 1 second can cost Amazon $1.6 Billion In Sales, and Google found a similar phenomenon in their traffic numbers.
Using a background task processor is one way(among others) to solve this problem.

How they work
Task processors may differ, but they mostly work this way;
  1. You have a function that contains some code that can take a long time to run, maybe because it will get blocked on IO
  2. You annotate that function with some functionality provided by the 'background job' library
  3. when you execute your function, instead of it running synchronously; the background job library will 'take' your function and serialize it to an object(eg string)
  4. the background job library then takes that serialized object and stores it someplace(eg in a database)
  5. then at a later time, workers(provided by the background job library) will take the serialized object from the store, convert/deserialize it to its original form
  6. then use that to execute your original function
Implementation
We are going to implement a background job library named backie(short for background processor) that will work in the manner laid out in the previous section.

(a) Base-Task
We will create a base task/job class that all users of backie will have to subclass in order to create tasks that can be ran in the background.

# backie/task.py
import abc
import json
import uuid

from .broker import Broker

class BaseTask(abc.ABC):
    """
    Example Usage:
        class AdderTask(BaseTask):
            task_name = "AdderTask"
            def run(self, a, b):
                result = a + b
                return result

        adder = AdderTask()
        adder.delay(9, 34)
    """
    task_name = None

    def __init__(self):
        if not self.task_name:
            raise ValueError("task_name should be set")
        self.broker = Broker()

    @abc.abstractmethod
    def run(self, *args, **kwargs):
        # put your business logic here
        raise NotImplementedError("Task `run` method must be implemented.")

    def delay(self, *args, **kwargs):
        try:
            task_id = str(uuid.uuid4())
            _task = {"task_id": task_id, "args": args, "kwargs": kwargs}
            serialized_task = json.dumps(_task)
            self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
            print("task: {0} succesfully queued".format(task_id))
        except Exception:
            raise Exception("Unable to publish task to the broker.")           
                

So, if you want to create a task that will be processed in the background; you import the BaseTask from backie, create a subclass of it and also provide an implementation of the run method.
To call your task, you use the delay method with appropriate arguments. And when you do that, the BaseTask will serialize those arguments to json and store them in the provided broker/store(BaseTask.broker).

(b) Broker
Our BaseTask refers to a broker that is been used to store the tasks. Lets implement that broker.

# backie/broker.py
import redis # pip install redis

class Broker:
    """
    use redis as our broker.
    This implements a basic FIFO queue using redis.
    """
    def __init__(self):
        host = "localhost"
        port = 6379
        password = None
        self.redis_instance = redis.StrictRedis(
            host=host, port=port, password=password, db=0, socket_timeout=8.0
        )

    def enqueue(self, item, queue_name):
        self.redis_instance.lpush(queue_name, item)

    def dequeue(self, queue_name):
        dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
        if not dequed_item:
            return None
        dequed_item = dequed_item[1]
        return dequed_item      
                

In this case we are using redis as our backing broker/store. But you can use any other store to do that. You could store the tasks in memory, in a database, in the filesystem, you could even store them in a blockchain if you want to stay current with the hype.
What you use as the backing store does not matter; as you can see, the BaseTask is serializing the task arguments to a json string and that is what is stored. Find yourself a place that can store json strings and you are good to go.

This is how celery is able to support different brokers. When you do task.delay(3,5) Celery serializes all the task arguments to json and sends them to a broker for storage.

(c) Worker
Now that the task arguments(and any other metadata like task_id) have been stored in the broker, we now need to actually run those tasks.
We do that via a worker, lets implement one;

# backie/worker.py
import json

class Worker:
    """
    Example Usage:
        task = AdderTask()
        worker = Worker(task=task)
        worker.start()
    """
    def __init__(self, task) -> None:
        self.task = task

    def start(self,):
        while True:
            try:
                _dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
                dequeued_item = json.loads(_dequeued_item)
                task_id = dequeued_item["task_id"]
                task_args = dequeued_item["args"]
                task_kwargs = dequeued_item["kwargs"]

                print("running task: {0}".format(task_id))
                self.task.run(*task_args, **task_kwargs)
                print("succesful run of task: {0}".format(task_id))
            except Exception:
                print("Unable to execute task.")
                continue       
                

The worker takes a task instance as an argument. Then in a loop(in the start method), it dequeues a task from the broker, json deserializes that task to get the task arguments, then uses those argumensts to call the task.

It is the work of celery workers to dequeue a task from the broker, deserializes its arguments/metadata and executes the original function with those arguments.

Usage
We have implemented the background task processor(backie) that is similar to celery, so how do we use it?
Going to our previous example of an e-commerce site, lets use backie to implement the various tasks/jobs that need to be ran.
We will need a task to; Persist that order to it's database, another to emit an order metric event and lastly one that sends out a confirmation email to the customer.
For the sake of brevity, we will only look at the task that sends out emails after ordering.
 
# ecommerce_tasks.py
from backie.task import BaseTask

# pip install requests
import requests

class EmailTask(BaseTask):
    """
    task to send email to customer after they have ordered.
    """

    task_name = "EmailTask"

    def run(self, order_id, email_address):
        # lets pretend httpbin.org is an email service provider
        url = "https://httpbin.org/{0}/{1}".format(order_id, email_address)
        print(url)
        response = requests.get(url, timeout=5.0)
        print("response:: ", response)


if __name__ == "__main__":
    order_id = "24dkq40"
    email_address = "example@example.org"
    email_task = EmailTask()
    email_task.delay(order_id, email_address)
                

And then we also need to implement the workers;
 
# ecommerce_worker.py
from ecommerce_tasks import EmailTask

from backie.worker import Worker

if __name__ == "__main__":
    email_task = EmailTask()

    # run workers
    worker = Worker(task=email_task)
    worker.start()
                

Putting it all together, lets run a redis server(using docker) in one terminal;
 
docker run -p 6379:6379 redis:5.0-alpine
               

And in another terminal we run the tasks;
 
python ecommerce_tasks.py
    task: 4ed63e42-f614-4093-a654-46211d5de8cf succesfully queued
                

In a third terminal run the workers;

python ecommerce_worker.py
    running task: 4ed63e42-f614-4093-a654-46211d5de8cf
    https://httpbin.org/24dkq40/example@example.org
    succesful run of task: 4ed63e42-f614-4093-a654-46211d5de8cf
                

Conclusion
That was a brief overview of how some(most?) background task processors work.
The principle behind how they work is very simple; turn a tasks arguments to a string, store the string in a database of sorts, retrieve that string from the database at a later datetime and feed those arguments to the actual task to run.
Of course on top of that simple idea, these task processors go on and layer other useful features like; retries, periodic tasks, chaining of tasks etc.

All the code in this blogpost can be found at: https://github.com/komuw/komu.engineer/tree/master/blogs/07

PS: Check out wiji which is an experimental python3 asyncio distributed task processor.

You can comment on this article by clicking here.