Using RabbitMQ as a Ruby work queue to populate Elasticsearch via Docker Compose

In this post I’ll share some Ruby code that uses RabbitMQ as a work queue to populate Elasticsearch documents.

First I created a RabbitMQ base class to contain shared functionality between the producer and workers. On initialize, the base class waits for the RabbitMQ and Elasticsearch services to be available before starting. file: rabbitmq_base.rb

require 'bunny'
require 'elasticsearch'
require 'faker'

module ES
  module_function

  def client
    @client ||= Elasticsearch::Client.new url: es_url, log: true
  end

  def es_url
    "http://#{es_host}:9200"
  end

  def es_host
    ENV.fetch('ELASTICSEARCH_HOST', 'localhost')
  end
end

class RabbitmqBase
  def initialize
    begin
      create_connection
      create_channel
      create_queue
    rescue StandardError
      sleep 1
      retry
    end

    begin
      try_es_connection
    rescue StandardError
      sleep 1
      retry
    end
  end

  protected

  def try_es_connection
    ES.client.cluster.health wait_for_status: 'yellow'
  end

  def create_connection
    @connection = Bunny.new(hostname: rabbitmq_host)
    @connection.start
  end

  def create_channel
    @channel = @connection.create_channel
  end

  def create_queue
    @queue = @channel.queue(queue_name)
  end

  def queue_name
    ENV.fetch('QUEUE_NAME', 'worker_queue')
  end

  def rabbitmq_host
    ENV.fetch('RABBITMQ_HOST', 'localhost')
  end
end

The producer subclass publishes a set number of tasks to complete and then exits. file: producer.rb

#!/usr/bin/env ruby

require_relative 'rabbitmq_base'

class Producer < RabbitmqBase
  def start
    message = 'create_person'
    1_000.times { @queue.publish(message, persistent: true) }
  end
end

Producer.new.start

The worker subclass subscribes to the queue, checks if the task matches an available worker method, and then generates a person document in Elasticsearch. file: worker.rb

#!/usr/bin/env ruby

require_relative 'rabbitmq_base'

class Worker < RabbitmqBase
  def start
    @queue.subscribe(block: true) do |_delivery_info, _properties, body|
      if WorkerMethods.public_methods.include?(body.to_sym)
        WorkerMethods.send(body)
      else
        raise 'Worker method not found'
      end
    end
  end

  module WorkerMethods
    module_function

    def create_person
      person = {
        first_name: Faker::Name.first_name,
        last_name: Faker::Name.last_name,
        email: Faker::Internet.email
      }
      ES.client.create index: 'people',
                       type: 'person',
                       body: person
      puts "Job processed by worker: #{hostname}"
    end

    def hostname
      @hostname ||= `hostname`.strip
    end
  end
end

Worker.new.start

I create a ruby-based Dockerfile for the producer and workers, file: Dockerfile

FROM ruby:2.5.3-alpine3.8

RUN apk add --no-cache --update build-base

RUN echo 'gem: --no-document' > ~/.gemrc
RUN gem install bundler

ENV APP_HOME /app/
COPY Gemfile Gemfile.lock $APP_HOME
WORKDIR $APP_HOME
RUN bundle install
COPY *.rb $APP_HOME

I used docker compose to create a cluster of services. I implemented a deploy/replicas configuration to spin up 10 worker apps to distribute the load. file: docker-compose.yml

version: '3'
services:

  rabbitmq:
    image: rabbitmq:latest
    ports:
      - "5672:5672"

  app_producer:
    build:
      context: .
      dockerfile: Dockerfile
    command: /app/producer.rb
    environment:
      - ELASTICSEARCH_HOST=elasticsearch
      - RABBITMQ_HOST=rabbitmq
      - QUEUE_NAME=worker_queue
    depends_on:
      - elasticsearch
      - rabbitmq

  app_worker:
    build:
      context: .
      dockerfile: Dockerfile
    command: /app/worker.rb
    deploy:
      mode: replicated
      replicas: 10
    environment:
      - ELASTICSEARCH_HOST=elasticsearch
      - RABBITMQ_HOST=rabbitmq
      - QUEUE_NAME=worker_queue
    depends_on:
      - elasticsearch
      - rabbitmq

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:6.5.2
    container_name: elasticsearch
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata:/usr/share/elasticsearch/data
    ports:
      - 9200:9200

volumes:
  esdata:
    driver: local

Here are the commands I executed to run the apps and verify the results:

# build and start docker container
docker-compose build && docker-compose --compatibility up

# review docker containers
# NOTE: as expected, the producer container exited after queuing all the tasks
docker ps -a
CONTAINER ID  IMAGE                                                COMMAND                 CREATED             STATUS                     PORTS                                                  NAMES
8d0e832f2deb  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_10
5420e004c788  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_4
3d0d70b04310  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_5
e2bc549a2cbb  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_7
11445b7c6295  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_3
5a27d37015c4  rabbitmq_app_producer                                "/app/producer.rb"      About a minute ago  Exited (0) 49 seconds ago                                                         rabbitmq_app_producer_1
a51bcb127e76  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_2
42bfd224e65e  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_9
9307e547454b  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_1
d49337e9c5a8  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_8
ebf2e23a8736  rabbitmq_app_worker                                  "/app/worker.rb"        About a minute ago  Up About a minute                                                                 rabbitmq_app_worker_6
788244fb1620  rabbitmq:latest                                      "docker-entrypoint.s…"  About a minute ago  Up About a minute          4369/tcp, 5671/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp  rabbitmq_rabbitmq_1
ec59a0e8f744  docker.elastic.co/elasticsearch/elasticsearch:6.5.2  "/usr/local/bin/dock…"  About a minute ago  Up About a minute          0.0.0.0:9200->9200/tcp, 9300/tcp                       elasticsearch

# query elasticsearch
curl 'http://localhost:9200/people/_search?pretty&size=1'
{
  "took" : 43,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1000,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "people",
        "_type" : "person",
        "_id" : "ZmiIjmcBxEAUMv2VhCO2",
        "_score" : 1.0,
        "_source" : {
          "first_name" : "Eric",
          "last_name" : "London",
          "email" : "eric@example.com"
        }
      }
    ]
  }
}

Source code on GitHub

Updated: