首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >小白必看!99%的人都踩过的RabbitMQ丢消息坑

小白必看!99%的人都踩过的RabbitMQ丢消息坑

作者头像
java金融
发布2026-04-13 17:45:41
发布2026-04-13 17:45:41
960
举报
文章被收录于专栏:java金融java金融

上周凌晨3点,程序员小王突然被电话吵醒——用户投诉订单重复创建,查了半天发现是RabbitMQ消息丢失后重试导致的。老板在群里@他:“明天给我一个消息100%不丢的方案!”

如果你也遇到过这种半夜被叫醒的崩溃时刻,那今天这篇压箱底的RabbitMQ防丢指南,你一定要看完!


01 🚀 生产者端:从源头杜绝消息“失踪”

【别让消息死在发送路上】

很多时候消息丢失,根本没到MQ就“半路夭折”了。比如网络波动、MQ宕机,生产者以为发出去了,其实消息早就丢了。这时候就得给生产者加双重保险:

1.1 开启Publisher Confirm确认机制

这相当于给消息发了个“快递回执”——MQ收到消息后,会给生产者一个确认信号,没收到回执就说明消息丢了,生产者可以重试。

Spring Boot配置示例:

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated # 异步确认模式,性能最优

Java代码实现异步回调:

代码语言:javascript
复制
@RabbitListener
public class ConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            // 消息未确认,记录日志+重试
            log.error("消息发送失败,原因:{}", cause);
            retrySend(correlationData);
        }
    }
}

划重点:一定要用异步确认模式,同步模式会阻塞生产者,严重影响性能。

1.2 开启Return Callback处理未路由消息

有时候消息成功到了MQ,但交换机找不到对应的队列,默认会直接丢弃。这时候要开启Return Callback,把未路由的消息捞回来。

关键配置:

代码语言:javascript
复制
spring:
  rabbitmq:
    publisher-returns: true
    template:
      mandatory: true # 强制要求未路由消息返回

回调处理示例:

代码语言:javascript
复制
@RabbitListener
public class ReturnCallbackListener implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        // 未路由消息处理:存储到数据库+人工介入
        log.error("消息未路由,交换机:{},路由键:{},内容:{}",
                  returned.getExchange(), returned.getRoutingKey(), returned.getMessage());
    }
}

1.3 重试策略兜底

就算加了确认机制,偶尔还是会有网络抖动导致发送失败。这时候要配置重试策略,让系统自动重试,不用人工干预。

配置示例:

代码语言:javascript
复制
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        max-attempts: 3 # 最多重试3次
        initial-interval: 1000ms # 第一次重试间隔1秒
        multiplier: 2 # 每次重试间隔翻倍

注意:重试次数别设太多,不然会导致MQ消息堆积,最好配合死信队列使用,重试失败的消息进入死信队列,后续人工处理。


02 🛡️ Broker端:让消息在中间站稳如泰山

【别让MQ宕机带走你的消息】

生产者把消息送到MQ了,但如果MQ突然宕机,消息会不会丢?默认情况下会——因为RabbitMQ消息默认存在内存里,宕机后直接清空。这时候就得给MQ加“持久化Buff”。

2.1 三重持久化:交换机+队列+消息

要保证消息重启不丢,必须同时开启三个持久化:

(1)交换机持久化

创建交换机时指定durable: true

代码语言:javascript
复制
@Bean
public DirectExchange orderExchange() {
    // durable=true:交换机持久化,重启后不丢失
    return new DirectExchange("order_exchange", true, false);
}
(2)队列持久化

创建队列时同样指定durable: true

代码语言:javascript
复制
@Bean
public Queue orderQueue() {
    return QueueBuilder.durable("order_queue").build();
}
(3)消息持久化

发送消息时设置MessageDeliveryMode.PERSISTENT

代码语言:javascript
复制
rabbitTemplate.convertAndSend("order_exchange", "order_key", 
                             orderDTO, message -> {
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return message;
});

划重点:三个持久化缺一不可!只开消息持久化,队列宕机丢失了,消息也找不到。

2.2 镜像队列:高可用备份

就算开启了持久化,如果MQ节点挂了,还是会影响消息读取。这时候要配置镜像队列,把队列复制到多个节点,一个节点挂了,其他节点还能继续提供服务。

配置步骤:

  1. 进入RabbitMQ管理后台,点击【Admin】→【Policies】
  2. 添加新Policy,设置:
    • Name: mirror_queue_policy
    • Pattern: ^order_.*(匹配所有以order开头的队列)
    • Definition: {"ha-mode":"all","ha-sync-mode":"automatic"}
    • Priority: 1

解释ha-mode: all表示队列镜像到所有节点,ha-sync-mode: automatic表示自动同步队列内容。

2.3 LazyQueue:避免内存溢出丢消息

如果MQ存了海量消息,全部放在内存里容易导致内存溢出,进而引发宕机丢消息。LazyQueue可以把消息直接存在磁盘里,内存只存少量活跃消息,避免OOM。

配置示例:

代码语言:javascript
复制
@Bean
public Queue lazyOrderQueue() {
    return QueueBuilder.durable("lazy_order_queue")
                       .lazy() // 开启Lazy模式
                       .build();
}

注意:LazyQueue适合存储大量冷消息,热消息还是用普通队列,因为磁盘读取速度比内存慢。


03 📦 消费者端:确保消息“签收”不遗漏

【别让消息被重复消费或悄悄丢失】

消息到了消费者手里,也不是万事大吉——如果消费者还没处理完就宕机,自动ACK会让MQ以为消息已经处理完成,直接删除,导致消息丢失。

3.1 手动ACK:掌控消息生死权

一定要关闭自动ACK,改用手动ACK,确保消费者真正处理完消息后再告诉MQ:“我搞定了,你删了吧”。

配置示例:

代码语言:javascript
复制
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 手动ACK
        concurrency: 3 # 消费者数量
        max-concurrency: 10 # 最大消费者数量

手动ACK代码示例:

代码语言:javascript
复制
@RabbitListener(queues = "order_queue")
public void handleOrderMessage(Message message, Channel channel) throws IOException {
    try {
        // 处理业务逻辑:创建订单、扣减库存
        OrderDTO orderDTO = JSON.parseObject(message.getBody(), OrderDTO.class);
        orderService.createOrder(orderDTO);
        
        // 手动确认消息:第二个参数false表示只确认当前消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        log.error("处理订单消息失败:", e);
        // 消息处理失败,拒绝并重新入队:第三个参数true表示重新入队
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

避坑指南

  • 别忘记ACK!如果消费者一直不ACK,消息会一直留在队列里,变成“死消息”
  • 处理失败时用basicNack而非basicReject,因为basicNack可以批量拒绝消息

3.2 幂等性:避免重复消费导致的业务异常

就算做了所有防丢措施,还是可能出现重复消费的情况——比如消费者ACK后,MQ没收到,会重新发送消息。这时候必须保证业务接口的幂等性。

常用幂等性实现方案:

(1)数据库唯一键约束

在订单表中,用order_no作为唯一键,重复插入时会触发主键冲突,直接忽略:

代码语言:javascript
复制
INSERT INTO orders(order_no, user_id, amount) 
VALUES ('20240520123456', 1001, 99.9) 
ON DUPLICATE KEY UPDATE status = status;
(2)Redis原子操作

用Redis的SETNX命令,给每个消息生成唯一ID,处理前先判断是否已经处理过:

代码语言:javascript
复制
String messageId = message.getMessageProperties().getMessageId();
if (redisTemplate.opsForValue().setIfAbsent("order:process:" + messageId, "1", 1, TimeUnit.HOURS)) {
    // 未处理过,执行业务逻辑
    orderService.createOrder(orderDTO);
} else {
    // 已经处理过,直接ACK
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
(3)本地缓存+定时清理

如果不想依赖Redis,也可以用本地Guava缓存,配合定时任务清理过期缓存:

代码语言:javascript
复制
private LoadingCache<String, Boolean> processCache = CacheBuilder.newBuilder()
        .expireAfterWrite(1, TimeUnit.HOURS)
        .build(new CacheLoader<String, Boolean>() {
            @Override
            public Boolean load(String key) throws Exception {
                return false;
            }
        });

// 处理消息前判断
if (!processCache.get(messageId)) {
    orderService.createOrder(orderDTO);
    processCache.put(messageId, true);
}

04 🧐 终极兜底:那些容易被忽略的细节

【别让小细节毁了全局】

就算做了上面所有配置,还是可能有漏网之鱼,这些细节一定要注意:

4.1 避免消息还没写入磁盘就宕机

RabbitMQ开启持久化后,会先把消息写入操作系统的PageCache,再异步刷到磁盘。如果这时候MQ宕机,PageCache里的消息会丢失。

解决方法:

  • 开启RabbitMQ的fsync配置,强制消息写入磁盘后再确认,但会影响性能
  • 配合Publisher Confirm机制,确保MQ确认消息已经持久化到磁盘

4.2 监控告警:早发现早处理

一定要给RabbitMQ加监控,比如用Prometheus+Grafana监控队列长度、消息确认率、消费者数量等指标,设置告警阈值,比如队列长度超过1000就发告警。

常用监控指标:

  • rabbitmq_queue_messages_ready:待处理消息数
  • rabbitmq_queue_messages_unacknowledged:未ACK消息数
  • rabbitmq_publisher_confirm_unconfirmed:未确认的生产者消息数

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-04-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java金融 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 01 🚀 生产者端:从源头杜绝消息“失踪”
    • 1.1 开启Publisher Confirm确认机制
    • 1.2 开启Return Callback处理未路由消息
    • 1.3 重试策略兜底
  • 02 🛡️ Broker端:让消息在中间站稳如泰山
    • 2.1 三重持久化:交换机+队列+消息
      • (1)交换机持久化
      • (2)队列持久化
      • (3)消息持久化
    • 2.2 镜像队列:高可用备份
    • 2.3 LazyQueue:避免内存溢出丢消息
  • 03 📦 消费者端:确保消息“签收”不遗漏
    • 3.1 手动ACK:掌控消息生死权
    • 3.2 幂等性:避免重复消费导致的业务异常
      • (1)数据库唯一键约束
      • (2)Redis原子操作
      • (3)本地缓存+定时清理
  • 04 🧐 终极兜底:那些容易被忽略的细节
    • 4.1 避免消息还没写入磁盘就宕机
    • 4.2 监控告警:早发现早处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档