
在分布式系统架构中,消息中间件扮演着 "交通枢纽" 的角色,负责协调各个服务之间的通信。目前主流的消息中间件有 RabbitMQ、Kafka 和 RocketMQ,它们各具特色:
根据 RabbitMQ 官方数据,它在全球财富 500 强公司中被广泛采用,能轻松处理每秒数万条消息,且提供了近乎完美的消息可靠性保证。其独特的交换机模型和灵活的路由规则,使其成为业务复杂多变场景的理想选择。
本文将带你从零开始,全面掌握 SpringBoot 与 RabbitMQ 的整合方案,从基础配置到高级特性,从代码实现到性能调优,让你既能理解底层原理,又能解决实际开发中的各种问题。
RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议实现,核心概念包括:
RabbitMQ 提供了四种主要的交换机类型,适用于不同的路由场景:
*匹配一个单词,#匹配多个单词)RabbitMQ 的整体架构如图所示:

消息流转流程:
根据 RabbitMQ 官方文档(https://www.rabbitmq.com/documentation.html),这种架构设计使得 RabbitMQ 具有极高的灵活性,可以通过不同的交换机和绑定组合,实现复杂的消息路由策略。
我们采用最新稳定版 RabbitMQ 3.13.0 进行安装,步骤如下:
# 对于Ubuntu/Debian
sudo apt-get update
sudo apt-get install erlang
# 对于CentOS/RHEL
sudo yum install erlang# 对于Ubuntu/Debian
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server_3.13.0-1_all.deb
sudo dpkg -i rabbitmq-server_3.13.0-1_all.deb
# 对于CentOS/RHEL
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm
sudo rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpmsudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-serversudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"使用 Docker 安装 RabbitMQ 更加简单快捷:
# 拉取镜像
docker pull rabbitmq:3.13.0-management
# 启动容器
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3.13.0-management
我们使用 SpringBoot 3.2.0(最新稳定版)来创建项目,首先在 pom.xml 中添加必要的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.jam</groupId>
<artifactId>springboot-rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-demo</name>
<description>SpringBoot集成RabbitMQ示例项目</description>
<properties>
<java.version>17</java.version>
<lombok.version>1.18.30</lombok.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<mysql-connector.version>8.2.0</mysql-connector.version>
<springdoc.version>2.1.0</springdoc.version>
</properties>
<dependencies>
<!-- SpringBoot核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RabbitMQ依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql-connector.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
在 application.yml 中添加 RabbitMQ 的配置:
spring:
application:
name: springboot-rabbitmq-demo
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/rabbitmq_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
rabbitmq:
host: localhost
port: 5672
username: admin
password: password
virtual-host: /
# 连接超时时间,单位毫秒
connection-timeout: 10000
# 生产者配置
publisher-confirm-type: correlated # 开启发布确认机制
publisher-returns: true # 开启发布返回机制
# 消费者配置
listener:
simple:
# 并发消费者数量
concurrency: 5
# 最大并发消费者数量
max-concurrency: 10
# 每次从队列中拉取的消息数量
prefetch: 1
# 消息确认模式:manual-手动确认,auto-自动确认
acknowledge-mode: manual
# 消费失败时是否重试
retry:
enabled: true
# 初始重试间隔时间
initial-interval: 1000
# 重试最大间隔时间
max-interval: 10000
# 重试乘数
multiplier: 2
# 最大重试次数
max-attempts: 3
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.jam.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
server:
port: 8081
创建配置类,定义交换机、队列和绑定关系:
package com.jam.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置类
* 定义交换机、队列和绑定关系
*
* @author 果酱
*/
@Configuration
public class RabbitMQConfig {
/**
* 直接交换机名称
*/
public static final String DIRECT_EXCHANGE = "direct_exchange";
/**
* 主题交换机名称
*/
public static final String TOPIC_EXCHANGE = "topic_exchange";
/**
* 扇形交换机名称
*/
public static final String FANOUT_EXCHANGE = "fanout_exchange";
/**
* 头交换机名称
*/
public static final String HEADERS_EXCHANGE = "headers_exchange";
/**
* 直接队列1名称
*/
public static final String DIRECT_QUEUE_1 = "direct_queue_1";
/**
* 直接队列2名称
*/
public static final String DIRECT_QUEUE_2 = "direct_queue_2";
/**
* 主题队列1名称
*/
public static final String TOPIC_QUEUE_1 = "topic_queue_1";
/**
* 主题队列2名称
*/
public static final String TOPIC_QUEUE_2 = "topic_queue_2";
/**
* 扇形队列1名称
*/
public static final String FANOUT_QUEUE_1 = "fanout_queue_1";
/**
* 扇形队列2名称
*/
public static final String FANOUT_QUEUE_2 = "fanout_queue_2";
/**
* 头队列名称
*/
public static final String HEADERS_QUEUE = "headers_queue";
/**
* 死信交换机名称
*/
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
/**
* 死信队列名称
*/
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
/**
* 延迟队列名称
*/
public static final String DELAY_QUEUE = "delay_queue";
// ==================== 交换机 ====================
/**
* 创建直接交换机
*
* @return 直接交换机
*/
@Bean
public DirectExchange directExchange() {
// durable: 是否持久化
// autoDelete: 是否自动删除(当没有绑定关系时)
// arguments: 交换机的其他属性
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
/**
* 创建主题交换机
*
* @return 主题交换机
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
/**
* 创建扇形交换机
*
* @return 扇形交换机
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
/**
* 创建头交换机
*
* @return 头交换机
*/
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADERS_EXCHANGE, true, false);
}
/**
* 创建死信交换机
*
* @return 死信交换机
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}
// ==================== 队列 ====================
/**
* 创建直接队列1
*
* @return 直接队列1
*/
@Bean
public Queue directQueue1() {
// durable: 是否持久化
// exclusive: 是否排他(仅当前连接可见,连接关闭后删除)
// autoDelete: 是否自动删除(当没有消费者时)
// arguments: 队列的其他属性
return QueueBuilder.durable(DIRECT_QUEUE_1)
.build();
}
/**
* 创建直接队列2
*
* @return 直接队列2
*/
@Bean
public Queue directQueue2() {
return QueueBuilder.durable(DIRECT_QUEUE_2)
.build();
}
/**
* 创建主题队列1
*
* @return 主题队列1
*/
@Bean
public Queue topicQueue1() {
return QueueBuilder.durable(TOPIC_QUEUE_1)
.build();
}
/**
* 创建主题队列2
*
* @return 主题队列2
*/
@Bean
public Queue topicQueue2() {
return QueueBuilder.durable(TOPIC_QUEUE_2)
.build();
}
/**
* 创建扇形队列1
*
* @return 扇形队列1
*/
@Bean
public Queue fanoutQueue1() {
return QueueBuilder.durable(FANOUT_QUEUE_1)
.build();
}
/**
* 创建扇形队列2
*
* @return 扇形队列2
*/
@Bean
public Queue fanoutQueue2() {
return QueueBuilder.durable(FANOUT_QUEUE_2)
.build();
}
/**
* 创建头队列
*
* @return 头队列
*/
@Bean
public Queue headersQueue() {
return QueueBuilder.durable(HEADERS_QUEUE)
.build();
}
/**
* 创建死信队列
*
* @return 死信队列
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE)
.build();
}
/**
* 创建延迟队列
* 设置死信交换机和死信路由键
*
* @return 延迟队列
*/
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE)
// 设置死信交换机
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
// 设置死信路由键
.withArgument("x-dead-letter-routing-key", "dead.letter.key")
.build();
}
// ==================== 绑定 ====================
/**
* 绑定直接队列1到直接交换机
*
* @return 绑定关系
*/
@Bean
public Binding directBinding1() {
// 将directQueue1绑定到directExchange,路由键为"direct.key1"
return BindingBuilder.bind(directQueue1())
.to(directExchange())
.with("direct.key1");
}
/**
* 绑定直接队列2到直接交换机
*
* @return 绑定关系
*/
@Bean
public Binding directBinding2() {
// 将directQueue2绑定到directExchange,路由键为"direct.key2"
return BindingBuilder.bind(directQueue2())
.to(directExchange())
.with("direct.key2");
}
/**
* 绑定主题队列1到主题交换机
*
* @return 绑定关系
*/
@Bean
public Binding topicBinding1() {
// 将topicQueue1绑定到topicExchange,路由键模式为"topic.key.*"
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with("topic.key.*");
}
/**
* 绑定主题队列2到主题交换机
*
* @return 绑定关系
*/
@Bean
public Binding topicBinding2() {
// 将topicQueue2绑定到topicExchange,路由键模式为"topic.#"
return BindingBuilder.bind(topicQueue2())
.to(topicExchange())
.with("topic.#");
}
/**
* 绑定扇形队列1到扇形交换机
*
* @return 绑定关系
*/
@Bean
public Binding fanoutBinding1() {
// 扇形交换机忽略路由键,只需绑定即可
return BindingBuilder.bind(fanoutQueue1())
.to(fanoutExchange());
}
/**
* 绑定扇形队列2到扇形交换机
*
* @return 绑定关系
*/
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2())
.to(fanoutExchange());
}
/**
* 绑定头队列到头交换机
*
* @return 绑定关系
*/
@Bean
public Binding headersBinding() {
// 头交换机根据消息头信息进行路由
// 这里设置需要匹配的头信息:type=message和priority=high
// whereAll()表示所有头信息都需要匹配
// whereAny()表示任何一个头信息匹配即可
return BindingBuilder.bind(headersQueue())
.to(headersExchange())
.where("type").matches("message")
.and("priority").matches("high");
}
/**
* 绑定死信队列到死信交换机
*
* @return 绑定关系
*/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead.letter.key");
}
}
创建一个通用的消息实体类,用于封装发送的消息内容:
package com.jam.entity;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 消息实体类
* 用于封装发送到RabbitMQ的消息内容
*
* @author 果酱
*/
@Data
public class MessageEntity implements Serializable {
/**
* 消息ID
*/
private String messageId;
/**
* 消息内容
*/
private String content;
/**
* 业务类型
*/
private String businessType;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 扩展字段,用于存储额外信息
*/
private String extra;
}
使用 Spring AMQP 提供的 RabbitTemplate 来发送消息,创建一个消息生产者服务:
package com.jam.service;
import com.jam.config.RabbitMQConfig;
import com.jam.entity.MessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 消息生产者服务
* 负责向RabbitMQ发送各种类型的消息
*
* @author 果酱
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducerService {
/**
* RabbitMQ模板类,提供发送消息的各种方法
*/
private final RabbitTemplate rabbitTemplate;
/**
* 初始化RabbitTemplate的回调函数
*/
public void initRabbitTemplate() {
// 设置确认回调:确认消息是否到达交换机
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String messageId = correlationData != null ? correlationData.getId() : "unknown";
if (ack) {
log.info("消息已到达交换机,消息ID:{}", messageId);
} else {
log.error("消息未到达交换机,消息ID:{},原因:{}", messageId, cause);
// 消息发送失败,可以在这里进行重试或记录到数据库
}
});
// 设置返回回调:当消息到达交换机但无法路由到队列时触发
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.error("消息无法路由到队列,消息ID:{},交换机:{},路由键:{},回复码:{},回复文本:{}",
returnedMessage.getMessage().getMessageProperties().getMessageId(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText());
});
}
/**
* 发送直接交换机消息
*
* @param routingKey 路由键
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
*/
public void sendDirectMessage(String routingKey, String content,
String businessType, String extra) {
// 参数校验
StringUtils.hasText(routingKey, "路由键不能为空");
StringUtils.hasText(content, "消息内容不能为空");
// 创建消息实体
MessageEntity messageEntity = createMessageEntity(content, businessType, extra);
// 创建关联数据,用于确认回调
CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());
log.info("发送直接消息,交换机:{},路由键:{},消息ID:{}",
RabbitMQConfig.DIRECT_EXCHANGE, routingKey, messageEntity.getMessageId());
// 发送消息
rabbitTemplate.convertAndSend(
RabbitMQConfig.DIRECT_EXCHANGE,
routingKey,
messageEntity,
correlationData);
}
/**
* 发送主题交换机消息
*
* @param routingKey 路由键
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
*/
public void sendTopicMessage(String routingKey, String content,
String businessType, String extra) {
// 参数校验
StringUtils.hasText(routingKey, "路由键不能为空");
StringUtils.hasText(content, "消息内容不能为空");
// 创建消息实体
MessageEntity messageEntity = createMessageEntity(content, businessType, extra);
// 创建关联数据,用于确认回调
CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());
log.info("发送主题消息,交换机:{},路由键:{},消息ID:{}",
RabbitMQConfig.TOPIC_EXCHANGE, routingKey, messageEntity.getMessageId());
// 发送消息
rabbitTemplate.convertAndSend(
RabbitMQConfig.TOPIC_EXCHANGE,
routingKey,
messageEntity,
correlationData);
}
/**
* 发送扇形交换机消息
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
*/
public void sendFanoutMessage(String content, String businessType, String extra) {
// 参数校验
StringUtils.hasText(content, "消息内容不能为空");
// 创建消息实体
MessageEntity messageEntity = createMessageEntity(content, businessType, extra);
// 创建关联数据,用于确认回调
CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());
log.info("发送扇形消息,交换机:{},消息ID:{}",
RabbitMQConfig.FANOUT_EXCHANGE, messageEntity.getMessageId());
// 扇形交换机忽略路由键,这里可以传null
rabbitTemplate.convertAndSend(
RabbitMQConfig.FANOUT_EXCHANGE,
null,
messageEntity,
correlationData);
}
/**
* 发送头交换机消息
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
*/
public void sendHeadersMessage(String content, String businessType, String extra) {
// 参数校验
StringUtils.hasText(content, "消息内容不能为空");
// 创建消息实体
MessageEntity messageEntity = createMessageEntity(content, businessType, extra);
// 创建关联数据,用于确认回调
CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());
// 设置消息头信息,用于头交换机路由
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setHeader("type", "message");
message.getMessageProperties().setHeader("priority", "high");
return message;
};
log.info("发送头消息,交换机:{},消息ID:{}",
RabbitMQConfig.HEADERS_EXCHANGE, messageEntity.getMessageId());
// 发送消息
rabbitTemplate.convertAndSend(
RabbitMQConfig.HEADERS_EXCHANGE,
null, // 头交换机忽略路由键
messageEntity,
messagePostProcessor,
correlationData);
}
/**
* 发送延迟消息
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @param delayMillis 延迟时间,单位毫秒
*/
public void sendDelayMessage(String content, String businessType, String extra, long delayMillis) {
// 参数校验
StringUtils.hasText(content, "消息内容不能为空");
if (delayMillis <= 0) {
throw new IllegalArgumentException("延迟时间必须大于0");
}
// 创建消息实体
MessageEntity messageEntity = createMessageEntity(content, businessType, extra);
// 创建关联数据,用于确认回调
CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());
// 设置消息延迟属性
MessagePostProcessor messagePostProcessor = message -> {
// 设置消息过期时间,即延迟时间
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
return message;
};
log.info("发送延迟消息,延迟队列:{},延迟时间:{}ms,消息ID:{}",
RabbitMQConfig.DELAY_QUEUE, delayMillis, messageEntity.getMessageId());
// 发送消息到延迟队列
rabbitTemplate.convertAndSend(
"", // 不指定交换机,直接发送到队列
RabbitMQConfig.DELAY_QUEUE,
messageEntity,
messagePostProcessor,
correlationData);
}
/**
* 创建消息实体
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @return 消息实体
*/
private MessageEntity createMessageEntity(String content, String businessType, String extra) {
MessageEntity messageEntity = new MessageEntity();
messageEntity.setMessageId(UUID.randomUUID().toString());
messageEntity.setContent(content);
messageEntity.setBusinessType(businessType);
messageEntity.setCreateTime(LocalDateTime.now());
messageEntity.setExtra(extra);
return messageEntity;
}
}
使用 @RabbitListener 注解来创建消息消费者:
package com.jam.service;
import com.jam.config.RabbitMQConfig;
import com.jam.entity.MessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import java.io.IOException;
/**
* 消息消费者服务
* 负责从RabbitMQ接收并处理消息
*
* @author 果酱
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageConsumerService {
/**
* 消息轨迹服务
*/
private final MessageTraceService messageTraceService;
/**
* 消费直接队列1的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_1)
public void consumeDirectQueue1(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到直接队列1的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("直接队列1消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("直接队列1消息处理失败,消息ID:{}", message.getMessageId(), e);
// 拒绝消息并将其丢弃(不重新入队)
// 如果需要重新入队,可以使用channel.basicNack(deliveryTag, false, true)
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 消费直接队列2的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_2)
public void consumeDirectQueue2(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到直接队列2的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("直接队列2消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("直接队列2消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 消费主题队列1的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_1)
public void consumeTopicQueue1(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到主题队列1的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("主题队列1消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("主题队列1消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 消费主题队列2的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_2)
public void consumeTopicQueue2(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到主题队列2的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("主题队列2消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("主题队列2消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 消费扇形队列1的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1)
public void consumeFanoutQueue1(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到扇形队列1的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("扇形队列1消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("扇形队列1消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 消费扇形队列2的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2)
public void consumeFanoutQueue2(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到扇形队列2的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("扇形队列2消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("扇形队列2消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 消费头队列的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.HEADERS_QUEUE)
public void consumeHeadersQueue(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到头队列的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("头队列消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("头队列消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 消费死信队列的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
public void consumeDeadLetterQueue(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.error("接收到死信队列的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理死信消息的业务逻辑,通常需要人工干预
processDeadLetterMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("死信队列消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("死信队列消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 处理消息的业务逻辑
*
* @param message 要处理的消息
*/
private void processMessage(MessageEntity message) {
// 根据业务类型处理不同的消息
String businessType = message.getBusinessType();
if ("ORDER_CREATE".equals(businessType)) {
// 处理订单创建消息
processOrderCreateMessage(message);
} else if ("USER_REGISTER".equals(businessType)) {
// 处理用户注册消息
processUserRegisterMessage(message);
} else {
// 处理未知类型消息
log.warn("收到未知类型的消息,消息ID:{},业务类型:{}",
message.getMessageId(), businessType);
}
}
/**
* 处理死信消息
*
* @param message 死信消息
*/
private void processDeadLetterMessage(MessageEntity message) {
log.info("处理死信消息,消息ID:{},内容:{}",
message.getMessageId(), message.getContent());
// 实际业务处理逻辑,如记录到数据库等待人工处理
}
/**
* 处理订单创建消息
*
* @param message 订单创建消息
*/
private void processOrderCreateMessage(MessageEntity message) {
log.info("处理订单创建消息,消息ID:{},订单信息:{}",
message.getMessageId(), message.getContent());
// 实际业务处理逻辑...
}
/**
* 处理用户注册消息
*
* @param message 用户注册消息
*/
private void processUserRegisterMessage(MessageEntity message) {
log.info("处理用户注册消息,消息ID:{},用户信息:{}",
message.getMessageId(), message.getContent());
// 实际业务处理逻辑...
}
}
为了跟踪消息的整个生命周期,创建消息轨迹服务:
package com.jam.service;
import com.jam.entity.MessageEntity;
import com.jam.entity.MessageTrace;
import com.jam.mapper.MessageTraceMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* 消息轨迹服务
* 记录消息的发送和消费轨迹
*
* @author 果酱
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageTraceService {
private final MessageTraceMapper messageTraceMapper;
/**
* 记录消息发送前的轨迹
*
* @param message 消息实体
* @param exchange 交换机
* @param routingKey 路由键
* @return 消息轨迹ID
*/
@Transactional(rollbackFor = Exception.class)
public Long recordBeforeSend(MessageEntity message, String exchange, String routingKey) {
Objects.requireNonNull(message, "消息实体不能为空");
StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
StringUtils.hasText(exchange, "交换机不能为空");
MessageTrace trace = new MessageTrace();
trace.setMessageId(message.getMessageId());
trace.setExchange(exchange);
trace.setRoutingKey(routingKey);
trace.setBusinessType(message.getBusinessType());
trace.setContent(message.getContent());
trace.setSendStatus(0); // 待发送
trace.setCreateTime(LocalDateTime.now());
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.insert(trace);
log.info("记录消息发送前轨迹,消息ID:{},轨迹ID:{}", message.getMessageId(), trace.getId());
return trace.getId();
}
/**
* 记录消息发送成功的轨迹
*
* @param messageId 消息ID
*/
@Transactional(rollbackFor = Exception.class)
public void recordSendSuccess(String messageId) {
StringUtils.hasText(messageId, "消息ID不能为空");
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace == null) {
log.warn("未找到消息轨迹,消息ID:{}", messageId);
return;
}
trace.setSendTime(LocalDateTime.now());
trace.setSendStatus(1); // 发送成功
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("记录消息发送成功轨迹,消息ID:{}", messageId);
}
/**
* 记录消息发送失败的轨迹
*
* @param messageId 消息ID
* @param errorMsg 错误信息
*/
@Transactional(rollbackFor = Exception.class)
public void recordSendFailure(String messageId, String errorMsg) {
StringUtils.hasText(messageId, "消息ID不能为空");
StringUtils.hasText(errorMsg, "错误信息不能为空");
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace == null) {
log.warn("未找到消息轨迹,消息ID:{}", messageId);
return;
}
trace.setSendTime(LocalDateTime.now());
trace.setSendStatus(2); // 发送失败
trace.setSendErrorMsg(errorMsg);
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("记录消息发送失败轨迹,消息ID:{}", messageId);
}
/**
* 记录消息消费成功的轨迹
*
* @param message 消息实体
*/
@Transactional(rollbackFor = Exception.class)
public void recordConsumeSuccess(MessageEntity message) {
Objects.requireNonNull(message, "消息实体不能为空");
StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
MessageTrace trace = messageTraceMapper.selectByMessageId(message.getMessageId());
if (trace == null) {
log.warn("未找到消息轨迹,消息ID:{}", message.getMessageId());
return;
}
trace.setConsumeTime(LocalDateTime.now());
trace.setConsumeStatus(1); // 消费成功
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("记录消息消费成功轨迹,消息ID:{}", message.getMessageId());
}
/**
* 记录消息消费失败的轨迹
*
* @param message 消息实体
* @param errorMsg 错误信息
*/
@Transactional(rollbackFor = Exception.class)
public void recordConsumeFailure(MessageEntity message, String errorMsg) {
Objects.requireNonNull(message, "消息实体不能为空");
StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
StringUtils.hasText(errorMsg, "错误信息不能为空");
MessageTrace trace = messageTraceMapper.selectByMessageId(message.getMessageId());
if (trace == null) {
log.warn("未找到消息轨迹,消息ID:{}", message.getMessageId());
return;
}
trace.setConsumeTime(LocalDateTime.now());
trace.setConsumeStatus(2); // 消费失败
trace.setConsumeErrorMsg(errorMsg);
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("记录消息消费失败轨迹,消息ID:{}", message.getMessageId());
}
}
创建一个控制器,用于测试消息发送功能:
package com.jam.controller;
import com.jam.service.MessageProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 消息测试控制器
* 提供API接口用于测试消息发送功能
*
* @author 果酱
*/
@Slf4j
@RestController
@RequestMapping("/api/message")
@RequiredArgsConstructor
@Tag(name = "消息测试接口", description = "用于测试RabbitMQ消息发送的API接口")
public class MessageController {
/**
* 消息生产者服务
*/
private final MessageProducerService messageProducerService;
/**
* 发送直接交换机消息
*
* @param routingKey 路由键
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @return 响应信息
*/
@PostMapping("/direct")
@Operation(summary = "发送直接消息", description = "发送到直接交换机的消息")
public ResponseEntity<String> sendDirectMessage(
@Parameter(description = "路由键", required = true)
@RequestParam String routingKey,
@Parameter(description = "消息内容", required = true)
@RequestParam String content,
@Parameter(description = "业务类型")
@RequestParam(required = false) String businessType,
@Parameter(description = "额外信息")
@RequestParam(required = false) String extra) {
log.info("接收到发送直接消息请求,路由键:{}", routingKey);
messageProducerService.sendDirectMessage(routingKey, content, businessType, extra);
return ResponseEntity.ok("直接消息发送成功");
}
/**
* 发送主题交换机消息
*
* @param routingKey 路由键
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @return 响应信息
*/
@PostMapping("/topic")
@Operation(summary = "发送主题消息", description = "发送到主题交换机的消息")
public ResponseEntity<String> sendTopicMessage(
@Parameter(description = "路由键", required = true)
@RequestParam String routingKey,
@Parameter(description = "消息内容", required = true)
@RequestParam String content,
@Parameter(description = "业务类型")
@RequestParam(required = false) String businessType,
@Parameter(description = "额外信息")
@RequestParam(required = false) String extra) {
log.info("接收到发送主题消息请求,路由键:{}", routingKey);
messageProducerService.sendTopicMessage(routingKey, content, businessType, extra);
return ResponseEntity.ok("主题消息发送成功");
}
/**
* 发送扇形交换机消息
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @return 响应信息
*/
@PostMapping("/fanout")
@Operation(summary = "发送扇形消息", description = "发送到扇形交换机的消息")
public ResponseEntity<String> sendFanoutMessage(
@Parameter(description = "消息内容", required = true)
@RequestParam String content,
@Parameter(description = "业务类型")
@RequestParam(required = false) String businessType,
@Parameter(description = "额外信息")
@RequestParam(required = false) String extra) {
log.info("接收到发送扇形消息请求");
messageProducerService.sendFanoutMessage(content, businessType, extra);
return ResponseEntity.ok("扇形消息发送成功");
}
/**
* 发送头交换机消息
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @return 响应信息
*/
@PostMapping("/headers")
@Operation(summary = "发送头消息", description = "发送到头交换机的消息")
public ResponseEntity<String> sendHeadersMessage(
@Parameter(description = "消息内容", required = true)
@RequestParam String content,
@Parameter(description = "业务类型")
@RequestParam(required = false) String businessType,
@Parameter(description = "额外信息")
@RequestParam(required = false) String extra) {
log.info("接收到发送头消息请求");
messageProducerService.sendHeadersMessage(content, businessType, extra);
return ResponseEntity.ok("头消息发送成功");
}
/**
* 发送延迟消息
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @param delayMillis 延迟时间,单位毫秒
* @return 响应信息
*/
@PostMapping("/delay")
@Operation(summary = "发送延迟消息", description = "发送延迟消息,指定时间后才会被消费")
public ResponseEntity<String> sendDelayMessage(
@Parameter(description = "消息内容", required = true)
@RequestParam String content,
@Parameter(description = "业务类型")
@RequestParam(required = false) String businessType,
@Parameter(description = "额外信息")
@RequestParam(required = false) String extra,
@Parameter(description = "延迟时间(毫秒)", required = true)
@RequestParam long delayMillis) {
log.info("接收到发送延迟消息请求,延迟时间:{}ms", delayMillis);
messageProducerService.sendDelayMessage(content, businessType, extra, delayMillis);
return ResponseEntity.ok("延迟消息发送成功");
}
}
package com.jam;
import com.jam.service.MessageProducerService;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
/**
* SpringBoot应用启动类
*
* @author 果酱
*/
@SpringBootApplication
@MapperScan("com.jam.mapper")
@OpenAPIDefinition(
info = @Info(
title = "SpringBoot集成RabbitMQ示例项目",
version = "1.0",
description = "SpringBoot集成RabbitMQ的示例项目,包含各种消息发送和消费的示例"
)
)
public class SpringbootRabbitmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqDemoApplication.class, args);
}
/**
* 初始化RabbitTemplate的回调函数
*/
@Bean
public CommandLineRunner initRabbitTemplate(MessageProducerService producerService) {
return args -> {
producerService.initRabbitTemplate();
};
}
}
消息轨迹实体类:
package com.jam.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息轨迹实体类
* 记录消息的发送和消费情况
*
* @author 果酱
*/
@Data
@TableName("t_message_trace")
public class MessageTrace {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 消息ID
*/
private String messageId;
/**
* 交换机
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 业务类型
*/
private String businessType;
/**
* 消息内容
*/
private String content;
/**
* 发送时间
*/
private LocalDateTime sendTime;
/**
* 发送状态:0-待发送,1-发送成功,2-发送失败
*/
private Integer sendStatus;
/**
* 发送错误信息
*/
private String sendErrorMsg;
/**
* 消费时间
*/
private LocalDateTime consumeTime;
/**
* 消费状态:0-待消费,1-消费成功,2-消费失败
*/
private Integer consumeStatus;
/**
* 消费错误信息
*/
private String consumeErrorMsg;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}消息轨迹 Mapper 接口:
package com.jam.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.entity.MessageTrace;
import org.apache.ibatis.annotations.Param;
/**
* 消息轨迹Mapper
*
* @author 果酱
*/
public interface MessageTraceMapper extends BaseMapper<MessageTrace> {
/**
* 根据消息ID查询消息轨迹
*
* @param messageId 消息ID
* @return 消息轨迹信息
*/
MessageTrace selectByMessageId(@Param("messageId") String messageId);
}消息轨迹 Mapper XML 文件(resources/mapper/MessageTraceMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.mapper.MessageTraceMapper">
<select id="selectByMessageId" parameterType="java.lang.String" resultType="com.jam.entity.MessageTrace">
SELECT * FROM t_message_trace WHERE message_id = #{messageId}
</select>
</mapper>创建消息轨迹表的 SQL:
CREATE TABLE `t_message_trace` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`message_id` varchar(64) NOT NULL COMMENT '消息ID',
`exchange` varchar(128) NOT NULL COMMENT '交换机',
`routing_key` varchar(128) DEFAULT NULL COMMENT '路由键',
`business_type` varchar(64) DEFAULT NULL COMMENT '业务类型',
`content` text COMMENT '消息内容',
`send_time` datetime DEFAULT NULL COMMENT '发送时间',
`send_status` tinyint DEFAULT NULL COMMENT '发送状态:0-待发送,1-发送成功,2-发送失败',
`send_error_msg` text COMMENT '发送错误信息',
`consume_time` datetime DEFAULT NULL COMMENT '消费时间',
`consume_status` tinyint DEFAULT NULL COMMENT '消费状态:0-待消费,1-消费成功,2-消费失败',
`consume_error_msg` text COMMENT '消费错误信息',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`),
KEY `idx_exchange` (`exchange`),
KEY `idx_business_type` (`business_type`),
KEY `idx_send_status` (`send_status`),
KEY `idx_consume_status` (`consume_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息轨迹表';
启动应用程序后,可以通过以下方式测试消息发送与消费:
使用 Swagger UI 测试:访问http://localhost:8081/swagger-ui.html,通过界面调用消息发送接口
# 发送直接消息
curl -X POST "http://localhost:8081/api/message/direct?routingKey=direct.key1&content=Hello Direct&businessType=TEST"
# 发送主题消息
curl -X POST "http://localhost:8081/api/message/topic?routingKey=topic.key.test&content=Hello Topic&businessType=TEST"
# 发送扇形消息
curl -X POST "http://localhost:8081/api/message/fanout?content=Hello Fanout&businessType=TEST"
# 发送头消息
curl -X POST "http://localhost:8081/api/message/headers?content=Hello Headers&businessType=TEST"
# 发送延迟消息(延迟5秒)
curl -X POST "http://localhost:8081/api/message/delay?content=Hello Delay&businessType=TEST&delayMillis=5000"发送消息后,可以在控制台看到生产者和消费者的日志输出,证明消息已经成功发送和消费。
RabbitMQ 提供了完善的消息确认机制,确保消息的可靠传递。消息确认包括两个方向:
我们在前面的代码中已经实现了这两种确认机制:
setConfirmCallback和setReturnsCallback实现channel.basicAck和channel.basicNack手动确认消息确认流程:

根据 Spring AMQP 官方文档(https://docs.spring.io/spring-amqp/reference/),推荐在生产环境中开启手动确认模式,以确保消息不会被意外丢失。
死信队列(Dead Letter Queue)用于存储无法被正常消费的消息,这些消息通常被称为死信(Dead Letter)。消息成为死信的原因包括:
死信队列的工作流程:

在前面的配置中,我们已经实现了死信队列的功能:
延迟队列用于存储需要在指定时间后才被消费的消息。RabbitMQ 本身不直接支持延迟队列,但可以通过以下两种方式实现:
我们在前面的代码中实现了第一种方式,下面介绍第二种方式的实现:
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
# 复制到插件目录
sudo cp rabbitmq_delayed_message_exchange-3.13.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.13.0/plugins/
# 启用插件
sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange/**
* 延迟交换机名称
*/
public static final String DELAYED_EXCHANGE = "delayed_exchange";
/**
* 延迟队列名称
*/
public static final String DELAYED_QUEUE = "delayed_queue";
/**
* 创建延迟交换机
*
* @return 延迟交换机
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
// 类型为x-delayed-message
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 创建延迟队列
*
* @return 延迟队列
*/
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE)
.build();
}
/**
* 绑定延迟队列到延迟交换机
*
* @return 绑定关系
*/
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue())
.to(delayedExchange())
.with("delayed.key")
.noargs();
}/**
* 发送延迟消息(使用延迟交换机插件)
*
* @param content 消息内容
* @param businessType 业务类型
* @param extra 额外信息
* @param delayMillis 延迟时间,单位毫秒
*/
public void sendDelayedMessageWithPlugin(String content, String businessType, String extra, long delayMillis) {
// 参数校验
StringUtils.hasText(content, "消息内容不能为空");
if (delayMillis <= 0) {
throw new IllegalArgumentException("延迟时间必须大于0");
}
// 创建消息实体
MessageEntity messageEntity = createMessageEntity(content, businessType, extra);
// 创建关联数据,用于确认回调
CorrelationData correlationData = new CorrelationData(messageEntity.getMessageId());
// 设置消息延迟属性
MessagePostProcessor messagePostProcessor = message -> {
// 设置延迟时间
message.getMessageProperties().setHeader("x-delay", delayMillis);
return message;
};
log.info("发送延迟消息(插件),交换机:{},延迟时间:{}ms,消息ID:{}",
RabbitMQConfig.DELAYED_EXCHANGE, delayMillis, messageEntity.getMessageId());
// 发送消息到延迟交换机
rabbitTemplate.convertAndSend(
RabbitMQConfig.DELAYED_EXCHANGE,
"delayed.key",
messageEntity,
messagePostProcessor,
correlationData);
}/**
* 消费延迟队列(插件)的消息
*
* @param message 消息内容
* @param channel 消息通道
* @param deliveryTag 消息投递标签
*/
@RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE)
public void consumeDelayedQueue(MessageEntity message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("接收到延迟队列(插件)的消息,消息ID:{},内容:{},业务类型:{}",
message.getMessageId(), message.getContent(), message.getBusinessType());
try {
// 处理消息的业务逻辑
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("延迟队列(插件)消息处理成功并确认,消息ID:{}", message.getMessageId());
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
log.error("延迟队列(插件)消息处理失败,消息ID:{}", message.getMessageId(), e);
channel.basicNack(deliveryTag, false, false);
}
}两种延迟队列实现方式的对比:
实现方式 | 优点 | 缺点 |
|---|---|---|
TTL + 死信队列 | 无需安装插件,原生支持 | 时间精度不高,队列级别的 TTL 设置会影响所有消息 |
延迟交换机插件 | 时间精度高,支持消息级别的延迟设置 | 需要安装插件,增加了维护成本 |
根据 RabbitMQ 官方文档,推荐在生产环境中使用延迟交换机插件的方式,因为它提供了更精确的延迟控制。
在分布式系统中,消息重复消费是不可避免的问题,因此需要保证消息消费的幂等性。常用的实现方式有:
1.基于数据库唯一索引
/**
* 处理消息(幂等性保证)
*
* @param message 消息实体
*/
@Transactional(rollbackFor = Exception.class)
public void processMessageWithIdempotency(MessageEntity message) {
String messageId = message.getMessageId();
String businessType = message.getBusinessType();
// 检查消息是否已经处理过
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace != null && trace.getConsumeStatus() == 1) {
log.info("消息已经处理过,消息ID:{}", messageId);
return;
}
// 根据业务类型处理不同的消息
if ("ORDER_CREATE".equals(businessType)) {
// 处理订单创建消息,使用订单号作为唯一键
String orderNo = message.getExtra();
// 检查订单是否已经处理
Order order = orderMapper.selectByOrderNo(orderNo);
if (order != null) {
log.info("订单已经处理过,订单号:{}", orderNo);
return;
}
// 处理订单业务逻辑
// ...
} else if ("USER_REGISTER".equals(businessType)) {
// 处理用户注册消息,使用用户ID作为唯一键
// ...
}
}/**
* 使用Redis分布式锁保证幂等性
*
* @param message 消息实体
*/
public void processMessageWithRedisLock(MessageEntity message) {
String messageId = message.getMessageId();
String lockKey = "message:process:" + messageId;
// 获取分布式锁,设置5分钟过期时间
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES);
if (Boolean.TRUE.equals(locked)) {
try {
// 检查消息是否已经处理过
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace != null && trace.getConsumeStatus() == 1) {
log.info("消息已经处理过,消息ID:{}", messageId);
return;
}
// 处理消息业务逻辑
processMessage(message);
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
} else {
log.info("消息正在处理中,消息ID:{}", messageId);
}
}对于有明确状态流转的业务,可以通过状态机来保证幂等性,例如订单状态从 "待支付" 到 "已支付" 的转换。
为了让 RabbitMQ 在生产环境中发挥最佳性能,我们需要进行合理的调优。以下是一些关键的调优方向:
内存配置: 根据服务器内存大小合理配置 RabbitMQ 的内存限制
# 设置内存限制为4GB
rabbitmqctl set_vm_memory_high_watermark 4GB
# 或者设置为物理内存的50%
rabbitmqctl set_vm_memory_high_watermark 0.5磁盘配置:
# 设置磁盘同步策略为所有队列
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'3.网络配置:
# 在rabbitmq.conf中设置
tcp_listeners.tcp.default.backlog = 1024
tcp_listeners.tcp.default.buffer_size = 131072批量发送: 对于大量小消息,采用批量发送可以显著提高吞吐量
/**
* 批量发送消息
*/
public void sendBatchMessages(String exchange, String routingKey, List<MessageEntity> messages) {
StringUtils.hasText(exchange, "交换机不能为空");
StringUtils.hasText(routingKey, "路由键不能为空");
if (CollectionUtils.isEmpty(messages)) {
throw new IllegalArgumentException("消息列表不能为空");
}
log.info("批量发送消息,交换机:{},路由键:{},消息数量:{}",
exchange, routingKey, messages.size());
// 创建批量消息
List<Message> amqpMessages = new ArrayList<>(messages.size());
List<CorrelationData> correlationDataList = new ArrayList<>(messages.size());
for (MessageEntity message : messages) {
// 确保消息ID唯一
if (StringUtils.isBlank(message.getMessageId())) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getCreateTime() == null) {
message.setCreateTime(LocalDateTime.now());
}
// 记录消息发送前的轨迹
messageTraceService.recordBeforeSend(message, exchange, routingKey);
// 创建AMQP消息
Message amqpMessage = MessageBuilder
.withBody(new ObjectMapper().writeValueAsBytes(message))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setMessageId(message.getMessageId())
.build();
amqpMessages.add(amqpMessage);
correlationDataList.add(new CorrelationData(message.getMessageId()));
}
// 批量发送消息
rabbitTemplate.invoke(operations -> {
for (int i = 0; i < amqpMessages.size(); i++) {
operations.convertAndSend(
exchange,
routingKey,
amqpMessages.get(i),
correlationDataList.get(i));
}
return null;
});
log.info("批量消息发送完成,数量:{}", messages.size());
}消息压缩: 对于大消息,启用压缩可以减少网络传输和存储开销
/**
* 发送压缩消息
*/
public void sendCompressedMessage(String exchange, String routingKey, MessageEntity message) {
StringUtils.hasText(exchange, "交换机不能为空");
StringUtils.hasText(routingKey, "路由键不能为空");
Objects.requireNonNull(message, "消息实体不能为空");
try {
// 序列化消息
byte[] messageBytes = new ObjectMapper().writeValueAsBytes(message);
// 压缩消息
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOut = new GZIPOutputStream(byteOut)) {
gzipOut.write(messageBytes);
}
byte[] compressedBytes = byteOut.toByteArray();
log.info("消息压缩前大小:{}字节,压缩后大小:{}字节,压缩率:{}%",
messageBytes.length,
compressedBytes.length,
(int) ((1 - (double) compressedBytes.length / messageBytes.length) * 100));
// 记录消息发送前的轨迹
messageTraceService.recordBeforeSend(message, exchange, routingKey);
// 创建压缩消息
Message amqpMessage = MessageBuilder
.withBody(compressedBytes)
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setMessageId(message.getMessageId())
.setHeader("x-compressed", true)
.build();
// 发送消息
rabbitTemplate.convertAndSend(
exchange,
routingKey,
amqpMessage,
new CorrelationData(message.getMessageId()));
log.info("压缩消息发送完成,消息ID:{}", message.getMessageId());
} catch (Exception e) {
log.error("发送压缩消息失败,消息ID:{}", message.getMessageId(), e);
messageTraceService.recordSendFailure(message.getMessageId(), e.getMessage());
throw new RuntimeException("发送压缩消息失败", e);
}
}异步发送: 使用异步发送可以提高生产者的吞吐量,避免阻塞主线程
消费线程池配置: 根据业务处理能力配置合理的消费线程池大小
spring:
rabbitmq:
listener:
simple:
# 并发消费者数量
concurrency: 5
# 最大并发消费者数量
max-concurrency: 20
# 每次从队列中拉取的消息数量
prefetch: 10批量消费: 开启批量消费可以提高消费效率
/**
* 批量消费消息
*/
@RabbitListener(queues = RabbitMQConfig.BATCH_QUEUE)
public void batchConsume(@Payload List<MessageEntity> messages,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> deliveryTags) throws IOException {
log.info("接收到批量消息,数量:{}", messages.size());
List<Long> successTags = new ArrayList<>();
List<Long> failTags = new ArrayList<>();
try {
for (int i = 0; i < messages.size(); i++) {
MessageEntity message = messages.get(i);
long tag = deliveryTags.get(i);
try {
// 处理单条消息
processMessage(message);
// 记录消费成功轨迹
messageTraceService.recordConsumeSuccess(message);
successTags.add(tag);
} catch (Exception e) {
// 记录消费失败轨迹
messageTraceService.recordConsumeFailure(message, e.getMessage());
failTags.add(tag);
log.error("批量消费中消息处理失败,消息ID:{}", message.getMessageId(), e);
}
}
// 确认成功的消息
if (!successTags.isEmpty()) {
// 批量确认消息
channel.basicAck(successTags.get(successTags.size() - 1), true);
}
// 拒绝失败的消息
for (long tag : failTags) {
channel.basicNack(tag, false, false);
}
log.info("批量消息处理完成,成功:{}条,失败:{}条", successTags.size(), failTags.size());
} catch (Exception e) {
log.error("批量消息处理异常", e);
// 拒绝所有消息
for (long tag : deliveryTags) {
channel.basicNack(tag, false, false);
}
}
}消息预取: 合理设置 prefetch 参数,控制消费者一次从队列中获取的消息数量,避免消费者过载
异步处理: 消费者接收到消息后,将消息放入线程池异步处理,快速确认消息,提高消费效率
队列持久化: 确保队列持久化,避免服务重启后队列丢失
// 创建持久化队列
QueueBuilder.durable(QUEUE_NAME).build();消息持久化: 对于重要消息,设置消息持久化,避免服务重启后消息丢失
Message message = MessageBuilder
.withBody(body)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();合理设置队列长度: 为队列设置最大长度,避免消息无限堆积导致磁盘占满
// 创建有限长度的队列
QueueBuilder.durable(QUEUE_NAME)
.maxLength(10000) // 最大消息数量
.maxLengthBytes(104857600) // 最大字节数(100MB)
.overflow(Overflow.REJECT_PUBLISHER) // 超出长度时拒绝生产者
.build();合理设置消息过期时间: 对于时效性强的消息,设置消息过期时间,避免无效消息堆积
// 创建有消息过期时间的队列(队列级别)
QueueBuilder.durable(QUEUE_NAME)
.ttl(60000) // 消息过期时间,单位毫秒
.build();
// 发送有过期时间的消息(消息级别)
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setExpiration("60000"); // 60秒过期
return message;
};消息丢失可能发生在三个阶段:生产阶段、存储阶段和消费阶段。
生产阶段丢失:
// 在配置中开启生产者确认
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
// 实现确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("消息发送失败,原因:{}", cause);
// 重试发送或记录到数据库
}
});存储阶段丢失:
// 持久化队列
QueueBuilder.durable(QUEUE_NAME).build();
// 持久化消息
Message message = MessageBuilder
.withBody(body)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();消费阶段丢失:
// 在配置中开启手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
// 消息处理完成后手动确认
channel.basicAck(deliveryTag, false);消息积压通常是因为消费速度跟不上生产速度,解决方案如下:
优化消费逻辑:
增加消费者数量:
临时扩容:
消息迁移:
/**
* 迁移积压消息
*/
@Scheduled(fixedRate = 60000)
public void migrateBacklogMessages() {
String sourceQueue = "source_queue";
String targetQueue = "backlog_queue";
// 检查队列消息数量
QueueInfo sourceQueueInfo = rabbitAdmin.getQueueInfo(sourceQueue);
if (sourceQueueInfo == null || sourceQueueInfo.getMessageCount() <= 1000) {
return; // 消息数量不多,不需要迁移
}
log.info("开始迁移积压消息,源队列:{},消息数量:{}",
sourceQueue, sourceQueueInfo.getMessageCount());
// 迁移消息
int batchSize = 100;
while (true) {
List<Message> messages = rabbitTemplate.receive(sourceQueue, batchSize);
if (CollectionUtils.isEmpty(messages)) {
break;
}
// 发送消息到目标队列
for (Message message : messages) {
rabbitTemplate.send(targetQueue, message);
}
log.info("已迁移消息:{}条", messages.size());
// 避免过度消耗资源
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
log.info("积压消息迁移完成");
}监控告警:
/**
* 消息积压监控
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void monitorMessageBacklog() {
// 监控的队列列表
List<String> monitorQueues = Arrays.asList(
RabbitMQConfig.DIRECT_QUEUE_1,
RabbitMQConfig.DIRECT_QUEUE_2,
RabbitMQConfig.TOPIC_QUEUE_1,
RabbitMQConfig.TOPIC_QUEUE_2
);
for (String queue : monitorQueues) {
QueueInfo queueInfo = rabbitAdmin.getQueueInfo(queue);
if (queueInfo != null) {
long messageCount = queueInfo.getMessageCount();
log.info("队列:{},消息数量:{}", queue, messageCount);
// 如果消息数量超过阈值,发送告警
if (messageCount > 10000) {
log.warn("队列消息积压严重,队列:{},消息数量:{}", queue, messageCount);
// 发送告警通知(邮件、短信等)
alertService.sendAlert("消息积压告警", "队列:" + queue + ",消息数量:" + messageCount);
}
}
}
}RabbitMQ 中,单个队列的消息是有序的,但多个消费者同时消费同一个队列时,可能会导致消息乱序。解决方案如下:
单消费者:
分区队列:
/**
* 发送有序消息到分区队列
*/
public void sendOrderedMessage(String baseQueueName, String businessId, MessageEntity message) {
StringUtils.hasText(baseQueueName, "基础队列名称不能为空");
StringUtils.hasText(businessId, "业务ID不能为空");
Objects.requireNonNull(message, "消息实体不能为空");
// 队列分区数量
int partitionCount = 5;
// 根据业务ID计算分区索引
int partitionIndex = Math.abs(businessId.hashCode() % partitionCount);
// 分区队列名称
String partitionQueue = baseQueueName + "_" + partitionIndex;
// 路由键(与分区队列名称相同)
String routingKey = partitionQueue;
log.info("发送有序消息,业务ID:{},分区队列:{},消息ID:{}",
businessId, partitionQueue, message.getMessageId());
// 发送消息
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDERED_EXCHANGE,
routingKey,
message,
new CorrelationData(message.getMessageId()));
}本文详细介绍了 SpringBoot 集成 RabbitMQ 的全过程,从基础概念到高级特性,从代码实现到性能调优,涵盖了实际开发中可能遇到的各种场景。
RabbitMQ 作为一款优秀的消息中间件,以其灵活性和可靠性在企业级应用中得到广泛应用。合理使用 RabbitMQ 可以帮助我们构建松耦合、高可用的分布式系统。