Messaging with RabbitMQ in Spring Boot Application

In this article, you will learn about RabbitMQ and explore its common use cases. I will also guide you step-by-step on how to implement messaging using RabbitMQ in a Spring Boot application. By the end of this article, you will know how to publish and consume messages in a queue. So let’s get started!

What is a RabbitMQ?

RabbitMQ is an open-source message broker software that provides a messaging system for applications. It is based on the Advanced Message Queuing Protocol (AMQP), which is a standard protocol for message-oriented middleware that allows different applications to communicate with each other over a network.

In simple terms, RabbitMQ acts as a middleman between different applications that need to communicate with each other. It accepts messages from one application and routes them to another application based on specific rules. RabbitMQ is highly scalable, fault-tolerant, and can handle large volumes of messages, making it a popular choice for building reliable, distributed systems.

Why do we need a Messaging Queue?

There are several reasons why we may need a messaging queue in a distributed application or system. Here are some of the most common reasons:

  1. Decoupling: A messaging queue allows for loose coupling between different components of an application. When a component sends a message to a queue, it doesn’t need to know which other components will process the message, or even if any other components are currently available. This means that changes to one component won’t necessarily impact other components.
  2. Asynchronous processing: Messaging queues allow for asynchronous processing of messages. Instead of blocking the sender until the message is processed, the sender can send the message and continue with its work, while the message is processed at a later time.
  3. Load balancing: Messaging queues can help balance the load across multiple instances of the same component. Instead of sending all messages to a single instance, messages can be distributed across multiple instances to balance the load.
  4. Resilience: Messaging queues can improve the resilience of an application. If a component is temporarily unavailable, messages can be stored in the queue until the component is back online. This prevents the loss of messages that would occur if the sender simply discards the message.

In summary, messaging queues can help improve the scalability, flexibility, and reliability of distributed applications, and they can also simplify the design and implementation of these applications.

Key-Components in Messaging via RabbitMQ:

Below is the image that depicts the workflow and key components involved in the messaging via RabbitMQ.

  • Message: A message is a form of data that is shared from the producer to the consumer. The data can hold requests, information, meta-data, etc.
  • Producer: A producer is a user program that is responsible to send or produce messages.
  • Exchange: Exchanges are message routing agents, it is responsible to route the messages to different queues with the help of header attributes, bindings, and routing keys. Binding is the link between a queue and the exchange. Whereas, the Routing Key is the address that the exchange uses to determine which queue the message should be sent to.
  • Queue: A queue is a mailbox or a buffer that stores messages. It has certain limitations like disk space and memory.
  • Consumer: A consumer is a user program that is responsible to receive messages.

Common Use-Cases of using Messaging Queue:

In real life, messaging queues are frequently used in various applications. For example, let’s consider a messaging queue application at a coffee shop.

When you enter the coffee shop, the front desk worker greets you, takes your order, and gives you an order number. The order is then sent to the kitchen, where your drink is prepared. The kitchen crew receives your order and begins preparing it. While the kitchen crew prepares your order, the front-line employee doesn’t sit around waiting. Instead, they continue to work and entertain other customers. Similarly, the kitchen crew receives increased orders for drink preparation, but the orders are fulfilled in order. As a result, neither the front-line workers nor the kitchen employees are dependent on each other; they are aware of their roles and continue to work as independent entities.

There are many use-cases for messaging queues, including:

  • Decoupling: A way to decouple communication between different services or applications.
  • High Response Time: When the response time of a request is too long, such as calculations, searching, or PDF creation.
  • Background Jobs: Sending background messages, emails, or notifications to loads of users.
  • Asynchronous Messaging: Messaging queues are the best way to implement asynchronous programming.”

Prerequisite Steps:

To integrate RabbitMQ with your Spring Boot application, the first step is to download and install RabbitMQ on your local machine. There are two common ways to set up RabbitMQ:

  1. Using a Docker container: You can refer to the following blog post for instructions on how to run RabbitMQ on your computer in a Docker.
  2. Using the installer: You can follow the official RabbitMQ website’s instructions to install RabbitMQ according to your operating system. Read the following blog post, to learn how to install RabbitMQ on macOS,

RabbitMQ is accessible by default over port 15672 once setup is complete and RabbitMQ is running.

RabbitMQ UI

Implement messaging using RabbitMQ in Spring Boot Application

Create a Maven-based Spring Boot Project using any of your favourite IDE. You can also read the following tutorial to learn how to create Spring Boot application using Spring Initializr tool.

pom.xml

Following is a complete pom.xml file that contains the RabbitMQ dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/>
        <!-- lookup parent from repository -->
    </parent>
    <groupId>org.example</groupId>
    <artifactId>RabbitMQSpringBootExample</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spring.ampqp.version>2.2.9.RELEASE</spring.ampqp.version>
        <springboot-version>2.5.6</springboot-version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!-- Spring Boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${springboot-version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-amqp</artifactId>
            <version>${spring.ampqp.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>${spring.ampqp.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.15.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

    </dependencies>
</project>

application.properties

This is how the application.properties file looks like. Add RabbitMQ specific properties where:

  • rabbitmq.host: RabbitMQ host
  • rabbitmq.virtualhost: Virtual host to use when connecting to the broker.
  • rabbitmq.port: RabbitMQ port
  • rabbitmq.username: Login user to authenticate to the broker.
  • rabbitmq.password: Log in to authenticate against the broker.
  • rabbitmq.exchange: The name of the exchange to use for send operations.
  • rabbitmq.queue: The name of the message queue where messages are saved.
  • rabbitmq.routingkey: Name of the routing key.
  • rabbitmq.reply.timeout:  Timeout is enforced on consumer delivery acknowledgment. This helps detect buggy consumers that never acknowledge deliveries.
  • rabbitmq.concurrent.consumers: This field is important when we are with multiple producers and consumers reading/writing from the same queue.
  • rabbitmq.max.concurrent.consumers: It shows the number of concurrent consumers, but in our example, we are working with a single consumer only.
#RabbitMQ settings
rabbitmq.host=localhost
rabbitmq.virtualhost=/
rabbitmq.port=15672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.exchange=rabbitmq.exchange
rabbitmq.queue=rabbitmq.queue
rabbitmq.routingkey=rabbitmq.routingkey
rabbitmq.reply.timeout=60000
rabbitmq.concurrent.consumers=1
rabbitmq.max.concurrent.consumers=1

Define the RabbitMQ Configuration Class

In our example, we are disabling the default behavior of RabbitMQ by disabling RabbitAutoConfiguration class.

We need a custom Configuration class to declare @Bean ourselves and configure them as wanted because we are not usingSPring Boot autoconfiguration.  This allows us to adapt Beans according to our RabbitMQ server specifications. One of the use-cases of using the custom Configuration class is to give us leverage to connect to the remote RabbitMQ server to an IP Address and valid credentials.
package org.appsdeveloper.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*;
import org.springframework.util.ErrorHandler;

@EnableRabbit
@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue}")
    private String queueName;

    @Value("${rabbitmq.exchange}")
    private String exchange;

    @Value("${rabbitmq.routingkey}")
    private String routingkey;

    @Value("${rabbitmq.username}")
    private String username;

    @Value("${rabbitmq.password}")
    private String password;

    @Value("${rabbitmq.host}")
    private String host;

    @Value("${rabbitmq.virtualhost}")
    private String virtualHost;

    @Value("${rabbitmq.reply.timeout}")
    private Integer replyTimeout;

    @Value("${rabbitmq.concurrent.consumers}")
    private Integer concurrentConsumers;

    @Value("${rabbitmq.max.concurrent.consumers}")
    private Integer maxConcurrentConsumers;

    @Bean
    public Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchange);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingkey);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        ObjectMapper objectMapper = new ObjectMapper();
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setDefaultReceiveQueue(queueName);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        rabbitTemplate.setReplyAddress(queue().getName());
        rabbitTemplate.setReplyTimeout(replyTimeout);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jsonMessageConverter());
        factory.setConcurrentConsumers(concurrentConsumers);
        factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = LogManager.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }
}

Key-points of Configuration Class:

In the above class, we have configured Beans as per our RabbitMQ properties defined in the application.properties class.

  • Use @EnableRabbit  to enable support for Rabbit Listener.
  • @Value  injects properties from the resource file into the Component classes.
  • Define a bean of Queue with a name and mark it as non-durable. Non-durable means that the queue and any messages on it will be removed when RabbitMQ is stopped. On the other hand, restarting the application won’t have any effect on the queue.
  • For the exchange, we have four different types, but in our example, we have registered a bean of the DirectExchange type. It routes messages to a queue by matching a complete routing key.
  • Then we created a bean for Binding to tie an exchange with the queue.
  • To ensure that messages are delivered in JSON format, create a bean for MessageConverter
  • Register ConnectionFactory bean to make a connection to RabbitMQ.
  • Bean of rabbitTemplate serves the purpose of sending messages to the queue.
  • To define RabbitAdmin, declare AmqpAdmin bean or define it in the ApplicationContext. It’s useful if we need queues to be automatically declared and bounded.
  • For the container factory bean, we have used SimpleRabbitListenerContainerFactory. It is required since the infrastructure looks for a bean rabbitListenerContainerFactory as the factory’s source for creating message listener containers by default.
  • The errorHandler is used to return some user-friendly Error Object when an exception is thrown by the listener.

Define the Domain Model Class

MenuOrder.java

Keeping the use-case of the Restaurant Management System, we have created a domain class of MenuOrder.

package org.appsdeveloper.domain;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.Serializable;
import java.util.List;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class MenuOrder implements Serializable {

    private static final long serialVersionUID = -1138446817700416884L;

    @JsonProperty
    private String orderIdentifier;

    @JsonProperty
    private int orderId;

    @JsonProperty
    private List<String> orderList;

    @JsonProperty
    private String customerName;

    @Override
    public String toString() {
        return "MenuOrder{" +
                "orderIdentifier='" + orderIdentifier + '\'' +
                ", orderId=" + orderId +
                ", orderList=" + orderList +
                ", customerName='" + customerName + '\'' +
                '}';
    }
}

Define the Sender/Receiver Classes

RabbitMQSender.java

Following is an implementation of a service class to send messages in the form of a MenuOrder object to the queue using RabbitTemplate.

package org.appsdeveloper.queue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.appsdeveloper.domain.MenuOrder;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    private static Logger logger = LogManager.getLogger(RabbitMQSender.class.toString());


    public void send(MenuOrder menuOrder) {
        rabbitTemplate.convertAndSend(queue.getName(), menuOrder);
        logger.info("Sending Message to the Queue : " + menuOrder.toString());
    }
}
  • Annotate the class with @Service annotation to inject it as a dependency.
  • rabbitTemplate.convertAndSend(queue.getName(), menuOrder); It is the main method that adds a message to the queue after conversion.

RabbitMQReceiver.java

Following is a Listener class to consume messages of a MenuOrder payload type from the queue.

package org.appsdeveloper.queue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.appsdeveloper.domain.MenuOrder;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "rabbitmq.queue", id = "listener")
public class RabbitMQReceiver {

    private static Logger logger = LogManager.getLogger(RabbitMQReceiver.class.toString());

    @RabbitHandler
    public void receiver(MenuOrder menuOrder) {
        logger.info("MenuOrder listener invoked - Consuming Message with MenuOrder Identifier : " + menuOrder.getOrderIdentifier());
    }

}
  • @RabbitListener  is responsible to listen to the RabbitMQ queue for any incoming messages.
  • When classifying reading data based on payload type, @RabbitHandler is crucial. Define different methods, annotate each with @RabbitHandler  to consume multiple data type payloads from the same queue.

Define the Controller Class

Expose a POST endpoint to send messages to the queue.

package org.appsdeveloper.rest;


import org.appsdeveloper.queue.RabbitMQSender;
import org.appsdeveloper.domain.MenuOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/rabbitmq")
public class RabbitMQDemoController {

    @Autowired
    RabbitMQSender rabbitMQSender;

    @PostMapping(value = "/sender")
    public String producer(@RequestBody MenuOrder menuOrder) {
        rabbitMQSender.send(menuOrder);
        return "Message sent to the RabbitMQ Queue Successfully";
    }
}

Define the Spring Boot Application Main Class

The following code shows the main application class.

package org.appsdeveloper;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableAutoConfiguration(exclude={org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration.class})
public class DemoApplication {

    public static void main(String[] args) {
        System.out.println("Hello");
        SpringApplication.run(DemoApplication.class, args);
    }

}

Testing

It’s time to test the flow from the Postman. Let’s hit the Sender API to send the record to the queue:

Implement messaging using RabbitMQ with Spring Boot Application

API has successfully sent the record to the RabbitMQ Queue as per the configuration. As a result, the message was consumed or received by the Listener because we added a logger to the Receiver class.

Implement messaging using RabbitMQ with Spring Boot Application

In the following RabbitMQ snapshot, we can observe the spike that indicates a message was received at the last minute and processed by the consumer.

In the above image, we can also see details pertaining to the consumer.

Conclusion

In this article, you’ve learned the fundamentals of RabbitMQ as a messaging queue and examined a practical example that demonstrates how to send and consume messages from queues. However, there are more complex use-cases for RabbitMQ. To keep things simple, we focused on a minimal use-case with only one consumer subscribed to a single queue.

Additionally, you’ve learned the core concepts needed to implement messaging using RabbitMQ with a Spring Boot application in practice.

If you found this article helpful, please share your feedback or thoughts in the comments section.

Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *