Strategies for Working with Message Queues

Dec 30, 2019 at 2:35PM
Caleb Doxsey

Message queues like Apache Kafka are a common component of distributed systems. This blog post will look at several different strategies for improving performance when working with message queues.

Model Overview

Kafka consists of topics which have one or more partitions.

Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

With this structured commit log, each consumer follows the same basic steps:

  1. A consumer is assigned a particular topic-partition (either manually or automatically via a consumer group)
  2. The previous offset is read so that the consumer will begin where it last left off
  3. Messages are consumed from Kafka
  4. Messages are processed in some way
  5. The processed message offset is committed back to Kafka

Other types of message queues (like AMQP) have a similar flow - messages are consumed, processed and acknowledged. Generally we rely on idempotent message processing - that is the ability to process the same message twice with no ill effect - and err on the side of only committing if we're certain we've done what we need to. This gives us durability and guarantees that every message will be processed, even if our consumer process crashes.

Code

The code examples in this post will be written in Go but the strategies work in any programming language. In Go we can use the officially supported confluent-kafka-go library.

package main

import (
    "log"

    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func consumer(server string) error {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  server,
        "group.id":           "example",
        "auto.offset.reset":  "earliest",
        "enable.auto.commit": false,
    })
    if err != nil {
        return fmt.Errorf("error creating kafka consumer: %w", err)
    }
    defer c.Close()

    c.SubscribeTopics([]string{"example-topic"}, nil)
    for {
        // 1. Get
        msg, err := c.ReadMessage(-1)
        if err != nil {
            log.Println("consumer error:", err)
            continue
        }

        // 2. Process
        log.Println("process", msg)

        // 3. Commit
        c.CommitMessage(msg)
    }
}

This example consumes a message, processes it, and commits the offset.

Our throughput will be limited by how quickly a single consumer can fetch and process messages and then commit their offsets, Nmessages * ( Tget + Tprocess + Tcommit ). We can improve this.

Pipelining

In 1907, Henry Ford announced his goal for the Ford Motor Company: to create "a motor car for the great multitude." At that time, automobiles were expensive, custom-made machines. [...]

To improve the flow of the work, it needed to be arranged so that as one task was finished, another began, with minimum time spent in set-up. Ford was inspired by the meat-packing houses of Chicago and a grain mill conveyor belt he had seen. If he brought the work to the workers, they spent less time moving about. Then he divided the labor by breaking the assembly of the Model T into 84 distinct steps. Each worker was trained to do just one of these steps. Ford called in Frederick Taylor, the creator of "scientific management," to do time and motion studies to determine the exact speed at which the work should proceed and the exact motions workers should use to accomplish their tasks. Ford installs first moving assembly line, 1913

The first performance improvement we can make is to utilize pipelining. A pipeline consists of steps (or stages) where the output of one step is connected to the input of the next step. If a single pipeline processes multiple messages we can often run the pipeline steps in parallel, that is, step 2 could be handling message #1, while step 1 is handling message #2.

Pipelines can be found in many places in a computer:

Pipelines are an easy way for us to add parallelism to our code. They might also have the added benefit of making our code easier to understand, by breaking up a complex workflow into easier to understand stages.

In Go we create pipelines using channels:

func consumerPipeline(consumer *kafka.Consumer) error {
    getToProcess := make(chan *kafka.Message, 1)
    processToCommit := make(chan *kafka.Message, 1)

    var eg errgroup.Group
    eg.Go(func() error {
        // 1. Get
        for {
            msg, err := consumer.ReadMessage(-1)
            if err != nil {
                log.Println("consumer error:", err)
                continue
            }

            getToProcess <- msg
        }
        return nil
    })
    eg.Go(func() error {
        // 2. Process
        for msg := range getToProcess {
            log.Println("process", msg)
            processToCommit <- msg
        }
        return nil
    })
    eg.Go(func() error {
        // 3. Commit
        for msg := range processToCommit {
            _, err := consumer.CommitMessage(msg)
            if err != nil {
                return err
            }
        }
        return nil
    })
    return eg.Wait()
}

We break up the flow into 3 stages and put a channel between each one. We then run each stage in its own goroutine, using the errgroup package for easier management. (tip: if you need cancellation you can use a Context and the WithContext method from the same package).

Using this approach with our message queue, we can now begin to get the next message even while still processing the current message:

In general our time will now be Nmessages * max( Tget , Tprocess , Tcommit ).

Increasing parallelism will also increase our CPU usage, though it will probably not be a 3x increase, since the time spent processing will likely be more than the other steps, and the time spent reading messages and committing offsets involves asynchronous network IO which will save us some CPU cycles. Regardless, we often have CPU cores sitting idle on servers, so overall this is a good thing.

Batching

Tis the season for cookies. One interesting facet of baking cookies is that baking a single cookie takes roughly the same amount of time as baking dozens of cookies. Even though you double the amount of flour and sugar, it doesn't take much longer to mix it or crack a few more eggs. And the cooking time doesn't change much either. If your goal is to create 100 cookies, 4 batches of 25 cookies will always beat 25 batches of 4 cookies.

Let's use this intuition about batching in cooking and apply it to programming. Batching refers to combining multiple messages together in a single operation. Like pipelining, batching can be found all over the place on a computer:

For our uses batching is effective in multiple places:

  1. We can batch the number of messages we retrieve from Kafka. librdkafka, the underlying library we are using in Go, already supports this capability and has it enabled by default.

  2. Although less dramatic, sending multiple messages over a channel in Go is more efficient than sending a single message. Each send on a channel involves a Mutex lock/unlock, and the cost of that overhead is roughly the same regardless of what is sent over the channel. A benchmark demonstrating this is available here.

  3. We can batch the offset commits. In Kafka since we store a single offset per topic-partition for a consumer, committing a later offset implicitly commits all the offsets before it. Using our code above, one way to implement the batching of commits would be like this:

    eg.Go(func() error {
      // 3. Commit
      ticker := time.NewTicker(time.Second)
      defer ticker.Stop()
    
      type Key struct {
        topic     string
        partition int32
      }
      offsets := map[Key]kafka.Offset{}
    
      for {
        select {
        // commit every second
        case <-ticker.C:
          if len(offsets) > 0 {
            tps := make([]kafka.TopicPartition, 0, len(offsets))
            for k, o := range offsets {
              tps = append(tps, kafka.TopicPartition{Topic: &k.topic, Partition: k.partition, Offset: o})
              delete(offsets, k)
            }
            _, err := consumer.CommitOffsets(tps)
            if err != nil {
              return err
            }
          }
        case msg := <-processToCommit:
          k := Key{*msg.TopicPartition.Topic, msg.TopicPartition.Partition}
          o := msg.TopicPartition.Offset
          // store the offset in the map
          if o > offsets[k] {
            offsets[k] = o
          }
        }
      }
      return nil
    })
    

In general batching will improve performance by consolidating gets and commits. For a batch size of 8, we might see: Nmessages/8 * ( Tget+ε + Tprocess*8 + Tcommit+ε ).

We will also usually see an improvement in the actual time to process the batch of messages as well (ie some number < *8). This is because batching both eliminates overhead by consolidating operations, but also it ends up improving cache locality. CPUs are far better at processing contiguous records than records randomly distributed across space and time. Batching smushes all our messages together, particularly if we avoid passing pointers over channels and stick to structs/slices. This also has the added benefit of improving data compression rates.

Unfortunately batching is no panacea and it comes with some downsides:

  1. There's no universal optimum batch size. Modern systems are so complex that it's nearly impossible to reason about their performance characteristics. We can make a guess based on cache sizes or network protocol overhead, but at the end of the day the best approach will be to benchmark and discover what the best batch size is. This will change over time, so it's something we need to re-evaluate periodically.
  2. Batching can introduce latency into a system, since a stage in a processing pipeline may have to wait some time before seeing enough data to batch properly. Introducing a timeout helps, but also reduces the benefits of batching.
  3. Over-batching can cause downstream issues. For example, Kafka has a maximum message size and exceeding that size will result in messages being rejected by the message broker. Large messages can also require a large amount of memory to handle properly.
  4. Batching hurts random-access patterns. In the b-tree example above, we could store all records in a single top-level node, which would improve performance if we intend to always read all records, but doing so would change the performance characteristics from O(log n) to O(n) to find a single record. In the case of a message queue we may end up consuming messages we don't intend to use which is inefficient. (though, as always, it all depends on how we intend to use the data)
  5. Batching can limit horizontal scalability and hamper recovery time. The larger the message size the larger box we're going to need to process it and the longer it's going to take to process. Imagine we bundle a million messages together and it takes 30 minutes to process this batch of messages. If our server crashes in the middle of this, we have to restart from the beginning. A smaller batch size would be much easier to process. (and the truth is, larger batch sizing sees diminishing returns, so the difference between 1,000 and 1,000,000 may be negligible)

Process Stage Parallelism

What about introducing parallelism in the processing stage?

An obvious approach to this would be to simply start multiple processing stages:

// look, a 10x improvement
for i := 0; i < 10; i++ {
  eg.Go(func() error {
    // 2. Process
    for msg := range getToProcess {
      log.Println("process", msg)
      processToCommit <- msg
    }
    return nil
  })
}

Unfortunately the problem with this approach is we have no guarantee that the messages will be processed in order anymore. And since acknowledgement in Kafka is done with a single commit offset, it's really important that we don't commit the offset until we're sure all the messages before it have been handled.

One way to solve this is with an aggregation or fan-in step, where we keep track of which messages we've seen so far, and only forward them upstream in the right order. But before we go too far down this path, it reminds me of a solution from a lower level protocol.

The Sliding Window Protocol

The TCP protocol has a surprisingly similar issue it has to deal with. Messages in TCP are given sequence numbers and have to be acknowledged when received. Rather than wait for an acknowledgement TCP will send several messages, up to a receive window. Once that window is full it will wait for acknowledgements before sending more data. This keeps messages in order, allows sending them in parallel and prevents us from having too many in flight.

Let's try to implement a similar approach to our problem. First we will replace our processing code with a call to a function:

eg.Go(func() error {
  // 2. Process
  return slidingWindow(runtime.NumCPU(), getToProcess, processToCommit, func(msg *kafka.Message) error {
    log.Println("process", msg)
    return nil
  })
})

The goal of this slidingWindow function is to start multiple processors, receiving incoming messages from getToProcess, processing them, and then sending them to processToCommit. We want to make sure the messages are sent to processToCommit in the order they were received, but also process them in parallel. Our sliding window function looks like this:

func slidingWindow(size int, in <-chan *kafka.Message, out chan<- *kafka.Message, process func(*kafka.Message) error) error {
  var eg errgroup.Group

  // start our processors, each of which will have two channels
  // one for incoming and one for outgoing messages
  toProcess := make([]chan *kafka.Message, size)
  fromProcess := make([]chan *kafka.Message, size)
  for i := 0; i < size; i++ {
    request := make(chan *kafka.Message, 1)
    result := make(chan *kafka.Message, 1)
    eg.Go(func() error {
      for msg := range request {
        err := process(msg)
        if err != nil {
          return err
        }
        result <- msg
      }
      return nil
    })
    toProcess[i] = request
    fromProcess[i] = result
  }

  // start our feeding code
  eg.Go(func() error {
    // pointers to offsets within our channel slices
    var pending, completed int
    for {
      // we're going to leverage the fact that nil chans never proceed
      var nextIn, nextOut <-chan *kafka.Message

      switch pending - completed {
      case 0: // empty window, wait for incoming message
        nextIn = in
      case size: // full window, wait for completed message
        nextOut = fromProcess[completed%size]
      default: // otherwise we have room in the buffer and some outstanding requests, so either can proceed
        nextIn = in
        nextOut = fromProcess[completed%size]
      }

      select {
      case msg := <-nextIn: // we have an incoming message, process it
        toProcess[pending%size] <- msg
        pending++
      case msg := <-nextOut: // we have a completed message, send it to out
        out <- msg
        completed++
      }
    }
  })
  return eg.Wait()
}

This code might seem to clever by half, I don't doubt it has a bug or two since I wrote it in an afternoon, but it's built on top of tried-and-true ideas from other problem domains.

Partitioning

Many communities in Colorado are rapidly growing. For example Erie had a population of ~6,000 in 2000. It's now well over 25,000. Dealing with that sort of rapid growth is challenging across many dimensions - infrastructure, schools, housing, etc. One rather pronounced illustration of the problem can be found at election time. (though to be fair, you won't see this in Colorado, since they've switched entirely to mail-in ballots)

You'll notice a long line of folks hoping to vote before work, standing outside some middle school in the cold dark of winter. Staffed by volunteers these sorts of polling locations aren't always able to keep up with the changes in demographics. But one strategy they do utilize is to add more voting machines and to form multiple lines instead of a single long line. To do this they often break up their constituents by district, or last name - A-L over there, M-Z over here.

This solution is an example of partitioning - breaking up a large data set into smaller pieces which can then be processed independently. Applied to message queues, that means we basically make multiple message queues rather than a single one. With 3 queues and 3 consumers we've immediately tripled our throughput. Increase that to N and we have a mechanism for practically unlimited scalability.

Kafka is designed to be partitioned from the start. All topics in Kafka have a certain number of partitions, each of which has a separate offset and can be consumed independently.

To use partitioning in Kafka we simply create a topic with more than one partition and then run multiple consumer processes that all use the same consumer group name. Kafka will distribute the partitions amongst the consumers automatically.

It is then up to us how we want to send messages to the partitions. There are many strategies:

The most straightforward would be to use a round-robin or random partitioning. Each partition will receive an equal subset of the data. For example if there are 3 partitions and 90 messages, each partition would receive 30 of the messages. Crucially theres nothing about the messages that tells you which partition it will end up in.

For some applications that unpredictability is not ideal. For example, data might be tied to a particular customer and it's important that the messages for that customer be processed in the right order. For a case like this we can derive an integer from the message (if the customer id is already an integer we're done, if not we can use hashing) and use modular arithmetic to pick which partition to send the message to.

var msg struct {
  CustomerID int
  Data       []byte
}
partition := int32(msg.CustomerID % numberOfPartitions)

By using this approach all of a customer's messages will end up in the same partition, thus guaranteeing they will be processed in order.

Partitioning based on some part of the data also allows us to partition downstream data storage layers. For example, we may have a separate SQL database for each partition and we can know which SQL database to query based on the partition the customer ID correspond to. This allows us to scale out downstream systems because they each store a subset of the data.

This is particularly effective when dealing with caching, since an LRU cache will dynamically adjust based on the traffic it receives. If there's a separate cache for each partition, each cache stores less data and therefore the hit rate is improved and/or the necessary system resources are reduced.

However partitioning has significant problems.

Conclusion

Partitioning is an area of software development ripe for disruption. We often reach for it far too early in the design process, when just optimizing code or using a bigger machine would be more than adequate. But at the same time once you've exhausted the low hanging optimization fruit and are starting to use stupidly big instances for your problems, adding partitioning at such a late stage can be extremely difficult. So difficult that scrapping what's there and starting over may be easier.

A couple decades ago Joel Spolsky told us that rewriting code from scratch is the "single worst strategic mistake that any software company can make".

I suppose that's good advice, but it might leave you scratching your head about rapidly growing startups. Why do they so often build something, and then whole scale replace it a year or two later?

That's a big question, but here lies one answer: designs appropriate for one level of scale often don't work when applied to a different level of scale. Admitting to that reality means you have to change the design of a system to get it to work when conditions change - and large scale design changes are often impossible to pull off with an existing code base. So it makes sense why there's so much churn.

Nevertheless the status-quo appears unstable to me. Rebuilding everything every few years is massively inefficient.

One promising solution to this problem is Google's Slicer:

Slicer is a highly available, low-latency, scalable and adaptive sharding service that remains decoupled from customer binaries and offers optional assignment consistency. These features and the consequent architecture were driven by the needs of real applications at Google. Slicer makes it easy to exploit sharding affinity and has proven to offer a diversity of benefits, such as object caching, write aggregation, and socket aggregation, to dozens of deployed applications.

A similarly designed, open-source solution to automatic repartitioning would go a long way to making it easier to build low-scale systems which could adapt to meet changing load requirements. Maybe some day we'll even see a managed service that does this in Google Cloud or AWS.