首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >SpringBoot 整合 RabbitMQ 的完美实践

SpringBoot 整合 RabbitMQ 的完美实践

作者头像
果酱带你啃java
发布2026-04-14 12:44:48
发布2026-04-14 12:44:48
370
举报

为什么 RabbitMQ 是消息中间件的优选?

在分布式系统架构中,消息中间件扮演着 "交通枢纽" 的角色,负责协调各个服务之间的通信。目前主流的消息中间件有 RabbitMQ、Kafka 和 RocketMQ,它们各具特色:

  • Kafka:高吞吐量,适合大数据日志处理,但消息可靠性和灵活性较弱
  • RocketMQ:阿里开源,兼顾吞吐量和可靠性,适合复杂业务场景
  • RabbitMQ:基于 AMQP 协议,灵活性高,插件丰富,社区活跃,学习曲线友好

根据 RabbitMQ 官方数据,它在全球财富 500 强公司中被广泛采用,能轻松处理每秒数万条消息,且提供了近乎完美的消息可靠性保证。其独特的交换机模型和灵活的路由规则,使其成为业务复杂多变场景的理想选择。

本文将带你从零开始,全面掌握 SpringBoot 与 RabbitMQ 的整合方案,从基础配置到高级特性,从代码实现到性能调优,让你既能理解底层原理,又能解决实际开发中的各种问题。

一、RabbitMQ 核心概念与架构

1.1 核心概念解析

RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议实现,核心概念包括:

  • Producer消息生产者,负责发送消息到 RabbitMQ 服务器
  • Consumer消息消费者,负责从 RabbitMQ 服务器接收并处理消息
  • BrokerRabbitMQ 服务器实例,负责消息的存储和转发
  • Exchange交换机,接收生产者发送的消息,并根据路由规则将消息路由到队列
  • Queue消息队列,存储消息直到被消费者消费
  • Binding绑定,定义交换机和队列之间的关联关系,包含路由规则
  • Routing Key路由键,生产者发送消息时指定,用于交换机路由消息
  • Virtual Host虚拟主机,提供资源隔离,不同虚拟主机之间的资源相互独立

1.2 交换机类型

RabbitMQ 提供了四种主要的交换机类型,适用于不同的路由场景:

  1. Direct Exchange直接交换机,根据路由键精确匹配进行路由
  2. Topic Exchange主题交换机,支持通配符匹配路由键(*匹配一个单词,#匹配多个单词)
  3. Fanout Exchange扇形交换机,忽略路由键,将消息广播到所有绑定的队列
  4. Headers Exchange头交换机,根据消息头信息而不是路由键进行路由

1.3 架构原理

RabbitMQ 的整体架构如图所示:

消息流转流程:

  1. 生产者将消息发送到交换机,并指定路由键
  2. 交换机根据自身类型和绑定规则,将消息路由到一个或多个队列
  3. 消费者从队列中获取消息并处理
  4. 消息被消费后,默认从队列中删除

根据 RabbitMQ 官方文档(https://www.rabbitmq.com/documentation.html),这种架构设计使得 RabbitMQ 具有极高的灵活性,可以通过不同的交换机和绑定组合,实现复杂的消息路由策略。

二、环境搭建

2.1 安装 RabbitMQ

我们采用最新稳定版 RabbitMQ 3.13.0 进行安装,步骤如下:

  1. 安装 Erlang(RabbitMQ 依赖 Erlang 环境):
代码语言:javascript
复制
# 对于Ubuntu/Debian
sudo apt-get update
sudo apt-get install erlang

# 对于CentOS/RHEL
sudo yum install erlang
  1. 安装 RabbitMQ:
代码语言:javascript
复制
# 对于Ubuntu/Debian
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server_3.13.0-1_all.deb
sudo dpkg -i rabbitmq-server_3.13.0-1_all.deb

# 对于CentOS/RHEL
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm
sudo rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm
  1. 启动 RabbitMQ 服务:
代码语言:javascript
复制
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
  1. 启用管理插件:
代码语言:javascript
复制
sudo rabbitmq-plugins enable rabbitmq_management
  1. 创建管理员用户:
代码语言:javascript
复制
sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
  1. 访问管理界面: 打开浏览器访问http://localhost:15672,使用创建的 admin 用户登录。

2.2 安装 Docker 方式(推荐)

使用 Docker 安装 RabbitMQ 更加简单快捷:

代码语言:javascript
复制
# 拉取镜像
docker pull rabbitmq:3.13.0-management

# 启动容器
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  rabbitmq:3.13.0-management
代码语言:javascript
复制

三、SpringBoot 集成 RabbitMQ 基础

3.1 创建项目并添加依赖

我们使用 SpringBoot 3.2.0(最新稳定版)来创建项目,首先在 pom.xml 中添加必要的依赖:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>

    <groupId>com.jam</groupId>
    <artifactId>springboot-rabbitmq-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq-demo</name>
    <description>SpringBoot集成RabbitMQ示例项目</description>

    <properties>
        <java.version>17</java.version>
        <lombok.version>1.18.30</lombok.version>
        <commons-lang3.version>3.14.0</commons-lang3.version>
        <mybatis-plus.version>3.5.5</mybatis-plus.version>
        <mysql-connector.version>8.2.0</mysql-connector.version>
        <springdoc.version>2.1.0</springdoc.version>
    </properties>

    <dependencies>
        <!-- SpringBoot核心依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- RabbitMQ依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 工具类 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>

        <!-- MyBatis-Plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <!-- MySQL驱动 -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>${mysql-connector.version}</version>
            <scope>runtime</scope>
        </dependency>

        <!-- Swagger3 -->
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>

        <!-- 测试依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
代码语言:javascript
复制

3.2 配置 RabbitMQ

在 application.yml 中添加 RabbitMQ 的配置:

代码语言:javascript
复制
spring:
  application:
    name: springboot-rabbitmq-demo
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/rabbitmq_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: root
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: password
    virtual-host: /
    # 连接超时时间,单位毫秒
    connection-timeout: 10000
    # 生产者配置
    publisher-confirm-type: correlated # 开启发布确认机制
    publisher-returns: true # 开启发布返回机制
    # 消费者配置
    listener:
      simple:
        # 并发消费者数量
        concurrency: 5
        # 最大并发消费者数量
        max-concurrency: 10
        # 每次从队列中拉取的消息数量
        prefetch: 1
        # 消息确认模式:manual-手动确认,auto-自动确认
        acknowledge-mode: manual
        # 消费失败时是否重试
        retry:
          enabled: true
          # 初始重试间隔时间
          initial-interval: 1000
          # 重试最大间隔时间
          max-interval: 10000
          # 重试乘数
          multiplier: 2
          # 最大重试次数
          max-attempts: 3

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: com.jam.entity
  configuration:
    map-underscore-to-camel-case: true
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

springdoc:
  api-docs:
    path: /api-docs
  swagger-ui:
    path: /swagger-ui.html
    operationsSorter: method

server:
  port: 8081
代码语言:javascript
复制

3.3 创建 RabbitMQ 配置类

创建配置类,定义交换机、队列和绑定关系:

代码语言:javascript
复制
package com.jam.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 * 定义交换机、队列和绑定关系
 *
 * @author 果酱
 */
@Configuration
public class RabbitMQConfig {
    /**
     * 直接交换机名称
     */
    public static final String DIRECT_EXCHANGE = "direct_exchange";

    /**
     * 主题交换机名称
     */
    public static final String TOPIC_EXCHANGE = "topic_exchange";

    /**
     * 扇形交换机名称
     */
    public static final String FANOUT_EXCHANGE = "fanout_exchange";

    /**
     * 头交换机名称
     */
    public static final String HEADERS_EXCHANGE = "headers_exchange";

    /**
     * 直接队列1名称
     */
    public static final String DIRECT_QUEUE_1 = "direct_queue_1";

    /**
     * 直接队列2名称
     */
    public static final String DIRECT_QUEUE_2 = "direct_queue_2";

    /**
     * 主题队列1名称
     */
    public static final String TOPIC_QUEUE_1 = "topic_queue_1";

    /**
     * 主题队列2名称
     */
    public static final String TOPIC_QUEUE_2 = "topic_queue_2";

    /**
     * 扇形队列1名称
     */
    public static final String FANOUT_QUEUE_1 = "fanout_queue_1";

    /**
     * 扇形队列2名称
     */
    public static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    /**
     * 头队列名称
     */
    public static final String HEADERS_QUEUE = "headers_queue";

    /**
     * 死信交换机名称
     */
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";

    /**
     * 死信队列名称
     */
    public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";

    /**
     * 延迟队列名称
     */
    public static final String DELAY_QUEUE = "delay_queue";

    // ==================== 交换机 ====================

    /**
     * 创建直接交换机
     *
     * @return 直接交换机
     */
    @Bean
    public DirectExchange directExchange() {
        // durable: 是否持久化
        // autoDelete: 是否自动删除(当没有绑定关系时)
        // arguments: 交换机的其他属性
        return new DirectExchange(DIRECT_EXCHANGE, true, false);
    }

    /**
     * 创建主题交换机
     *
     * @return 主题交换机
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE, true, false);
    }

    /**
     * 创建扇形交换机
     *
     * @return 扇形交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }

    /**
     * 创建头交换机
     *
     * @return 头交换机
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHANGE, true, false);
    }

    /**
     * 创建死信交换机
     *
     * @return 死信交换机
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
    }

    // ==================== 队列 ====================

    /**
     * 创建直接队列1
     *
     * @return 直接队列1
     */
    @Bean
    public Queue directQueue1() {
        // durable: 是否持久化
        // exclusive: 是否排他(仅当前连接可见,连接关闭后删除)
        // autoDelete: 是否自动删除(当没有消费者时)
        // arguments: 队列的其他属性
        return QueueBuilder.durable(DIRECT_QUEUE_1)
                .build();
    }

    /**
     * 创建直接队列2
     *
     * @return 直接队列2
     */
    @Bean
    public Queue directQueue2() {
        return QueueBuilder.durable(DIRECT_QUEUE_2)
                .build();
    }

    /**
     * 创建主题队列1
     *
     * @return 主题队列1
     */
    @Bean
    public Queue topicQueue1() {
        return QueueBuilder.durable(TOPIC_QUEUE_1)
                .build();
    }

    /**
     * 创建主题队列2
     *
     * @return 主题队列2
     */
    @Bean
    public Queue topicQueue2() {
        return QueueBuilder.durable(TOPIC_QUEUE_2)
                .build();
    }

    /**
     * 创建扇形队列1
     *
     * @return 扇形队列1
     */
    @Bean
    public Queue fanoutQueue1() {
        return QueueBuilder.durable(FANOUT_QUEUE_1)
                .build();
    }

    /**
     * 创建扇形队列2
     *
     * @return 扇形队列2
     */
    @Bean
    public Queue fanoutQueue2() {
        return QueueBuilder.durable(FANOUT_QUEUE_2)
                .build();
    }

    /**
     * 创建头队列
     *
     * @return 头队列
     */
    @Bean
    public Queue headersQueue() {
        return QueueBuilder.durable(HEADERS_QUEUE)
                .build();
    }

    /**
     * 创建死信队列
     *
     * @return 死信队列
     */
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE)
                .build();
    }

    /**
     * 创建延迟队列
     * 设置死信交换机和死信路由键
     *
     * @return 延迟队列
     */
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE)
                // 设置死信交换机
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                // 设置死信路由键
                .withArgument("x-dead-letter-routing-key", "dead.letter.key")
                .build();
    }

    // ==================== 绑定 ====================

    /**
     * 绑定直接队列1到直接交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding directBinding1() {
        // 将directQueue1绑定到directExchange,路由键为"direct.key1"
        return BindingBuilder.bind(directQueue1())
                .to(directExchange())
                .with("direct.key1");
    }

    /**
     * 绑定直接队列2到直接交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding directBinding2() {
        // 将directQueue2绑定到directExchange,路由键为"direct.key2"
        return BindingBuilder.bind(directQueue2())
                .to(directExchange())
                .with("direct.key2");
    }

    /**
     * 绑定主题队列1到主题交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding topicBinding1() {
        // 将topicQueue1绑定到topicExchange,路由键模式为"topic.key.*"
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with("topic.key.*");
    }

    /**
     * 绑定主题队列2到主题交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding topicBinding2() {
        // 将topicQueue2绑定到topicExchange,路由键模式为"topic.#"
        return BindingBuilder.bind(topicQueue2())
                .to(topicExchange())
                .with("topic.#");
    }

    /**
     * 绑定扇形队列1到扇形交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding fanoutBinding1() {
        // 扇形交换机忽略路由键,只需绑定即可
        return BindingBuilder.bind(fanoutQueue1())
                .to(fanoutExchange());
    }

    /**
     * 绑定扇形队列2到扇形交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2())
                .to(fanoutExchange());
    }

    /**
     * 绑定头队列到头交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding headersBinding() {
        // 头交换机根据消息头信息进行路由
        // 这里设置需要匹配的头信息:type=message和priority=high
        // whereAll()表示所有头信息都需要匹配
        // whereAny()表示任何一个头信息匹配即可
        return BindingBuilder.bind(headersQueue())
                .to(headersExchange())
                .where("type").matches("message")
                .and("priority").matches("high");
    }

    /**
     * 绑定死信队列到死信交换机
     *
     * @return 绑定关系
     */
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("dead.letter.key");
    }
}
代码语言:javascript
复制

3.4 创建消息实体类

创建一个通用的消息实体类,用于封装发送的消息内容:

代码语言:javascript
复制
package com.jam.entity;

import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * 消息实体类
 * 用于封装发送到RabbitMQ的消息内容
 *
 * @author 果酱
 */
@Data
public class MessageEntity implements Serializable {
    /**
     * 消息ID
     */
    private String messageId;

    /**
     * 消息内容
     */
    private String content;

    /**
     * 业务类型
     */
    private String businessType;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;

    /**
     * 扩展字段,用于存储额外信息
     */
    private String extra;
}
代码语言:javascript
复制

3.5 创建消息生产者

使用 Spring AMQP 提供的 RabbitTemplate 来发送消息,创建一个消息生产者服务:

代码语言:javascript
复制
package com.jam.service;

import com.jam.config.RabbitMQConfig;
import com.jam.entity.MessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * 消息生产者服务
 * 负责向RabbitMQ发送各种类型的消息
 *
 * @author 果酱
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducerService {
    /**
     * RabbitMQ模板类,提供发送消息的各种方法
     */
    private final RabbitTemplate rabbitTemplate;

    /**
     * 初始化RabbitTemplate的回调函数
     */
    public void initRabbitTemplate() {
        // 设置确认回调:确认消息是否到达交换机
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String messageId = correlationData != null ? correlationData.getId() : "unknown";
            if (ack) {
                log.info("消息已到达交换机,消息ID:{}", messageId);
            } else {
                log.error("消息未到达交换机,消息ID:{},原因:{}", messageId, cause);
                // 消息发送失败,可以在这里进行重试或记录到数据库
            }
        });

        // 设置返回回调:当消息到达交换机但无法路由到队列时触发
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.error("消息无法路由到队列,消息ID:{},交换机:{},路由键:{},回复码:{},回复文本:{}",
                    returnedMessage.getMessage().getMessageProperties().getMessageId(),
                    returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey(),
                    returnedMessage.getReplyCode(),
                    returnedMessage.getReplyText());
        });
    }

    /**
     * 发送直接交换机消息
     *
     * @param routingKey 路由键
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     */
    public void sendDirectMessage(String routingKey, String content, 
                                 String businessType, String extra) {
        // 参数校验
        StringUtils.hasText(routingKey, "路由键不能为空");
        StringUtils.hasText(content, "消息内容不能为空");

        // 创建消息实体
        MessageEntity messageEntity = createMessageEntity(content, businessType, extra);

        // 创建关联数据,用于确认回调
        CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());

        log.info("发送直接消息,交换机:{},路由键:{},消息ID:{}",
                RabbitMQConfig.DIRECT_EXCHANGE, routingKey, messageEntity.getMessageId());

        // 发送消息
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.DIRECT_EXCHANGE,
                routingKey,
                messageEntity,
                correlationData);
    }

    /**
     * 发送主题交换机消息
     *
     * @param routingKey 路由键
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     */
    public void sendTopicMessage(String routingKey, String content, 
                                String businessType, String extra) {
        // 参数校验
        StringUtils.hasText(routingKey, "路由键不能为空");
        StringUtils.hasText(content, "消息内容不能为空");

        // 创建消息实体
        MessageEntity messageEntity = createMessageEntity(content, businessType, extra);

        // 创建关联数据,用于确认回调
        CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());

        log.info("发送主题消息,交换机:{},路由键:{},消息ID:{}",
                RabbitMQConfig.TOPIC_EXCHANGE, routingKey, messageEntity.getMessageId());

        // 发送消息
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.TOPIC_EXCHANGE,
                routingKey,
                messageEntity,
                correlationData);
    }

    /**
     * 发送扇形交换机消息
     *
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     */
    public void sendFanoutMessage(String content, String businessType, String extra) {
        // 参数校验
        StringUtils.hasText(content, "消息内容不能为空");

        // 创建消息实体
        MessageEntity messageEntity = createMessageEntity(content, businessType, extra);

        // 创建关联数据,用于确认回调
        CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());

        log.info("发送扇形消息,交换机:{},消息ID:{}",
                RabbitMQConfig.FANOUT_EXCHANGE, messageEntity.getMessageId());

        // 扇形交换机忽略路由键,这里可以传null
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.FANOUT_EXCHANGE,
                null,
                messageEntity,
                correlationData);
    }

    /**
     * 发送头交换机消息
     *
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     */
    public void sendHeadersMessage(String content, String businessType, String extra) {
        // 参数校验
        StringUtils.hasText(content, "消息内容不能为空");

        // 创建消息实体
        MessageEntity messageEntity = createMessageEntity(content, businessType, extra);

        // 创建关联数据,用于确认回调
        CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());

        // 设置消息头信息,用于头交换机路由
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setHeader("type", "message");
            message.getMessageProperties().setHeader("priority", "high");
            return message;
        };

        log.info("发送头消息,交换机:{},消息ID:{}",
                RabbitMQConfig.HEADERS_EXCHANGE, messageEntity.getMessageId());

        // 发送消息
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.HEADERS_EXCHANGE,
                null, // 头交换机忽略路由键
                messageEntity,
                messagePostProcessor,
                correlationData);
    }

    /**
     * 发送延迟消息
     *
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @param delayMillis 延迟时间,单位毫秒
     */
    public void sendDelayMessage(String content, String businessType, String extra, long delayMillis) {
        // 参数校验
        StringUtils.hasText(content, "消息内容不能为空");
        if (delayMillis <= 0) {
            throw new IllegalArgumentException("延迟时间必须大于0");
        }

        // 创建消息实体
        MessageEntity messageEntity = createMessageEntity(content, businessType, extra);

        // 创建关联数据,用于确认回调
        CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());

        // 设置消息延迟属性
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置消息过期时间,即延迟时间
            message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
            return message;
        };

        log.info("发送延迟消息,延迟队列:{},延迟时间:{}ms,消息ID:{}",
                RabbitMQConfig.DELAY_QUEUE, delayMillis, messageEntity.getMessageId());

        // 发送消息到延迟队列
        rabbitTemplate.convertAndSend(
                "", // 不指定交换机,直接发送到队列
                RabbitMQConfig.DELAY_QUEUE,
                messageEntity,
                messagePostProcessor,
                correlationData);
    }

    /**
     * 创建消息实体
     *
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 消息实体
     */
    private MessageEntity createMessageEntity(String content, String businessType, String extra) {
        MessageEntity messageEntity = new MessageEntity();
        messageEntity.setMessageId(UUID.randomUUID().toString());
        messageEntity.setContent(content);
        messageEntity.setBusinessType(businessType);
        messageEntity.setCreateTime(LocalDateTime.now());
        messageEntity.setExtra(extra);
        return messageEntity;
    }
}
代码语言:javascript
复制

3.6 创建消息消费者

使用 @RabbitListener 注解来创建消息消费者:

代码语言:javascript
复制
package com.jam.service;

import com.jam.config.RabbitMQConfig;
import com.jam.entity.MessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import java.io.IOException;

/**
 * 消息消费者服务
 * 负责从RabbitMQ接收并处理消息
 *
 * @author 果酱
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageConsumerService {
    /**
     * 消息轨迹服务
     */
    private final MessageTraceService messageTraceService;

    /**
     * 消费直接队列1的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_1)
    public void consumeDirectQueue1(MessageEntity message, Channel channel,
                                   @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到直接队列1的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理消息的业务逻辑
            processMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("直接队列1消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("直接队列1消息处理失败,消息ID:{}", message.getMessageId(), e);
            // 拒绝消息并将其丢弃(不重新入队)
            // 如果需要重新入队,可以使用channel.basicNack(deliveryTag, false, true)
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 消费直接队列2的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_2)
    public void consumeDirectQueue2(MessageEntity message, Channel channel,
                                   @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到直接队列2的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理消息的业务逻辑
            processMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("直接队列2消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("直接队列2消息处理失败,消息ID:{}", message.getMessageId(), e);
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 消费主题队列1的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_1)
    public void consumeTopicQueue1(MessageEntity message, Channel channel,
                                  @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到主题队列1的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理消息的业务逻辑
            processMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("主题队列1消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("主题队列1消息处理失败,消息ID:{}", message.getMessageId(), e);
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 消费主题队列2的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_2)
    public void consumeTopicQueue2(MessageEntity message, Channel channel,
                                  @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到主题队列2的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理消息的业务逻辑
            processMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("主题队列2消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("主题队列2消息处理失败,消息ID:{}", message.getMessageId(), e);
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 消费扇形队列1的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1)
    public void consumeFanoutQueue1(MessageEntity message, Channel channel,
                                   @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到扇形队列1的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理消息的业务逻辑
            processMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("扇形队列1消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("扇形队列1消息处理失败,消息ID:{}", message.getMessageId(), e);
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 消费扇形队列2的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2)
    public void consumeFanoutQueue2(MessageEntity message, Channel channel,
                                   @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到扇形队列2的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理消息的业务逻辑
            processMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("扇形队列2消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("扇形队列2消息处理失败,消息ID:{}", message.getMessageId(), e);
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 消费头队列的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.HEADERS_QUEUE)
    public void consumeHeadersQueue(MessageEntity message, Channel channel,
                                   @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到头队列的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理消息的业务逻辑
            processMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("头队列消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("头队列消息处理失败,消息ID:{}", message.getMessageId(), e);
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 消费死信队列的消息
     *
     * @param message 消息内容
     * @param channel 消息通道
     * @param deliveryTag 消息投递标签
     */
    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
    public void consumeDeadLetterQueue(MessageEntity message, Channel channel,
                                      @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.error("接收到死信队列的消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(), message.getContent(), message.getBusinessType());

        try {
            // 处理死信消息的业务逻辑,通常需要人工干预
            processDeadLetterMessage(message);

            // 记录消费成功轨迹
            messageTraceService.recordConsumeSuccess(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("死信队列消息处理成功并确认,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 记录消费失败轨迹
            messageTraceService.recordConsumeFailure(message, e.getMessage());

            log.error("死信队列消息处理失败,消息ID:{}", message.getMessageId(), e);
            channel.basicNack(deliveryTag, false, false);
        }
    }

    /**
     * 处理消息的业务逻辑
     *
     * @param message 要处理的消息
     */
    private void processMessage(MessageEntity message) {
        // 根据业务类型处理不同的消息
        String businessType = message.getBusinessType();
        if ("ORDER_CREATE".equals(businessType)) {
            // 处理订单创建消息
            processOrderCreateMessage(message);
        } else if ("USER_REGISTER".equals(businessType)) {
            // 处理用户注册消息
            processUserRegisterMessage(message);
        } else {
            // 处理未知类型消息
            log.warn("收到未知类型的消息,消息ID:{},业务类型:{}",
                    message.getMessageId(), businessType);
        }
    }

    /**
     * 处理死信消息
     *
     * @param message 死信消息
     */
    private void processDeadLetterMessage(MessageEntity message) {
        log.info("处理死信消息,消息ID:{},内容:{}",
                message.getMessageId(), message.getContent());
        // 实际业务处理逻辑,如记录到数据库等待人工处理
    }

    /**
     * 处理订单创建消息
     *
     * @param message 订单创建消息
     */
    private void processOrderCreateMessage(MessageEntity message) {
        log.info("处理订单创建消息,消息ID:{},订单信息:{}",
                message.getMessageId(), message.getContent());
        // 实际业务处理逻辑...
    }

    /**
     * 处理用户注册消息
     *
     * @param message 用户注册消息
     */
    private void processUserRegisterMessage(MessageEntity message) {
        log.info("处理用户注册消息,消息ID:{},用户信息:{}",
                message.getMessageId(), message.getContent());
        // 实际业务处理逻辑...
    }
}
代码语言:javascript
复制

3.7 创建消息轨迹服务

为了跟踪消息的整个生命周期,创建消息轨迹服务:

代码语言:javascript
复制
package com.jam.service;

import com.jam.entity.MessageEntity;
import com.jam.entity.MessageTrace;
import com.jam.mapper.MessageTraceMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Objects;

/**
 * 消息轨迹服务
 * 记录消息的发送和消费轨迹
 *
 * @author 果酱
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageTraceService {
    private final MessageTraceMapper messageTraceMapper;

    /**
     * 记录消息发送前的轨迹
     *
     * @param message 消息实体
     * @param exchange 交换机
     * @param routingKey 路由键
     * @return 消息轨迹ID
     */
    @Transactional(rollbackFor = Exception.class)
    public Long recordBeforeSend(MessageEntity message, String exchange, String routingKey) {
        Objects.requireNonNull(message, "消息实体不能为空");
        StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
        StringUtils.hasText(exchange, "交换机不能为空");

        MessageTrace trace = new MessageTrace();
        trace.setMessageId(message.getMessageId());
        trace.setExchange(exchange);
        trace.setRoutingKey(routingKey);
        trace.setBusinessType(message.getBusinessType());
        trace.setContent(message.getContent());
        trace.setSendStatus(0); // 待发送
        trace.setCreateTime(LocalDateTime.now());
        trace.setUpdateTime(LocalDateTime.now());

        messageTraceMapper.insert(trace);
        log.info("记录消息发送前轨迹,消息ID:{},轨迹ID:{}", message.getMessageId(), trace.getId());
        return trace.getId();
    }

    /**
     * 记录消息发送成功的轨迹
     *
     * @param messageId 消息ID
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordSendSuccess(String messageId) {
        StringUtils.hasText(messageId, "消息ID不能为空");

        MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", messageId);
            return;
        }

        trace.setSendTime(LocalDateTime.now());
        trace.setSendStatus(1); // 发送成功
        trace.setUpdateTime(LocalDateTime.now());

        messageTraceMapper.updateById(trace);
        log.info("记录消息发送成功轨迹,消息ID:{}", messageId);
    }

    /**
     * 记录消息发送失败的轨迹
     *
     * @param messageId 消息ID
     * @param errorMsg 错误信息
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordSendFailure(String messageId, String errorMsg) {
        StringUtils.hasText(messageId, "消息ID不能为空");
        StringUtils.hasText(errorMsg, "错误信息不能为空");

        MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", messageId);
            return;
        }

        trace.setSendTime(LocalDateTime.now());
        trace.setSendStatus(2); // 发送失败
        trace.setSendErrorMsg(errorMsg);
        trace.setUpdateTime(LocalDateTime.now());

        messageTraceMapper.updateById(trace);
        log.info("记录消息发送失败轨迹,消息ID:{}", messageId);
    }

    /**
     * 记录消息消费成功的轨迹
     *
     * @param message 消息实体
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordConsumeSuccess(MessageEntity message) {
        Objects.requireNonNull(message, "消息实体不能为空");
        StringUtils.hasText(message.getMessageId(), "消息ID不能为空");

        MessageTrace trace = messageTraceMapper.selectByMessageId(message.getMessageId());
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", message.getMessageId());
            return;
        }

        trace.setConsumeTime(LocalDateTime.now());
        trace.setConsumeStatus(1); // 消费成功
        trace.setUpdateTime(LocalDateTime.now());

        messageTraceMapper.updateById(trace);
        log.info("记录消息消费成功轨迹,消息ID:{}", message.getMessageId());
    }

    /**
     * 记录消息消费失败的轨迹
     *
     * @param message 消息实体
     * @param errorMsg 错误信息
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordConsumeFailure(MessageEntity message, String errorMsg) {
        Objects.requireNonNull(message, "消息实体不能为空");
        StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
        StringUtils.hasText(errorMsg, "错误信息不能为空");

        MessageTrace trace = messageTraceMapper.selectByMessageId(message.getMessageId());
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", message.getMessageId());
            return;
        }

        trace.setConsumeTime(LocalDateTime.now());
        trace.setConsumeStatus(2); // 消费失败
        trace.setConsumeErrorMsg(errorMsg);
        trace.setUpdateTime(LocalDateTime.now());

        messageTraceMapper.updateById(trace);
        log.info("记录消息消费失败轨迹,消息ID:{}", message.getMessageId());
    }
}
代码语言:javascript
复制

3.8 创建控制器

创建一个控制器,用于测试消息发送功能:

代码语言:javascript
复制
package com.jam.controller;

import com.jam.service.MessageProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息测试控制器
 * 提供API接口用于测试消息发送功能
 *
 * @author 果酱
 */
@Slf4j
@RestController
@RequestMapping("/api/message")
@RequiredArgsConstructor
@Tag(name = "消息测试接口", description = "用于测试RabbitMQ消息发送的API接口")
public class MessageController {
    /**
     * 消息生产者服务
     */
    private final MessageProducerService messageProducerService;

    /**
     * 发送直接交换机消息
     *
     * @param routingKey 路由键
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 响应信息
     */
    @PostMapping("/direct")
    @Operation(summary = "发送直接消息", description = "发送到直接交换机的消息")
    public ResponseEntity<String> sendDirectMessage(
            @Parameter(description = "路由键", required = true)
            @RequestParam String routingKey,

            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,

            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,

            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra) {

        log.info("接收到发送直接消息请求,路由键:{}", routingKey);
        messageProducerService.sendDirectMessage(routingKey, content, businessType, extra);
        return ResponseEntity.ok("直接消息发送成功");
    }

    /**
     * 发送主题交换机消息
     *
     * @param routingKey 路由键
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 响应信息
     */
    @PostMapping("/topic")
    @Operation(summary = "发送主题消息", description = "发送到主题交换机的消息")
    public ResponseEntity<String> sendTopicMessage(
            @Parameter(description = "路由键", required = true)
            @RequestParam String routingKey,

            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,

            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,

            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra) {

        log.info("接收到发送主题消息请求,路由键:{}", routingKey);
        messageProducerService.sendTopicMessage(routingKey, content, businessType, extra);
        return ResponseEntity.ok("主题消息发送成功");
    }

    /**
     * 发送扇形交换机消息
     *
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 响应信息
     */
    @PostMapping("/fanout")
    @Operation(summary = "发送扇形消息", description = "发送到扇形交换机的消息")
    public ResponseEntity<String> sendFanoutMessage(
            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,

            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,

            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra) {

        log.info("接收到发送扇形消息请求");
        messageProducerService.sendFanoutMessage(content, businessType, extra);
        return ResponseEntity.ok("扇形消息发送成功");
    }

    /**
     * 发送头交换机消息
     *
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 响应信息
     */
    @PostMapping("/headers")
    @Operation(summary = "发送头消息", description = "发送到头交换机的消息")
    public ResponseEntity<String> sendHeadersMessage(
            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,

            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,

            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra) {

        log.info("接收到发送头消息请求");
        messageProducerService.sendHeadersMessage(content, businessType, extra);
        return ResponseEntity.ok("头消息发送成功");
    }

    /**
     * 发送延迟消息
     *
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @param delayMillis 延迟时间,单位毫秒
     * @return 响应信息
     */
    @PostMapping("/delay")
    @Operation(summary = "发送延迟消息", description = "发送延迟消息,指定时间后才会被消费")
    public ResponseEntity<String> sendDelayMessage(
            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,

            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,

            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra,

            @Parameter(description = "延迟时间(毫秒)", required = true)
            @RequestParam long delayMillis) {

        log.info("接收到发送延迟消息请求,延迟时间:{}ms", delayMillis);
        messageProducerService.sendDelayMessage(content, businessType, extra, delayMillis);
        return ResponseEntity.ok("延迟消息发送成功");
    }
}
代码语言:javascript
复制

3.9 创建启动类

代码语言:javascript
复制
package com.jam;

import com.jam.service.MessageProducerService;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

/**
 * SpringBoot应用启动类
 *
 * @author 果酱
 */
@SpringBootApplication
@MapperScan("com.jam.mapper")
@OpenAPIDefinition(
        info = @Info(
                title = "SpringBoot集成RabbitMQ示例项目",
                version = "1.0",
                description = "SpringBoot集成RabbitMQ的示例项目,包含各种消息发送和消费的示例"
        )
)
public class SpringbootRabbitmqDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqDemoApplication.class, args);
    }

    /**
     * 初始化RabbitTemplate的回调函数
     */
    @Bean
    public CommandLineRunner initRabbitTemplate(MessageProducerService producerService) {
        return args -> {
            producerService.initRabbitTemplate();
        };
    }
}
代码语言:javascript
复制

3.10 创建消息轨迹相关实体和数据库表

消息轨迹实体类:

代码语言:javascript
复制
package com.jam.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 消息轨迹实体类
 * 记录消息的发送和消费情况
 *
 * @author 果酱
 */
@Data
@TableName("t_message_trace")
public class MessageTrace {
    /**
     * 主键ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 消息ID
     */
    private String messageId;

    /**
     * 交换机
     */
    private String exchange;

    /**
     * 路由键
     */
    private String routingKey;

    /**
     * 业务类型
     */
    private String businessType;

    /**
     * 消息内容
     */
    private String content;

    /**
     * 发送时间
     */
    private LocalDateTime sendTime;

    /**
     * 发送状态:0-待发送,1-发送成功,2-发送失败
     */
    private Integer sendStatus;

    /**
     * 发送错误信息
     */
    private String sendErrorMsg;

    /**
     * 消费时间
     */
    private LocalDateTime consumeTime;

    /**
     * 消费状态:0-待消费,1-消费成功,2-消费失败
     */
    private Integer consumeStatus;

    /**
     * 消费错误信息
     */
    private String consumeErrorMsg;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    private LocalDateTime updateTime;
}

消息轨迹 Mapper 接口:

代码语言:javascript
复制
package com.jam.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.entity.MessageTrace;
import org.apache.ibatis.annotations.Param;

/**
 * 消息轨迹Mapper
 *
 * @author 果酱
 */
public interface MessageTraceMapper extends BaseMapper<MessageTrace> {
    /**
     * 根据消息ID查询消息轨迹
     *
     * @param messageId 消息ID
     * @return 消息轨迹信息
     */
    MessageTrace selectByMessageId(@Param("messageId") String messageId);
}

消息轨迹 Mapper XML 文件(resources/mapper/MessageTraceMapper.xml):

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.mapper.MessageTraceMapper">
    <select id="selectByMessageId" parameterType="java.lang.String" resultType="com.jam.entity.MessageTrace">
        SELECT * FROM t_message_trace WHERE message_id = #{messageId}
    </select>
</mapper>

创建消息轨迹表的 SQL:

代码语言:javascript
复制
CREATE TABLE `t_message_trace` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `message_id` varchar(64) NOT NULL COMMENT '消息ID',
  `exchange` varchar(128) NOT NULL COMMENT '交换机',
  `routing_key` varchar(128) DEFAULT NULL COMMENT '路由键',
  `business_type` varchar(64) DEFAULT NULL COMMENT '业务类型',
  `content` text COMMENT '消息内容',
  `send_time` datetime DEFAULT NULL COMMENT '发送时间',
  `send_status` tinyint DEFAULT NULL COMMENT '发送状态:0-待发送,1-发送成功,2-发送失败',
  `send_error_msg` text COMMENT '发送错误信息',
  `consume_time` datetime DEFAULT NULL COMMENT '消费时间',
  `consume_status` tinyint DEFAULT NULL COMMENT '消费状态:0-待消费,1-消费成功,2-消费失败',
  `consume_error_msg` text COMMENT '消费错误信息',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_message_id` (`message_id`),
  KEY `idx_exchange` (`exchange`),
  KEY `idx_business_type` (`business_type`),
  KEY `idx_send_status` (`send_status`),
  KEY `idx_consume_status` (`consume_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息轨迹表';
代码语言:javascript
复制

3.11 测试消息发送与消费

启动应用程序后,可以通过以下方式测试消息发送与消费:

使用 Swagger UI 测试:访问http://localhost:8081/swagger-ui.html,通过界面调用消息发送接口

  1. 使用 curl 命令测试:
代码语言:javascript
复制
# 发送直接消息
curl -X POST "http://localhost:8081/api/message/direct?routingKey=direct.key1&content=Hello Direct&businessType=TEST"

# 发送主题消息
curl -X POST "http://localhost:8081/api/message/topic?routingKey=topic.key.test&content=Hello Topic&businessType=TEST"

# 发送扇形消息
curl -X POST "http://localhost:8081/api/message/fanout?content=Hello Fanout&businessType=TEST"

# 发送头消息
curl -X POST "http://localhost:8081/api/message/headers?content=Hello Headers&businessType=TEST"

# 发送延迟消息(延迟5秒)
curl -X POST "http://localhost:8081/api/message/delay?content=Hello Delay&businessType=TEST&delayMillis=5000"

发送消息后,可以在控制台看到生产者和消费者的日志输出,证明消息已经成功发送和消费。

四、RabbitMQ 高级特性

4.1 消息确认机制

RabbitMQ 提供了完善的消息确认机制,确保消息的可靠传递。消息确认包括两个方向:

  1. 生产者确认确保消息成功发送到交换机
  2. 消费者确认确保消息被成功消费

我们在前面的代码中已经实现了这两种确认机制:

  • 生产者确认:通过setConfirmCallbacksetReturnsCallback实现
  • 消费者确认:通过channel.basicAckchannel.basicNack手动确认

消息确认流程:

根据 Spring AMQP 官方文档(https://docs.spring.io/spring-amqp/reference/),推荐在生产环境中开启手动确认模式,以确保消息不会被意外丢失。

4.2 死信队列

死信队列(Dead Letter Queue)用于存储无法被正常消费的消息,这些消息通常被称为死信(Dead Letter)。消息成为死信的原因包括:

  1. 消息被拒绝(basicReject 或 basicNack)并且 requeue 参数为 false
  2. 消息过期
  3. 队列达到最大长度,新消息无法入队

死信队列的工作流程:

在前面的配置中,我们已经实现了死信队列的功能:

  1. 创建了死信交换机(DEAD_LETTER_EXCHANGE)和死信队列(DEAD_LETTER_QUEUE)
  2. 为延迟队列(DELAY_QUEUE)设置了死信交换机和死信路由键
  3. 创建了死信消费者,专门处理死信消息

4.3 延迟队列

延迟队列用于存储需要在指定时间后才被消费的消息。RabbitMQ 本身不直接支持延迟队列,但可以通过以下两种方式实现:

  1. 使用消息过期时间(TTL)+ 死信队列
  2. 使用 rabbitmq-delayed-message-exchange 插件

我们在前面的代码中实现了第一种方式,下面介绍第二种方式的实现:

  1. 安装延迟消息交换机插件:
代码语言:javascript
复制
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

# 复制到插件目录
sudo cp rabbitmq_delayed_message_exchange-3.13.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.13.0/plugins/

# 启用插件
sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 创建延迟交换机配置:
代码语言:javascript
复制
/**
 * 延迟交换机名称
 */
public static final String DELAYED_EXCHANGE = "delayed_exchange";

/**
 * 延迟队列名称
 */
public static final String DELAYED_QUEUE = "delayed_queue";

/**
 * 创建延迟交换机
 *
 * @return 延迟交换机
 */
@Bean
public CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    // 类型为x-delayed-message
    return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}

/**
 * 创建延迟队列
 *
 * @return 延迟队列
 */
@Bean
public Queue delayedQueue() {
    return QueueBuilder.durable(DELAYED_QUEUE)
            .build();
}

/**
 * 绑定延迟队列到延迟交换机
 *
 * @return 绑定关系
 */
@Bean
public Binding delayedBinding() {
    return BindingBuilder.bind(delayedQueue())
            .to(delayedExchange())
            .with("delayed.key")
            .noargs();
}
  1. 发送延迟消息:
代码语言:javascript
复制
/**
 * 发送延迟消息(使用延迟交换机插件)
 *
 * @param content 消息内容
 * @param businessType 业务类型
 * @param extra 额外信息
 * @param delayMillis 延迟时间,单位毫秒
 */
public void sendDelayedMessageWithPlugin(String content, String businessType, String extra, long delayMillis) {
    // 参数校验
    StringUtils.hasText(content, "消息内容不能为空");
    if (delayMillis <= 0) {
        throw new IllegalArgumentException("延迟时间必须大于0");
    }

    // 创建消息实体
    MessageEntity messageEntity = createMessageEntity(content, businessType, extra);

    // 创建关联数据,用于确认回调
    CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());

    // 设置消息延迟属性
    MessagePostProcessor messagePostProcessor = message -> {
        // 设置延迟时间
        message.getMessageProperties().setHeader("x-delay", delayMillis);
        return message;
    };

    log.info("发送延迟消息(插件),交换机:{},延迟时间:{}ms,消息ID:{}",
            RabbitMQConfig.DELAYED_EXCHANGE, delayMillis, messageEntity.getMessageId());

    // 发送消息到延迟交换机
    rabbitTemplate.convertAndSend(
            RabbitMQConfig.DELAYED_EXCHANGE,
            "delayed.key",
            messageEntity,
            messagePostProcessor,
            correlationData);
}
  1. 创建延迟消息消费者:
代码语言:javascript
复制
/**
 * 消费延迟队列(插件)的消息
 *
 * @param message 消息内容
 * @param channel 消息通道
 * @param deliveryTag 消息投递标签
 */
@RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE)
public void consumeDelayedQueue(MessageEntity message, Channel channel,
                               @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
    log.info("接收到延迟队列(插件)的消息,消息ID:{},内容:{},业务类型:{}",
            message.getMessageId(), message.getContent(), message.getBusinessType());

    try {
        // 处理消息的业务逻辑
        processMessage(message);

        // 记录消费成功轨迹
        messageTraceService.recordConsumeSuccess(message);

        // 手动确认消息
        channel.basicAck(deliveryTag, false);
        log.info("延迟队列(插件)消息处理成功并确认,消息ID:{}", message.getMessageId());
    } catch (Exception e) {
        // 记录消费失败轨迹
        messageTraceService.recordConsumeFailure(message, e.getMessage());

        log.error("延迟队列(插件)消息处理失败,消息ID:{}", message.getMessageId(), e);
        channel.basicNack(deliveryTag, false, false);
    }
}

两种延迟队列实现方式的对比:

实现方式

优点

缺点

TTL + 死信队列

无需安装插件,原生支持

时间精度不高,队列级别的 TTL 设置会影响所有消息

延迟交换机插件

时间精度高,支持消息级别的延迟设置

需要安装插件,增加了维护成本

根据 RabbitMQ 官方文档,推荐在生产环境中使用延迟交换机插件的方式,因为它提供了更精确的延迟控制。

4.4 消息幂等性

在分布式系统中,消息重复消费是不可避免的问题,因此需要保证消息消费的幂等性。常用的实现方式有:

1.基于数据库唯一索引

代码语言:javascript
复制
/**
 * 处理消息(幂等性保证)
 *
 * @param message 消息实体
 */
@Transactional(rollbackFor = Exception.class)
public void processMessageWithIdempotency(MessageEntity message) {
    String messageId = message.getMessageId();
    String businessType = message.getBusinessType();

    // 检查消息是否已经处理过
    MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
    if (trace != null && trace.getConsumeStatus() == 1) {
        log.info("消息已经处理过,消息ID:{}", messageId);
        return;
    }

    // 根据业务类型处理不同的消息
    if ("ORDER_CREATE".equals(businessType)) {
        // 处理订单创建消息,使用订单号作为唯一键
        String orderNo = message.getExtra();
        // 检查订单是否已经处理
        Order order = orderMapper.selectByOrderNo(orderNo);
        if (order != null) {
            log.info("订单已经处理过,订单号:{}", orderNo);
            return;
        }
        // 处理订单业务逻辑
        // ...
    } else if ("USER_REGISTER".equals(businessType)) {
        // 处理用户注册消息,使用用户ID作为唯一键
        // ...
    }
}
  1. 基于 Redis 的分布式锁
代码语言:javascript
复制
/**
 * 使用Redis分布式锁保证幂等性
 *
 * @param message 消息实体
 */
public void processMessageWithRedisLock(MessageEntity message) {
    String messageId = message.getMessageId();
    String lockKey = "message:process:" + messageId;

    // 获取分布式锁,设置5分钟过期时间
    Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES);

    if (Boolean.TRUE.equals(locked)) {
        try {
            // 检查消息是否已经处理过
            MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
            if (trace != null && trace.getConsumeStatus() == 1) {
                log.info("消息已经处理过,消息ID:{}", messageId);
                return;
            }

            // 处理消息业务逻辑
            processMessage(message);

        } finally {
            // 释放锁
            redisTemplate.delete(lockKey);
        }
    } else {
        log.info("消息正在处理中,消息ID:{}", messageId);
    }
}
  1. 基于状态机

对于有明确状态流转的业务,可以通过状态机来保证幂等性,例如订单状态从 "待支付" 到 "已支付" 的转换。

五、RabbitMQ 性能调优

为了让 RabbitMQ 在生产环境中发挥最佳性能,我们需要进行合理的调优。以下是一些关键的调优方向:

5.1 服务器调优

内存配置: 根据服务器内存大小合理配置 RabbitMQ 的内存限制

代码语言:javascript
复制
# 设置内存限制为4GB
rabbitmqctl set_vm_memory_high_watermark 4GB
# 或者设置为物理内存的50%
rabbitmqctl set_vm_memory_high_watermark 0.5

磁盘配置

代码语言:javascript
复制

  • 使用 SSD 硬盘提高 IO 性能
  • 确保有足够的磁盘空间,避免磁盘满导致服务异常
  • 配置磁盘同步策略
代码语言:javascript
复制
# 设置磁盘同步策略为所有队列
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

3.网络配置

  • 调整 TCP 缓冲区大小

代码语言:javascript
复制
# 在rabbitmq.conf中设置
tcp_listeners.tcp.default.backlog = 1024
tcp_listeners.tcp.default.buffer_size = 131072

5.2 生产者调优

批量发送: 对于大量小消息,采用批量发送可以显著提高吞吐量

代码语言:javascript
复制
/**
 * 批量发送消息
 */
public void sendBatchMessages(String exchange, String routingKey, List<MessageEntity> messages) {
    StringUtils.hasText(exchange, "交换机不能为空");
    StringUtils.hasText(routingKey, "路由键不能为空");
    if (CollectionUtils.isEmpty(messages)) {
        throw new IllegalArgumentException("消息列表不能为空");
    }

    log.info("批量发送消息,交换机:{},路由键:{},消息数量:{}", 
            exchange, routingKey, messages.size());

    // 创建批量消息
    List<Message> amqpMessages = new ArrayList<>(messages.size());
    List<CorrelationData> correlationDataList = new ArrayList<>(messages.size());

    for (MessageEntity message : messages) {
        // 确保消息ID唯一
        if (StringUtils.isBlank(message.getMessageId())) {
            message.setMessageId(UUID.randomUUID().toString());
        }
        if (message.getCreateTime() == null) {
            message.setCreateTime(LocalDateTime.now());
        }

        // 记录消息发送前的轨迹
        messageTraceService.recordBeforeSend(message, exchange, routingKey);

        // 创建AMQP消息
        Message amqpMessage = MessageBuilder
                .withBody(new ObjectMapper().writeValueAsBytes(message))
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setMessageId(message.getMessageId())
                .build();

        amqpMessages.add(amqpMessage);
        correlationDataList.add(new CorrelationData(message.getMessageId()));
    }

    // 批量发送消息
    rabbitTemplate.invoke(operations -> {
        for (int i = 0; i < amqpMessages.size(); i++) {
            operations.convertAndSend(
                    exchange,
                    routingKey,
                    amqpMessages.get(i),
                    correlationDataList.get(i));
        }
        return null;
    });

    log.info("批量消息发送完成,数量:{}", messages.size());
}

消息压缩: 对于大消息,启用压缩可以减少网络传输和存储开销

代码语言:javascript
复制
/**
 * 发送压缩消息
 */
public void sendCompressedMessage(String exchange, String routingKey, MessageEntity message) {
    StringUtils.hasText(exchange, "交换机不能为空");
    StringUtils.hasText(routingKey, "路由键不能为空");
    Objects.requireNonNull(message, "消息实体不能为空");

    try {
        // 序列化消息
        byte[] messageBytes = new ObjectMapper().writeValueAsBytes(message);

        // 压缩消息
        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOut = new GZIPOutputStream(byteOut)) {
            gzipOut.write(messageBytes);
        }
        byte[] compressedBytes = byteOut.toByteArray();

        log.info("消息压缩前大小:{}字节,压缩后大小:{}字节,压缩率:{}%",
                messageBytes.length,
                compressedBytes.length,
                (int) ((1 - (double) compressedBytes.length / messageBytes.length) * 100));

        // 记录消息发送前的轨迹
        messageTraceService.recordBeforeSend(message, exchange, routingKey);

        // 创建压缩消息
        Message amqpMessage = MessageBuilder
                .withBody(compressedBytes)
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setMessageId(message.getMessageId())
                .setHeader("x-compressed", true)
                .build();

        // 发送消息
        rabbitTemplate.convertAndSend(
                exchange,
                routingKey,
                amqpMessage,
                new CorrelationData(message.getMessageId()));

        log.info("压缩消息发送完成,消息ID:{}", message.getMessageId());
    } catch (Exception e) {
        log.error("发送压缩消息失败,消息ID:{}", message.getMessageId(), e);
        messageTraceService.recordSendFailure(message.getMessageId(), e.getMessage());
        throw new RuntimeException("发送压缩消息失败", e);
    }
}

异步发送: 使用异步发送可以提高生产者的吞吐量,避免阻塞主线程

5.3 消费者调优

消费线程池配置: 根据业务处理能力配置合理的消费线程池大小

代码语言:javascript
复制
spring:
  rabbitmq:
    listener:
      simple:
        # 并发消费者数量
        concurrency: 5
        # 最大并发消费者数量
        max-concurrency: 20
        # 每次从队列中拉取的消息数量
        prefetch: 10

批量消费: 开启批量消费可以提高消费效率

代码语言:javascript
复制
/**
 * 批量消费消息
 */
@RabbitListener(queues = RabbitMQConfig.BATCH_QUEUE)
public void batchConsume(@Payload List<MessageEntity> messages, 
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) List<Long> deliveryTags) throws IOException {
    log.info("接收到批量消息,数量:{}", messages.size());

    List<Long> successTags = new ArrayList<>();
    List<Long> failTags = new ArrayList<>();

    try {
        for (int i = 0; i < messages.size(); i++) {
            MessageEntity message = messages.get(i);
            long tag = deliveryTags.get(i);

            try {
                // 处理单条消息
                processMessage(message);
                // 记录消费成功轨迹
                messageTraceService.recordConsumeSuccess(message);
                successTags.add(tag);
            } catch (Exception e) {
                // 记录消费失败轨迹
                messageTraceService.recordConsumeFailure(message, e.getMessage());
                failTags.add(tag);
                log.error("批量消费中消息处理失败,消息ID:{}", message.getMessageId(), e);
            }
        }

        // 确认成功的消息
        if (!successTags.isEmpty()) {
            // 批量确认消息
            channel.basicAck(successTags.get(successTags.size() - 1), true);
        }

        // 拒绝失败的消息
        for (long tag : failTags) {
            channel.basicNack(tag, false, false);
        }

        log.info("批量消息处理完成,成功:{}条,失败:{}条", successTags.size(), failTags.size());
    } catch (Exception e) {
        log.error("批量消息处理异常", e);
        // 拒绝所有消息
        for (long tag : deliveryTags) {
            channel.basicNack(tag, false, false);
        }
    }
}

消息预取: 合理设置 prefetch 参数,控制消费者一次从队列中获取的消息数量,避免消费者过载

异步处理: 消费者接收到消息后,将消息放入线程池异步处理,快速确认消息,提高消费效率

5.4 队列和交换机调优

队列持久化: 确保队列持久化,避免服务重启后队列丢失

代码语言:javascript
复制
// 创建持久化队列
QueueBuilder.durable(QUEUE_NAME).build();

消息持久化: 对于重要消息,设置消息持久化,避免服务重启后消息丢失

代码语言:javascript
复制
Message message = MessageBuilder
        .withBody(body)
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
        .build();

合理设置队列长度: 为队列设置最大长度,避免消息无限堆积导致磁盘占满

代码语言:javascript
复制
// 创建有限长度的队列
QueueBuilder.durable(QUEUE_NAME)
        .maxLength(10000) // 最大消息数量
        .maxLengthBytes(104857600) // 最大字节数(100MB)
        .overflow(Overflow.REJECT_PUBLISHER) // 超出长度时拒绝生产者
        .build();

合理设置消息过期时间: 对于时效性强的消息,设置消息过期时间,避免无效消息堆积

代码语言:javascript
复制
// 创建有消息过期时间的队列(队列级别)
QueueBuilder.durable(QUEUE_NAME)
        .ttl(60000) // 消息过期时间,单位毫秒
        .build();

// 发送有过期时间的消息(消息级别)
MessagePostProcessor messagePostProcessor = message -> {
    message.getMessageProperties().setExpiration("60000"); // 60秒过期
    return message;
};

六、常见问题与解决方案

6.1 消息丢失问题

消息丢失可能发生在三个阶段:生产阶段、存储阶段和消费阶段。

生产阶段丢失

代码语言:javascript
复制

  • 解决方案:开启生产者确认机制,处理确认失败的情况
代码语言:javascript
复制
// 在配置中开启生产者确认
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

// 实现确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        log.error("消息发送失败,原因:{}", cause);
        // 重试发送或记录到数据库
    }
});

存储阶段丢失

代码语言:javascript
复制

  • 解决方案:队列和消息都设置为持久化
代码语言:javascript
复制
// 持久化队列
QueueBuilder.durable(QUEUE_NAME).build();

// 持久化消息
Message message = MessageBuilder
        .withBody(body)
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
        .build();

消费阶段丢失

代码语言:javascript
复制

  • 解决方案:使用手动确认模式,确保消息处理完成后再确认
代码语言:javascript
复制
// 在配置中开启手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

// 消息处理完成后手动确认
channel.basicAck(deliveryTag, false);

6.2 消息积压问题

消息积压通常是因为消费速度跟不上生产速度,解决方案如下:

优化消费逻辑

  • 减少单次消息处理时间
  • 异步处理非关键流程

增加消费者数量

  • 水平扩展消费者实例
  • 确保消费者数量不超过队列的分区数

临时扩容

  • 对于突发流量,可以临时启动更多的消费者实例

消息迁移

代码语言:javascript
复制

  • 将积压的消息迁移到新的队列,由专门的消费者处理
代码语言:javascript
复制
/**
 * 迁移积压消息
 */
@Scheduled(fixedRate = 60000)
public void migrateBacklogMessages() {
    String sourceQueue = "source_queue";
    String targetQueue = "backlog_queue";

    // 检查队列消息数量
    QueueInfo sourceQueueInfo = rabbitAdmin.getQueueInfo(sourceQueue);
    if (sourceQueueInfo == null || sourceQueueInfo.getMessageCount() <= 1000) {
        return; // 消息数量不多,不需要迁移
    }

    log.info("开始迁移积压消息,源队列:{},消息数量:{}", 
            sourceQueue, sourceQueueInfo.getMessageCount());

    // 迁移消息
    int batchSize = 100;
    while (true) {
        List<Message> messages = rabbitTemplate.receive(sourceQueue, batchSize);
        if (CollectionUtils.isEmpty(messages)) {
            break;
        }

        // 发送消息到目标队列
        for (Message message : messages) {
            rabbitTemplate.send(targetQueue, message);
        }

        log.info("已迁移消息:{}条", messages.size());

        // 避免过度消耗资源
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }

    log.info("积压消息迁移完成");
}

监控告警

代码语言:javascript
复制

  • 配置消息积压监控和告警,及时发现问题
代码语言:javascript
复制
/**
 * 消息积压监控
 */
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void monitorMessageBacklog() {
    // 监控的队列列表
    List<String> monitorQueues = Arrays.asList(
        RabbitMQConfig.DIRECT_QUEUE_1,
        RabbitMQConfig.DIRECT_QUEUE_2,
        RabbitMQConfig.TOPIC_QUEUE_1,
        RabbitMQConfig.TOPIC_QUEUE_2
    );

    for (String queue : monitorQueues) {
        QueueInfo queueInfo = rabbitAdmin.getQueueInfo(queue);
        if (queueInfo != null) {
            long messageCount = queueInfo.getMessageCount();
            log.info("队列:{},消息数量:{}", queue, messageCount);

            // 如果消息数量超过阈值,发送告警
            if (messageCount > 10000) {
                log.warn("队列消息积压严重,队列:{},消息数量:{}", queue, messageCount);
                // 发送告警通知(邮件、短信等)
                alertService.sendAlert("消息积压告警", "队列:" + queue + ",消息数量:" + messageCount);
            }
        }
    }
}

6.3 消息顺序性问题

RabbitMQ 中,单个队列的消息是有序的,但多个消费者同时消费同一个队列时,可能会导致消息乱序。解决方案如下:

单消费者

  • 只使用一个消费者消费队列,保证消息顺序性,但会降低吞吐量

分区队列

代码语言:javascript
复制

  • 将一个队列拆分为多个分区队列,每个分区队列由一个消费者处理
  • 确保相同业务 ID 的消息发送到同一个分区队列
代码语言:javascript
复制
/**
 * 发送有序消息到分区队列
 */
public void sendOrderedMessage(String baseQueueName, String businessId, MessageEntity message) {
    StringUtils.hasText(baseQueueName, "基础队列名称不能为空");
    StringUtils.hasText(businessId, "业务ID不能为空");
    Objects.requireNonNull(message, "消息实体不能为空");

    // 队列分区数量
    int partitionCount = 5;

    // 根据业务ID计算分区索引
    int partitionIndex = Math.abs(businessId.hashCode() % partitionCount);

    // 分区队列名称
    String partitionQueue = baseQueueName + "_" + partitionIndex;

    // 路由键(与分区队列名称相同)
    String routingKey = partitionQueue;

    log.info("发送有序消息,业务ID:{},分区队列:{},消息ID:{}",
            businessId, partitionQueue, message.getMessageId());

    // 发送消息
    rabbitTemplate.convertAndSend(
            RabbitMQConfig.ORDERED_EXCHANGE,
            routingKey,
            message,
            new CorrelationData(message.getMessageId()));
}
  1. 使用顺序消息交换机
    • 实现自定义交换机,确保消息按顺序路由到队列

七、总结

本文详细介绍了 SpringBoot 集成 RabbitMQ 的全过程,从基础概念到高级特性,从代码实现到性能调优,涵盖了实际开发中可能遇到的各种场景。

RabbitMQ 作为一款优秀的消息中间件,以其灵活性和可靠性在企业级应用中得到广泛应用。合理使用 RabbitMQ 可以帮助我们构建松耦合、高可用的分布式系统。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-09-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么 RabbitMQ 是消息中间件的优选?
  • 一、RabbitMQ 核心概念与架构
    • 1.1 核心概念解析
    • 1.2 交换机类型
    • 1.3 架构原理
  • 二、环境搭建
    • 2.1 安装 RabbitMQ
    • 2.2 安装 Docker 方式(推荐)
  • 三、SpringBoot 集成 RabbitMQ 基础
    • 3.1 创建项目并添加依赖
    • 3.2 配置 RabbitMQ
    • 3.3 创建 RabbitMQ 配置类
    • 3.4 创建消息实体类
    • 3.5 创建消息生产者
    • 3.6 创建消息消费者
    • 3.7 创建消息轨迹服务
    • 3.8 创建控制器
    • 3.9 创建启动类
    • 3.10 创建消息轨迹相关实体和数据库表
    • 3.11 测试消息发送与消费
  • 四、RabbitMQ 高级特性
    • 4.1 消息确认机制
    • 4.2 死信队列
    • 4.3 延迟队列
    • 4.4 消息幂等性
  • 五、RabbitMQ 性能调优
    • 5.1 服务器调优
    • 5.2 生产者调优
    • 5.3 消费者调优
    • 5.4 队列和交换机调优
  • 六、常见问题与解决方案
    • 6.1 消息丢失问题
    • 6.2 消息积压问题
    • 6.3 消息顺序性问题
  • 七、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档