
在分布式系统中,消息队列就像“交通枢纽”,承接生产者的消息、调度消费者的消费节奏,是解耦、削峰、异步通信的核心组件。但一旦出现“百万消息积压几小时”的问题,就相当于交通枢纽彻底瘫痪——下游业务无法获取消息、数据一致性被破坏、甚至引发连锁故障,直接影响用户体验和业务连续性。
作为常年和分布式架构打交道的开发者,我曾在生产环境中多次处理过消息积压问题,小到几万条消息的短暂阻塞,大到百万级消息积压4小时的紧急故障,总结出了一套“紧急止血→根源排查→彻底解决→复盘优化”的全流程方案。
很多开发者遇到积压就慌,盲目扩容消费者、重启服务,结果越搞越乱——其实消息积压的本质很简单,用一句话就能说透:消息生产速度 ≥ 消息消费速度,且积压量超过了消息队列的缓冲能力,导致消息在队列中持续堆积。
类比一下:消息队列就像小区的快递柜,生产者是快递员,消费者是取快递的业主。正常情况下,快递员送快递的速度(生产速度),和业主取快递的速度(消费速度)基本匹配,快递柜不会满;但如果快递员突然批量送百万个包裹(生产者突增),或者业主都在家不出来取件(消费者消费慢/挂掉),快递柜很快就会被堆满,后续的快递只能排队等待,这就是“消息积压”。
很多人觉得“积压几小时没事”,其实积压的危害会持续放大,甚至引发级联故障:
遇到积压问题,先不要急着解决,先排查原因——用下面这个流程图,3分钟就能定位到积压的核心原因,避免盲目操作:

当百万消息积压几小时,核心诉求是“快速缓解,避免业务进一步恶化”——这一步的核心思路是:暂时切断非核心压力,最大化提升消费能力,快速消化积压消息,相当于“先找临时快递员,把堆积的快递先拉走一部分,缓解快递柜压力”。
紧急止血的操作优先级:暂停非核心生产者 → 临时扩容消费者 → 消息分流 → 跳过无效消息,四步走,30分钟内就能让积压量快速下降,具体操作如下(附实例)。
积压的核心是“生产>消费”,所以第一步要做的是“减少生产”——暂停非核心业务的生产者,只保留核心业务的消息生产,避免积压量持续增加。
比如电商系统中,百万订单消息积压,此时可以暂停“订单评价、物流通知”等非核心业务的生产者,只保留“订单创建、支付回调”等核心业务的生产者,让消费端集中精力处理核心消息。
基于JDK17、SpringBoot3.2.3、Swagger3,实现生产者的动态启停,通过接口控制,无需重启服务,符合生产环境使用规范:
package com.jam.demo.producer.controller;
import com.jam.demo.producer.service.OrderProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 生产者控制接口(用于动态启停非核心生产者)
* @author ken
*/
@RestController
@RequestMapping("/producer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ProducerControlController", description = "生产者动态控制接口")
public class ProducerControlController {
private final OrderProducerService orderProducerService;
/**
* 启停非核心生产者(如订单评价、物流通知)
* @param enable true-启动,false-暂停
* @return 操作结果
*/
@PostMapping("/nonCore/enable")
@Operation(summary = "启停非核心生产者", description = "控制非核心业务的消息生产,缓解积压")
public String enableNonCoreProducer(@RequestParam Boolean enable) {
if (ObjectUtils.isEmpty(enable)) {
log.error("启停参数不能为空");
return "参数错误:enable不能为空";
}
orderProducerService.setNonCoreEnable(enable);
String result = enable ? "非核心生产者已启动" : "非核心生产者已暂停";
log.info(result);
return result;
}
/**
* 查看非核心生产者状态
* @return 状态描述
*/
@PostMapping("/nonCore/status")
@Operation(summary = "查看非核心生产者状态", description = "获取当前非核心生产者的启停状态")
public String getNonCoreProducerStatus() {
boolean enable = orderProducerService.getNonCoreEnable();
return enable ? "非核心生产者正在运行" : "非核心生产者已暂停";
}
}
package com.jam.demo.producer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.NonCoreMessage;
import com.jam.demo.producer.config.RocketMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 订单生产者服务(核心+非核心)
* @author ken
*/
@Service
@Slf4j
public class OrderProducerService {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 非核心生产者开关(原子类保证线程安全)
*/
private final AtomicBoolean nonCoreEnable = new AtomicBoolean(true);
/**
* 发送核心消息(订单创建、支付回调)
* @param message 核心消息实体
*/
public void sendCoreMessage(Object message) {
if (ObjectUtils.isEmpty(message)) {
log.error("核心消息不能为空");
return;
}
try {
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.CORE_ORDER_TOPIC,
JSON.toJSONString(message),
3000
);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("核心消息发送成功,消息ID:{}", sendResult.getMsgId());
} else {
log.error("核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), message);
}
} catch (Exception e) {
log.error("核心消息发送异常,消息内容:{}", message, e);
// 核心消息发送失败,可触发重试(避免核心数据丢失)
retrySendCoreMessage(message);
}
}
/**
* 发送非核心消息(订单评价、物流通知)
* @param nonCoreMessage 非核心消息实体
*/
public void sendNonCoreMessage(NonCoreMessage nonCoreMessage) {
// 若开关关闭,直接返回,不发送非核心消息
if (!nonCoreEnable.get()) {
log.warn("非核心生产者已暂停,消息暂不发送:{}", nonCoreMessage);
return;
}
if (ObjectUtils.isEmpty(nonCoreMessage)) {
log.error("非核心消息不能为空");
return;
}
try {
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.NON_CORE_ORDER_TOPIC,
JSON.toJSONString(nonCoreMessage),
2000
);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("非核心消息发送成功,消息ID:{}", sendResult.getMsgId());
} else {
log.error("非核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), nonCoreMessage);
}
} catch (Exception e) {
log.error("非核心消息发送异常,消息内容:{}", nonCoreMessage, e);
// 非核心消息发送失败,无需重试,避免增加积压压力
}
}
/**
* 核心消息重试发送(最多重试3次)
* @param message 核心消息实体
*/
private void retrySendCoreMessage(Object message) {
int retryCount = 0;
while (retryCount < 3) {
try {
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.CORE_ORDER_TOPIC,
JSON.toJSONString(message),
3000
);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("核心消息重试发送成功,重试次数:{},消息ID:{}", retryCount + 1, sendResult.getMsgId());
return;
}
} catch (Exception e) {
log.error("核心消息重试发送异常,重试次数:{},消息内容:{}", retryCount + 1, message, e);
}
retryCount++;
// 重试间隔:100ms,避免频繁重试占用资源
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("核心消息重试间隔异常", e);
break;
}
}
log.error("核心消息重试3次均失败,消息内容:{},请人工处理", message);
}
/**
* 设置非核心生产者开关状态
* @param enable 开关状态
*/
public void setNonCoreEnable(Boolean enable) {
nonCoreEnable.set(enable);
}
/**
* 获取非核心生产者开关状态
* @return 开关状态
*/
public boolean getNonCoreEnable() {
return nonCoreEnable.get();
}
}
<!-- SpringBoot父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/>
</parent>
<!-- JDK版本 -->
<properties>
<java.version>17</java.version>
<rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version>
<mybatis-plus-boot-starter.version>3.5.5.1</mybatis-plus-boot-starter.version>
<fastjson2.version>2.0.39</fastjson2.version>
<lombok.version>1.18.30</lombok.version>
<swagger.version>2.2.0</swagger.version>
</properties>
<!-- 核心依赖 -->
<dependencies>
<!-- SpringBoot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>
<!-- FastJSON2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- MyBatisPlus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
</dependencies>
暂停非核心生产者后,下一步是“提升消费速度”——临时扩容消费者,让更多的“取件人”一起处理积压的消息,这是紧急止血最有效的手段。
基于SpringBoot+RocketMQ,实现消费者线程数的动态调整,结合Swagger3接口,无需重启服务,快速提升消费能力:
package com.jam.demo.consumer.controller;
import com.jam.demo.consumer.service.OrderConsumerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 消费者控制接口(动态调整消费线程数)
* @author ken
*/
@RestController
@RequestMapping("/consumer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ConsumerControlController", description = "消费者动态控制接口")
public class ConsumerControlController {
private final OrderConsumerService orderConsumerService;
/**
* 动态调整核心消息消费者线程数
* @param threadNum 线程数(1-50,根据服务器性能调整)
* @return 操作结果
*/
@PostMapping("/core/threadNum")
@Operation(summary = "调整核心消费者线程数", description = "动态调整核心消息(订单创建、支付回调)的消费线程数,提升消费速度")
public String adjustCoreConsumerThreadNum(@RequestParam Integer threadNum) {
if (ObjectUtils.isEmpty(threadNum) || threadNum < 1 || threadNum > 50) {
log.error("线程数参数错误,线程数必须在1-50之间,当前参数:{}", threadNum);
return "参数错误:线程数必须为1-50之间的整数";
}
orderConsumerService.setCoreConsumerThreadNum(threadNum);
log.info("核心消费者线程数已调整为:{}", threadNum);
return "核心消费者线程数调整成功,当前线程数:" + threadNum;
}
/**
* 查看核心消费者线程数
* @return 线程数描述
*/
@PostMapping("/core/threadNum/status")
@Operation(summary = "查看核心消费者线程数", description = "获取当前核心消息消费者的线程数")
public String getCoreConsumerThreadNum() {
int threadNum = orderConsumerService.getCoreConsumerThreadNum();
return "当前核心消费者线程数:" + threadNum;
}
}
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 核心消息消费者服务(订单创建、支付回调)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderConsumerService implements RocketMQListener<String> {
private final OrderMapper orderMapper;
private final PlatformTransactionManager transactionManager;
@Resource
private ThreadPoolTaskExecutor consumerThreadPool;
/**
* 核心消费者线程数(原子类保证线程安全)
*/
private final AtomicInteger coreConsumerThreadNum = new AtomicInteger(10);
/**
* 初始化消费者线程池(默认10线程)
*/
@PostConstruct
public void initThreadPool() {
consumerThreadPool.setCorePoolSize(coreConsumerThreadNum.get());
consumerThreadPool.setMaxPoolSize(coreConsumerThreadNum.get() + 5);
consumerThreadPool.setQueueCapacity(1000);
log.info("核心消费者线程池初始化完成,默认线程数:{}", coreConsumerThreadNum.get());
}
/**
* 消费核心消息(RocketMQ监听方法)
* @param message 消息内容(JSON格式)
*/
@Override
@RocketMQMessageListener(
topic = RocketMQConfig.CORE_ORDER_TOPIC,
consumerGroup = RocketMQConfig.CORE_ORDER_CONSUMER_GROUP,
// 广播模式:所有消费者都消费同一消息;集群模式:消息只被一个消费者消费(优先集群模式,避免重复消费)
messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
// 批量消费:每次拉取10条消息,提升消费速度(根据业务调整)
consumeMessageBatchMaxSize = 10,
// 消费超时时间:30秒(避免消费耗时过长导致消息重试)
consumeTimeout = 30
)
public void onMessage(String message) {
if (ObjectUtils.isEmpty(message)) {
log.error("消费核心消息失败:消息内容为空");
return;
}
// 提交到线程池异步消费,提升消费速度
consumerThreadPool.execute(() -> processCoreMessage(message));
}
/**
* 处理核心消息(业务逻辑+编程式事务)
* @param message 消息内容(JSON格式)
*/
private void processCoreMessage(String message) {
// 编程式事务(保证消息消费与数据库操作的一致性)
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
// 解析消息(FastJSON2)
CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
log.error("解析核心消息失败:消息格式错误,消息内容:{}", message);
// 事务回滚
transactionManager.rollback(transactionStatus);
return;
}
Order order = coreMessage.getOrder();
log.info("开始消费核心消息,订单ID:{},消息内容:{}", order.getOrderId(), message);
// 业务逻辑:更新订单状态(模拟核心业务)
LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
.eq(Order::getOrderId, order.getOrderId())
.set(Order::getStatus, order.getStatus())
.set(Order::getUpdateTime, System.currentTimeMillis());
int updateCount = orderMapper.update(null, updateWrapper);
if (updateCount == 0) {
log.error("消费核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
transactionManager.rollback(transactionStatus);
return;
}
// 事务提交
transactionManager.commit(transactionStatus);
log.info("消费核心消息成功,订单ID:{}", order.getOrderId());
} catch (Exception e) {
log.error("消费核心消息异常,消息内容:{}", message, e);
// 事务回滚
transactionManager.rollback(transactionStatus);
// 消息消费失败,抛出异常,触发RocketMQ重试(重试次数由队列配置决定)
throw new RuntimeException("消费核心消息异常,触发重试", e);
}
}
/**
* 设置核心消费者线程数(动态调整)
* @param threadNum 线程数
*/
public void setCoreConsumerThreadNum(Integer threadNum) {
coreConsumerThreadNum.set(threadNum);
// 调整线程池参数
consumerThreadPool.setCorePoolSize(threadNum);
consumerThreadPool.setMaxPoolSize(threadNum + 5);
}
/**
* 获取核心消费者线程数
* @return 线程数
*/
public int getCoreConsumerThreadNum() {
return coreConsumerThreadNum.get();
}
}
package com.jam.demo.consumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 消费者线程池配置类
* @author ken
*/
@Configuration
public class ThreadPoolConfig {
/**
* 核心消息消费者线程池
* @return 线程池实例
*/
@Bean
public ThreadPoolTaskExecutor consumerThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程名称前缀(便于日志排查)
executor.setThreadNamePrefix("core-consumer-thread-");
// 拒绝策略:丢弃任务并抛出异常(核心消息消费失败需及时感知)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 线程空闲时间:60秒
executor.setKeepAliveSeconds(60);
// 等待所有任务完成后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间:30秒(超时强制关闭)
executor.setAwaitTerminationSeconds(30);
return executor;
}
}
如果积压的消息中,有大量非核心消息(比如日志、通知),会阻塞核心消息的消费——此时需要将核心消息和非核心消息分流,让核心消息优先被消费,非核心消息后续慢慢处理。
基于RocketMQ的Admin API,实现消息批量转移,将原队列中的核心消息转移到临时队列,可直接运行:
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.consumer.config.RocketMQConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.admin.MQAdmin;
import org.apache.rocketmq.client.admin.MQAdminExt;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
/**
* 消息分流服务(核心消息转移到临时队列)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class MessageShuntService {
@Resource
private MQAdminExt mqAdmin;
private DefaultMQPullConsumer pullConsumer;
private DefaultMQProducer pushProducer;
/**
* 初始化拉取消费者和推送生产者(用于消息转移)
*/
@PostConstruct
public void init() throws MQClientException {
// 初始化拉取消费者(拉取原队列中的消息)
pullConsumer = new DefaultMQPullConsumer("message_shunt_pull_consumer_group");
pullConsumer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
pullConsumer.start();
log.info("消息分流-拉取消费者初始化完成");
// 初始化推送生产者(将核心消息推送到临时队列)
pushProducer = new DefaultMQProducer("message_shunt_push_producer_group");
pushProducer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
pushProducer.start();
log.info("消息分流-推送生产者初始化完成");
}
/**
* 消息分流:将原核心队列中的核心消息,转移到临时队列
* @param batchSize 每次拉取的消息数量(批量处理,提升效率)
* @return 转移结果(转移成功数量、失败数量)
*/
public String shuntCoreMessage(int batchSize) {
if (batchSize <= 0 || batchSize > 100) {
log.error("消息分流失败:批量大小错误,必须为1-100之间的整数,当前参数:{}", batchSize);
return "参数错误:批量大小必须为1-100之间的整数";
}
int successCount = 0;
int failCount = 0;
try {
// 1. 获取原核心队列的所有消息队列(分区)
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(RocketMQConfig.CORE_ORDER_TOPIC);
if (CollectionUtils.isEmpty(messageQueues)) {
log.error("消息分流失败:未找到原核心队列的分区");
return "消息分流失败:未找到原核心队列的分区";
}
// 2. 遍历每个分区,拉取消息并转移
for (MessageQueue messageQueue : messageQueues) {
log.info("开始处理分区:{},队列名称:{}", messageQueue.getQueueId(), messageQueue.getTopic());
// 获取分区的最小偏移量(从最开始拉取,避免遗漏)
long offset = pullConsumer.minOffset(messageQueue);
log.info("分区{}的最小偏移量:{}", messageQueue.getQueueId(), offset);
while (true) {
// 批量拉取消息(每次拉取batchSize条)
PullResult pullResult = pullConsumer.pull(
messageQueue,
"*", // 订阅所有tag
offset,
batchSize
);
// 判断拉取状态
if (PullStatus.FOUND.equals(pullResult.getPullStatus())) {
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
if (!CollectionUtils.isEmpty(messageExtList)) {
for (MessageExt messageExt : messageExtList) {
try {
// 解析消息,判断是否为核心消息(这里模拟核心消息包含order字段)
String messageBody = new String(messageExt.getBody(), "UTF-8");
CoreMessage coreMessage = JSON.parseObject(messageBody, CoreMessage.class);
if (!ObjectUtils.isEmpty(coreMessage) && !ObjectUtils.isEmpty(coreMessage.getOrder())) {
// 是核心消息,转移到临时队列
Message tempMessage = new Message(
RocketMQConfig.CORE_ORDER_TOPIC_TEMP, // 临时队列topic
messageExt.getTags(),
messageExt.getKeys(),
messageBody.getBytes("UTF-8")
);
// 发送到临时队列
SendResult sendResult = pushProducer.send(tempMessage, 3000);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
successCount++;
log.info("消息转移成功,消息ID:{},分区:{}", sendResult.getMsgId(), messageQueue.getQueueId());
} else {
failCount++;
log.error("消息转移失败,消息ID:{},发送状态:{}", messageExt.getMsgId(), sendResult.getSendStatus());
}
} else {
// 非核心消息,跳过(可根据业务需求删除或暂存)
log.warn("非核心消息,跳过转移,消息内容:{}", messageBody);
}
} catch (Exception e) {
failCount++;
log.error("消息转移异常,消息ID:{}", messageExt.getMsgId(), e);
}
}
}
// 更新偏移量,继续拉取下一批
offset = pullResult.getNextBeginOffset();
} else if (PullStatus.NO_NEW_MSG.equals(pullResult.getPullStatus())) {
// 该分区没有新消息,退出循环
log.info("分区{}没有新消息,退出处理", messageQueue.getQueueId());
break;
} else if (PullStatus.OFFSET_ILLEGAL.equals(pullResult.getPullStatus())) {
// 偏移量非法,重置为最小偏移量
offset = pullConsumer.minOffset(messageQueue);
log.warn("分区{}偏移量非法,重置为最小偏移量:{}", messageQueue.getQueueId(), offset);
}
}
}
return String.format("消息分流完成,转移成功:%d条,转移失败:%d条", successCount, failCount);
} catch (Exception e) {
log.error("消息分流整体异常", e);
return String.format("消息分流异常,已转移成功:%d条,失败:%d条,异常信息:%s", successCount, failCount, e.getMessage());
}
}
/**
* 关闭拉取消费者和推送生产者(用于服务停止时释放资源)
*/
public void close() {
if (pullConsumer != null) {
pullConsumer.shutdown();
log.info("消息分流-拉取消费者已关闭");
}
if (pushProducer != null) {
pushProducer.shutdown();
log.info("消息分流-推送生产者已关闭");
}
}
}
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
/**
* 临时队列消费者服务(消费转移后的核心消息)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class TempCoreMessageConsumer implements RocketMQListener<String> {
private final OrderMapper orderMapper;
private final PlatformTransactionManager transactionManager;
/**
* 消费临时队列中的核心消息
* @param message 消息内容(JSON格式)
*/
@Override
@RocketMQMessageListener(
topic = RocketMQConfig.CORE_ORDER_TOPIC_TEMP,
consumerGroup = RocketMQConfig.CORE_ORDER_TEMP_CONSUMER_GROUP,
messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
consumeMessageBatchMaxSize = 15, // 临时队列,批量消费数量增加到15,加快消费
consumeTimeout = 20
)
public void onMessage(String message) {
if (ObjectUtils.isEmpty(message)) {
log.error("消费临时队列消息失败:消息内容为空");
return;
}
// 编程式事务
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
log.error("解析临时队列消息失败:消息格式错误,消息内容:{}", message);
transactionManager.rollback(transactionStatus);
return;
}
Order order = coreMessage.getOrder();
log.info("开始消费临时队列核心消息,订单ID:{}", order.getOrderId());
// 业务逻辑:更新订单状态(与核心消费者一致)
LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
.eq(Order::getOrderId, order.getOrderId())
.set(Order::getStatus, order.getStatus())
.set(Order::getUpdateTime, System.currentTimeMillis());
int updateCount = orderMapper.update(null, updateWrapper);
if (updateCount == 0) {
log.error("消费临时队列消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
transactionManager.rollback(transactionStatus);
return;
}
transactionManager.commit(transactionStatus);
log.info("消费临时队列核心消息成功,订单ID:{}", order.getOrderId());
} catch (Exception e) {
log.error("消费临时队列消息异常,消息内容:{}", message, e);
transactionManager.rollback(transactionStatus);
throw new RuntimeException("消费临时队列消息异常,触发重试", e);
}
}
}
如果积压的消息中,有大量无效消息(比如重复消息、过期消息、格式错误消息),这些消息会占用消费资源,导致有效消息消费变慢——此时需要跳过无效消息,让消费者只处理有效消息。
基于Redis(最新稳定版7.2.4),实现消息去重和无效消息过滤,可直接编译运行:
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* 核心消息消费者(带无效消息过滤+Redis去重)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class FilteredOrderConsumerService implements RocketMQListener<String> {
private final OrderMapper orderMapper;
private final PlatformTransactionManager transactionManager;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* Redis key前缀(已消费消息ID)
*/
private static final String CONSUMED_MESSAGE_KEY_PREFIX = "consumer:core:message:consumed:";
/**
* 消息有效期(24小时,单位:毫秒)
*/
private static final long MESSAGE_VALID_TIME = 24 * 60 * 60 * 1000;
/**
* 消费核心消息(带过滤无效消息)
* @param message 消息内容(JSON格式)
*/
@Override
@RocketMQMessageListener(
topic = RocketMQConfig.CORE_ORDER_TOPIC,
consumerGroup = RocketMQConfig.FILTERED_CORE_ORDER_CONSUMER_GROUP,
messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
consumeMessageBatchMaxSize = 12,
consumeTimeout = 25
)
public void onMessage(String message) {
if (ObjectUtils.isEmpty(message)) {
log.error("消费核心消息失败:消息内容为空(无效消息)");
return;
}
try {
// 1. 解析消息基本信息
CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder()) || ObjectUtils.isEmpty(coreMessage.getMsgId())) {
log.error("消费核心消息失败:消息格式错误(无效消息),消息内容:{}", message);
return;
}
String msgId = coreMessage.getMsgId();
Order order = coreMessage.getOrder();
long messageCreateTime = coreMessage.getCreateTime();
// 2. 过滤无效消息
// 2.1 过滤过期消息(创建时间超过24小时)
if (System.currentTimeMillis() - messageCreateTime > MESSAGE_VALID_TIME) {
log.warn("跳过过期消息,消息ID:{},订单ID:{},创建时间:{}", msgId, order.getOrderId(), messageCreateTime);
return;
}
// 2.2 过滤重复消息(Redis去重,过期时间24小时)
String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
Boolean isConsumed = stringRedisTemplate.hasKey(redisKey);
if (Boolean.TRUE.equals(isConsumed)) {
log.warn("跳过重复消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());
return;
}
// 3. 有效消息,开始处理(编程式事务)
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
log.info("开始消费有效核心消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());
// 业务逻辑:更新订单状态
LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
.eq(Order::getOrderId, order.getOrderId())
.set(Order::getStatus, order.getStatus())
.set(Order::getUpdateTime, System.currentTimeMillis());
int updateCount = orderMapper.update(null, updateWrapper);
if (updateCount == 0) {
log.error("消费有效核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
transactionManager.rollback(transactionStatus);
return;
}
// 事务提交
transactionManager.commit(transactionStatus);
log.info("消费有效核心消息成功,订单ID:{}", order.getOrderId());
// 记录已消费消息ID到Redis,设置24小时过期(避免重复消费)
stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS);
} catch (Exception e) {
log.error("消费有效核心消息异常,消息ID:{},订单ID:{}", msgId, order.getOrderId(), e);
transactionManager.rollback(transactionStatus);
throw new RuntimeException("消费有效核心消息异常,触发重试", e);
}
} catch (Exception e) {
log.error("过滤/消费核心消息整体异常,消息内容:{}", message, e);
}
}
}
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redis连接池(提升性能) -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
紧急止血操作完成后,需通过3个维度验证效果,避免“假缓解”:
紧急止血只是“治标”,只有找到积压的根本原因,才能“治本”。结合前文的排查流程图,从消费端、生产端、队列配置三个维度逐一排查,每个维度都有明确的排查方法和验证手段。
消费端是积压的最主要原因,核心排查方向:
# 查看消费者进程
ps -ef | grep core-consumer
# 查看端口占用
netstat -tulpn | grep 8081
# 查看OOM日志
dmesg | grep -i oom | grep core-consumer
# 启动Arthas
java -jar arthas-boot.jar
# 选择消费者进程
# 查看线程池信息
thread -n 10 # 查看最繁忙的10个线程
# 查看线程池详细参数
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getCorePoolSize()'
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getActiveCount()'
-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL long_query_time = 1; -- 耗时1秒以上的SQL记录
-- 查看慢查询日志
SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10;
-- 分析SQL执行计划
EXPLAIN UPDATE `order` SET status = 2, update_time = 1740000000000 WHERE order_id = '123456';
生产端问题主要是“生产突增”或“重复生产”:
队列配置问题容易被忽略,但会导致“扩容无效”:
# RocketMQ查看Topic分区数
sh mqadmin topicStatus -n 127.0.0.1:9876 -t CORE_ORDER_TOPIC
找到根源后,从架构、配置、代码三个层面优化,彻底避免百万消息积压问题,核心思路是“提升消费能力、控制生产速度、增加容错机制”。
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: core-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: core-consumer
minReplicas: 2
maxReplicas: 10
metrics:
- type: External
external:
metric:
name: rocketmq_topic_backlog
selector:
matchLabels:
topic: CORE_ORDER_TOPIC
target:
type: Value
value: 10000 # 积压数超过1万,自动扩容

参数 | 推荐值(RocketMQ) | 说明 |
|---|---|---|
批量消费数 | 10-20 | 提升消费效率,避免单条消费 |
消费超时时间 | 20-30秒 | 避免消费耗时过长导致重试 |
重试次数 | 3次 | 避免无限重试增加积压 |
死信队列开启 | 是 | 失败消息进入死信,避免阻塞 |
将消费中的非核心业务(如日志、通知)异步处理,减少主流程耗时:
/**
* 异步处理非核心业务(日志记录)
* @author ken
*/
private void asyncProcessNonCoreBusiness(Order order) {
// 异步线程池(单独配置,不占用核心消费线程)
nonCoreThreadPool.execute(() -> {
try {
logService.recordOrderLog(order.getOrderId(), order.getStatus());
notificationService.sendOrderNotify(order.getUserId(), order.getOrderId());
} catch (Exception e) {
log.error("异步处理非核心业务失败,订单ID:{}", order.getOrderId(), e);
}
});
}
基于MySQL+Redis实现双重幂等,避免重复消费导致数据不一致:
/**
* 检查消费幂等性(MySQL+Redis)
* @param msgId 消息ID
* @param orderId 订单ID
* @return true-可消费,false-重复消费
* @author ken
*/
private boolean checkIdempotent(String msgId, String orderId) {
// 1. Redis快速判断(第一层)
String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
if (Boolean.TRUE.equals(stringRedisTemplate.hasKey(redisKey))) {
return false;
}
// 2. MySQL数据库判断(第二层,防Redis宕机)
int count = orderMapper.countConsumedMsg(msgId, orderId);
if (count > 0) {
return stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS); // 同步到Redis
}
return true;
}
对应的MyBatisPlus Mapper:
package com.jam.demo.consumer.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.common.entity.Order;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
/**
* 订单Mapper
* @author ken
*/
@Repository
public interface OrderMapper extends BaseMapper<Order> {
/**
* 检查消息是否已消费(幂等性)
* @param msgId 消息ID
* @param orderId 订单ID
* @return 已消费次数
*/
int countConsumedMsg(@Param("msgId") String msgId, @Param("orderId") String orderId);
}
对应的Mapper XML(resources/mapper/OrderMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.demo.consumer.mapper.OrderMapper">
<select id="countConsumedMsg" resultType="java.lang.Integer">
SELECT COUNT(1) FROM `order_consume_record`
WHERE msg_id = #{msgId} AND order_id = #{orderId}
</select>
</mapper>
对应的MySQL表结构(MySQL8.0):
CREATE TABLE `order_consume_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`order_id` varchar(64) NOT NULL COMMENT '订单ID',
`consume_time` bigint NOT NULL COMMENT '消费时间(毫秒)',
`consumer_ip` varchar(32) NOT NULL COMMENT '消费者IP',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_msg_order` (`msg_id`,`order_id`) COMMENT '消息ID+订单ID唯一索引(幂等性)',
KEY `idx_order_id` (`order_id`) COMMENT '订单ID索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单消费记录(幂等性)';
解决积压问题后,必须进行复盘,建立“监控-预警-应急”三位一体的预防机制,让积压问题“早发现、早处理、不扩大”。
核心监控指标(需配置可视化面板,如Grafana):
维度 | 核心指标 | 预警阈值 |
|---|---|---|
队列 | 积压数量、生产TPS、消费TPS | 积压>1万触发预警 |
消费端 | 消费成功率、消费耗时、线程池利用率 | 成功率<99%、耗时>2秒 |
生产端 | 生产成功率、重试次数 | 重试次数>100次/分钟 |
系统 | JVM内存、CPU利用率、磁盘使用率 | CPU>80%、内存>85% |
通过钉钉/企业微信配置分级预警,避免信息过载:
将本文的紧急止血流程整理成标准化应急手册,包含: