EVan Wilson
Posted on September 13, 2024
1. Adding Dependencies
First, add the necessary dependencies to your pom.xml
file:
<!-- RocketMQ Spring Boot dependency for Spring Boot 3 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Dependency compatible with MQ cluster version 5.3.0 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
2. Configuration File bootstrap.yaml
Configure your RocketMQ settings in the bootstrap.yaml
file:
rocketmq:
name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
consumer:
group: consume-group-test
access-key: access # Configure if ACL is used
secret-key: secret
consume-message-batch-max-size: 50 # Max messages per batch
pull-batch-size: 100 # Max messages pulled from Broker
topics:
project: "group-topic-1"
groups:
project: "consume-group-1" # Use different groups for different business processes
3. Configuration Class MqConfigProperties
Create the configuration class MqConfigProperties
:
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
import java.io.Serializable;
/**
* RocketMQ Configuration Class
*/
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {
private static final long serialVersionUID = 1L;
@Autowired
private RocketMQProperties rocketMQProperties;
private TopicProperties topics;
private GroupProperties groups;
/**
* Topic Configuration Class
*/
@Data
public static class TopicProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
/**
* Consumer Group Configuration Class
*/
@Data
public static class GroupProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
}
4. Implementing the Consumer Code
Create the consumer class UserConsumer
:
import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.List;
/**
* Batch Consumer Implementation
*/
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {
@Resource
private MqConfigProperties mqConfigProperties;
@Resource
private ApplicationContext applicationContext;
private volatile boolean running;
private DefaultMQPushConsumer consumer;
@Override
public void start() {
if (isRunning()) {
throw new IllegalStateException("Consumer is already running");
}
initConsumer();
setRunning(true);
log.info("UserConsumer started successfully.");
}
@Override
public void stop() {
if (isRunning() && consumer != null) {
consumer.shutdown();
setRunning(false);
log.info("UserConsumer stopped.");
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
private void initConsumer() {
String topic = mqConfigProperties.getTopics().getProject();
String group = mqConfigProperties.getGroups().getProject();
String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
consumer = rpcHook != null
? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
: new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(100); // Set the batch size for consumption
consumer.subscribe(topic, "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.info("Received {} messages", msgs.size());
for (MessageExt message : msgs) {
String body = new String(message.getBody());
log.info("Processing message: {}", body);
User user = JSONObject.parseObject(body, User.class);
processUser(user);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
}
private void processUser(User user) {
log.info("Processing user with ID: {}", user.getId());
// Handle user-related business logic
}
}
5. Producer Example Code
To produce batch messages, you can use the following UserProducer
class:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class UserProducer {
private DefaultMQProducer producer;
public void sendBatchMessages(List<User> users, String topic) {
List<Message> messages = new ArrayList<>();
for (User user : users) {
messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
}
try {
producer.send(messages);
} catch (Exception e) {
log.error("Error sending batch messages", e);
}
}
}
6. Additional Optimization Suggestions
Performance Optimization: You can adjust the size of the consumer thread pool. By default, it's set to
consumeThreadMin=20
andconsumeThreadMax=20
. In high-concurrency scenarios, increasing the thread pool size can enhance performance.Error Handling: When consumption fails, be cautious with
RECONSUME_LATER
to avoid infinite retry loops. Set a maximum retry count based on your business requirements.Tenant Isolation: Use different groups for different business modules to avoid consuming data from the wrong group. This is especially crucial in production environments.
Posted on September 13, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
September 13, 2024