首页
学习
活动
专区
圈层
工具
发布

RabbitMQ 中消息什么时候会进入死信交换机?

线上看到死信队列突然涨,别急着骂消费者。

我一般先看两样东西:一个是x-death头,另一个是消费端有没有把requeue写成false。RabbitMQ 里的消息不是“消费失败就进死信”,这句话太粗,真按这个理解,排障时很容易绕半天。

RabbitMQ 官方文档里把消息进入死信交换机的触发点归成几类:消费端拒绝且不重新入队、消息 TTL 过期、队列长度超限、Quorum Queue 超过投递次数限制;另外,整个队列过期时,队列里的消息不会被死信转发。这个细节挺容易漏。

先看一个比较常见的配置。

@Configuration

publicclassPayMqConfig{

  publicstaticfinal String PAY_EXCHANGE = "pay.biz.ex";

  publicstaticfinal String PAY_QUEUE = "pay.notify.q";

  publicstaticfinal String DLX_EXCHANGE = "pay.dlx.ex";

  publicstaticfinal String DLX_QUEUE = "pay.dlx.q";

  @Bean

  DirectExchange payExchange(){

      returnnew DirectExchange(PAY_EXCHANGE, true, false);

  }

  @Bean

  DirectExchange payDlxExchange(){

      returnnew DirectExchange(DLX_EXCHANGE, true, false);

  }

  @Bean

  Queue payNotifyQueue(){

      return QueueBuilder.durable(PAY_QUEUE)

              .deadLetterExchange(DLX_EXCHANGE)

              .deadLetterRoutingKey("pay.dead")

              .ttl(30_000)

              .maxLength(50_000)

              .build();

  }

  @Bean

  Queue payDlxQueue(){

      return QueueBuilder.durable(DLX_QUEUE).build();

  }

  @Bean

  Binding payBinding(){

      return BindingBuilder.bind(payNotifyQueue())

              .to(payExchange())

              .with("pay.notify");

  }

  @Bean

  Binding payDlxBinding(){

      return BindingBuilder.bind(payDlxQueue())

              .to(payDlxExchange())

              .with("pay.dead");

  }

}

这段配置里,pay.notify.q才是真正的业务队列。它身上挂了两个东西:

x-dead-letter-exchange = pay.dlx.ex

x-dead-letter-routing-key = pay.dead

消息一旦被 RabbitMQ 判定为死信,就会从原队列重新投递到pay.dlx.ex,再根据pay.dead路由到pay.dlx.q。

这里要注意,死信交换机不是特殊交换机。它就是普通 exchange,只是被某个队列当成了“兜底出口”。

第一种情况,消费者明确拒绝,并且不让它回原队列。

这个是线上最常见的。比如支付回调消息来了,订单状态查不到,或者幂等校验发现数据对不上。这个时候我不太建议无脑重试,尤其是参数本身就是脏的,重试十次也没用。

@Component

publicclassPayNotifyConsumer{

  @RabbitListener(queues = PayMqConfig.PAY_QUEUE, ackMode = "MANUAL")

  publicvoidonMessage(Message message, Channel channel)throws IOException {

      long tag = message.getMessageProperties().getDeliveryTag();

      String body = new String(message.getBody(), StandardCharsets.UTF_8);

      try {

          PayNotifyEvent event = JsonUtil.read(body, PayNotifyEvent.class);

          if (event.getOrderNo() == null || event.getPayNo() == null) {

              log.warn("pay notify invalid body={}", body);

              channel.basicNack(tag, false, false);

              return;

          }

          payNotifyService.confirm(event);

          channel.basicAck(tag, false);

      } catch (TemporaryPayException e) {

          log.warn("pay notify temporary error, requeue body={}", body, e);

          channel.basicNack(tag, false, true);

      } catch (Exception e) {

          log.error("pay notify consume failed, send to dlx body={}", body, e);

          channel.basicNack(tag, false, false);

      }

  }

}

关键就在最后一个参数。

basicNack(tag, false, true)   -> 回原队列

basicNack(tag, false, false)  -> 进死信交换机

basicReject也是一样,只要requeue=false,并且队列配置了 DLX,这条消息就会被转走。

我见过有人把所有异常都写成requeue=true,结果一条坏消息在队列里反复投递,消费者日志刷得飞快,业务一条没动。这个不叫可靠,这叫自转。

第二种情况,消息过期。

TTL 有两种写法,一种是队列级别,一种是消息级别。

队列级别就是刚才配置里的:

.ttl(30_000)

意思是消息在这个队列里待超过 30 秒,还没被消费,就会过期。队列上配置了 DLX,它就会进死信交换机。

消息级别一般在发送时设置:

publicvoidsendPayTimeoutCheck(String orderNo){

  MessageProperties props = new MessageProperties();

  props.setContentType(MessageProperties.CONTENT_TYPE_JSON);

  props.setExpiration("60000");

  props.setMessageId("pay-check-" + orderNo);

  byte[] body = JsonUtil.writeBytes(Map.of("orderNo", orderNo));

  rabbitTemplate.send(

          PayMqConfig.PAY_EXCHANGE,

          "pay.notify",

          new Message(body, props)

  );

}

这里的expiration("60000")是字符串,单位毫秒。这个地方我以前也嫌弃过,明明是数字,偏要传字符串,写错了还不一定第一眼看出来。

TTL 有个坑:不是你设置了 60 秒,消息就一定在第 60 秒准时进死信队列。它和队列类型、消息位置都有关系。排延迟问题时,别拿秒表卡 RabbitMQ。

第三种情况,队列满了。

比如配置了:

.maxLength(50_000)

当队列里消息数量超过限制,RabbitMQ 会把超出的消息处理掉。如果这个队列配置了死信交换机,被挤出去的消息就会进入 DLX。

这个场景我一般会先看监控。

queue=pay.notify.q

messages_ready=50000

messages_unacked=12

publish_rate=1800/s

deliver_rate=200/s

这种不是死信队列的问题,是消费者扛不住了。你盯着死信队列消费脚本改半天没用,前面业务队列已经堵死了。

第四种情况,Quorum Queue 的投递次数超过限制。

普通经典队列里,RabbitMQ 不会天然帮你数“这条消息失败了几次就扔死信”。很多项目是自己在 header 里塞 retry 次数,或者靠 TTL + DLX 做延迟重试。

但 Quorum Queue 支持 delivery-limit。超过限制后,消息会被死信处理。

类似这样:

@Bean

Queue payQuorumQueue(){

  Map<String, Object> args = new HashMap<>();

  args.put("x-queue-type", "quorum");

  args.put("x-delivery-limit", 5);

  args.put("x-dead-letter-exchange", PayMqConfig.DLX_EXCHANGE);

  args.put("x-dead-letter-routing-key", "pay.dead");

  returnnew Queue("pay.quorum.q", true, false, false, args);

}

这种更适合那些“最多试几次,不行就人工处理”的消息。别让一条毒消息一直卡在消费链路里。

死信到了队列里,我一般不会马上补偿,先把头信息打出来。

@RabbitListener(queues = PayMqConfig.DLX_QUEUE)

publicvoidonDeadMessage(Message message){

  MessageProperties props = message.getMessageProperties();

  log.warn("dead message received, msgId={}, routingKey={}, xDeath={}, body={}",

          props.getMessageId(),

          props.getReceivedRoutingKey(),

          props.getHeaders().get("x-death"),

          new String(message.getBody(), StandardCharsets.UTF_8));

}

x-death里通常能看到原因,比如:

reason=rejected

reason=expired

reason=maxlen

这个比看代码猜强多了。

最后再补一句,下面这些情况不会自动进死信:

消费者 ack 成功了,不会进死信

basicNack requeue=true,不会进死信

消息发到 exchange 但没有路由到队列,不等于队列死信

整个队列过期删除,里面的消息不会逐条进死信

RabbitMQ 的死信机制不是垃圾桶,更像事故现场的暂存区。

配置 DLX 只是第一步,真正要看的还是这条消息为什么死:是业务参数脏了,是消费者异常了,是 TTL 到了,还是队列被打满了。

别上来就写个定时任务把死信队列重新塞回业务队列。那种操作,看着像补偿,实际上很可能只是把事故重新播放一遍。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/O2UdoTr_gjiKKjO33H5h0Ymg0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。
领券