首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >百万消息积压 4 小时,我靠这套方案快速止血

百万消息积压 4 小时,我靠这套方案快速止血

作者头像
果酱带你啃java
发布2026-04-14 14:35:14
发布2026-04-14 14:35:14
390
举报

在分布式系统中,消息队列就像“交通枢纽”,承接生产者的消息、调度消费者的消费节奏,是解耦、削峰、异步通信的核心组件。但一旦出现“百万消息积压几小时”的问题,就相当于交通枢纽彻底瘫痪——下游业务无法获取消息、数据一致性被破坏、甚至引发连锁故障,直接影响用户体验和业务连续性。

作为常年和分布式架构打交道的开发者,我曾在生产环境中多次处理过消息积压问题,小到几万条消息的短暂阻塞,大到百万级消息积压4小时的紧急故障,总结出了一套“紧急止血→根源排查→彻底解决→复盘优化”的全流程方案。

一、先搞懂:消息积压的核心本质

很多开发者遇到积压就慌,盲目扩容消费者、重启服务,结果越搞越乱——其实消息积压的本质很简单,用一句话就能说透:消息生产速度 ≥ 消息消费速度,且积压量超过了消息队列的缓冲能力,导致消息在队列中持续堆积

类比一下:消息队列就像小区的快递柜,生产者是快递员,消费者是取快递的业主。正常情况下,快递员送快递的速度(生产速度),和业主取快递的速度(消费速度)基本匹配,快递柜不会满;但如果快递员突然批量送百万个包裹(生产者突增),或者业主都在家不出来取件(消费者消费慢/挂掉),快递柜很快就会被堆满,后续的快递只能排队等待,这就是“消息积压”。

1.1 积压的3个核心前提

  1. 生产速度 > 消费速度:这是最常见的原因,比如大促期间,订单生产者每秒产生1000条消息,而消费者每秒只能处理100条,差值会持续累积,最终导致积压;
  2. 消费端故障:消费者服务挂掉、线程池阻塞、数据库宕机,导致消费完全停止,即使生产速度正常,消息也会持续堆积;
  3. 队列配置不合理:队列分区过少、消息拉取策略不当、死信队列未处理,导致消费端无法充分利用资源,即使消费者正常,也无法高效消费消息。

1.2 积压的底层影响

很多人觉得“积压几小时没事”,其实积压的危害会持续放大,甚至引发级联故障:

  • 消息超时:大部分消息队列(RocketMQ、Kafka)的消息都有超时时间,积压过久会导致消息过期,被丢弃或进入死信队列,引发业务数据丢失;
  • 队列撑满:消息队列的存储容量有限(比如磁盘满),积压过久会导致队列无法接收新消息,生产者报错,上游业务瘫痪;
  • 消费雪崩:积压的消息过多时,若消费者突然恢复,会一次性拉取大量消息,导致消费者线程池满、CPU飙升、服务宕机,积压问题进一步恶化;
  • 数据不一致:比如订单消息积压,下游库存服务无法及时扣减库存,可能导致超卖、漏卖,后续需要大量人力复盘对账。

1.3 积压排查流程图(必用,快速定位原因)

遇到积压问题,先不要急着解决,先排查原因——用下面这个流程图,3分钟就能定位到积压的核心原因,避免盲目操作:

二、紧急止血:30分钟内快速缓解积压(优先保业务)

当百万消息积压几小时,核心诉求是“快速缓解,避免业务进一步恶化”——这一步的核心思路是:暂时切断非核心压力,最大化提升消费能力,快速消化积压消息,相当于“先找临时快递员,把堆积的快递先拉走一部分,缓解快递柜压力”。

紧急止血的操作优先级:暂停非核心生产者 → 临时扩容消费者 → 消息分流 → 跳过无效消息,四步走,30分钟内就能让积压量快速下降,具体操作如下(附实例)。

2.1 第一步:暂停非核心生产者(减少压力源)

积压的核心是“生产>消费”,所以第一步要做的是“减少生产”——暂停非核心业务的生产者,只保留核心业务的消息生产,避免积压量持续增加。

2.1.1 操作场景

比如电商系统中,百万订单消息积压,此时可以暂停“订单评价、物流通知”等非核心业务的生产者,只保留“订单创建、支付回调”等核心业务的生产者,让消费端集中精力处理核心消息。

2.1.2 实例

基于JDK17、SpringBoot3.2.3、Swagger3,实现生产者的动态启停,通过接口控制,无需重启服务,符合生产环境使用规范:

代码语言:javascript
复制
package com.jam.demo.producer.controller;

import com.jam.demo.producer.service.OrderProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 生产者控制接口(用于动态启停非核心生产者)
 * @author ken
 */
@RestController
@RequestMapping("/producer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ProducerControlController", description = "生产者动态控制接口")
public class ProducerControlController {

    private final OrderProducerService orderProducerService;

    /**
     * 启停非核心生产者(如订单评价、物流通知)
     * @param enable true-启动,false-暂停
     * @return 操作结果
     */
    @PostMapping("/nonCore/enable")
    @Operation(summary = "启停非核心生产者", description = "控制非核心业务的消息生产,缓解积压")
    public String enableNonCoreProducer(@RequestParam Boolean enable) {
        if (ObjectUtils.isEmpty(enable)) {
            log.error("启停参数不能为空");
            return "参数错误:enable不能为空";
        }
        orderProducerService.setNonCoreEnable(enable);
        String result = enable ? "非核心生产者已启动" : "非核心生产者已暂停";
        log.info(result);
        return result;
    }

    /**
     * 查看非核心生产者状态
     * @return 状态描述
     */
    @PostMapping("/nonCore/status")
    @Operation(summary = "查看非核心生产者状态", description = "获取当前非核心生产者的启停状态")
    public String getNonCoreProducerStatus() {
        boolean enable = orderProducerService.getNonCoreEnable();
        return enable ? "非核心生产者正在运行" : "非核心生产者已暂停";
    }
}
代码语言:javascript
复制
package com.jam.demo.producer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.NonCoreMessage;
import com.jam.demo.producer.config.RocketMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 订单生产者服务(核心+非核心)
 * @author ken
 */
@Service
@Slf4j
public class OrderProducerService {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 非核心生产者开关(原子类保证线程安全)
     */
    private final AtomicBoolean nonCoreEnable = new AtomicBoolean(true);

    /**
     * 发送核心消息(订单创建、支付回调)
     * @param message 核心消息实体
     */
    public void sendCoreMessage(Object message) {
        if (ObjectUtils.isEmpty(message)) {
            log.error("核心消息不能为空");
            return;
        }
        try {
            SendResult sendResult = rocketMQTemplate.syncSend(
                    RocketMQConfig.CORE_ORDER_TOPIC,
                    JSON.toJSONString(message),
                    3000
            );
            if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                log.info("核心消息发送成功,消息ID:{}", sendResult.getMsgId());
            } else {
                log.error("核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), message);
            }
        } catch (Exception e) {
            log.error("核心消息发送异常,消息内容:{}", message, e);
            // 核心消息发送失败,可触发重试(避免核心数据丢失)
            retrySendCoreMessage(message);
        }
    }

    /**
     * 发送非核心消息(订单评价、物流通知)
     * @param nonCoreMessage 非核心消息实体
     */
    public void sendNonCoreMessage(NonCoreMessage nonCoreMessage) {
        // 若开关关闭,直接返回,不发送非核心消息
        if (!nonCoreEnable.get()) {
            log.warn("非核心生产者已暂停,消息暂不发送:{}", nonCoreMessage);
            return;
        }
        if (ObjectUtils.isEmpty(nonCoreMessage)) {
            log.error("非核心消息不能为空");
            return;
        }
        try {
            SendResult sendResult = rocketMQTemplate.syncSend(
                    RocketMQConfig.NON_CORE_ORDER_TOPIC,
                    JSON.toJSONString(nonCoreMessage),
                    2000
            );
            if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                log.info("非核心消息发送成功,消息ID:{}", sendResult.getMsgId());
            } else {
                log.error("非核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), nonCoreMessage);
            }
        } catch (Exception e) {
            log.error("非核心消息发送异常,消息内容:{}", nonCoreMessage, e);
            // 非核心消息发送失败,无需重试,避免增加积压压力
        }
    }

    /**
     * 核心消息重试发送(最多重试3次)
     * @param message 核心消息实体
     */
    private void retrySendCoreMessage(Object message) {
        int retryCount = 0;
        while (retryCount < 3) {
            try {
                SendResult sendResult = rocketMQTemplate.syncSend(
                        RocketMQConfig.CORE_ORDER_TOPIC,
                        JSON.toJSONString(message),
                        3000
                );
                if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                    log.info("核心消息重试发送成功,重试次数:{},消息ID:{}", retryCount + 1, sendResult.getMsgId());
                    return;
                }
            } catch (Exception e) {
                log.error("核心消息重试发送异常,重试次数:{},消息内容:{}", retryCount + 1, message, e);
            }
            retryCount++;
            // 重试间隔:100ms,避免频繁重试占用资源
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("核心消息重试间隔异常", e);
                break;
            }
        }
        log.error("核心消息重试3次均失败,消息内容:{},请人工处理", message);
    }

    /**
     * 设置非核心生产者开关状态
     * @param enable 开关状态
     */
    public void setNonCoreEnable(Boolean enable) {
        nonCoreEnable.set(enable);
    }

    /**
     * 获取非核心生产者开关状态
     * @return 开关状态
     */
    public boolean getNonCoreEnable() {
        return nonCoreEnable.get();
    }
}
2.1.3 关键说明
  • 用AtomicBoolean保证非核心生产者开关的线程安全,避免多线程操作导致状态错乱;
  • 核心消息发送失败会重试3次(间隔100ms),非核心消息不重试,避免增加积压压力;
  • 通过Swagger3接口动态控制,访问http://localhost:8080/swagger-ui/index.html即可操作,无需重启服务;
  • 依赖配置(pom.xml,仅展示核心依赖,完整依赖见文末):
代码语言:javascript
复制
<!-- SpringBoot父依赖 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.3</version>
    <relativePath/>
</parent>

<!-- JDK版本 -->
<properties>
    <java.version>17</java.version>
    <rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version>
    <mybatis-plus-boot-starter.version>3.5.5.1</mybatis-plus-boot-starter.version>
    <fastjson2.version>2.0.39</fastjson2.version>
    <lombok.version>1.18.30</lombok.version>
    <swagger.version>2.2.0</swagger.version>
</properties>

<!-- 核心依赖 -->
<dependencies>
    <!-- SpringBoot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>

    <!-- RocketMQ -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>${rocketmq-spring-boot-starter.version}</version>
    </dependency>

    <!-- FastJSON2 -->
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>${fastjson2.version}</version>
    </dependency>

    <!-- MyBatisPlus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>${mybatis-plus-boot-starter.version}</version>
    </dependency>

    <!-- MySQL驱动 -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>

    <!-- Swagger3 -->
    <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
        <version>${swagger.version}</version>
    </dependency>
</dependencies>

2.2 第二步:临时扩容消费者(最大化提升消费能力)

暂停非核心生产者后,下一步是“提升消费速度”——临时扩容消费者,让更多的“取件人”一起处理积压的消息,这是紧急止血最有效的手段。

2.2.1 扩容的核心原则
  1. 消费者数量 ≤ 队列分区数:大部分消息队列(Kafka、RocketMQ)的消费模型是“一个分区只能被一个消费者组的一个消费者消费”,如果消费者数量超过分区数,多余的消费者会处于空闲状态,无法提升消费速度;
  2. 扩容时避免重复消费:通过消息队列的偏移量(offset)机制,确保扩容后的消费者能从正确的位置开始消费,避免重复消费(比如RocketMQ的广播模式需谨慎,优先用集群模式);
  3. 临时扩容用容器化部署:生产环境中,用Docker+K8s快速扩容消费者实例,积压缓解后再缩容,避免资源浪费。
2.2.2 实例(动态调整消费者线程数,无需重启服务)

基于SpringBoot+RocketMQ,实现消费者线程数的动态调整,结合Swagger3接口,无需重启服务,快速提升消费能力:

代码语言:javascript
复制
package com.jam.demo.consumer.controller;

import com.jam.demo.consumer.service.OrderConsumerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消费者控制接口(动态调整消费线程数)
 * @author ken
 */
@RestController
@RequestMapping("/consumer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ConsumerControlController", description = "消费者动态控制接口")
public class ConsumerControlController {

    private final OrderConsumerService orderConsumerService;

    /**
     * 动态调整核心消息消费者线程数
     * @param threadNum 线程数(1-50,根据服务器性能调整)
     * @return 操作结果
     */
    @PostMapping("/core/threadNum")
    @Operation(summary = "调整核心消费者线程数", description = "动态调整核心消息(订单创建、支付回调)的消费线程数,提升消费速度")
    public String adjustCoreConsumerThreadNum(@RequestParam Integer threadNum) {
        if (ObjectUtils.isEmpty(threadNum) || threadNum < 1 || threadNum > 50) {
            log.error("线程数参数错误,线程数必须在1-50之间,当前参数:{}", threadNum);
            return "参数错误:线程数必须为1-50之间的整数";
        }
        orderConsumerService.setCoreConsumerThreadNum(threadNum);
        log.info("核心消费者线程数已调整为:{}", threadNum);
        return "核心消费者线程数调整成功,当前线程数:" + threadNum;
    }

    /**
     * 查看核心消费者线程数
     * @return 线程数描述
     */
    @PostMapping("/core/threadNum/status")
    @Operation(summary = "查看核心消费者线程数", description = "获取当前核心消息消费者的线程数")
    public String getCoreConsumerThreadNum() {
        int threadNum = orderConsumerService.getCoreConsumerThreadNum();
        return "当前核心消费者线程数:" + threadNum;
    }
}
代码语言:javascript
复制
package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 核心消息消费者服务(订单创建、支付回调)
 * @author ken
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderConsumerService implements RocketMQListener<String> {

    private final OrderMapper orderMapper;
    private final PlatformTransactionManager transactionManager;

    @Resource
    private ThreadPoolTaskExecutor consumerThreadPool;

    /**
     * 核心消费者线程数(原子类保证线程安全)
     */
    private final AtomicInteger coreConsumerThreadNum = new AtomicInteger(10);

    /**
     * 初始化消费者线程池(默认10线程)
     */
    @PostConstruct
    public void initThreadPool() {
        consumerThreadPool.setCorePoolSize(coreConsumerThreadNum.get());
        consumerThreadPool.setMaxPoolSize(coreConsumerThreadNum.get() + 5);
        consumerThreadPool.setQueueCapacity(1000);
        log.info("核心消费者线程池初始化完成,默认线程数:{}", coreConsumerThreadNum.get());
    }

    /**
     * 消费核心消息(RocketMQ监听方法)
     * @param message 消息内容(JSON格式)
     */
    @Override
    @RocketMQMessageListener(
            topic = RocketMQConfig.CORE_ORDER_TOPIC,
            consumerGroup = RocketMQConfig.CORE_ORDER_CONSUMER_GROUP,
            // 广播模式:所有消费者都消费同一消息;集群模式:消息只被一个消费者消费(优先集群模式,避免重复消费)
            messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
            // 批量消费:每次拉取10条消息,提升消费速度(根据业务调整)
            consumeMessageBatchMaxSize = 10,
            // 消费超时时间:30秒(避免消费耗时过长导致消息重试)
            consumeTimeout = 30
    )
    public void onMessage(String message) {
        if (ObjectUtils.isEmpty(message)) {
            log.error("消费核心消息失败:消息内容为空");
            return;
        }
        // 提交到线程池异步消费,提升消费速度
        consumerThreadPool.execute(() -> processCoreMessage(message));
    }

    /**
     * 处理核心消息(业务逻辑+编程式事务)
     * @param message 消息内容(JSON格式)
     */
    private void processCoreMessage(String message) {
        // 编程式事务(保证消息消费与数据库操作的一致性)
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);

        try {
            // 解析消息(FastJSON2)
            CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
            if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
                log.error("解析核心消息失败:消息格式错误,消息内容:{}", message);
                // 事务回滚
                transactionManager.rollback(transactionStatus);
                return;
            }

            Order order = coreMessage.getOrder();
            log.info("开始消费核心消息,订单ID:{},消息内容:{}", order.getOrderId(), message);

            // 业务逻辑:更新订单状态(模拟核心业务)
            LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
                    .eq(Order::getOrderId, order.getOrderId())
                    .set(Order::getStatus, order.getStatus())
                    .set(Order::getUpdateTime, System.currentTimeMillis());

            int updateCount = orderMapper.update(null, updateWrapper);
            if (updateCount == 0) {
                log.error("消费核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
                transactionManager.rollback(transactionStatus);
                return;
            }

            // 事务提交
            transactionManager.commit(transactionStatus);
            log.info("消费核心消息成功,订单ID:{}", order.getOrderId());
        } catch (Exception e) {
            log.error("消费核心消息异常,消息内容:{}", message, e);
            // 事务回滚
            transactionManager.rollback(transactionStatus);
            // 消息消费失败,抛出异常,触发RocketMQ重试(重试次数由队列配置决定)
            throw new RuntimeException("消费核心消息异常,触发重试", e);
        }
    }

    /**
     * 设置核心消费者线程数(动态调整)
     * @param threadNum 线程数
     */
    public void setCoreConsumerThreadNum(Integer threadNum) {
        coreConsumerThreadNum.set(threadNum);
        // 调整线程池参数
        consumerThreadPool.setCorePoolSize(threadNum);
        consumerThreadPool.setMaxPoolSize(threadNum + 5);
    }

    /**
     * 获取核心消费者线程数
     * @return 线程数
     */
    public int getCoreConsumerThreadNum() {
        return coreConsumerThreadNum.get();
    }
}
2.2.3 线程池配置(SpringBoot配置类)
代码语言:javascript
复制
package com.jam.demo.consumer.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 消费者线程池配置类
 * @author ken
 */
@Configuration
public class ThreadPoolConfig {

    /**
     * 核心消息消费者线程池
     * @return 线程池实例
     */
    @Bean
    public ThreadPoolTaskExecutor consumerThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 线程名称前缀(便于日志排查)
        executor.setThreadNamePrefix("core-consumer-thread-");
        // 拒绝策略:丢弃任务并抛出异常(核心消息消费失败需及时感知)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 线程空闲时间:60秒
        executor.setKeepAliveSeconds(60);
        // 等待所有任务完成后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 等待时间:30秒(超时强制关闭)
        executor.setAwaitTerminationSeconds(30);
        return executor;
    }
}
2.2.4 关键说明
  1. 用ThreadPoolTaskExecutor创建消费者线程池,动态调整核心线程数,无需重启服务;
  2. 采用编程式事务(PlatformTransactionManager),保证消息消费与数据库操作的一致性,避免数据不一致;
  3. RocketMQ监听配置:集群模式(避免重复消费)、批量消费(每次拉取10条)、消费超时30秒,符合生产环境最佳实践;
  4. 线程池拒绝策略用AbortPolicy(丢弃任务并抛异常),核心消息消费失败需及时感知,避免无声失败;
  5. 扩容建议:如果队列分区数为20,可将消费者线程数调整为20(最大化利用分区),同时用K8s扩容2-3个消费者实例,进一步提升消费速度。

2.3 第三步:消息分流(避免核心消息被阻塞)

如果积压的消息中,有大量非核心消息(比如日志、通知),会阻塞核心消息的消费——此时需要将核心消息和非核心消息分流,让核心消息优先被消费,非核心消息后续慢慢处理。

2.3.1 分流的核心思路
  1. 创建临时队列:新建一个临时消息队列(比如CORE_ORDER_TOPIC_TEMP),用于接收核心消息;
  2. 消息转移:将原队列中的核心消息,批量转移到临时队列,让临时消费者组专门处理;
  3. 原队列清理:将原队列中的非核心消息暂停消费,或批量删除(如果无需处理),减少积压压力。
2.3.2 实例(RocketMQ消息批量转移)

基于RocketMQ的Admin API,实现消息批量转移,将原队列中的核心消息转移到临时队列,可直接运行:

代码语言:javascript
复制
package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.consumer.config.RocketMQConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.admin.MQAdmin;
import org.apache.rocketmq.client.admin.MQAdminExt;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;

/**
 * 消息分流服务(核心消息转移到临时队列)
 * @author ken
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class MessageShuntService {

    @Resource
    private MQAdminExt mqAdmin;

    private DefaultMQPullConsumer pullConsumer;
    private DefaultMQProducer pushProducer;

    /**
     * 初始化拉取消费者和推送生产者(用于消息转移)
     */
    @PostConstruct
    public void init() throws MQClientException {
        // 初始化拉取消费者(拉取原队列中的消息)
        pullConsumer = new DefaultMQPullConsumer("message_shunt_pull_consumer_group");
        pullConsumer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
        pullConsumer.start();
        log.info("消息分流-拉取消费者初始化完成");

        // 初始化推送生产者(将核心消息推送到临时队列)
        pushProducer = new DefaultMQProducer("message_shunt_push_producer_group");
        pushProducer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
        pushProducer.start();
        log.info("消息分流-推送生产者初始化完成");
    }

    /**
     * 消息分流:将原核心队列中的核心消息,转移到临时队列
     * @param batchSize 每次拉取的消息数量(批量处理,提升效率)
     * @return 转移结果(转移成功数量、失败数量)
     */
    public String shuntCoreMessage(int batchSize) {
        if (batchSize <= 0 || batchSize > 100) {
            log.error("消息分流失败:批量大小错误,必须为1-100之间的整数,当前参数:{}", batchSize);
            return "参数错误:批量大小必须为1-100之间的整数";
        }

        int successCount = 0;
        int failCount = 0;

        try {
            // 1. 获取原核心队列的所有消息队列(分区)
            Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(RocketMQConfig.CORE_ORDER_TOPIC);
            if (CollectionUtils.isEmpty(messageQueues)) {
                log.error("消息分流失败:未找到原核心队列的分区");
                return "消息分流失败:未找到原核心队列的分区";
            }

            // 2. 遍历每个分区,拉取消息并转移
            for (MessageQueue messageQueue : messageQueues) {
                log.info("开始处理分区:{},队列名称:{}", messageQueue.getQueueId(), messageQueue.getTopic());

                // 获取分区的最小偏移量(从最开始拉取,避免遗漏)
                long offset = pullConsumer.minOffset(messageQueue);
                log.info("分区{}的最小偏移量:{}", messageQueue.getQueueId(), offset);

                while (true) {
                    // 批量拉取消息(每次拉取batchSize条)
                    PullResult pullResult = pullConsumer.pull(
                            messageQueue,
                            "*", // 订阅所有tag
                            offset,
                            batchSize
                    );

                    // 判断拉取状态
                    if (PullStatus.FOUND.equals(pullResult.getPullStatus())) {
                        List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        if (!CollectionUtils.isEmpty(messageExtList)) {
                            for (MessageExt messageExt : messageExtList) {
                                try {
                                    // 解析消息,判断是否为核心消息(这里模拟核心消息包含order字段)
                                    String messageBody = new String(messageExt.getBody(), "UTF-8");
                                    CoreMessage coreMessage = JSON.parseObject(messageBody, CoreMessage.class);
                                    if (!ObjectUtils.isEmpty(coreMessage) && !ObjectUtils.isEmpty(coreMessage.getOrder())) {
                                        // 是核心消息,转移到临时队列
                                        Message tempMessage = new Message(
                                                RocketMQConfig.CORE_ORDER_TOPIC_TEMP, // 临时队列topic
                                                messageExt.getTags(),
                                                messageExt.getKeys(),
                                                messageBody.getBytes("UTF-8")
                                        );
                                        // 发送到临时队列
                                        SendResult sendResult = pushProducer.send(tempMessage, 3000);
                                        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                                            successCount++;
                                            log.info("消息转移成功,消息ID:{},分区:{}", sendResult.getMsgId(), messageQueue.getQueueId());
                                        } else {
                                            failCount++;
                                            log.error("消息转移失败,消息ID:{},发送状态:{}", messageExt.getMsgId(), sendResult.getSendStatus());
                                        }
                                    } else {
                                        // 非核心消息,跳过(可根据业务需求删除或暂存)
                                        log.warn("非核心消息,跳过转移,消息内容:{}", messageBody);
                                    }
                                } catch (Exception e) {
                                    failCount++;
                                    log.error("消息转移异常,消息ID:{}", messageExt.getMsgId(), e);
                                }
                            }
                        }
                        // 更新偏移量,继续拉取下一批
                        offset = pullResult.getNextBeginOffset();
                    } else if (PullStatus.NO_NEW_MSG.equals(pullResult.getPullStatus())) {
                        // 该分区没有新消息,退出循环
                        log.info("分区{}没有新消息,退出处理", messageQueue.getQueueId());
                        break;
                    } else if (PullStatus.OFFSET_ILLEGAL.equals(pullResult.getPullStatus())) {
                        // 偏移量非法,重置为最小偏移量
                        offset = pullConsumer.minOffset(messageQueue);
                        log.warn("分区{}偏移量非法,重置为最小偏移量:{}", messageQueue.getQueueId(), offset);
                    }
                }
            }

            return String.format("消息分流完成,转移成功:%d条,转移失败:%d条", successCount, failCount);
        } catch (Exception e) {
            log.error("消息分流整体异常", e);
            return String.format("消息分流异常,已转移成功:%d条,失败:%d条,异常信息:%s", successCount, failCount, e.getMessage());
        }
    }

    /**
     * 关闭拉取消费者和推送生产者(用于服务停止时释放资源)
     */
    public void close() {
        if (pullConsumer != null) {
            pullConsumer.shutdown();
            log.info("消息分流-拉取消费者已关闭");
        }
        if (pushProducer != null) {
            pushProducer.shutdown();
            log.info("消息分流-推送生产者已关闭");
        }
    }
}
2.3.3 临时队列消费者配置(专门消费转移后的核心消息)
代码语言:javascript
复制
package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;

/**
 * 临时队列消费者服务(消费转移后的核心消息)
 * @author ken
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class TempCoreMessageConsumer implements RocketMQListener<String> {

    private final OrderMapper orderMapper;
    private final PlatformTransactionManager transactionManager;

    /**
     * 消费临时队列中的核心消息
     * @param message 消息内容(JSON格式)
     */
    @Override
    @RocketMQMessageListener(
            topic = RocketMQConfig.CORE_ORDER_TOPIC_TEMP,
            consumerGroup = RocketMQConfig.CORE_ORDER_TEMP_CONSUMER_GROUP,
            messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
            consumeMessageBatchMaxSize = 15, // 临时队列,批量消费数量增加到15,加快消费
            consumeTimeout = 20
    )
    public void onMessage(String message) {
        if (ObjectUtils.isEmpty(message)) {
            log.error("消费临时队列消息失败:消息内容为空");
            return;
        }

        // 编程式事务
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);

        try {
            CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
            if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
                log.error("解析临时队列消息失败:消息格式错误,消息内容:{}", message);
                transactionManager.rollback(transactionStatus);
                return;
            }

            Order order = coreMessage.getOrder();
            log.info("开始消费临时队列核心消息,订单ID:{}", order.getOrderId());

            // 业务逻辑:更新订单状态(与核心消费者一致)
            LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
                    .eq(Order::getOrderId, order.getOrderId())
                    .set(Order::getStatus, order.getStatus())
                    .set(Order::getUpdateTime, System.currentTimeMillis());

            int updateCount = orderMapper.update(null, updateWrapper);
            if (updateCount == 0) {
                log.error("消费临时队列消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
                transactionManager.rollback(transactionStatus);
                return;
            }

            transactionManager.commit(transactionStatus);
            log.info("消费临时队列核心消息成功,订单ID:{}", order.getOrderId());
        } catch (Exception e) {
            log.error("消费临时队列消息异常,消息内容:{}", message, e);
            transactionManager.rollback(transactionStatus);
            throw new RuntimeException("消费临时队列消息异常,触发重试", e);
        }
    }
}
2.3.4 关键说明
  1. 用MQAdminExt、DefaultMQPullConsumer拉取原队列消息,DefaultMQProducer将核心消息推送到临时队列,实现消息分流;
  2. 批量拉取消息(每次1-100条),提升分流效率,避免单条拉取耗时过长;
  3. 临时队列的批量消费数量设置为15(高于原队列的10),加快核心消息的消费速度;
  4. 非核心消息可根据业务需求,选择跳过、删除或暂存到其他队列,避免阻塞核心消息。

2.4 第四步:跳过无效消息(减少无效消费)

如果积压的消息中,有大量无效消息(比如重复消息、过期消息、格式错误消息),这些消息会占用消费资源,导致有效消息消费变慢——此时需要跳过无效消息,让消费者只处理有效消息。

2.4.1 无效消息的判断标准(根据业务定义)
  1. 过期消息:消息的创建时间超过业务允许的有效期(比如订单消息超过24小时,无需处理);
  2. 重复消息:消息ID重复(通过Redis或数据库记录已消费的消息ID,避免重复处理);
  3. 格式错误消息:消息JSON解析失败,或缺少核心字段(比如订单消息缺少订单ID)。

2.4.2 实例(跳过无效消息,结合Redis去重)

基于Redis(最新稳定版7.2.4),实现消息去重和无效消息过滤,可直接编译运行:

代码语言:javascript
复制
package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * 核心消息消费者(带无效消息过滤+Redis去重)
 * @author ken
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class FilteredOrderConsumerService implements RocketMQListener<String> {

    private final OrderMapper orderMapper;
    private final PlatformTransactionManager transactionManager;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * Redis key前缀(已消费消息ID)
     */
    private static final String CONSUMED_MESSAGE_KEY_PREFIX = "consumer:core:message:consumed:";

    /**
     * 消息有效期(24小时,单位:毫秒)
     */
    private static final long MESSAGE_VALID_TIME = 24 * 60 * 60 * 1000;

    /**
     * 消费核心消息(带过滤无效消息)
     * @param message 消息内容(JSON格式)
     */
    @Override
    @RocketMQMessageListener(
            topic = RocketMQConfig.CORE_ORDER_TOPIC,
            consumerGroup = RocketMQConfig.FILTERED_CORE_ORDER_CONSUMER_GROUP,
            messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
            consumeMessageBatchMaxSize = 12,
            consumeTimeout = 25
    )
    public void onMessage(String message) {
        if (ObjectUtils.isEmpty(message)) {
            log.error("消费核心消息失败:消息内容为空(无效消息)");
            return;
        }

        try {
            // 1. 解析消息基本信息
            CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
            if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder()) || ObjectUtils.isEmpty(coreMessage.getMsgId())) {
                log.error("消费核心消息失败:消息格式错误(无效消息),消息内容:{}", message);
                return;
            }

            String msgId = coreMessage.getMsgId();
            Order order = coreMessage.getOrder();
            long messageCreateTime = coreMessage.getCreateTime();

            // 2. 过滤无效消息
            // 2.1 过滤过期消息(创建时间超过24小时)
            if (System.currentTimeMillis() - messageCreateTime > MESSAGE_VALID_TIME) {
                log.warn("跳过过期消息,消息ID:{},订单ID:{},创建时间:{}", msgId, order.getOrderId(), messageCreateTime);
                return;
            }

            // 2.2 过滤重复消息(Redis去重,过期时间24小时)
            String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
            Boolean isConsumed = stringRedisTemplate.hasKey(redisKey);
            if (Boolean.TRUE.equals(isConsumed)) {
                log.warn("跳过重复消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());
                return;
            }

            // 3. 有效消息,开始处理(编程式事务)
            DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
            TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);

            try {
                log.info("开始消费有效核心消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());

                // 业务逻辑:更新订单状态
                LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
                        .eq(Order::getOrderId, order.getOrderId())
                        .set(Order::getStatus, order.getStatus())
                        .set(Order::getUpdateTime, System.currentTimeMillis());

                int updateCount = orderMapper.update(null, updateWrapper);
                if (updateCount == 0) {
                    log.error("消费有效核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
                    transactionManager.rollback(transactionStatus);
                    return;
                }

                // 事务提交
                transactionManager.commit(transactionStatus);
                log.info("消费有效核心消息成功,订单ID:{}", order.getOrderId());

                // 记录已消费消息ID到Redis,设置24小时过期(避免重复消费)
                stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS);
            } catch (Exception e) {
                log.error("消费有效核心消息异常,消息ID:{},订单ID:{}", msgId, order.getOrderId(), e);
                transactionManager.rollback(transactionStatus);
                throw new RuntimeException("消费有效核心消息异常,触发重试", e);
            }
        } catch (Exception e) {
            log.error("过滤/消费核心消息整体异常,消息内容:{}", message, e);
        }
    }
}
2.4.3 关键说明
  1. Redis去重逻辑:用消息唯一ID(msgId)作为Redis Key,值为固定标识,过期时间24小时(与消息有效期一致),避免永久占用Redis内存;
  2. 无效消息过滤优先级:先判断消息是否为空→再解析格式→再过滤过期→最后过滤重复,层层筛选,只处理有效消息;
  3. 异常隔离:过滤阶段的异常不触发消息重试(比如格式错误、过期),只有有效消息处理失败才触发重试,避免无效消息反复重试占用消费资源;
  4. Redis依赖配置(pom.xml):
代码语言:javascript
复制
<!-- Redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redis连接池(提升性能) -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2.5 紧急止血效果验证(必做,确认积压缓解)

紧急止血操作完成后,需通过3个维度验证效果,避免“假缓解”:

  1. 队列监控维度:查看消息队列的积压数量(Offset差值)是否持续下降,生产TPS ≤ 消费TPS;
  2. 消费端维度:查看消费者线程池利用率(80%左右为宜)、消费成功率(≥99%)、无大量重试日志;
  3. 业务维度:下游业务(如订单状态更新、库存扣减)恢复正常,无数据不一致问题。

三、根源排查:找到积压的“真凶”(避免重复踩坑)

紧急止血只是“治标”,只有找到积压的根本原因,才能“治本”。结合前文的排查流程图,从消费端、生产端、队列配置三个维度逐一排查,每个维度都有明确的排查方法和验证手段。

3.1 消费端问题排查(占比80%,最常见)

消费端是积压的最主要原因,核心排查方向:

3.1.1 消费服务是否存活
  • 排查方法:通过K8s/Docker查看消费者实例是否运行、端口是否存活、有无OOM/Kill日志;
  • 验证命令(Linux):
代码语言:javascript
复制
# 查看消费者进程
ps -ef | grep core-consumer
# 查看端口占用
netstat -tulpn | grep 8081
# 查看OOM日志
dmesg | grep -i oom | grep core-consumer
  • 解决方案:重启挂掉的实例、扩容实例、调整JVM参数(如-Xmx4g -Xms4g)避免OOM。
3.1.2 消费线程池是否阻塞
  • 排查方法:通过Arthas(阿里开源诊断工具)查看线程池状态(活跃线程数、队列长度、拒绝次数);
  • 验证命令(Arthas):
代码语言:javascript
复制
# 启动Arthas
java -jar arthas-boot.jar
# 选择消费者进程
# 查看线程池信息
thread -n 10 # 查看最繁忙的10个线程
# 查看线程池详细参数
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getCorePoolSize()'
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getActiveCount()'
  • 解决方案:动态扩容线程池、优化阻塞的业务逻辑(如慢SQL、远程调用超时)。
3.1.3 业务逻辑耗时过长
  • 排查方法:通过日志/链路追踪(SkyWalking)查看消费方法的耗时,定位慢操作(如慢SQL、第三方接口超时);
  • 慢SQL排查(MySQL8.0):
代码语言:javascript
复制
-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL long_query_time = 1; -- 耗时1秒以上的SQL记录
-- 查看慢查询日志
SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10;
-- 分析SQL执行计划
EXPLAIN UPDATE `order` SET status = 2, update_time = 1740000000000 WHERE order_id = '123456';
  • 解决方案:优化慢SQL(加索引、分库分表)、设置远程调用超时(如Feign设置1秒超时)、异步处理非核心业务。

3.2 生产端问题排查(占比15%)

生产端问题主要是“生产突增”或“重复生产”:

3.2.1 生产TPS突增
  • 排查方法:查看生产者监控(如Prometheus+Grafana),确认是否有大促、秒杀等峰值流量;
  • 解决方案:限流(如Sentinel)、削峰(如生产者端增加本地队列)、降级非核心生产业务。
3.2.2 重复生产
  • 排查方法:查看生产者日志,确认是否有“发送失败-重试”循环,或消息ID重复;
  • 解决方案:生产者端增加幂等性(如基于订单ID去重)、设置合理的重试次数(如3次)、避免无限重试。

3.3 队列配置问题排查(占比5%)

队列配置问题容易被忽略,但会导致“扩容无效”:

3.3.1 分区数不足
  • 排查方法:查看队列分区数(如RocketMQ):
代码语言:javascript
复制
# RocketMQ查看Topic分区数
sh mqadmin topicStatus -n 127.0.0.1:9876 -t CORE_ORDER_TOPIC
  • 解决方案:扩容分区数(注意:RocketMQ/Kafka分区数扩容后,需重新分配消费者)。
3.3.2 拉取策略不合理
  • 排查方法:查看消费者拉取配置(如批量拉取数、拉取间隔);
  • 解决方案:调整批量拉取数(如从10增加到20)、缩短拉取间隔(如从500ms缩短到200ms)。

四、彻底解决:从架构层面避免积压(治本)

找到根源后,从架构、配置、代码三个层面优化,彻底避免百万消息积压问题,核心思路是“提升消费能力、控制生产速度、增加容错机制”。

4.1 架构层面优化

4.1.1 消费者集群化+弹性伸缩
  • 方案:基于K8s实现消费者的弹性伸缩,根据积压数量自动扩容/缩容;
  • 配置示例(K8s HPA):
代码语言:javascript
复制
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: core-consumer-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: core-consumer
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: External
    external:
      metric:
        name: rocketmq_topic_backlog
        selector:
          matchLabels:
            topic: CORE_ORDER_TOPIC
      target:
        type: Value
        value: 10000 # 积压数超过1万,自动扩容
4.1.2 消息分级+优先级消费
  • 方案:将消息分为核心(订单、支付)、普通(物流)、低优先级(日志),不同优先级消息使用不同队列,核心队列配置更多分区和消费者;
  • 架构图:

4.2 配置层面优化

4.2.1 合理设置队列参数

参数

推荐值(RocketMQ)

说明

批量消费数

10-20

提升消费效率,避免单条消费

消费超时时间

20-30秒

避免消费耗时过长导致重试

重试次数

3次

避免无限重试增加积压

死信队列开启

失败消息进入死信,避免阻塞

4.2.2 消费者线程池配置
  • 核心线程数 = CPU核心数 * 2(如8核CPU,核心线程数16);
  • 最大线程数 = 核心线程数 + 5;
  • 队列容量 = 1000(避免队列过长导致阻塞);
  • 拒绝策略 = AbortPolicy(核心消息)/DiscardOldestPolicy(非核心消息)。

4.3 代码层面优化

4.3.1 消费业务异步化

将消费中的非核心业务(如日志、通知)异步处理,减少主流程耗时:

代码语言:javascript
复制
/**
 * 异步处理非核心业务(日志记录)
 * @author ken
 */
private void asyncProcessNonCoreBusiness(Order order) {
    // 异步线程池(单独配置,不占用核心消费线程)
    nonCoreThreadPool.execute(() -> {
        try {
            logService.recordOrderLog(order.getOrderId(), order.getStatus());
            notificationService.sendOrderNotify(order.getUserId(), order.getOrderId());
        } catch (Exception e) {
            log.error("异步处理非核心业务失败,订单ID:{}", order.getOrderId(), e);
        }
    });
}
4.3.2 消费幂等性保障

基于MySQL+Redis实现双重幂等,避免重复消费导致数据不一致:

代码语言:javascript
复制
/**
 * 检查消费幂等性(MySQL+Redis)
 * @param msgId 消息ID
 * @param orderId 订单ID
 * @return true-可消费,false-重复消费
 * @author ken
 */
private boolean checkIdempotent(String msgId, String orderId) {
    // 1. Redis快速判断(第一层)
    String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
    if (Boolean.TRUE.equals(stringRedisTemplate.hasKey(redisKey))) {
        return false;
    }
    // 2. MySQL数据库判断(第二层,防Redis宕机)
    int count = orderMapper.countConsumedMsg(msgId, orderId);
    if (count > 0) {
        return stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS); // 同步到Redis
    }
    return true;
}

对应的MyBatisPlus Mapper:

代码语言:javascript
复制
package com.jam.demo.consumer.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.common.entity.Order;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;

/**
 * 订单Mapper
 * @author ken
 */
@Repository
public interface OrderMapper extends BaseMapper<Order> {

    /**
     * 检查消息是否已消费(幂等性)
     * @param msgId 消息ID
     * @param orderId 订单ID
     * @return 已消费次数
     */
    int countConsumedMsg(@Param("msgId") String msgId, @Param("orderId") String orderId);
}

对应的Mapper XML(resources/mapper/OrderMapper.xml):

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.demo.consumer.mapper.OrderMapper">
    <select id="countConsumedMsg" resultType="java.lang.Integer">
        SELECT COUNT(1) FROM `order_consume_record`
        WHERE msg_id = #{msgId} AND order_id = #{orderId}
    </select>
</mapper>

对应的MySQL表结构(MySQL8.0):

代码语言:javascript
复制
CREATE TABLE `order_consume_record` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
  `order_id` varchar(64) NOT NULL COMMENT '订单ID',
  `consume_time` bigint NOT NULL COMMENT '消费时间(毫秒)',
  `consumer_ip` varchar(32) NOT NULL COMMENT '消费者IP',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_msg_order` (`msg_id`,`order_id`) COMMENT '消息ID+订单ID唯一索引(幂等性)',
  KEY `idx_order_id` (`order_id`) COMMENT '订单ID索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单消费记录(幂等性)';

五、复盘优化:建立预防机制

解决积压问题后,必须进行复盘,建立“监控-预警-应急”三位一体的预防机制,让积压问题“早发现、早处理、不扩大”。

5.1 完善监控体系

核心监控指标(需配置可视化面板,如Grafana):

维度

核心指标

预警阈值

队列

积压数量、生产TPS、消费TPS

积压>1万触发预警

消费端

消费成功率、消费耗时、线程池利用率

成功率<99%、耗时>2秒

生产端

生产成功率、重试次数

重试次数>100次/分钟

系统

JVM内存、CPU利用率、磁盘使用率

CPU>80%、内存>85%

5.2 配置分级预警

通过钉钉/企业微信配置分级预警,避免信息过载:

  • 一级预警(短信+电话):积压>10万、消费TPS=0、服务宕机;
  • 二级预警(钉钉群):积压>1万、消费成功率<99%、耗时>2秒;
  • 三级预警(日志):积压>5000、生产重试次数增加。

5.3 制定应急手册

将本文的紧急止血流程整理成标准化应急手册,包含:

  1. 应急联系人(开发、运维、DBA);
  2. 操作步骤(暂停非核心生产→扩容消费者→消息分流→跳过无效消息);
  3. 验证方法(监控指标、业务验证);
  4. 回滚方案(扩容后缩容、恢复非核心生产)。

总结

  1. 紧急止血核心:暂停非核心生产减少压力源,扩容消费者+消息分流提升消费能力,跳过无效消息减少资源浪费,30分钟内可快速缓解百万消息积压;
  2. 根源排查重点:80%的积压源于消费端(服务挂掉、线程池阻塞、慢业务),需通过Arthas、慢查询日志、队列监控逐一定位;
  3. 长期优化关键:架构上实现消费者弹性伸缩+消息分级消费,代码上保障幂等性+异步化,运营上建立监控-预警-应急体系,从“治标”到“治本”,彻底避免积压问题。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-02-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、先搞懂:消息积压的核心本质
    • 1.1 积压的3个核心前提
    • 1.2 积压的底层影响
    • 1.3 积压排查流程图(必用,快速定位原因)
  • 二、紧急止血:30分钟内快速缓解积压(优先保业务)
    • 2.1 第一步:暂停非核心生产者(减少压力源)
      • 2.1.1 操作场景
      • 2.1.2 实例
      • 2.1.3 关键说明
    • 2.2 第二步:临时扩容消费者(最大化提升消费能力)
      • 2.2.1 扩容的核心原则
      • 2.2.2 实例(动态调整消费者线程数,无需重启服务)
      • 2.2.3 线程池配置(SpringBoot配置类)
      • 2.2.4 关键说明
    • 2.3 第三步:消息分流(避免核心消息被阻塞)
      • 2.3.1 分流的核心思路
      • 2.3.2 实例(RocketMQ消息批量转移)
      • 2.3.3 临时队列消费者配置(专门消费转移后的核心消息)
      • 2.3.4 关键说明
    • 2.4 第四步:跳过无效消息(减少无效消费)
      • 2.4.1 无效消息的判断标准(根据业务定义)
    • 2.4.2 实例(跳过无效消息,结合Redis去重)
      • 2.4.3 关键说明
    • 2.5 紧急止血效果验证(必做,确认积压缓解)
  • 三、根源排查:找到积压的“真凶”(避免重复踩坑)
    • 3.1 消费端问题排查(占比80%,最常见)
      • 3.1.1 消费服务是否存活
      • 3.1.2 消费线程池是否阻塞
      • 3.1.3 业务逻辑耗时过长
    • 3.2 生产端问题排查(占比15%)
      • 3.2.1 生产TPS突增
      • 3.2.2 重复生产
    • 3.3 队列配置问题排查(占比5%)
      • 3.3.1 分区数不足
      • 3.3.2 拉取策略不合理
  • 四、彻底解决:从架构层面避免积压(治本)
    • 4.1 架构层面优化
      • 4.1.1 消费者集群化+弹性伸缩
      • 4.1.2 消息分级+优先级消费
    • 4.2 配置层面优化
      • 4.2.1 合理设置队列参数
      • 4.2.2 消费者线程池配置
    • 4.3 代码层面优化
      • 4.3.1 消费业务异步化
      • 4.3.2 消费幂等性保障
  • 五、复盘优化:建立预防机制
    • 5.1 完善监控体系
    • 5.2 配置分级预警
    • 5.3 制定应急手册
    • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档