
要做好监控运维,首先得理解RocketMQ的核心架构,明确各组件的职责和数据流转路径:

监控的本质就是跟踪“数据流转(生产→存储→消费)”和“组件健康(硬件+软件状态)”,及时发现异常节点或瓶颈。
Broker是监控的重中之重,所有消息的存储、转发都依赖它,重点关注以下维度:
SendMessageProcessor处理请求的QPS累加计算;实战获取代码:
package com.jam.demo.rocketmq.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.broker.BrokerStatsSubCommand;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
/**
* Broker监控指标获取工具类
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/broker")
@Tag(name = "Broker监控接口", description = "获取Broker的核心监控指标")
@Slf4j
publicclass BrokerMonitorController {
privatestaticfinal DefaultMQAdminExt admin = new DefaultMQAdminExt();
static {
admin.setNamesrvAddr("127.0.0.1:9876");
try {
admin.start();
} catch (MQClientException e) {
log.error("DefaultMQAdminExt启动失败", e);
thrownew RuntimeException(e);
}
}
/**
* 获取Broker的消息收发TPS
* @param brokerName Broker名称
* @return 消息收发TPS信息
* @throws MQClientException MQ客户端异常
*/
@GetMapping("/stats/{brokerName}")
@Operation(summary = "获取Broker消息收发TPS", description = "根据Broker名称查询生产/消费TPS")
public String getBrokerStats(
@Parameter(description = "Broker名称", required = true) @PathVariable String brokerName
) throws MQClientException {
if (!StringUtils.hasText(brokerName)) {
thrownew IllegalArgumentException("Broker名称不能为空");
}
BrokerStatsSubCommand statsCommand = new BrokerStatsSubCommand();
String result = statsCommand.execute(admin, new String[]{"-b", brokerName});
if (ObjectUtils.isEmpty(result)) {
log.warn("Broker[{}]的监控指标为空", brokerName);
return"Broker[" + brokerName + "]无监控数据";
}
return result;
}
}
Broker的存储层直接影响消息可靠性和读写性能,重点关注:
获取方式:通过Prometheus Exporter暴露指标,Prometheus配置示例:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'rocketmq-exporter'
static_configs:
- targets: ['127.0.0.1:5557'] # RocketMQ Exporter端口
ulimit -n)。Topic是消息的逻辑分类,核心监控“生产-消费”的平衡关系:
易混淆点:
获取堆积量代码:
package com.jam.demo.rocketmq.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
/**
* Topic监控指标获取工具类
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/topic")
@Tag(name = "Topic监控接口", description = "获取Topic的生产消费及堆积指标")
@Slf4j
publicclass TopicMonitorController {
privatestaticfinal DefaultMQAdminExt admin = new DefaultMQAdminExt();
static {
admin.setNamesrvAddr("127.0.0.1:9876");
try {
admin.start();
} catch (MQClientException e) {
log.error("DefaultMQAdminExt启动失败", e);
thrownew RuntimeException(e);
}
}
/**
* 获取Topic的消费堆积量
* @param topic Topic名称
* @param consumerGroup 消费组名称
* @return 堆积量信息
* @throws MQClientException MQ客户端异常
*/
@GetMapping("/accumulation/{topic}/{consumerGroup}")
@Operation(summary = "获取Topic堆积量", description = "根据Topic和消费组查询消息堆积数")
public String getTopicAccumulation(
@Parameter(description = "Topic名称", required = true) @PathVariable String topic,
@Parameter(description = "消费组名称", required = true) @PathVariable String consumerGroup
) throws MQClientException {
if (!StringUtils.hasText(topic)) {
thrownew IllegalArgumentException("Topic名称不能为空");
}
if (!StringUtils.hasText(consumerGroup)) {
thrownew IllegalArgumentException("消费组名称不能为空");
}
ConsumeStats consumeStats = admin.examineConsumeStats(consumerGroup);
if (ObjectUtils.isEmpty(consumeStats)) {
log.warn("消费组[{}]的消费状态为空", consumerGroup);
return"消费组[" + consumerGroup + "]无消费数据";
}
long totalAccumulation = consumeStats.getTotalAccumulation();
return String.format("Topic[%s]消费组[%s]堆积量:%d", topic, consumerGroup, totalAccumulation);
}
}
Consumer是消息消费的末端,重点监控消费能力和异常处理:
RocketMQ 5.x内置基于Spring Boot的Dashboard,无需额外部署:
broker.conf:brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.1.100 # Broker的实际IP
enableDashboard=true
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -c conf/broker.conf &
http://192.168.1.100:8080(默认端口8080),可查看集群状态、Topic堆积、消费进度等核心指标。大规模集群需更强大的可视化和告警能力,推荐Prometheus+Grafana组合:
git clone https://github.com/apache/rocketmq-exporter.git
cd rocketmq-exporter
mvn clean package -DskipTests
nohup java -jar target/rocketmq-exporter-0.2.0.jar \
--rocketmq.config.namesrvAddr=127.0.0.1:9876 \
--server.port=5557 &
sum(rocketmq_broker_producer_tps) by (broker)sum(rocketmq_broker_consumer_tps) by (broker)sum(rocketmq_topic_accumulation) by (topic)生产环境推荐部署2主2从集群,确保单Broker宕机不影响服务:
brokerClusterName=ProductionCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=72
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
autoCreateTopicEnable=false # 生产环境关闭自动创建Topic
maxMessageSize=65536
brokerClusterName=ProductionCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
storePathRootDir=/data/rocketmq/store
masterAddr=192.168.1.103:10911 # Master1地址
# 启动NameServer
nohup sh bin/mqnamesrv &
# 启动Master1
nohup sh bin/mqbroker -c conf/broker-a.conf &
# 启动Slave1
nohup sh bin/mqbroker -c conf/broker-a-slave.conf &
# 同理启动Master2和Slave2
broker-c.conf(Master3)和broker-c-slave.conf(Slave3),参考4.1.2/3;sh bin/mqadmin updateTopic -n 192.168.1.101:9876 -t OrderTopic -c ProductionCluster -b 192.168.1.107:10911
处理步骤:
sh bin/mqadmin brokerStatus -n 192.168.1.101:9876 -b 192.168.1.103:10911
brokerRole=SYNC_MASTER);brokerId=1作为Slave重启,接入新Master。临时处理:重置消费进度(仅应急使用):
package com.jam.demo.rocketmq.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.List;
/**
* 消费进度管理工具类
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/consumer")
@Tag(name = "消费进度管理接口", description = "重置消费进度、处理消息堆积")
@Slf4j
publicclass ConsumerProgressController {
privatestaticfinal DefaultMQAdminExt admin = new DefaultMQAdminExt();
static {
admin.setNamesrvAddr("127.0.0.1:9876");
try {
admin.start();
} catch (MQClientException e) {
log.error("DefaultMQAdminExt启动失败", e);
thrownew RuntimeException(e);
}
}
/**
* 重置消费进度到指定时间点
* @param consumerGroup 消费组名称
* @param topic Topic名称
* @param timestamp 时间戳(毫秒)
* @return 操作结果
* @throws MQClientException MQ客户端异常
*/
@PostMapping("/resetOffsetByTime")
@Operation(summary = "重置消费进度", description = "将消费组的Topic消费进度重置到指定时间点")
public String resetOffsetByTime(
@Parameter(description = "消费组名称", required = true) @RequestParam String consumerGroup,
@Parameter(description = "Topic名称", required = true) @RequestParam String topic,
@Parameter(description = "目标时间戳(毫秒)", required = true) @RequestParam long timestamp
) throws MQClientException {
if (!StringUtils.hasText(consumerGroup)) {
thrownew IllegalArgumentException("消费组名称不能为空");
}
if (!StringUtils.hasText(topic)) {
thrownew IllegalArgumentException("Topic名称不能为空");
}
List<MessageQueue> mqs = admin.fetchSubscribeMessageQueues(topic);
if (CollectionUtils.isEmpty(mqs)) {
log.warn("Topic[{}]无消息队列", topic);
return"Topic[" + topic + "]无消息队列";
}
admin.resetOffsetByTimestamp(consumerGroup, topic, timestamp);
return String.format("消费组[%s]Topic[%s]消费进度已重置到时间戳[%d]", consumerGroup, topic, timestamp);
}
}
查看死信消息:
sh bin/mqadmin viewMessage -n 192.168.1.101:9876 -t %DLQ%_OrderConsumerGroup -k orderId123
重新发送死信消息:
sh bin/mqadmin sendMessage -n 192.168.1.101:9876 -t OrderTopic -p "{\"orderId\":\"123\",\"status\":\"success\"}"
指标 | 阈值 | 告警级别 |
|---|---|---|
Broker磁盘使用率 | >85% | 严重 |
Topic堆积量 | >10000 | 警告 |
消费TPS < 生产TPS | 持续5分钟 | 警告 |
Broker宕机 | 1分钟未恢复 | 严重 |
global:
resolve_timeout:5m
route:
group_by:['alertname']
group_wait:10s
group_interval:10s
repeat_interval:1h
receiver:'dingding'
receivers:
-name:'dingding'
webhook_configs:
-url:'http://dingding-webhook:8080/send'# 钉钉机器人Webhook
send_resolved:true
#!/bin/bash
NAMESRV_ADDR="192.168.1.101:9876"
BROKER_LIST=("192.168.1.103:10911" "192.168.1.105:10911")
for broker in "${BROKER_LIST[@]}"; do
status=$(sh bin/mqadmin brokerStatus -n $NAMESRV_ADDR -b $broker | grep "BrokerStatus" | awk '{print $2}')
if [ "$status" != "RUNNING" ]; then
echo "Broker $broker状态异常:$status"
curl -X POST http://alertmanager:9093/api/v1/alerts -d '[{
"labels": {"alertname": "BrokerDown", "broker": "'$broker'"},
"annotations": {"description": "Broker '$broker'状态异常,当前状态:'$status'"}
}]'
fi
done
sendMessageThreadPoolNums:发送线程池大小,建议CPU核心数×2;mapedFileSizeCommitLog:CommitLog文件大小,调整为2GB减少文件数;transientStorePoolEnable:启用临时存储池,提升刷盘性能(需足够内存)。-server
-Xms8g
-Xmx8g
-Xmn4g
-XX:MetaspaceSize=256m
-XX:MaxMetaspaceSize=256m
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=20
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/data/rocketmq/logs/heapdump.hprof
RocketMQ的监控与运维核心在于“理解底层+标准化操作”:监控体系需覆盖数据流转全链路,运维操作需遵循高可用设计原则。通过搭建“指标监控+告警+自动化”的闭环体系,既能提前发现潜在问题,又能快速处理故障。本文从原理到实战,覆盖了监控指标、工具使用、运维操作和最佳实践,希望能帮助你在生产环境中让RocketMQ稳定运行——记住,消息中间件的稳定性,是业务系统高可用的基石。