上周凌晨3点,程序员小王突然被电话吵醒——用户投诉订单重复创建,查了半天发现是RabbitMQ消息丢失后重试导致的。老板在群里@他:“明天给我一个消息100%不丢的方案!”
如果你也遇到过这种半夜被叫醒的崩溃时刻,那今天这篇压箱底的RabbitMQ防丢指南,你一定要看完!
【别让消息死在发送路上】
很多时候消息丢失,根本没到MQ就“半路夭折”了。比如网络波动、MQ宕机,生产者以为发出去了,其实消息早就丢了。这时候就得给生产者加双重保险:
这相当于给消息发了个“快递回执”——MQ收到消息后,会给生产者一个确认信号,没收到回执就说明消息丢了,生产者可以重试。
Spring Boot配置示例:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated # 异步确认模式,性能最优
Java代码实现异步回调:
@RabbitListener
public class ConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
// 消息未确认,记录日志+重试
log.error("消息发送失败,原因:{}", cause);
retrySend(correlationData);
}
}
}
划重点:一定要用异步确认模式,同步模式会阻塞生产者,严重影响性能。
有时候消息成功到了MQ,但交换机找不到对应的队列,默认会直接丢弃。这时候要开启Return Callback,把未路由的消息捞回来。
关键配置:
spring:
rabbitmq:
publisher-returns: true
template:
mandatory: true # 强制要求未路由消息返回
回调处理示例:
@RabbitListener
public class ReturnCallbackListener implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
// 未路由消息处理:存储到数据库+人工介入
log.error("消息未路由,交换机:{},路由键:{},内容:{}",
returned.getExchange(), returned.getRoutingKey(), returned.getMessage());
}
}
就算加了确认机制,偶尔还是会有网络抖动导致发送失败。这时候要配置重试策略,让系统自动重试,不用人工干预。
配置示例:
spring:
rabbitmq:
template:
retry:
enabled: true
max-attempts: 3 # 最多重试3次
initial-interval: 1000ms # 第一次重试间隔1秒
multiplier: 2 # 每次重试间隔翻倍
注意:重试次数别设太多,不然会导致MQ消息堆积,最好配合死信队列使用,重试失败的消息进入死信队列,后续人工处理。
【别让MQ宕机带走你的消息】
生产者把消息送到MQ了,但如果MQ突然宕机,消息会不会丢?默认情况下会——因为RabbitMQ消息默认存在内存里,宕机后直接清空。这时候就得给MQ加“持久化Buff”。
要保证消息重启不丢,必须同时开启三个持久化:
创建交换机时指定durable: true:
@Bean
public DirectExchange orderExchange() {
// durable=true:交换机持久化,重启后不丢失
return new DirectExchange("order_exchange", true, false);
}
创建队列时同样指定durable: true:
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order_queue").build();
}
发送消息时设置MessageDeliveryMode.PERSISTENT:
rabbitTemplate.convertAndSend("order_exchange", "order_key",
orderDTO, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
});
划重点:三个持久化缺一不可!只开消息持久化,队列宕机丢失了,消息也找不到。
就算开启了持久化,如果MQ节点挂了,还是会影响消息读取。这时候要配置镜像队列,把队列复制到多个节点,一个节点挂了,其他节点还能继续提供服务。
配置步骤:
mirror_queue_policy^order_.*(匹配所有以order开头的队列){"ha-mode":"all","ha-sync-mode":"automatic"}解释:ha-mode: all表示队列镜像到所有节点,ha-sync-mode: automatic表示自动同步队列内容。
如果MQ存了海量消息,全部放在内存里容易导致内存溢出,进而引发宕机丢消息。LazyQueue可以把消息直接存在磁盘里,内存只存少量活跃消息,避免OOM。
配置示例:
@Bean
public Queue lazyOrderQueue() {
return QueueBuilder.durable("lazy_order_queue")
.lazy() // 开启Lazy模式
.build();
}
注意:LazyQueue适合存储大量冷消息,热消息还是用普通队列,因为磁盘读取速度比内存慢。
【别让消息被重复消费或悄悄丢失】
消息到了消费者手里,也不是万事大吉——如果消费者还没处理完就宕机,自动ACK会让MQ以为消息已经处理完成,直接删除,导致消息丢失。
一定要关闭自动ACK,改用手动ACK,确保消费者真正处理完消息后再告诉MQ:“我搞定了,你删了吧”。
配置示例:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动ACK
concurrency: 3 # 消费者数量
max-concurrency: 10 # 最大消费者数量
手动ACK代码示例:
@RabbitListener(queues = "order_queue")
public void handleOrderMessage(Message message, Channel channel) throws IOException {
try {
// 处理业务逻辑:创建订单、扣减库存
OrderDTO orderDTO = JSON.parseObject(message.getBody(), OrderDTO.class);
orderService.createOrder(orderDTO);
// 手动确认消息:第二个参数false表示只确认当前消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("处理订单消息失败:", e);
// 消息处理失败,拒绝并重新入队:第三个参数true表示重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
避坑指南:
basicNack而非basicReject,因为basicNack可以批量拒绝消息就算做了所有防丢措施,还是可能出现重复消费的情况——比如消费者ACK后,MQ没收到,会重新发送消息。这时候必须保证业务接口的幂等性。
常用幂等性实现方案:
在订单表中,用order_no作为唯一键,重复插入时会触发主键冲突,直接忽略:
INSERT INTO orders(order_no, user_id, amount)
VALUES ('20240520123456', 1001, 99.9)
ON DUPLICATE KEY UPDATE status = status;
用Redis的SETNX命令,给每个消息生成唯一ID,处理前先判断是否已经处理过:
String messageId = message.getMessageProperties().getMessageId();
if (redisTemplate.opsForValue().setIfAbsent("order:process:" + messageId, "1", 1, TimeUnit.HOURS)) {
// 未处理过,执行业务逻辑
orderService.createOrder(orderDTO);
} else {
// 已经处理过,直接ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
如果不想依赖Redis,也可以用本地Guava缓存,配合定时任务清理过期缓存:
private LoadingCache<String, Boolean> processCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.build(new CacheLoader<String, Boolean>() {
@Override
public Boolean load(String key) throws Exception {
return false;
}
});
// 处理消息前判断
if (!processCache.get(messageId)) {
orderService.createOrder(orderDTO);
processCache.put(messageId, true);
}
【别让小细节毁了全局】
就算做了上面所有配置,还是可能有漏网之鱼,这些细节一定要注意:
RabbitMQ开启持久化后,会先把消息写入操作系统的PageCache,再异步刷到磁盘。如果这时候MQ宕机,PageCache里的消息会丢失。
解决方法:
fsync配置,强制消息写入磁盘后再确认,但会影响性能一定要给RabbitMQ加监控,比如用Prometheus+Grafana监控队列长度、消息确认率、消费者数量等指标,设置告警阈值,比如队列长度超过1000就发告警。
常用监控指标:
rabbitmq_queue_messages_ready:待处理消息数rabbitmq_queue_messages_unacknowledged:未ACK消息数rabbitmq_publisher_confirm_unconfirmed:未确认的生产者消息数