Unlock Scalable Messaging: 4-Step Guide to Fault-Tolerant Concurrency with RabbitMQ & Spring Boot
Ava Parker
Posted on October 10, 2024
Building on my previous discussion on harnessing the power of asynchronous messaging with RabbitMQ, this article delves into the practical implementation of RabbitMQ in a real-world setting using Java and Spring Boot. For a thorough understanding of the fundamental concepts of async messaging and RabbitMQ, I recommend checking out my previous article at computerstechnicians.com. This article will focus on the following key aspects:
- Seamless integration of RabbitMQ into a Spring Boot application
- Creating efficient producers and consumers to send and receive messages in various formats, including String, JSON, and Java Objects
- Implementing robust fault tolerance mechanisms
- Providing scalable concurrency support
Getting Started with RabbitMQ in Your Spring Boot Project
The first step in unlocking the full potential of RabbitMQ is to set up a Spring Boot project and add the necessary RabbitMQ dependencies. If you already have a Spring Boot application, you can simply add the dependencies without creating a new project. However, creating a separate project for RabbitMQ offers the advantage of keeping your message queue-related code separate from your main application, making it easily shareable or pluggable into other applications if needed.
You can create a Spring Boot project using the Spring initializer and import it into your IDE, or create one directly from the Spring Tool Suite IDE (if you’re using it). To integrate RabbitMQ, add the spring-boot-starter-amqp dependency to your pom.xml file.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Configure and Initialize Application Properties
These configuration parameters are intuitively named, and their purpose is to facilitate the exchange of data within the application, including specifying exchange names, queue names, and binding configurations.
# Message Queue specific configurations for app1
app1.exchange.identifier=app1-exchange
app1.queue.identifier=app1-queue
app1.routing.identifier=app1-routing-key
# Message Queue specific configurations for app2
app2.exchange.identifier=app2-exchange
app2.queue.identifier=app2-queue
app2.routing.identifier=app2-routing-key
#AMQP RabbitMQ configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# Additional RabbitMQ properties
spring.rabbitmq.listener.simple.concurrency=4
spring.rabbitmq.listener.simple.max-concurrency=8
spring.rabbitmq.listener.simple.retry.initial-interval=5000
Develop a Properties File Reader Class
Now that we have created the properties file, let's design a class to read these properties and make them accessible within the application.
package com.dpk.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
@Configuration@PropertySource("classpath:application.properties")public class ApplicationConfigReader {
@Value("${app1.exchange.identifier}")private String app1Exchange;
@Value("${app1.queue.identifier}")private String app1Queue;
@Value("${app1.routing.identifier}")private String app1RoutingKey;
@Value("${app2.exchange.identifier}")private String app2Exchange;
@Value("${app2.queue.identifier}")private String app2Queue;
@Value("${app2.routing.identifier}")private String app2RoutingKey;
// All getters and setters
}
Lay the Groundwork for Queue, Exchange, Routing Key, and Binding Configurations
package com.dpk;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import com.dpk.config.ApplicationConfigReader;
@EnableRabbit@SpringBootApplicationpublic class MsgqApplication extends SpringBootServletInitializer implements RabbitListenerConfigurer {
@Autowiredprivate ApplicationConfigReader applicationConfig;
public ApplicationConfigReader getApplicationConfig() {
return applicationConfig;
}
public void setApplicationConfig(ApplicationConfigReader applicationConfig) {
this.applicationConfig = applicationConfig;
}
public static void main(String[] args) {
SpringApplication.run(MsgqApplication.class, args);
}
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(MsgqApplication.class);
}
/* This bean is to read the properties file configs */
@Beanpublic ApplicationConfigReader applicationConfig() {
return new ApplicationConfigReader();
}
/* Creating a bean for the Message queue Exchange */
@Beanpublic TopicExchange getApp1Exchange() {
return new TopicExchange(getApplicationConfig().getApp1Exchange());
}
/* Creating a bean for the Message queue */
@Beanpublic Queue getApp1Queue() {
return new Queue(getApplicationConfig().getApp1Queue());
}
/* Binding between Exchange and Queue using routing key */
@Beanpublic Binding declareBindingApp1() {
return BindingBuilder.bind(getApp1Queue()).to(getApp1Exchange()).with(getApplicationConfig().getApp1RoutingKey());
}
/* Creating a bean for the Message queue Exchange */
@Beanpublic TopicExchange getApp2Exchange() {
return new TopicExchange(getApplicationConfig().getApp2Exchange());
}
/* Creating a bean for the Message queue */
@Beanpublic Queue getApp2Queue() {
return new Queue(getApplicationConfig().getApp2Queue());
}
/* Binding between Exchange and Queue using routing key */
@Beanpublic Binding declareBindingApp2() {
return BindingBuilder.bind(getApp2Queue()).to(getApp2Exchange()).with(getApplicationConfig().getApp2RoutingKey());
}
/* Bean for rabbitTemplate */
@Beanpublic RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Beanpublic MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Beanpublic DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
@Overridepublic void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
}
Architect a Message Broker
The MessageSender class is strikingly simple. It harnesses the convertAndSend() method of the RabbitTemplate to channel messages to a queue, defining the exchange, routing key, and data payload.
package com.dpk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/
- A message broker responsible for relaying messages to a queue via an exchange.
*/
@Componentpublic class MessageSender {
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
/
-
- @param rabbitTemplate
- @param exchange
- @param routingKey
- @param data
*/
public void sendMessage(RabbitTemplate rabbitTemplate, String exchange, String routingKey, Object data) {
log.info("Routing message to the queue using routingKey {}. Message= {}", routingKey, data);
rabbitTemplate.convertAndSend(exchange, routingKey, data);
log.info("The message has been successfully relayed to the queue.");
}
}
Architecting Message Handlers
Constructing a message handler can be a multifaceted task, as it entails addressing diverse scenarios, including:
- Automatically converting messages into Java objects
- Managing REST call failures to inaccessible APIs or errors occurring during request processing
- Enabling multiple handlers to concurrently retrieve and process messages from queues
- Determining when and how to re-queue messages in the event of failure
Java Object Deserialization
Spring provides the @RabbitListener
annotation, which simplifies message reception from queues and offers automatic Java object deserialization. The following example demonstrates this feature.
Error Handling and Message Re-Queuing in Handlers
In such situations, depending on your business requirements, you may choose not to re-queue the message or re-queue it with a maximum number of retry options to process it up to a limit.
To prevent re-queuing the message, you can throw the AmqpRejectAndDontRequeueException
. For maximum retry handling, you can add an additional parameter to the message, setting the maximum number of retries and incrementing its value while receiving the message, ensuring the total number of retries does not exceed the limit.
An alternative approach is to add these properties to the application.properties
file, specifying the maximum number of attempts:
spring.rabbitmq.listener.simple.retry.max-attempts=3
Concurrency Capabilities
Concurrency can be achieved in two ways:
- Creating a thread pool with a specified maximum number of threads and using a ThreadExecutor to call the methods/APIs for request processing.
- Leveraging the built-in concurrency feature, which requires setting two properties in the application.properties file.
Note: You can adjust the values of these properties according to your application’s scalability requirements.
spring.rabbitmq.listener.simple.concurrency=4
spring.rabbitmq.listener.simple.max-concurrency=8
package com.dpk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpClientErrorException;
import com.dpk.config.ApplicationConfigReader;
import com.dpk.dto.UserDetails;
import com.dpk.util.ApplicationConstant;
/**
* Message Listener for RabbitMQ
*/
@Servicepublic class MessageListener {
private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
@Autowired ApplicationConfigReader applicationConfigReader;
/**
* Message listener for app1
* @param UserDetails a user defined object used for deserialization of message
*/
@RabbitListener(queues = "${app1.queue.name}")
public void receiveMessageForApp1(final UserDetails data) {
log.info("Received message: {} from app1 queue.", data);
try {
log.info("Making REST call to the API");
//TODO: Code to make REST call
log.info("<< Exiting receiveMessageForApp1() after API call.");
} catch(HttpClientErrorException ex) {
if(ex.getStatusCode() == HttpStatus.NOT_FOUND) {
log.info("Delay...");
try {
Thread.sleep(ApplicationConstant.MESSAGE_RETRY_DELAY);
} catch (InterruptedException e) { }
log.info("Throwing exception so that message will be requed in the queue.");
// Note: Typically Application specific exception should be thrown below
throw new RuntimeException();
} else {
throw new AmqpRejectAndDontRequeueException(ex);
}
} catch(Exception e) {
log.error("Internal server error occurred in API call. Bypassing message requeue {}", e);
throw new AmqpRejectAndDontRequeueException(e);
}
}
/**
* Message listener for app2
*
*/
@RabbitListener(queues = "${app2.queue.name}")
public void receiveMessageForApp2(String reqObj) {
log.info("Received message: {} from app2 queue.", reqObj);
try {
log.info("Making REST call to the API");
//TODO: Code to make REST call
log.info("<< Exiting receiveMessageCrawlCI() after API call.");
} catch(HttpClientErrorException ex) {
if(ex.getStatusCode() == HttpStatus.NOT_FOUND) {
log.info("Delay...");
try {
Thread.sleep(ApplicationConstant.MESSAGE_RETRY_DELAY);
} catch (InterruptedException e) { }
log.info("Throwing exception so that message will be requed in the queue.");
// Note: Typically Application specific exception can be thrown below
throw new RuntimeException();
} else {
throw new AmqpRejectAndDontRequeueException(ex);
}
} catch(Exception e) {
log.error("Internal server error occurred in python server. Bypassing message requeue {}", e);
throw new AmqpRejectAndDontRequeueException(e);
}
}
}
Set Up the Service Interface
Finally, create the service class that wraps the service interface, which will be triggered by the user. This service class utilizes the MessageSender to forward the message to the queue.
package com.dpk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.dpk.config.ApplicationConfigReader;
import com.dpk.dto.UserDetails;
import com.dpk.util.ApplicationConstant;
@RestController@RequestMapping(path = "/userservice")
public class UserService {
private static final Logger log = LoggerFactory.getLogger(UserService.class);
private final RabbitTemplate rabbitTemplate;
private ApplicationConfigReader applicationConfig;
private MessageSender messageSender;
public ApplicationConfigReader getApplicationConfig() {
return applicationConfig;
}
@Autowiredpublic void setApplicationConfig(ApplicationConfigReader applicationConfig) {
this.applicationConfig = applicationConfig;
}
@Autowiredpublic UserService(final RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public MessageSender getMessageSender() {
return messageSender;
}
@Autowiredpublic void setMessageSender(MessageSender messageSender) {
this.messageSender = messageSender;
}
@RequestMapping(path = "/add", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<?> sendMessage(@RequestBody UserDetails user) {
String exchange = getApplicationConfig().getApp1Exchange();
String routingKey = getApplicationConfig().getApp1RoutingKey();
/* Sending to Message Queue */
try {
messageSender.sendMessage(rabbitTemplate, exchange, routingKey, user);
return new ResponseEntity<String>(ApplicationConstant.IN_QUEUE, HttpStatus.OK);
} catch (Exception ex) {
log.error("Exception occurred while sending message to the queue. Exception= {}", ex);
return new ResponseEntity(ApplicationConstant.MESSAGE_QUEUE_SEND_ERROR,
HttpStatus.INTERNAL_SERVER_ERROR);
}
}
}
Conclusion
The complete code is accessible on GitHub, where you can log in and review it at your convenience. Should you have any queries or feedback, please feel free to express your thoughts in the comments section below. I appreciate the chance to interact with you. Thank you!
Posted on October 10, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.