
RocketMQ的架构由四大核心组件构成,各司其职:
辅助概念:
Producer发送消息前需获取Topic的路由信息(即该Topic分布在哪些Broker的哪些MessageQueue),流程如下:

Broker采用CommitLog+ConsumeQueue+IndexFile的三层存储结构:
# 下载最新稳定版(5.1.4)
wget https://archive.apache.org/dist/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip
unzip rocketmq-all-5.1.4-bin-release.zip -d /usr/local/rocketmq
cd /usr/local/rocketmq
echo "export ROCKETMQ_HOME=/usr/local/rocketmq" >> /etc/profile
echo "export PATH=\$PATH:\$ROCKETMQ_HOME/bin" >> /etc/profile
source /etc/profile
# 修改JVM内存(根据服务器配置调整)
sed -i 's/-Xms4g -Xmx4g/-Xms1g -Xmx1g/g' bin/runserver.sh
# 启动NameServer(后台运行)
nohup sh bin/mqnamesrv > namesrv.log 2>&1 &
# 验证启动(输出"Name Server boot success"表示成功)
tail -f namesrv.log
# 修改JVM内存
sed -i 's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
# 启动Broker(指定NameServer地址)
nohup sh bin/mqbroker -n 192.168.1.100:9876 > broker.log 2>&1 &
# 验证启动(输出"broker boot success"表示成功)
tail -f broker.log
SpringBoot项目中引入以下Maven依赖(最新稳定版):
<dependencies>
<!-- SpringBoot核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.5</version>
</dependency>
<!-- RocketMQ客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.4</version>
</dependency>
<!-- Lombok(简化代码) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Swagger3(接口文档) -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.2.0</version>
</dependency>
<!-- MyBatisPlus(持久层) -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
<scope>runtime</scope>
</dependency>
<!-- Fastjson2(JSON处理) -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.48</version>
</dependency>
<!-- Guava(集合工具) -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.0-jre</version>
</dependency>
</dependencies>
package com.jam.demo.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
/**
* RocketMQ生产者配置类
* @author ken
*/
@Configuration
@Slf4j
publicclass RocketMQProducerConfig {
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
/**
* 初始化默认生产者
* @return DefaultMQProducer
*/
@Bean
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServerAddr);
// 设置同步发送重试次数
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
log.info("RocketMQ生产者启动成功,nameServerAddr:{},producerGroup:{}", nameServerAddr, producerGroup);
} catch (Exception e) {
log.error("RocketMQ生产者启动失败", e);
thrownew RuntimeException("RocketMQ生产者初始化失败", e);
}
return producer;
}
}
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
/**
* 普通消息生产者服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
publicclass NormalMessageProducerService {
privatefinal DefaultMQProducer defaultMQProducer;
/**
* 发送普通消息(同步)
* @param topic 主题(必填)
* @param tags 标签(可选)
* @param keys 消息键(可选,用于消息查询)
* @param body 消息体(必填)
* @return SendResult 发送结果
* @throws Exception 发送异常
*/
public SendResult sendNormalMessage(String topic, String tags, String keys, String body) throws Exception {
// 参数校验
if (!StringUtils.hasText(topic)) {
thrownew IllegalArgumentException("topic不能为空");
}
if (!StringUtils.hasText(body)) {
thrownew IllegalArgumentException("body不能为空");
}
// 构建消息(topic+tags+keys+body)
Message message = new Message(topic, tags, keys, body.getBytes("UTF-8"));
// 同步发送消息
SendResult sendResult = defaultMQProducer.send(message);
log.info("发送普通消息成功,topic:{},tags:{},keys:{},msgId:{},queueId:{}",
topic, tags, keys, sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
return sendResult;
}
/**
* 发送异步消息
* @param topic 主题
* @param tags 标签
* @param keys 消息键
* @param body 消息体
*/
public void sendAsyncMessage(String topic, String tags, String keys, String body) {
if (!StringUtils.hasText(topic) || !StringUtils.hasText(body)) {
thrownew IllegalArgumentException("topic和body不能为空");
}
Message message = new Message(topic, tags, keys, body.getBytes());
// 异步发送回调
defaultMQProducer.send(message, (sendResult, e) -> {
if (e == null) {
log.info("异步发送成功,msgId:{}", sendResult.getMsgId());
} else {
log.error("异步发送失败", e);
}
});
}
}
package com.jam.demo.service;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* 普通消息消费者服务(推模式)
* @author ken
*/
@Service
@Slf4j
publicclass NormalMessageConsumerService {
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
/**
* 初始化推模式消费者
* @throws MQClientException 初始化异常
*/
@PostConstruct
public void initPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServerAddr);
// 设置从最新位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 设置最大重试次数
consumer.setMaxReconsumeTimes(5);
// 设置消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 订阅主题(*表示所有标签)
consumer.subscribe("demo_topic", "order");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), "UTF-8");
log.info("消费普通消息成功,topic:{},tags:{},keys:{},body:{},msgId:{},reconsumeTimes:{}",
msg.getTopic(), msg.getTags(), msg.getKeys(), body, msg.getMsgId(), msg.getReconsumeTimes());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费普通消息失败", e);
// 重试消费(达到最大次数后进入死信队列)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动消费者
consumer.start();
log.info("RocketMQ推模式消费者启动成功,consumerGroup:{}", consumerGroup);
}
}
package com.jam.demo.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* 消息测试控制器
* @author ken
*/
@RestController
@RequestMapping("/message")
@Tag(name = "消息测试接口", description = "RocketMQ消息发送测试")
@Slf4j
@RequiredArgsConstructor
publicclass MessageTestController {
privatefinal NormalMessageProducerService normalMessageProducerService;
@PostMapping("/sendNormal")
@Operation(summary = "发送同步普通消息", description = "发送同步RocketMQ消息到demo_topic")
public String sendNormalMessage(
@Parameter(description = "主题", required = true, example = "demo_topic") @RequestParam String topic,
@Parameter(description = "标签", example = "order") @RequestParam(required = false) String tags,
@Parameter(description = "消息键", example = "order_1001") @RequestParam(required = false) String keys,
@Parameter(description = "消息体", required = true, example = "{\"orderId\":\"1001\",\"amount\":99}") @RequestParam String body) {
try {
SendResult sendResult = normalMessageProducerService.sendNormalMessage(topic, tags, keys, body);
return"发送成功,msgId:" + sendResult.getMsgId();
} catch (Exception e) {
log.error("发送普通消息失败", e);
return"发送失败:" + e.getMessage();
}
}
@PostMapping("/sendAsync")
@Operation(summary = "发送异步普通消息", description = "发送异步RocketMQ消息到demo_topic")
public String sendAsyncMessage(
@Parameter(description = "主题", required = true) @RequestParam String topic,
@Parameter(description = "标签") @RequestParam(required = false) String tags,
@Parameter(description = "消息键") @RequestParam(required = false) String keys,
@Parameter(description = "消息体", required = true) @RequestParam String body) {
try {
normalMessageProducerService.sendAsyncMessage(topic, tags, keys, body);
return"异步发送请求已提交";
} catch (Exception e) {
log.error("发送异步消息失败", e);
return"发送失败:" + e.getMessage();
}
}
}
server:
port:8080
rocketmq:
name-server:192.168.1.100:9876
producer:
group:demo_producer_group
consumer:
group:demo_consumer_group
spring:
datasource:
url:jdbc:mysql://localhost:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username:root
password:root
driver-class-name:com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations:classpath:mapper/**/*.xml
type-aliases-package:com.jam.demo.entity
configuration:
map-underscore-to-camel-case:true
log-impl:org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
swagger-ui:
path:/swagger-ui.html
operationsSorter:method
api-docs:
path:/v3/api-docs
packages-to-scan:com.jam.demo.controller
顺序消息要求同一业务流程的消息按顺序生产和消费(如订单创建→支付→发货),需保证消息发送到同一个MessageQueue,且消费时单线程处理该Queue。
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* 顺序消息生产者服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
publicclass OrderMessageProducerService {
privatefinal DefaultMQProducer defaultMQProducer;
/**
* 发送顺序消息(按业务ID选择MessageQueue)
* @param topic 主题
* @param tags 标签
* @param keys 消息键
* @param body 消息体
* @param businessId 业务ID(如订单ID,用于选择Queue)
* @return SendResult 发送结果
* @throws Exception 发送异常
*/
public SendResult sendOrderMessage(String topic, String tags, String keys, String body, String businessId) throws Exception {
if (!StringUtils.hasText(topic) || !StringUtils.hasText(body) || !StringUtils.hasText(businessId)) {
thrownew IllegalArgumentException("topic、body、businessId不能为空");
}
Message message = new Message(topic, tags, keys, body.getBytes("UTF-8"));
// 按businessId哈希选择MessageQueue(保证同一业务ID的消息进入同一Queue)
SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String id = (String) arg;
int hash = id.hashCode() % mqs.size();
return mqs.get(Math.abs(hash));
}
}, businessId);
log.info("发送顺序消息成功,businessId:{},queueId:{},msgId:{}",
businessId, sendResult.getMessageQueue().getQueueId(), sendResult.getMsgId());
return sendResult;
}
}
package com.jam.demo.service;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* 顺序消息消费者服务
* @author ken
*/
@Service
@Slf4j
publicclass OrderMessageConsumerService {
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
/**
* 初始化顺序消费者
* @throws MQClientException 初始化异常
*/
@PostConstruct
public void initOrderConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServerAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("demo_topic", "order");
// 注册顺序消息监听器(单线程处理每个MessageQueue)
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
context.setAutoCommit(true); // 自动提交偏移量
try {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), "UTF-8");
String businessId = msg.getKeys().split("_")[1]; // 从keys解析业务ID
log.info("消费顺序消息成功,businessId:{},body:{},queueId:{}",
businessId, body, msg.getQueueId());
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
log.error("消费顺序消息失败", e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; // 暂停当前Queue消费
}
});
consumer.start();
log.info("顺序消息消费者启动成功");
}
}
批量消息可减少网络请求次数,提升发送效率,但需注意单批消息大小不超过4MB。
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 批量消息生产者服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
publicclass BatchMessageProducerService {
privatefinal DefaultMQProducer defaultMQProducer;
/**
* 发送批量消息
* @param topic 主题
* @param tags 标签
* @param messageList 消息列表
* @return SendResult 发送结果
* @throws Exception 发送异常
*/
public SendResult sendBatchMessage(String topic, String tags, List<String> messageList) throws Exception {
if (CollectionUtils.isEmpty(messageList)) {
thrownew IllegalArgumentException("消息列表不能为空");
}
List<Message> msgs = new ArrayList<>();
for (String body : messageList) {
Message msg = new Message(topic, tags, "batch_" + System.currentTimeMillis(), body.getBytes("UTF-8"));
msgs.add(msg);
}
// 发送批量消息
SendResult sendResult = defaultMQProducer.send(msgs);
log.info("发送批量消息成功,数量:{},msgId:{}", messageList.size(), sendResult.getMsgId());
return sendResult;
}
}
RocketMQ的高可用依赖NameServer集群和Broker主从集群:

NameServer无状态,只需启动多个节点即可,Producer/Consumer配置多个NameServer地址(用分号分隔):
rocketmq:
name-server: 192.168.1.100:9876;192.168.1.101:9876
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0 # 0表示主节点
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER # 同步主节点(实时同步到从节点)
flushDiskType=SYNC_FLUSH # 同步刷盘(消息写入即刷盘)
storePathRootDir=/data/rocketmq/store/master
storePathCommitLog=/data/rocketmq/store/master/commitlog
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
brokerClusterName=DefaultCluster
brokerName=broker-a # 与主节点同名
brokerId=1 # 非0表示从节点
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=SYNC_FLUSH
storePathRootDir=/data/rocketmq/store/slave
storePathCommitLog=/data/rocketmq/store/slave/commitlog
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
nohup sh mqbroker -c conf/broker-a.properties > broker-a.log 2>&1 &
nohup sh mqbroker -c conf/broker-a-s.properties > broker-a-s.log 2>&1 &
消息可靠性是企业级场景的核心需求,需从生产、存储、消费三个环节保障:
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步发送重试
consumer.setMaxReconsumeTimes(5);
consumer.subscribe("%DLQ%demo_consumer_group", "*"); // 死信队列命名规则:%DLQ%+消费者组名
重复消费是消息中间件的常见问题(如网络抖动导致重试),需通过幂等性设计避免业务异常。
CREATE TABLE`message_consume_record` (
`id`bigintNOTNULL AUTO_INCREMENT COMMENT'主键ID',
`business_key`varchar(64) NOTNULLCOMMENT'业务唯一键(如订单ID)',
`consume_status`varchar(16) NOTNULLCOMMENT'消费状态:UNCONSUMED/CONSUMED',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP,
`update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUEKEY`uk_business_key` (`business_key`) COMMENT'唯一索引保证幂等'
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='消息消费记录表';
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息消费记录实体
* @author ken
*/
@Data
@TableName("message_consume_record")
publicclass MessageConsumeRecord {
@TableId(type = IdType.AUTO)
private Long id;
private String businessKey;
private String consumeStatus;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.MessageConsumeRecord;
import org.apache.ibatis.annotations.Mapper;
/**
* 消息消费记录Mapper
* @author ken
*/
@Mapper
publicinterface MessageConsumeRecordMapper extends BaseMapper<MessageConsumeRecord> {
}
package com.jam.demo.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.MessageConsumeRecord;
import com.jam.demo.mapper.MessageConsumeRecordMapper;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
/**
* 幂等消费服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
publicclass IdempotentConsumeService {
privatefinal MessageConsumeRecordMapper consumeRecordMapper;
/**
* 处理幂等消费
* @param businessKey 业务唯一键
* @param consumeLogic 消费逻辑
* @return 消费状态
*/
public ConsumeConcurrentlyStatus handleIdempotent(String businessKey, Runnable consumeLogic) {
if (!StringUtils.hasText(businessKey)) {
log.error("业务唯一键不能为空");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 检查是否已消费
LambdaQueryWrapper<MessageConsumeRecord> queryWrapper = new LambdaQueryWrapper<MessageConsumeRecord>()
.eq(MessageConsumeRecord::getBusinessKey, businessKey);
MessageConsumeRecord record = consumeRecordMapper.selectOne(queryWrapper);
if (record == null) {
try {
// 执行消费逻辑
consumeLogic.run();
// 插入消费记录
MessageConsumeRecord newRecord = new MessageConsumeRecord();
newRecord.setBusinessKey(businessKey);
newRecord.setConsumeStatus("CONSUMED");
consumeRecordMapper.insert(newRecord);
log.info("幂等消费成功,businessKey:{}", businessKey);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费逻辑执行失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} else {
log.info("消息已消费,businessKey:{}", businessKey);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
String businessKey = msg.getKeys(); // 业务唯一键放在keys中
return idempotentConsumeService.handleIdempotent(businessKey, () -> {
// 具体消费逻辑(如订单处理)
String body = new String(msg.getBody(), "UTF-8");
log.info("执行订单处理逻辑:{}", body);
});
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
RocketMQ通过半消息机制实现分布式事务,解决跨服务的数据一致性问题(如订单创建与库存扣减)。
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
/**
* 事务消息生产者服务
* @author ken
*/
@Service
@Slf4j
publicclass TransactionMessageProducerService {
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
private TransactionMQProducer transactionProducer;
/**
* 初始化事务生产者
*/
@PostConstruct
public void initTransactionProducer() {
transactionProducer = new TransactionMQProducer(producerGroup);
transactionProducer.setNamesrvAddr(nameServerAddr);
// 设置事务监听器
transactionProducer.setTransactionListener(new TransactionListener() {
/**
* 执行本地事务(如扣减库存)
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String businessKey = msg.getKeys();
log.info("执行本地事务,businessKey:{}", businessKey);
try {
// 模拟本地事务(如数据库操作)
boolean localTxSuccess = true; // 实际场景需替换为真实业务逻辑
if (localTxSuccess) {
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
} else {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
}
} catch (Exception e) {
log.error("本地事务执行异常", e);
return LocalTransactionState.UNKNOW; // 未知状态,等待回查
}
}
/**
* 事务回查(Broker主动查询本地事务状态)
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String businessKey = msg.getKeys();
log.info("事务回查,businessKey:{}", businessKey);
// 模拟查询本地事务状态
boolean txSuccess = true;
return txSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
});
try {
transactionProducer.start();
log.info("事务消息生产者启动成功");
} catch (Exception e) {
log.error("事务生产者启动失败", e);
thrownew RuntimeException("事务生产者初始化失败", e);
}
}
/**
* 发送事务消息
* @param topic 主题
* @param tags 标签
* @param keys 业务键
* @param body 消息体
* @param arg 附加参数
*/
public void sendTransactionMessage(String topic, String tags, String keys, String body, Object arg) {
Message message = new Message(topic, tags, keys, body.getBytes());
try {
transactionProducer.sendMessageInTransaction(message, arg);
log.info("事务消息发送请求提交成功,keys:{}", keys);
} catch (Exception e) {
log.error("发送事务消息失败", e);
thrownew RuntimeException("事务消息发送失败", e);
}
}
}
@PostMapping("/sendTransaction")
@Operation(summary = "发送事务消息", description = "发送RocketMQ事务消息")
public String sendTransactionMessage(
@Parameter(description = "主题", required = true) @RequestParam String topic,
@Parameter(description = "标签", required = true) @RequestParam String tags,
@Parameter(description = "业务键", required = true) @RequestParam String keys,
@Parameter(description = "消息体", required = true) @RequestParam String body) {
try {
transactionMessageProducerService.sendTransactionMessage(topic, tags, keys, body, null);
return "事务消息请求提交成功,业务键:" + keys;
} catch (Exception e) {
log.error("发送事务消息失败", e);
return "发送失败:" + e.getMessage();
}
}
mqadmin命令查看消息状态:# 查看Topic消息累计数
sh mqadmin topicStatus -n 192.168.1.100:9876 -t demo_topic
# 查看Broker消息存储状态
sh mqadmin brokerStatus -n 192.168.1.100:9876 -b 192.168.1.100:10911
sed -i 's/-Xms2g -Xmx2g/-Xms8g -Xmx8g -Xmn4g/g' bin/runbroker.sh
mapedFileSizeCommitLog=1073741824 # CommitLog文件大小设为1GB
flushCommitLogThreadPoolNums=4
flushConsumeQueueThreadPoolNums=2
producer.setCompressMsgBodyOverHowmuch(1024*1024))。consumer.setConsumeThreadMax(128));consumer.setConsumeMessageBatchMaxSize(32));RocketMQ作为一款高性能、高可用的消息中间件,已成为企业分布式架构的核心组件。本文从底层逻辑出发,讲解了核心API开发、企业级架构设计、可靠性保障、幂等性处理及性能优化等实战内容,所有示例代码均可直接落地生产。
在实际项目中,需结合业务场景选择合适的消息类型(普通/顺序/事务),通过集群部署保障高可用,通过幂等设计保障数据一致性。同时,需关注消息链路的监控与排查,确保系统稳定运行。