首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >庖丁解牛:RocketMQ Broker/Consumer/Producer源码深度剖析与实战

庖丁解牛:RocketMQ Broker/Consumer/Producer源码深度剖析与实战

作者头像
果酱带你啃java
发布2026-04-14 13:55:36
发布2026-04-14 13:55:36
380
举报

RocketMQ作为阿里开源的分布式消息中间件,凭借高吞吐、低延迟、高可用的特性成为微服务架构的核心组件。本文将从源码层面拆解Broker、Consumer、Producer的核心机制,结合实战案例揭示底层原理,让你不仅知其然更知其所以然。

一、RocketMQ核心架构总览

先通过架构图建立整体认知,后续源码分析将围绕这些核心组件展开:

核心组件职责

  • NameServer:轻量级注册中心,管理Broker路由信息,支持动态扩缩容
  • Broker:核心消息存储节点,处理消息存储、投递、过滤等核心逻辑
  • Producer:消息生产者,支持同步/异步/单向发送模式
  • Consumer:消息消费者,支持推/拉模式,集群/广播消费

二、Producer源码深度剖析

2.1 Producer启动流程

核心代码解析:

代码语言:javascript
复制
package com.jam.demo.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.springframework.util.StringUtils;

/**
 * Producer启动核心逻辑
 * @author ken
 */
@Slf4j
publicclass ProducerStartupAnalysis {
    
    public DefaultMQProducer createProducer(String groupName) throws MQClientException {
        // 1. 参数校验
        if (!StringUtils.hasText(groupName)) {
            thrownew MQClientException("Producer group name is null", null);
        }
        
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        
        // 2. 创建MQClientInstance(核心客户端实例)
        producer.start();
        
        // 3. 定时任务:从NameServer更新路由信息(默认30s一次)
        // DefaultMQProducerImpl#startScheduledTask
        
        return producer;
    }
}

2.2 消息发送核心流程

Producer发送消息的核心入口是DefaultMQProducerImpl#sendDefaultImpl,完整流程:

2.2.1 消息发送核心代码
代码语言:javascript
复制
package com.jam.demo.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.TimeUnit;

/**
 * 消息发送核心实现
 * @author ken
 */
@Slf4j
publicclass MessageSendAnalysis {
    
    privatefinal DefaultMQProducer producer;
    
    public MessageSendAnalysis(DefaultMQProducer producer) {
        this.producer = producer;
    }
    
    /**
     * 同步发送消息核心逻辑
     * @param message 消息体
     * @return 发送结果
     * @throws MQClientException 客户端异常
     * @throws RemotingException 远程通信异常
     * @throws MQBrokerException Broker异常
     * @throws InterruptedException 中断异常
     */
    public SendResult sendMessage(Message message) 
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 1. 消息校验
        validateMessage(message);
        
        // 2. 发送消息(核心方法)
        SendResult sendResult = producer.send(message);
        
        // 3. 处理发送结果
        handleSendResult(sendResult, message);
        
        return sendResult;
    }
    
    /**
     * 消息参数校验
     */
    private void validateMessage(Message message) throws MQClientException {
        if (ObjectUtils.isEmpty(message)) {
            thrownew MQClientException("Message is null", null);
        }
        if (!org.springframework.util.StringUtils.hasText(message.getTopic())) {
            thrownew MQClientException("Message topic is null", null);
        }
        if (ObjectUtils.isEmpty(message.getBody())) {
            thrownew MQClientException("Message body is null", null);
        }
    }
    
    /**
     * 处理发送结果
     */
    private void handleSendResult(SendResult sendResult, Message message) {
        if (sendResult != null) {
            switch (sendResult.getSendStatus()) {
                case SEND_OK:
                    log.info("消息发送成功,topic={}, msgId={}", message.getTopic(), sendResult.getMsgId());
                    break;
                default:
                    log.warn("消息发送异常,status={}, msgId={}", sendResult.getSendStatus(), sendResult.getMsgId());
            }
        }
    }
    
    // 测试代码
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_demo_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        
        MessageSendAnalysis analysis = new MessageSendAnalysis(producer);
        Message message = new Message("demo_topic", "demo_tag", "Hello RocketMQ".getBytes());
        
        SendResult result = analysis.sendMessage(message);
        log.info("发送结果:{}", result);
        
        TimeUnit.SECONDS.sleep(1);
        producer.shutdown();
    }
}
2.2.2 消息队列选择策略

RocketMQ提供多种队列选择策略,默认实现是AllocateMessageQueueAveragely

代码语言:javascript
复制
package com.jam.demo.producer;

import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 队列选择策略实现
 * @author ken
 */
publicclass QueueSelectorAnalysis {
    
    privatefinal LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    
    /**
     * 默认队列选择策略:轮询+故障延迟规避
     */
    public MessageQueue selectQueue(List<MessageQueue> mqs, Message msg, Object arg) {
        if (mqs == null || mqs.isEmpty()) {
            returnnull;
        }
        
        // 如果指定了队列参数,直接使用
        if (arg != null) {
            int index = Integer.parseInt(arg.toString());
            return mqs.get(index % mqs.size());
        }
        
        // 故障延迟规避 + 轮询选择
        for (int i = 0; i < mqs.size(); i++) {
            int index = ThreadLocalRandom.current().nextInt(mqs.size());
            MessageQueue mq = mqs.get(index);
            if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                return mq;
            }
        }
        
        // 如果所有队列都不可用,返回第一个队列
        return mqs.get(0);
    }
}

2.3 消息发送重试机制

Producer内置完善的重试机制,核心参数:

  • retryTimesWhenSendFailed:同步发送重试次数(默认2次)
  • retryTimesWhenSendAsyncFailed:异步发送重试次数(默认2次)
  • retryAnotherBrokerWhenNotStoreOK:是否重试其他Broker(默认false)

核心实现:

代码语言:javascript
复制
package com.jam.demo.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

/**
 * 消息发送重试机制
 * @author ken
 */
@Slf4j
publicclass RetryMechanismAnalysis {
    
    privatefinal DefaultMQProducer producer;
    privateint retryTimes = 2; // 默认重试2次
    
    public RetryMechanismAnalysis(DefaultMQProducer producer) {
        this.producer = producer;
        this.retryTimes = producer.getRetryTimesWhenSendFailed();
    }
    
    /**
     * 带重试的消息发送
     */
    public void sendWithRetry(Message message, List<MessageQueue> mqs) {
        for (int i = 0; i <= retryTimes; i++) {
            MessageQueue mq = selectQueue(mqs);
            try {
                // 发送消息
                producer.send(message, mq);
                log.info("消息发送成功,重试次数={}", i);
                return;
            } catch (Exception e) {
                log.error("消息发送失败,重试次数={}", i, e);
                
                // 如果是最后一次重试,抛出异常
                if (i == retryTimes) {
                    thrownew RuntimeException("消息发送最终失败", e);
                }
                
                // 延迟一段时间后重试
                try {
                    Thread.sleep(100 * (i + 1));
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    
    private MessageQueue selectQueue(List<MessageQueue> mqs) {
        return mqs.get((int) (System.currentTimeMillis() % mqs.size()));
    }
}

三、Broker源码深度剖析

3.1 Broker核心存储机制

Broker采用混合存储架构,核心存储文件:

  • CommitLog:消息主体存储文件,所有Topic的消息顺序写入
  • ConsumeQueue:消费队列,存储消息在CommitLog的偏移量索引
  • IndexFile:消息索引文件,支持按Key查询消息
3.1.1 CommitLog写入核心代码
代码语言:javascript
复制
package com.jam.demo.broker;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.MessageStoreConfig;

/**
 * CommitLog写入机制
 * @author ken
 */
@Slf4j
publicclass CommitLogAnalysis {
    
    privatefinal CommitLog commitLog;
    
    public CommitLogAnalysis(DefaultMessageStore messageStore) {
        this.commitLog = messageStore.getCommitLog();
    }
    
    /**
     * 消息存储核心流程
     */
    public boolean putMessage(MessageExtBrokerInner msg) {
        // 1. 消息校验
        if (msg == null || msg.getBody() == null) {
            log.error("消息为空,无法存储");
            returnfalse;
        }
        
        // 2. 追加到CommitLog
        // CommitLog#putMessage核心逻辑:
        // - 获取当前MappedFile
        // - 计算消息长度
        // - 检查是否有足够空间
        // - 写入消息
        // - 更新偏移量
        
        // 3. 刷盘处理
        handleFlush(msg);
        
        // 4. 主从复制
        handleReplica(msg);
        
        returntrue;
    }
    
    /**
     * 刷盘策略处理
     */
    private void handleFlush(MessageExtBrokerInner msg) {
        MessageStoreConfig config = commitLog.getDefaultMessageStore().getMessageStoreConfig();
        
        if (config.isFlushDiskTypeSync()) {
            // 同步刷盘:等待刷盘完成
            commitLog.flush(0);
        } else {
            // 异步刷盘:由后台线程处理
            // FlushRealTimeService线程默认每隔500ms刷盘一次
        }
    }
    
    /**
     * 主从复制处理
     */
    private void handleReplica(MessageExtBrokerInner msg) {
        // 根据同步/异步复制策略处理
        // HAService负责主从复制
    }
}
3.1.2 ConsumeQueue构建机制

ConsumeQueue是消息的逻辑队列,每个Topic下的每个Queue对应一个ConsumeQueue文件:

代码语言:javascript
复制
package com.jam.demo.broker;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;

/**
 * ConsumeQueue构建与查询
 * @author ken
 */
@Slf4j
publicclass ConsumeQueueAnalysis {
    
    privatefinal DefaultMessageStore messageStore;
    
    public ConsumeQueueAnalysis(DefaultMessageStore messageStore) {
        this.messageStore = messageStore;
    }
    
    /**
     * 获取消息队列的索引信息
     */
    public SelectMappedBufferResult getIndexInfo(String topic, int queueId, long offset) {
        // 1. 获取ConsumeQueue实例
        ConsumeQueue consumeQueue = messageStore.findConsumeQueue(topic, queueId);
        if (consumeQueue == null) {
            log.error("ConsumeQueue不存在,topic={}, queueId={}", topic, queueId);
            returnnull;
        }
        
        // 2. 查询指定偏移量的索引信息
        // 每个ConsumeQueue条目固定20字节:8字节CommitLog偏移量 + 4字节消息长度 + 8字节Tag哈希值
        return consumeQueue.getIndexBuffer(offset);
    }
    
    /**
     * 构建ConsumeQueue索引(由ReputMessageService线程处理)
     */
    public void buildIndex(String topic, int queueId, long commitLogOffset, int msgSize, long tagsCode) {
        ConsumeQueue consumeQueue = messageStore.findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            // 写入索引条目
            consumeQueue.putPositionInfo(commitLogOffset, msgSize, tagsCode);
        }
    }
}

3.2 Broker请求处理机制

Broker通过Netty处理客户端请求,核心请求处理器:

  • SendMessageProcessor:处理消息发送请求
  • PullMessageProcessor:处理消息拉取请求
  • QueryMessageProcessor:处理消息查询请求
代码语言:javascript
复制
package com.jam.demo.broker;

import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.DefaultMessageStore;

/**
 * Broker请求处理器核心逻辑
 * @author ken
 */
@Slf4j
publicclass BrokerRequestProcessor {
    
    privatefinal DefaultMessageStore messageStore;
    
    public BrokerRequestProcessor(DefaultMessageStore messageStore) {
        this.messageStore = messageStore;
    }
    
    /**
     * 处理客户端请求
     */
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
        switch (request.getCode()) {
            case RequestCode.SEND_MESSAGE:
                return handleSendMessage(ctx, request);
            case RequestCode.PULL_MESSAGE:
                return handlePullMessage(ctx, request);
            default:
                return RemotingCommand.createResponseCommand(null);
        }
    }
    
    /**
     * 处理消息发送请求
     */
    private RemotingCommand handleSendMessage(ChannelHandlerContext ctx, RemotingCommand request) {
        SendMessageRequestHeader header = 
            (SendMessageRequestHeader) request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
        
        log.info("接收消息发送请求,topic={}, queueId={}", header.getTopic(), header.getQueueId());
        
        // 1. 构建内部消息对象
        // 2. 存储消息到CommitLog
        // 3. 返回响应结果
        
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        response.setCode(0);
        response.setRemark(null);
        return response;
    }
    
    /**
     * 处理消息拉取请求
     */
    private RemotingCommand handlePullMessage(ChannelHandlerContext ctx, RemotingCommand request) {
        // 处理拉取请求逻辑
        return RemotingCommand.createResponseCommand(null);
    }
}

3.3 Broker刷盘与主从复制

3.3.1 刷盘策略实现
代码语言:javascript
复制
package com.jam.demo.broker;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;

/**
 * 刷盘策略实现
 * @author ken
 */
@Slf4j
publicclass FlushStrategyAnalysis {
    
    privatefinal CommitLog commitLog;
    privatefinal MessageStoreConfig storeConfig;
    
    public FlushStrategyAnalysis(CommitLog commitLog) {
        this.commitLog = commitLog;
        this.storeConfig = commitLog.getDefaultMessageStore().getMessageStoreConfig();
    }
    
    /**
     * 同步刷盘实现
     */
    public void syncFlush(long offset) {
        // 等待刷盘完成
        commitLog.flush(offset);
        log.info("同步刷盘完成,偏移量={}", offset);
    }
    
    /**
     * 异步刷盘实现(后台线程)
     */
    publicclass AsyncFlushService extends Thread {
        
        privatevolatileboolean isRunning = true;
        
        @Override
        public void run() {
            log.info("异步刷盘服务启动");
            
            while (isRunning) {
                try {
                    // 默认500ms刷盘一次
                    Thread.sleep(storeConfig.getFlushIntervalCommitLog());
                    
                    // 刷盘
                    commitLog.flush(0);
                } catch (InterruptedException e) {
                    log.warn("异步刷盘线程中断", e);
                }
            }
            
            log.info("异步刷盘服务停止");
        }
        
        public void shutdown() {
            isRunning = false;
            this.interrupt();
        }
    }
}

四、Consumer源码深度剖析

4.1 Consumer启动流程

4.2 消息消费模式

RocketMQ支持两种消费模式:

4.2.1 推模式(默认)

推模式本质是长轮询,核心实现DefaultMQPushConsumerImpl#pullMessage

代码语言:javascript
复制
package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * 推模式消费实现
 * @author ken
 */
@Slf4j
publicclass PushConsumerAnalysis {
    
    public void startPushConsumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo_group");
        consumer.setNamesrvAddr("localhost:9876");
        
        // 订阅Topic
        consumer.subscribe("demo_topic", "*");
        
        // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            if (CollectionUtils.isEmpty(msgs)) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            
            // 处理消息
            for (MessageExt msg : msgs) {
                log.info("消费消息:topic={}, msgId={}, body={}", 
                    msg.getTopic(), msg.getMsgId(), new String(msg.getBody()));
            }
            
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        // 启动消费者
        consumer.start();
        log.info("推模式消费者启动成功");
    }
    
    public static void main(String[] args) throws Exception {
        PushConsumerAnalysis analysis = new PushConsumerAnalysis();
        analysis.startPushConsumer();
        
        // 保持进程运行
        Thread.currentThread().join();
    }
}
4.2.2 拉模式

拉模式由消费者主动控制拉取频率:

代码语言:javascript
复制
package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * 拉模式消费实现
 * @author ken
 */
@Slf4j
publicclass PullConsumerAnalysis {
    
    public void startPullConsumer() throws Exception {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pull_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        
        // 订阅Topic
        consumer.subscribe("demo_topic", "*");
        
        // 启动消费者
        consumer.start();
        log.info("拉模式消费者启动成功");
        
        // 循环拉取消息
        while (true) {
            List<MessageExt> msgs = consumer.poll();
            
            if (!CollectionUtils.isEmpty(msgs)) {
                for (MessageExt msg : msgs) {
                    log.info("拉取消息:topic={}, msgId={}, body={}", 
                        msg.getTopic(), msg.getMsgId(), new String(msg.getBody()));
                }
                
                // 提交偏移量
                consumer.commitSync();
            }
            
            // 控制拉取频率
            Thread.sleep(1000);
        }
    }
    
    public static void main(String[] args) throws Exception {
        PullConsumerAnalysis analysis = new PullConsumerAnalysis();
        analysis.startPullConsumer();
    }
}

4.3 消费偏移量管理

RocketMQ通过OffsetStore管理消费偏移量:

代码语言:javascript
复制
package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.OffsetStore;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Map;

/**
 * 消费偏移量管理
 * @author ken
 */
@Slf4j
publicclass OffsetManagementAnalysis {
    
    privatefinal DefaultMQPushConsumer consumer;
    private OffsetStore offsetStore;
    
    public OffsetManagementAnalysis(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
        initOffsetStore();
    }
    
    /**
     * 初始化偏移量存储
     */
    private void initOffsetStore() {
        // 集群模式:偏移量存储在Broker
        // 广播模式:偏移量存储在本地
        if (consumer.getMessageModel().isBroadcast()) {
            offsetStore = new LocalFileOffsetStore(consumer.getDefaultMQPushConsumerImpl().getMQClientFactory(), 
                                                  consumer.getConsumerGroup());
        } else {
            offsetStore = new RemoteBrokerOffsetStore(consumer.getDefaultMQPushConsumerImpl().getMQClientFactory(), 
                                                    consumer.getConsumerGroup());
        }
    }
    
    /**
     * 更新偏移量
     */
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        offsetStore.updateOffset(mq, offset, increaseOnly);
        log.info("更新偏移量:topic={}, queueId={}, offset={}", mq.getTopic(), mq.getQueueId(), offset);
    }
    
    /**
     * 持久化偏移量
     */
    public void persistOffset(MessageQueue mq) {
        offsetStore.persist(mq);
        log.info("持久化偏移量:topic={}, queueId={}", mq.getTopic(), mq.getQueueId());
    }
    
    /**
     * 批量持久化偏移量
     */
    public void persistAllOffsets() {
        offsetStore.persistAll();
        log.info("批量持久化所有偏移量");
    }
    
    /**
     * 查询偏移量
     */
    public long queryOffset(MessageQueue mq) {
        return offsetStore.readOffset(mq, org.apache.rocketmq.client.consumer.store.ReadOffsetType.MEMORY_FIRST_THEN_STORE);
    }
}

4.4 消息重试机制

消费失败时,RocketMQ会将消息发送到重试队列(%RETRY%+ConsumerGroup):

代码语言:javascript
复制
package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * 消费重试机制
 * @author ken
 */
@Slf4j
publicclass ConsumeRetryAnalysis {
    
    public void startConsumerWithRetry() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_retry_group");
        consumer.setNamesrvAddr("localhost:9876");
        
        // 设置最大重试次数(默认16次)
        consumer.setMaxReconsumeTimes(3);
        
        // 订阅主Topic和重试Topic
        consumer.subscribe("demo_topic", "*");
        
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            if (CollectionUtils.isEmpty(msgs)) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            
            for (MessageExt msg : msgs) {
                try {
                    // 模拟消费失败
                    if (msg.getReconsumeTimes() < 2) {
                        log.info("模拟消费失败,重试次数={}", msg.getReconsumeTimes());
                        thrownew RuntimeException("消费失败,需要重试");
                    }
                    
                    // 正常消费
                    log.info("消费成功:msgId={}, reconsumeTimes={}", msg.getMsgId(), msg.getReconsumeTimes());
                } catch (Exception e) {
                    log.error("消费异常", e);
                    
                    // 判断是否达到最大重试次数
                    if (msg.getReconsumeTimes() >= consumer.getMaxReconsumeTimes()) {
                        log.error("达到最大重试次数,消息进入死信队列:msgId={}", msg.getMsgId());
                        // 可以手动发送到死信队列
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
        log.info("带重试机制的消费者启动成功");
    }
    
    public static void main(String[] args) throws Exception {
        ConsumeRetryAnalysis analysis = new ConsumeRetryAnalysis();
        analysis.startConsumerWithRetry();
        
        Thread.currentThread().join();
    }
}

五、实战案例:RocketMQ核心功能应用

5.1 分布式事务消息

RocketMQ通过二阶段提交实现分布式事务消息:

代码语言:javascript
复制
package com.jam.demo.transaction;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.StringUtils;

import java.util.concurrent.*;

/**
 * 分布式事务消息实现
 * @author ken
 */
@Slf4j
publicclass TransactionMessageDemo {
    
    public static void main(String[] args) throws Exception {
        // 创建事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        
        // 线程池用于执行本地事务
        ExecutorService executorService = new ThreadPoolExecutor(
            2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
            r -> {
                Thread thread = new Thread(r);
                thread.setName("transaction-thread");
                return thread;
            });
        
        producer.setExecutorService(executorService);
        
        // 注册事务监听器
        producer.setTransactionListener(new TransactionListener() {
            
            /**
             * 执行本地事务
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                String transactionId = msg.getKeys();
                log.info("执行本地事务,transactionId={}", transactionId);
                
                try {
                    // 执行业务逻辑:扣减库存、创建订单等
                    boolean success = executeBusinessLogic(transactionId);
                    
                    if (success) {
                        log.info("本地事务执行成功,提交消息");
                        return LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        log.info("本地事务执行失败,回滚消息");
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                } catch (Exception e) {
                    log.error("本地事务执行异常", e);
                    return LocalTransactionState.UNKNOW;
                }
            }
            
            /**
             * 事务回查
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                String transactionId = msg.getKeys();
                log.info("事务回查,transactionId={}", transactionId);
                
                // 查询本地事务状态
                boolean isCommit = checkBusinessStatus(transactionId);
                
                if (isCommit) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
        });
        
        producer.start();
        
        // 发送事务消息
        String transactionId = "tx_" + System.currentTimeMillis();
        Message message = new Message("transaction_topic", "tx_tag", 
            transactionId, "订单创建请求".getBytes());
        
        producer.sendMessageInTransaction(message, null);
        
        log.info("事务消息发送成功,transactionId={}", transactionId);
        
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
        executorService.shutdown();
    }
    
    /**
     * 执行业务逻辑
     */
    private static boolean executeBusinessLogic(String transactionId) {
        // 模拟业务逻辑执行
        return !transactionId.endsWith("0"); // 最后一位为0时模拟失败
    }
    
    /**
     * 检查业务状态
     */
    private static boolean checkBusinessStatus(String transactionId) {
        // 模拟查询数据库检查事务状态
        return StringUtils.hasText(transactionId);
    }
}

5.2 消息过滤机制

RocketMQ支持Tag过滤和SQL92过滤:

代码语言:javascript
复制
package com.jam.demo.filter;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 消息过滤实战
 * @author ken
 */
@Slf4j
publicclass MessageFilterDemo {
    
    public static void main(String[] args) throws Exception {
        // 1. Tag过滤
        DefaultMQPushConsumer tagConsumer = new DefaultMQPushConsumer("tag_filter_consumer");
        tagConsumer.setNamesrvAddr("localhost:9876");
        // 只消费tag1或tag2的消息
        tagConsumer.subscribe("filter_topic", "tag1 || tag2");
        tagConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                log.info("Tag过滤消费:tag={}, body={}", msg.getTags(), new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        tagConsumer.start();
        
        // 2. SQL92过滤
        DefaultMQPushConsumer sqlConsumer = new DefaultMQPushConsumer("sql_filter_consumer");
        sqlConsumer.setNamesrvAddr("localhost:9876");
        // 消费属性a>5且b='abc'的消息
        sqlConsumer.subscribe("filter_topic", MessageSelector.bySql("a > 5 AND b = 'abc'"));
        sqlConsumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                log.info("SQL过滤消费:a={}, b={}, body={}", 
                    msg.getUserProperty("a"), msg.getUserProperty("b"), new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        sqlConsumer.start();
        
        log.info("过滤消费者启动成功");
    }
}

六、性能优化与最佳实践

6.1 Producer性能优化

代码语言:javascript
复制
package com.jam.demo.optimize;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * Producer性能优化配置
 * @author ken
 */
publicclass ProducerOptimize {
    
    public DefaultMQProducer createOptimizedProducer() {
        DefaultMQProducer producer = new DefaultMQProducer("optimize_producer_group");
        
        // 批量发送(提高吞吐量)
        producer.setBatchSize(1000);
        producer.setCompressMsgBodyOverHowmuch(1024 * 4); // 4KB以上压缩
        
        // 异步发送(提高响应速度)
        producer.setAsyncSenderThreadPoolNums(8);
        
        // 超时配置
        producer.setSendMsgTimeout(3000);
        
        // 重试配置
        producer.setRetryTimesWhenSendFailed(2);
        producer.setRetryAnotherBrokerWhenNotStoreOK(true);
        
        // 消息最大长度(默认4MB)
        producer.setMaxMessageSize(1024 * 1024 * 4);
        
        return producer;
    }
    
    /**
     * 批量发送示例
     */
    public void batchSend(DefaultMQProducer producer) throws Exception {
        Message[] messages = new Message[10];
        for (int i = 0; i < 10; i++) {
            messages[i] = new Message("batch_topic", "batch_tag", ("Batch message " + i).getBytes());
        }
        
        SendResult result = producer.send(messages);
        System.out.println("批量发送结果:" + result);
    }
}

6.2 Broker性能优化

Broker优化核心配置:

代码语言:javascript
复制
# 存储路径配置
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue

# 刷盘策略(生产环境建议异步刷盘)
flushDiskType=ASYNC_FLUSH

# 文件大小配置
mapedFileSizeCommitLog=1073741824  # 1GB
mapedFileSizeConsumeQueue=300000    # 300KB

# 清理过期文件
deleteWhen=04
fileReservedTime=72

# 线程池配置
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16

# 网络配置
listenPort=10911
serverWorkerThreads=8
serverCallbackExecutorThreads=8

七、总结

本文从源码层面深度剖析了RocketMQ的Producer、Broker、Consumer三大核心组件,通过流程图、核心代码解析和实战案例,系统讲解了:

  1. Producer:消息发送流程、队列选择策略、重试机制
  2. Broker:消息存储机制、刷盘策略、主从复制
  3. Consumer:消费模式、偏移量管理、重试机制
  4. 实战应用:分布式事务、消息过滤、性能优化

RocketMQ的设计体现了高性能、高可用、可扩展的分布式系统设计思想,深入理解其源码有助于我们更好地使用和优化RocketMQ,解决实际生产环境中的问题。

附录:完整pom.xml配置

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.jam.demo</groupId>
    <artifactId>rocketmq-source-analysis</artifactId>
    <version>1.0.0</version>
    
    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <rocketmq.version>5.1.4</rocketmq.version>
        <lombok.version>1.18.30</lombok.version>
        <spring.version>6.1.2</spring.version>
    </properties>
    
    <dependencies>
        <!-- RocketMQ客户端 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <!-- Spring Context -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        
        <!-- Spring Utils -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        
        <!-- Guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>32.1.3-jre</version>
        </dependency>
        
        <!-- Fastjson2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.32</version>
        </dependency>
    </dependencies>
</project>
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-12-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ作为阿里开源的分布式消息中间件,凭借高吞吐、低延迟、高可用的特性成为微服务架构的核心组件。本文将从源码层面拆解Broker、Consumer、Producer的核心机制,结合实战案例揭示底层原理,让你不仅知其然更知其所以然。
    • 一、RocketMQ核心架构总览
      • 核心组件职责
    • 二、Producer源码深度剖析
      • 2.1 Producer启动流程
      • 2.2 消息发送核心流程
      • 2.3 消息发送重试机制
    • 三、Broker源码深度剖析
      • 3.1 Broker核心存储机制
      • 3.2 Broker请求处理机制
      • 3.3 Broker刷盘与主从复制
    • 四、Consumer源码深度剖析
      • 4.1 Consumer启动流程
      • 4.2 消息消费模式
      • 4.3 消费偏移量管理
      • 4.4 消息重试机制
    • 五、实战案例:RocketMQ核心功能应用
      • 5.1 分布式事务消息
      • 5.2 消息过滤机制
    • 六、性能优化与最佳实践
      • 6.1 Producer性能优化
      • 6.2 Broker性能优化
    • 七、总结
    • 附录:完整pom.xml配置
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档