
在分布式微服务架构中,消息中间件是解耦系统、削峰填谷、异步通信的核心组件。RocketMQ作为阿里开源的金融级分布式消息中间件,凭借高可靠、高吞吐、低延迟的特性,成为国内互联网企业的主流选型。其中事务消息与延时消息是RocketMQ最具差异化的两大核心能力,分别解决了分布式场景下的原子性事务与定时调度两大核心痛点。
在微服务架构中,我们经常会遇到「本地事务执行」与「消息发送」的原子性问题,典型场景如电商下单流程:用户下单时,需要在本地数据库创建订单、扣减库存,同时发送消息通知物流系统初始化发货流程。传统的处理方式存在两个致命问题:
传统的分布式事务方案如2PC、TCC、SAGA等,实现复杂、侵入性强、性能损耗大。而RocketMQ的事务消息,基于两阶段提交+事务回查机制,以极低的侵入性,完美解决了「本地事务与消息发送的原子性」问题,实现了二者的最终一致性。
RocketMQ事务消息的核心设计思想是两阶段提交+反向事务回查补偿,核心概念如下:
底层存储实现上,RocketMQ为半消息专门设计了系统级主题RMQ_SYS_TRANS_HALF_TOPIC,该主题对普通消费者完全不可见,所有半消息都会被持久化到该主题中。当收到COMMIT指令时,Broker会将消息从半消息主题转移到业务指定的真实主题,此时消息才能被消费者消费;当收到ROLLBACK指令时,Broker会直接将半消息标记为删除,不会进行任何投递。
事务回查机制由Broker的定时任务驱动,默认每隔60秒扫描一次半消息主题中超过阈值未收到二次确认的消息,主动向生产者发起回查,默认最大回查次数为15次,超过次数后Broker会默认回滚该消息,避免消息无限期占用存储资源。
RocketMQ事务消息的完整执行流程如下,通过流程图可直观理解全链路逻辑:

全流程的核心逻辑可拆解为8个核心步骤:
RMQ_SYS_TRANS_HALF_TOPIC主题,持久化成功后向生产者返回ACK确认。本次实战基于经典的电商下单场景,实现「创建订单+扣减库存」与「发送物流消息」的原子性,确保二者最终一致。
项目基于JDK17、Spring Boot 3.2.4、RocketMQ 5.2.0构建,Maven核心依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<groupId>com.jam.demo</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-demo</name>
<description>RocketMQ事务消息与延时消息实战</description>
<properties>
<java.version>17</java.version>
<rocketmq.version>2.2.5</rocketmq.version>
<mybatis-plus.version>3.5.6</mybatis-plus.version>
<mysql.version>8.0.36</mysql.version>
<fastjson2.version>2.0.52</fastjson2.version>
<guava.version>33.1.0-jre</guava.version>
<lombok.version>1.18.32</lombok.version>
<springdoc.version>2.5.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.yml配置文件如下:
spring:
application:
name:rocketmq-demo
datasource:
driver-class-name:com.mysql.cj.jdbc.Driver
url:jdbc:mysql://127.0.0.1:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username:root
password:root
jackson:
default-property-inclusion:non_null
server:
port:8080
rocketmq:
name-server:127.0.0.1:9876
producer:
group:order-producer-group
send-message-timeout:3000
retry-times-when-send-failed:2
mybatis-plus:
mapper-locations:classpath*:/mapper/**/*.xml
configuration:
map-underscore-to-camel-case:true
log-impl:org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
swagger-ui:
path:/swagger-ui.html
enabled:true
api-docs:
enabled:true
path:/v3/api-docs
基于MySQL 8.0设计3张核心表,分别为订单表、库存表、本地事务状态表,SQL脚本如下:
CREATE DATABASEIFNOTEXISTS rocketmq_demo DEFAULTCHARACTERSET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE rocketmq_demo;
-- 订单表
DROPTABLEIFEXISTS t_order;
CREATETABLE t_order (
idBIGINTNOTNULL AUTO_INCREMENT COMMENT'主键ID',
order_no VARCHAR(64) NOTNULLCOMMENT'订单编号',
user_id BIGINTNOTNULLCOMMENT'用户ID',
product_id BIGINTNOTNULLCOMMENT'商品ID',
quantity INTNOTNULLCOMMENT'购买数量',
total_amount DECIMAL(10,2) NOTNULLCOMMENT'订单总金额',
order_status TINYINTNOTNULLDEFAULT0COMMENT'订单状态:0-待支付,1-已支付,2-已取消,3-已完成',
create_time DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',
update_time DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'更新时间',
PRIMARY KEY (id),
UNIQUEKEY uk_order_no (order_no),
KEY idx_user_id (user_id),
KEY idx_product_id (product_id)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表';
-- 库存表
DROPTABLEIFEXISTS t_stock;
CREATETABLE t_stock (
idBIGINTNOTNULL AUTO_INCREMENT COMMENT'主键ID',
product_id BIGINTNOTNULLCOMMENT'商品ID',
stock_num INTNOTNULLDEFAULT0COMMENT'库存数量',
create_time DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',
update_time DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'更新时间',
PRIMARY KEY (id),
UNIQUEKEY uk_product_id (product_id)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='库存表';
-- 本地事务状态表,用于事务回查
DROPTABLEIFEXISTS t_transaction_log;
CREATETABLE t_transaction_log (
idBIGINTNOTNULL AUTO_INCREMENT COMMENT'主键ID',
transaction_id VARCHAR(64) NOTNULLCOMMENT'全局事务ID',
business_type VARCHAR(32) NOTNULLCOMMENT'业务类型',
transaction_status TINYINTNOTNULLCOMMENT'事务状态:0-执行中,1-已提交,2-已回滚',
create_time DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',
update_time DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'更新时间',
PRIMARY KEY (id),
UNIQUEKEY uk_transaction_id (transaction_id),
KEY idx_create_time (create_time)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='本地事务状态表';
-- 初始化库存数据
INSERTINTO t_stock (product_id, stock_num) VALUES (1, 1000);
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单实体类
* @author ken
*/
@Data
@TableName("t_order")
@Schema(name = "Order", description = "订单实体")
publicclass Order {
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "订单编号")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "商品ID")
private Long productId;
@Schema(description = "购买数量")
private Integer quantity;
@Schema(description = "订单总金额")
private BigDecimal totalAmount;
@Schema(description = "订单状态:0-待支付,1-已支付,2-已取消,3-已完成")
private Integer orderStatus;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 库存实体类
* @author ken
*/
@Data
@TableName("t_stock")
@Schema(name = "Stock", description = "库存实体")
publicclass Stock {
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "商品ID")
private Long productId;
@Schema(description = "库存数量")
private Integer stockNum;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 事务日志实体类
* @author ken
*/
@Data
@TableName("t_transaction_log")
@Schema(name = "TransactionLog", description = "事务日志实体")
publicclass TransactionLog {
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "全局事务ID")
private String transactionId;
@Schema(description = "业务类型")
private String businessType;
@Schema(description = "事务状态:0-执行中,1-已提交,2-已回滚")
private Integer transactionStatus;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;
/**
* 订单Mapper
* @author ken
*/
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Stock;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
/**
* 库存Mapper
* @author ken
*/
@Mapper
publicinterface StockMapper extends BaseMapper<Stock> {
/**
* 扣减库存
* @param productId 商品ID
* @param quantity 扣减数量
* @return 影响行数
*/
@Update("UPDATE t_stock SET stock_num = stock_num - #{quantity} WHERE product_id = #{productId} AND stock_num >= #{quantity}")
int deductStock(@Param("productId") Long productId, @Param("quantity") Integer quantity);
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.TransactionLog;
import org.apache.ibatis.annotations.Mapper;
/**
* 事务日志Mapper
* @author ken
*/
@Mapper
public interface TransactionLogMapper extends BaseMapper<TransactionLog> {
}
package com.jam.demo.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
* 订单服务
* @author ken
*/
@Service
@RequiredArgsConstructor
publicclass OrderService {
privatefinal OrderMapper orderMapper;
/**
* 保存订单
* @param order 订单实体
* @return 保存结果
*/
public boolean saveOrder(Order order) {
return orderMapper.insert(order) > 0;
}
/**
* 根据订单号查询订单
* @param orderNo 订单号
* @return 订单实体
*/
public Order getOrderByNo(String orderNo) {
LambdaQueryWrapper<Order> wrapper = new LambdaQueryWrapper<Order>()
.eq(Order::getOrderNo, orderNo);
return orderMapper.selectOne(wrapper);
}
/**
* 取消订单
* @param orderNo 订单号
* @return 取消结果
*/
public boolean cancelOrder(String orderNo) {
Order order = new Order();
order.setOrderStatus(2);
LambdaQueryWrapper<Order> wrapper = new LambdaQueryWrapper<Order>()
.eq(Order::getOrderNo, orderNo)
.eq(Order::getOrderStatus, 0);
return orderMapper.update(order, wrapper) > 0;
}
}
package com.jam.demo.service;
import com.jam.demo.mapper.StockMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
* 库存服务
* @author ken
*/
@Service
@RequiredArgsConstructor
publicclass StockService {
privatefinal StockMapper stockMapper;
/**
* 扣减库存
* @param productId 商品ID
* @param quantity 扣减数量
* @return 扣减结果
*/
public boolean deductStock(Long productId, Integer quantity) {
return stockMapper.deductStock(productId, quantity) > 0;
}
/**
* 恢复库存
* @param productId 商品ID
* @param quantity 恢复数量
* @return 恢复结果
*/
public boolean restoreStock(Long productId, Integer quantity) {
return stockMapper.deductStock(productId, -quantity) > 0;
}
}
package com.jam.demo.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.TransactionLog;
import com.jam.demo.mapper.TransactionLogMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
/**
* 事务日志服务
* @author ken
*/
@Service
@RequiredArgsConstructor
publicclass TransactionLogService {
privatefinal TransactionLogMapper transactionLogMapper;
/**
* 保存事务日志
* @param transactionLog 事务日志实体
* @return 保存结果
*/
public boolean saveTransactionLog(TransactionLog transactionLog) {
return transactionLogMapper.insert(transactionLog) > 0;
}
/**
* 更新事务状态
* @param transactionId 事务ID
* @param status 事务状态
* @return 更新结果
*/
public boolean updateTransactionStatus(String transactionId, Integer status) {
TransactionLog transactionLog = new TransactionLog();
transactionLog.setTransactionStatus(status);
LambdaQueryWrapper<TransactionLog> wrapper = new LambdaQueryWrapper<TransactionLog>()
.eq(TransactionLog::getTransactionId, transactionId);
return transactionLogMapper.update(transactionLog, wrapper) > 0;
}
/**
* 根据事务ID查询事务状态
* @param transactionId 事务ID
* @return 事务状态
*/
public Integer getTransactionStatus(String transactionId) {
LambdaQueryWrapper<TransactionLog> wrapper = new LambdaQueryWrapper<TransactionLog>()
.eq(TransactionLog::getTransactionId, transactionId);
TransactionLog transactionLog = transactionLogMapper.selectOne(wrapper);
if (ObjectUtils.isEmpty(transactionLog)) {
returnnull;
}
return transactionLog.getTransactionStatus();
}
}
package com.jam.demo.config;
import com.jam.demo.listener.OrderTransactionListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* RocketMQ配置类
* @author ken
*/
@Slf4j
@Configuration
publicclass RocketMQConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
privatefinal OrderTransactionListener orderTransactionListener;
public RocketMQConfig(OrderTransactionListener orderTransactionListener) {
this.orderTransactionListener = orderTransactionListener;
}
/**
* 事务消息生产者
* @return TransactionMQProducer实例
*/
@Bean
public TransactionMQProducer transactionMQProducer() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("transaction-check-thread-%d");
return thread;
}
);
TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.setExecutorService(executor);
producer.setTransactionListener(orderTransactionListener);
try {
producer.start();
log.info("TransactionMQProducer start success");
} catch (Exception e) {
log.error("TransactionMQProducer start failed", e);
thrownew RuntimeException(e);
}
return producer;
}
/**
* RocketMQTemplate
* @param transactionMQProducer 事务消息生产者
* @return RocketMQTemplate实例
*/
@Bean
public RocketMQTemplate rocketMQTemplate(TransactionMQProducer transactionMQProducer) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(transactionMQProducer);
return rocketMQTemplate;
}
}
package com.jam.demo.listener;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import com.jam.demo.entity.TransactionLog;
import com.jam.demo.service.OrderService;
import com.jam.demo.service.StockService;
import com.jam.demo.service.TransactionLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.UUID;
/**
* 订单事务消息监听器
* @author ken
*/
@Slf4j
@RequiredArgsConstructor
publicclass OrderTransactionListener implements TransactionListener {
privatefinal OrderService orderService;
privatefinal StockService stockService;
privatefinal TransactionLogService transactionLogService;
privatefinal TransactionTemplate transactionTemplate;
/**
* 执行本地事务
* @param msg 半消息
* @param arg 业务参数
* @return 本地事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getTransactionId();
if (!StringUtils.hasText(transactionId)) {
transactionId = UUID.randomUUID().toString().replace("-", "");
}
log.info("开始执行本地事务,transactionId:{}", transactionId);
try {
Order order = JSON.parseObject(msg.getBody(), Order.class);
String finalTransactionId = transactionId;
Boolean result = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
// 1. 保存事务日志,状态为执行中
TransactionLog transactionLog = new TransactionLog();
transactionLog.setTransactionId(finalTransactionId);
transactionLog.setBusinessType("ORDER_CREATE");
transactionLog.setTransactionStatus(0);
boolean saveLog = transactionLogService.saveTransactionLog(transactionLog);
if (!saveLog) {
status.setRollbackOnly();
returnfalse;
}
// 2. 保存订单
boolean saveOrder = orderService.saveOrder(order);
if (!saveOrder) {
status.setRollbackOnly();
returnfalse;
}
// 3. 扣减库存
boolean deductStock = stockService.deductStock(order.getProductId(), order.getQuantity());
if (!deductStock) {
status.setRollbackOnly();
returnfalse;
}
// 4. 更新事务状态为已提交
boolean updateStatus = transactionLogService.updateTransactionStatus(finalTransactionId, 1);
if (!updateStatus) {
status.setRollbackOnly();
returnfalse;
}
returntrue;
}
});
if (ObjectUtils.isEmpty(result) || !result) {
log.error("本地事务执行失败,transactionId:{}", transactionId);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
log.info("本地事务执行成功,transactionId:{}", transactionId);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("本地事务执行异常,transactionId:{}", transactionId, e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 事务回查
* @param msg 消息
* @return 本地事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
log.info("开始事务回查,transactionId:{}", transactionId);
if (!StringUtils.hasText(transactionId)) {
return LocalTransactionState.UNKNOW;
}
try {
Integer status = transactionLogService.getTransactionStatus(transactionId);
if (ObjectUtils.isEmpty(status)) {
log.warn("事务状态不存在,transactionId:{}", transactionId);
return LocalTransactionState.UNKNOW;
}
returnswitch (status) {
case1 -> LocalTransactionState.COMMIT_MESSAGE;
case2 -> LocalTransactionState.ROLLBACK_MESSAGE;
default -> LocalTransactionState.UNKNOW;
};
} catch (Exception e) {
log.error("事务回查异常,transactionId:{}", transactionId, e);
return LocalTransactionState.UNKNOW;
}
}
}
package com.jam.demo.consumer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 订单物流消息消费者
* @author ken
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "order_create_topic",
consumerGroup = "order-logistics-consumer-group"
)
publicclass OrderLogisticsConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到订单创建消息,message:{}", message);
Order order = JSON.parseObject(message, Order.class);
// 初始化物流发货流程
log.info("初始化订单物流流程,orderNo:{}", order.getOrderNo());
}
}
package com.jam.demo.controller;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* 订单事务消息接口
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/order")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单事务消息与定时消息相关接口")
publicclass OrderController {
privatefinal TransactionMQProducer transactionMQProducer;
privatefinal RocketMQTemplate rocketMQTemplate;
privatestaticfinal String ORDER_TOPIC = "order_create_topic";
privatestaticfinal String ORDER_CANCEL_TOPIC = "order_cancel_topic";
// 30分钟延时,单位毫秒
privatestaticfinallong DELAY_TIME = 30 * 60 * 1000L;
/**
* 创建订单(事务消息)
* @param order 订单实体
* @return 订单创建结果
*/
@PostMapping("/create")
@Operation(summary = "创建订单", description = "基于RocketMQ事务消息创建订单,保证本地事务与消息发送的原子性")
public String createOrder(@RequestBody Order order) {
String transactionId = UUID.randomUUID().toString().replace("-", "");
order.setOrderNo(transactionId);
order.setOrderStatus(0);
try {
Message message = new Message(ORDER_TOPIC, JSON.toJSONBytes(order));
message.setTransactionId(transactionId);
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
log.info("事务消息发送结果,transactionId:{}, sendResult:{}", transactionId, sendResult);
// 发送订单取消定时消息
Message cancelMessage = new Message(ORDER_CANCEL_TOPIC, JSON.toJSONBytes(order));
// 设置消息投递时间戳
cancelMessage.setDeliverTimeMs(System.currentTimeMillis() + DELAY_TIME);
SendResult cancelSendResult = rocketMQTemplate.getProducer().send(cancelMessage);
log.info("定时消息发送结果,orderNo:{}, sendResult:{}", transactionId, cancelSendResult);
return"订单创建成功,订单号:" + transactionId;
} catch (Exception e) {
log.error("订单创建失败,transactionId:{}", transactionId, e);
return"订单创建失败:" + e.getMessage();
}
}
}
package com.jam.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 项目启动类
* @author ken
*/
@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
publicclass RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
RMQ_SYS_TRANS_HALF_TOPIC的写入权限,否则半消息发送会直接失败。在业务开发中,我们经常会遇到需要延迟处理的业务场景,典型场景如:
传统的实现方式是基于数据库轮询,定时扫描符合条件的记录进行处理。这种方式存在严重的性能问题:高频轮询会给数据库带来巨大的IO压力,同时轮询间隔会导致业务处理延迟,无法做到精准触发。而RocketMQ的延时/定时消息,基于Broker端的定时调度机制,完美解决了这一痛点,实现了高性能、高精度的延迟任务处理。
RocketMQ 4.x版本仅支持固定级别的延时消息,预设了18个延时级别,每个级别对应固定的延时时间,用户无法自定义任意延时时间。
延时级别 | 延时时间 | 延时级别 | 延时时间 |
|---|---|---|---|
1 | 1秒 | 10 | 6分钟 |
2 | 5秒 | 11 | 7分钟 |
3 | 10秒 | 12 | 8分钟 |
4 | 30秒 | 13 | 9分钟 |
5 | 1分钟 | 14 | 10分钟 |
6 | 2分钟 | 15 | 20分钟 |
7 | 3分钟 | 16 | 30分钟 |
8 | 4分钟 | 17 | 1小时 |
9 | 5分钟 | 18 | 2小时 |
4.x延时消息的核心实现逻辑如下:
setDelayTimeLevel方法设置延时级别,消息发送到Broker。SCHEDULE_TOPIC_XXXX,每个延时级别对应该主题下的一个独立队列。该实现方案的优点是简单稳定,缺点是灵活性极差,仅支持固定的18个延时级别,无法满足自定义延时时间的业务需求。
RocketMQ 5.0版本正式推出了定时消息(Timed Message)能力,支持毫秒级精度的任意时间延时,最大支持40天的延时时长,彻底解决了4.x版本的灵活性问题。
5.x定时消息的核心实现基于TimerStore定时索引存储与多级时间轮调度机制,核心逻辑如下:
setDeliverTimeMs方法设置消息的投递时间戳(当前时间+延时时长),消息发送到Broker。完整的定时消息执行流程如下:

为了清晰区分4.x延时消息与5.x定时消息的差异,避免使用时混淆,核心对比如下:
特性维度 | 4.x固定级别延时消息 | 5.x任意时间定时消息 |
|---|---|---|
延时时间 | 仅支持18个固定级别,不可自定义 | 支持毫秒级任意时间,最大40天 |
存储实现 | 基于固定主题SCHEDULE_TOPIC_XXXX的队列存储 | 基于CommitLog+TimerStore(RocksDB)的索引存储 |
调度机制 | 单级别单线程轮询调度 | 多级时间轮高效调度 |
时间精度 | 秒级 | 毫秒级 |
扩展性 | 差,新增级别需修改Broker配置并重启 | 好,无需修改配置,直接使用 |
版本支持 | 所有4.x版本 | 5.0及以上版本 |
本次实战基于经典的电商订单超时取消场景,用户下单后发送30分钟的定时消息,30分钟后检查订单支付状态,若仍为待支付,则自动取消订单并释放库存。
使用5.x定时消息前,需要在Broker的配置文件broker.conf中开启定时消息功能,核心配置如下:
# 开启定时消息功能
enableTimer=true
# 定时消息最大延时时长,单位毫秒,默认40天,此处设置为30天
timerMaxDelay=2592000000
# 定时消息调度线程数,默认4
timerWheelNum=4
配置完成后重启Broker,即可使用定时消息能力。
package com.jam.demo.consumer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import com.jam.demo.service.OrderService;
import com.jam.demo.service.StockService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.ObjectUtils;
/**
* 订单取消定时消息消费者
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "order_cancel_topic",
consumerGroup = "order-cancel-consumer-group"
)
publicclass OrderCancelConsumer implements RocketMQListener<String> {
privatefinal OrderService orderService;
privatefinal StockService stockService;
privatefinal TransactionTemplate transactionTemplate;
@Override
public void onMessage(String message) {
log.info("收到订单取消定时消息,message:{}", message);
Order order = JSON.parseObject(message, Order.class);
String orderNo = order.getOrderNo();
try {
// 幂等校验:查询订单状态
Order existOrder = orderService.getOrderByNo(orderNo);
if (ObjectUtils.isEmpty(existOrder)) {
log.warn("订单不存在,orderNo:{}", orderNo);
return;
}
if (existOrder.getOrderStatus() != 0) {
log.info("订单已支付或已取消,无需处理,orderNo:{}", orderNo);
return;
}
// 执行订单取消与库存恢复,使用编程式事务
Boolean result = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
// 1. 取消订单
boolean cancelOrder = orderService.cancelOrder(orderNo);
if (!cancelOrder) {
status.setRollbackOnly();
returnfalse;
}
// 2. 恢复库存
boolean restoreStock = stockService.restoreStock(order.getProductId(), order.getQuantity());
if (!restoreStock) {
status.setRollbackOnly();
returnfalse;
}
returntrue;
}
});
if (ObjectUtils.isEmpty(result) || !result) {
log.error("订单取消失败,orderNo:{}", orderNo);
thrownew RuntimeException("订单取消失败,触发重试");
}
log.info("订单超时取消成功,orderNo:{}", orderNo);
} catch (Exception e) {
log.error("订单取消处理异常,orderNo:{}", orderNo, e);
throw e;
}
}
}
timerMaxDelay,否则消息会被Broker拒绝。setDeliverTimeMs方法设置的是消息的投递时间戳,必须是大于当前时间的未来时间,否则消息会被立即投递。setDelayTimeLevel设置级别,5.x定时消息使用setDeliverTimeMs设置时间戳,不可混淆使用。enableTimer=true,否则定时消息会被当成普通消息立即投递。timerMaxDelay配置的最大值,消息会被Broker直接拒绝,发送失败。setDelayTimeLevel与5.x的setDeliverTimeMs不可同时使用,同时设置时只有setDeliverTimeMs生效。RocketMQ的事务消息与延时消息,是其区别于其他消息中间件的核心差异化能力,分别解决了分布式场景下的两大核心痛点: