
最近发的每篇教程都讲清楚了概念,也讲清楚了在咱们项目中是如何实现和落地的。
延迟队列是一种特殊的消息队列,它允许消息在发送后的一定时间延迟后才被消费。在电商系统中,延迟队列常用于处理订单超时自动取消、优惠券到期提醒、定时任务调度等场景。
死信队列(Dead Letter Queue,DLQ)是用于存储无法被正常消费的消息的队列。当消息满足以下任一条件时,会被发送到死信队列:
在RabbitMQ中,实现延迟队列主要有两种方式:
本项目采用的是第二种方式,通过安装和配置 RabbitMQ Delayed Message Exchange 插件来实现延迟队列功能。
在电商系统中,订单创建后通常需要用户在一定时间内完成支付,否则订单应该被自动取消。处理这种场景有几种常见方案:
方案 | 优点 | 缺点 |
|---|---|---|
定时任务轮询 | 实现简单 | 1. 时间精度低2. 对数据库压力大3. 资源浪费 |
Redis过期监听 | 性能好 | 1. 需要额外的Redis集群2. 实现复杂度高3. 存在消息丢失风险 |
延迟队列 | 1. 时间精度高2. 解耦系统3. 高可靠 | 1. 需要引入消息队列2. 额外维护成本 |
RabbitMQ Delayed Message Exchange 插件是一个官方维护的插件,它提供了一个延迟交换机类型 x-delayed-message,允许消息根据指定的延迟时间进行投递。
从项目结构可以看到,插件已经放置在 rabbitmq/plugins 目录下:
rabbitmq/
└── plugins/
└── rabbitmq_delayed_message_exchange-4.1.0.ez
在Docker环境中,通常需要在 docker-compose.yml 中配置启用该插件。
项目中实现延迟队列处理订单超时主要包含以下几个核心组件:
项目在 utility/rabbitmq/rabbitmq.go 中封装了RabbitMQ客户端,提供了连接管理、消息发布、消费等功能。
// 关键方法:PublishWithDelay 发布延迟消息
func (r *RabbitMQ) PublishWithDelay(exchange, routingKey string, message interface{}, delayMs int) error {
body, err := json.Marshal(message)
if err != nil {
return err
}
return r.channel.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
Headers: amqp.Table{
"x-delay": delayMs, // 延迟时间,单位毫秒
},
DeliveryMode: amqp.Persistent, // 持久化消息
},
)
}
特别注意:
Headers: amqp.Table{"x-delay": delayMs} 设置延迟时间DeliveryMode: amqp.Persistent 确保消息持久化,防止服务重启导致消息丢失// DeclareExchange 声明交换机
func (r *RabbitMQ) DeclareExchange(name, kind string) error {
args := amqp.Table{}
// 如果是延迟交换机,需要设置特殊参数
if kind == "x-delayed-message" {
args["x-delayed-type"] = "direct" // 指定延迟交换机的底层类型
}
return r.channel.ExchangeDeclare(
name,
kind,
true, // durable
false, // autoDelete
false, // internal
false, // noWait
args, // arguments
)
}
延迟交换机需要指定 kind 为 x-delayed-message,并在 args 中设置 x-delayed-type 参数。
// 订单超时事件定义
type OrderTimeoutEvent struct {
OrderId int `json:"order_id"`
Type string `json:"type"`
TimeStamp string `json:"timestamp"`
}
// 事件类型常量
const (
OrderTimeout = "order_timeout"
)
当用户创建订单时,系统会发布一个延迟消息,设置一定的延迟时间(如30分钟):
// PublishOrderTimeoutEvent 发布订单超时事件
func PublishOrderTimeoutEvent(orderId int, delayMs int) {
ctx := context.Background()
// 初始化RabbitMQ连接
rb, err := NewRabbitMQ(ctx)
if err != nil {
g.Log().Errorf(ctx, "Failed to connect to RabbitMQ: %v", err)
return
}
defer rb.Close()
// 声明延迟交换机
exchange := g.Cfg().MustGet(ctx, "rabbitmq.exchange.orderDelayExchange").String()
err = rb.DeclareExchange(exchange, "x-delayed-message")
if err != nil {
g.Log().Errorf(ctx, "Failed to declare delay exchange: %v", err)
return
}
// 创建事件
event := OrderTimeoutEvent{
OrderId: orderId,
Type: OrderTimeout,
TimeStamp: time.Now().Format(time.RFC3339),
}
// 发布延迟事件
routingKey := g.Cfg().MustGet(ctx, "rabbitmq.routingKey.orderTimeout").String()
err = rb.PublishWithDelay(exchange, routingKey, event, delayMs)
if err != nil {
g.Log().Errorf(ctx, "Failed to publish orderTimeout event: %v", err)
} else {
g.Log().Infof(ctx, "Published orderTimeout event with %d ms delay: %+v", delayMs, event)
}
}
订单超时消费者负责接收和处理超时消息:
// OrderTimeoutConsumer 订单超时未支付消费者
type OrderTimeoutConsumer struct {
*rabbitmq.BaseConsumer
}
// NewOrderTimeoutConsumer 创建订单超时未支付消费者
func NewOrderTimeoutConsumer(ctx context.Context) *OrderTimeoutConsumer {
config := rabbitmq.ConsumerConfig{
Exchange: g.Cfg().MustGet(ctx, "rabbitmq.exchange.orderDelayExchange").String(),
ExchangeType: "x-delayed-message",
Queue: g.Cfg().MustGet(ctx, "rabbitmq.queue.orderTimeoutQueue").String(),
RoutingKey: g.Cfg().MustGet(ctx, "rabbitmq.routingKey.orderTimeout").String(),
ConsumerTag: "order_service_order_timeout",
AutoAck: false,
PrefetchCount: 1,
Durable: true,
}
return &OrderTimeoutConsumer{
BaseConsumer: rabbitmq.NewBaseConsumer("OrderTimeoutConsumer", config),
}
}
// HandleMessage 处理订单超时未支付消息
func (c *OrderTimeoutConsumer) HandleMessage(ctx context.Context, msg amqp.Delivery) error {
var event rabbitmq.OrderTimeoutEvent
err := rabbitmq.UnmarshalEvent(msg.Body, &event)
if err != nil {
g.Log().Errorf(ctx, "解析订单超时未支付结果事件失败: %v", err)
return err
}
g.Log().Infof(ctx, "收到订单超时未支付事件: %+v", event)
if event.Type != rabbitmq.OrderTimeout {
g.Log().Errorf(ctx, "不是订单超时未支付的事件,event.Type:%s", event.Type)
return gerror.WrapCode(gcode.CodeInvalidParameter, fmt.Errorf("不是订单超时未支付的事件,event.Type:%s", event.Type))
}
eventTime, err := time.Parse(time.RFC3339, event.TimeStamp)
if err != nil {
return fmt.Errorf("解析事件时间戳失败: %v", err)
}
// 判断是否过期:事件时间 + 30s < 当前时间
expireTime := g.Cfg().MustGet(ctx, "business.orderTimeout").String()
expireMs, err := strconv.Atoi(expireTime)
if err != nil {
return fmt.Errorf("订单超时时间配置无效: %v", err)
}
expireDuration := time.Duration(expireMs) * time.Millisecond
if time.Now().Before(eventTime.Add(expireDuration)) {
g.Log().Infof(ctx, "订单未到取消时间,跳过处理: order_id=%d, event_time=%s", event.OrderId, event.TimeStamp)
return nil
}
// 调用订单超时未支付处理逻辑
err = order_info.HandleOrderTimeoutResult(ctx, event.OrderId)
if err != nil {
g.Log().Errorf(ctx, "处理订单 %d 的超时未支付失败: %v", event.OrderId, err)
return err
}
g.Log().Infof(ctx, "成功处理订单 %d 的超时未支付事件", event.OrderId)
// 取消库存
eventReq, err := order_info.GetOrderDetail(ctx, event.OrderId)
if err != nil {
g.Log().Errorf(ctx, "获取订单 %v 对应的商品信息失败,err: %v", event.OrderId, err)
return err
}
go rabbitmq.PublishReturnStockEvent(event.OrderId, eventReq)
return nil
}
消费者的主要职责:
// HandleOrderTimeoutResult 处理订单超时结果
func HandleOrderTimeoutResult(ctx context.Context, orderId int) error {
// 更新字段
updateData := g.Map{
"status": consts.OrderStatusCancelled,
"updated_at": gtime.Now(), // 可选:更新时间戳
}
// 更新订单状态
result, err := dao.OrderInfo.Ctx(ctx).Where("id=? AND status=?", orderId, consts.OrderStatusPendingPayment).Update(updateData)
if err != nil {
return gerror.WrapCode(gcode.CodeDbOperationError, err)
}
row, _ := result.RowsAffected()
if row == 0 {
g.Log().Infof(ctx, "订单已取消,无需再取消, orderId=%d", orderId)
return nil
}
g.Log().Infof(ctx, "订单状态更新成功, 订单编号:{%s}, 新状态: %d", orderId, consts.OrderStatusPendingPayment)
return nil
}
这个函数的主要逻辑:
WHERE id=? AND status=? 条件进行乐观锁更新,确保只更新待支付状态的订单┌───────────────┐ ┌────────────────────┐ ┌──────────────────────┐
│ 创建订单 │ ──> │ 发布延迟消息 │ ──> │ 延迟交换机存储 │
└───────────────┘ └────────────────────┘ └──────────┬─────────┘
│ 延迟时间到
▼
┌───────────────────────┐ ┌───────────────────────┐ ┌─────────────────┐
│ 返还商品库存 │ <─── │ 更新订单状态为已取消 │ <─── │ 消费超时消息 │
└───────────────────────┘ └───────────────────────┘ └─────────────────┘
PublishOrderTimeoutEvent 方法,发布一个延迟消息,延迟时间通常设置为订单超时时间(如30分钟)OrderTimeoutConsumer 从队列中获取消息HandleOrderTimeoutResult 更新订单状态为"已取消"PublishReturnStockEvent 发布库存返还事件除了订单超时处理,延迟队列还可以用于以下场景:
通过本实战案例,相信大家已经掌握了如何使用RabbitMQ延迟队列来处理订单超时问题,以及相关的最佳实践和优化方向。