找回密码
 立即注册
首页 业界区 安全 MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+ ...

MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)

羊夏菡 8 小时前
本文 的 原文 地址

原始的内容,请参考 本文 的 原文 地址

本文 的 原文 地址
本文作者:

  • 第一作者  老架构师 肖恩(肖恩  是尼恩团队 高级架构师,负责写此文的第一稿,初稿 )
  • 第二作者   老架构师 尼恩   (45岁老架构师, 负责  提升此文的 技术高度,让大家有一种  俯视 技术、俯瞰技术、 技术自由  的感觉
尼恩说在前面:

在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、shein 希音、shopee、百度、网易的面试资格,遇到很多很重要的面试题:
MQ消息积压、如何监控、如何排查?
你知道如何解决 MQ消息积压 嘛?
你们生产环境  这么高的吞吐量,没有出现过 MQ积压吗?  不可能吧?  怎么解决 的?
前几天 小伙伴面试 美团,遇到了这个问题。但是由于 没有回答好,导致面试挂了。
小伙伴面试完了之后,来求助尼恩。那么,遇到 这个问题,该如何才能回答得很漂亮,才能 让面试官刮目相看、口水直流。
所以,尼恩给大家做一下系统化、体系化的梳理,使得大家内力猛增,可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”,然后实现”offer直提”。
尼恩提示:回答问题的时,首先 从一个 现场故事讲起。
1.png

现场事故:凌晨2点 报警群突然炸了

凌晨2点, 凌晨2:15,监控系统触发MQ积压告警,核心业务消息队列积压量在15分钟内从50万激增至1000万条,导致订单支付、物流推送等核心业务链路出现严重延迟。
报警群突然炸了——各种催命电话, 把我从被窝里 催起来。
我顶着黑眼圈打开MQ 监控一看,发现上游系统异常推送800万条营销消息 。

  • 直接诱因:夜间批量促销活动未做流量评估,上游系统异常推送800万条营销消息
  • 架构缺陷:消费者采用单线程串行消费模式,监控阈值设置过高(500万才触发告警)
然后,加班熬夜一通宵 的时间,彻底把问题解决。并且,把我的绝招整理成了下面的方案,提供给其他同学学习使用。
方案分两个部分:

  • 紧急的  止血救命包  (临时方案)
  • 长期的  架构治根     (  长期方案)

紧急止血 + 架构治根 ,一共是5个步骤

2.png

第1步: 定位原因
第2步: 紧急止血包-极速扩容
临时扩容Consumer实例和开启批量消费。
第3步: 弃卒保帅-非核心业务降级
暂停非关键业务的消费者和降低非关键业务消费者线程数。
第4步: 并行爆破-上重武器
消息转储  和   消费者更大规模扩容。
第5步:架构治根。 长治久安-上牛逼的架构方案
高吞吐架构升级和高并发压的应急预案。

第1步:定位原因


  • 生产侧问题(较少见,10%)
  • Broker侧问题(较少见,10%)
  • 消费侧问题(最可能,80%)
3.png

第2步:紧急止血包(临时消费者扩容)


  • 临时扩容Consumer实例
  • 开启批量消费。
4.png

第3步. 弃卒保帅:消费者降级


  • 暂停非关键业务的消费者
  • 降低非关键业务消费者线程数。
5.png

第4步. 并行爆破:上重武器

​    消息转储  和   消费者更大规模扩容。
6.png

第5步. 架构治根:架构升级,  长治久安


  • 高吞吐架构升级
  • 高并发压的应急预案。
7.png

接下来,尼恩给大家说详细方案:
第1步:定位消息积压原因

在消息处理流程中,若客户端的消费速度跟不上服务端的发送速度,未处理的消息会不断累积,这部分消息就是堆积消息。
消息堆积会直接导致消费延迟,想要高效排查和解决这类问题,首先定位原因。

  • 第一步:定位消息积压原因
定位消息积压原因

遇到消息积压时,很多人第一反应是“扩容消费者”,但在操作前必须先明确:到底是什么拖慢了消费速度?
RocketMQ的消费链路就像一条流水线,任何环节“堵车”都会引发积压,我们得先给这条流水线做个全面“检查”。
MQ消息积压的核心本质生产速率>消费速率,导致消息在Broker队列中堆积。
全链路分析如下:
8.png

生产侧问题(较少见,10%)


  • 业务高峰(如秒杀、大促)、补偿机制重发、生产端线程池失控等导致的瞬时流量冲击
Broker侧问题(较少见,10%)


  • 磁盘IO瓶颈(PageCache刷盘慢);
  • 主从同步延迟;
  • 网络分区或资源限制。
消费侧问题(最可能,80%)

(1)性能瓶颈

  • Consumer陷入死循环,导致卡死;
  • 业务逻辑复杂(如慢SQL、外部API调用、高耗时计算);
  • 单条消息处理时间过长(超过100ms需警惕)。
(2)资源不足

  • 消费者实例数量不足(未随流量动态扩容);
  • 消费者宕机或线程阻塞(如GC停顿、死锁)。
(3)配置缺陷

  • 顺序消费中单条消息卡住会阻塞整个队列(顺序消息会持续重试,普通消息仅重试16次);
  • 广播模式下重复处理导致效率低下。
(4)重试风暴

  • 消息因依赖服务异常等原因频繁失败重试。
9.png

消息积压本质是“生产速度>消费速度+Broker转发能力”。
由于Broker通常是高可用集群,生产侧若无人工故障也较稳定,因此排查时应优先考虑消费侧问题。
大致的排查步骤如下:
排查Consumer是否处于“假死”状态

打开RocketMQ Dashboard(运维必备工具),查看Consumer分组的“在线客户端”列表。若某台服务器的Consumer长时间未上报心跳(LastHeartbeatTime超过2分钟),大概率是“消费者假死”。
这种情况多因消费者线程被Full GC卡住或代码中存在死循环。例如曾遇到某台服务器因循环中频繁打印日志导致CPU占用100%,Consumer线程直接卡死,积压量持续增加。
注意:需为Consumer配置JVM监控,重点关注GC频率和耗时。比如假死机器的Young GC耗时超500ms,老年代频繁Full GC,就会直接影响Consumer正常工作。
10.png

检查队列负载是否均衡

RocketMQ的Consumer采用“队列均分”策略,每个Consumer分配多个Message Queue(MQ)。若某台Consumer分配100个MQ,另一台仅分配10个,会导致“忙闲不均”。
通过Dashboard可查看每个Consumer实例的“已分配队列数”。比如三台新扩容服务器因网络配置问题未连接NameServer,导致老服务器承担80%队列,消费能力被压垮。
实操建议:若队列分配不均,可先重启Consumer实例触发重新负载均衡;若问题持续,检查Consumer分组配置,确保consumeFromWhere和messageModel设置正确(默认CLUSTERING模式会自动均衡)。
消费端负载均衡策略详细情况,参考最后一个小节
检查消费线程是否“效率低下”

RocketMQ Consumer默认消费线程数为20(由consumeThreadMin和consumeThreadMax控制)。若业务逻辑复杂(如涉及数据库查询、接口调用),20个线程可能不足,导致大量任务排队。
比如日志中发现线程池任务堆积量超1000,而实际工作线程仅10个——因为初始化时误将consumeThreadMin和Max均设为10,无法应对流量激增。
重点:线程数并非越多越好,需结合CPU核心数调整。IO密集型任务可设为CPU核心数的5-10倍(如50);CPU密集型任务超过32通常无意义,反而会因上下文切换降低效率。
节点线程数计算模型
单节点并发度需合理设置,过大易增加线程切换开销。理想环境下最优线程数计算模型:

  • 单机vCPU核数为C;
  • 忽略线程切换耗时,I/O操作不消耗CPU;
  • 线程有足够消息处理,内存充足;
  • 逻辑中CPU计算耗时为T1,外部I/O操作为T2。
则单个线程的TPS为1/(T1+T2),若CPU使用率达100%,单机最大线程数为C*(T1+T2)/T1。
第2步:紧急止血包(临时消费者扩容)

明确原因后进入“急救阶段”,需先让消费速度追上生产速度,再逐步消化历史积压。
第一招:临时扩容Consumer

这是最直接的方法,相当于增加高速公路车道。RocketMQ的Consumer无状态,理论上可无限扩容,但需注意两点:
扩容数量不超过MQ总数

每个MQ同一时间仅能被一个Consumer消费。
例如集群有100个MQ,最多可扩容至100个Consumer实例(每个实例分配1个MQ);
若集群有200个MQ且当前仅10个Consumer,理论上可先扩容至50个实例,充分利用队列资源。
第二招:开启批量消费,提高单次处理量

RocketMQ支持批量消费,默认每次拉取1条消息(参数consumeMessageBatchMaxSize默认值为1)。
若业务允许,可改为一次拉取10-32条,减少网络交互,提升吞吐量。
比如将该参数改为16,配合扩容后消费速度从500条/秒提升至8000条/秒——相当于从每次搬1箱货变为搬16箱,效率显著提升。但需注意:
保持幂等性

批量处理可能出现重复消费(如处理到第10条时消费者挂了,重启后16条消息会重新消费),因此业务代码必须支持幂等(如用唯一ID去重)。比如因未做幂等导致数据库出现重复订单,后面还得脚本去重。
避免参数过大

超过32后吞吐量提升不明显,反而增加内存压力。曾尝试设为100,导致Consumer内存使用率超80%,险些触发OOM,最终确定16-32为最佳范围。
11.png

第3步. 弃卒保帅:消费者降级+ 暂停Producer或限流

消费者降级


  • 暂停非关键业务的消费者
  • 降低非关键业务消费者线程数。
暂停Producer或限流,控制消息源头

若积压量极大(比如千万级以上)且消费速度短期内无法追上,可暂时让Producer停止发消息或降低发送频率。
注意:暂停Producer前必须与业务方沟通
例如电商大促期间,暂停支付回调消息会影响商家收款,比如与前端协商在用户支付成功页增加“稍后刷新”提示,同时将Producer从2000 TPS限流至500 TPS,为消费者争取缓冲时间。
注意
暂停后需监控Consumer的“堆积量”是否下降(理想状态为每分钟下降10-20万条)。
若未变化,可能是消费者重试逻辑导致消息反复投递(如消息处理失败后进入重试队列,积压量“假死”),此时需检查maxReconsumeTimes参数(默认16次,超过后进入死信队列)。
第4步. 并行爆破:上重武器

消息转储  和   消费者更大规模扩容。
12.png

解决第一步 临时扩容场景下的 MQ 分区总数不足的解决方案

若前期MQ数量不足(如仅4个MQ且已分配4个Consumer),第一步的临时扩容Consumer 意义不大,可按以下步骤处理:
1、 临时转储队列 :  创建原队列数10倍(或N倍)的新Topic ,  也就是  临时转储队列;
2、  消息转储 : 开发临时转发程序,将积压消息均匀分发至新Topic的队列中;
3、 消费者更大规模扩容:  对应扩容Consumer(10倍),每个Consumer消费一个临时队列,同时扩容依赖的业务服务(如缓存、数据库);
4、消费完成后恢复原有架构,避免资源浪费。
13.png

对应扩容Consumer(10倍),每个Consumer消费一个临时队列,同时扩容依赖的业务服务(如缓存、数据库);
14.png

实操步骤
1、 临时创建新Consumer分组(如加后缀-tmp),避免与原有消费者竞争资源;
2、 启动时指定--consumerThreadMin 50 --consumerThreadMax 50(临时调高线程数);
3、 观察Dashboard的“消费速度”,理想状态下每台新服务器分配4-5个MQ,消费速度可提升3-5倍。
第5步.  架构治根:架构升级,彻底 根治


  • 高吞吐架构升级
  • 积压 的应急预案。
15.png

5.1:  高吞吐架构升级:从 “被动应对” 到 “主动防御”

高吞吐架构的核心目标是:让生产速率≤消费速率 + Broker 承载能力,从根源上减少积压风险。需从生产端、消费端、Broker 端三个维度系统性优化,结合业务场景(如秒杀、大促)设计针对性方案。
(1)生产端:控制 “消息源头” 的速率与质量

生产端是消息的 “起点”,需通过限流、瘦身、异步化等手段,避免瞬时流量冲击 MQ。
生产端 动态限流:给生产端装 “刹车”

核心措施:基于 MQ Broker 的 “消息堆积量” 动态调整生产速率。
实现方式:
在 Producer 端集成 “积压量监测接口”(如调用 RocketMQ Dashboard 的/topic/stats接口),当某 Topic 积压量超过 50 万条时,自动触发限流(通过令牌桶算法将 TPS 从 2000 降至 500)。
业务适配:
秒杀场景下,结合前端限流(如按钮置灰、排队提示)和后端限流(Redis 计数器 + Lua 脚本),确保生产速率不超过消费端最大处理能力的 80%(预留 20% 缓冲)。
生产端 消息 “瘦身”:减少无效数据传输

核心问题:大消息(>1MB)会导致 Broker 存储效率下降、消费端处理耗时增加(如解析大 JSON 耗时 100ms+)。
优化措施:

  • 消息体只保留 “核心字段”(如订单 ID、用户 ID、金额),非核心字段(如用户地址、商品详情)通过 “消息 + 数据库” 组合获取(消费端拿到消息后,再查 DB 补充信息);
  • 大字段压缩:使用 Protobuf 替代 JSON(压缩率提升 50%+),或对超过 500KB 的消息进行 GZIP 压缩;
  • 禁止 “日志型消息”:如非必要,不将接口调用日志、调试信息写入 MQ(改用 ELK 日志系统)。
(2)消费端:提升 “处理效率” 与 “容错能力”

消费端是消息处理的 “主力”.
消费端 需通过并行化、轻量化、隔离化设计,将单条消息处理耗时压缩至 50ms 以内(非复杂业务)。
队列拆分:

按 “业务类型” 或 “用户 ID 哈希” 拆分 Topic 队列,避免单队列阻塞。
示例:原 “订单消息” Topic(100 个队列)拆分为 “支付订单”(50 个队列)、“取消订单”(30 个队列)、“退款订单”(20 个队列),分别对应独立消费者组,避免某类消息(如退款)处理慢阻塞全量;
线程池优化:

核心参数优化:
IO 密集型业务(如调用外部 API、查 DB)→ 线程数 = CPU 核心数 ×5(如 8 核 CPU→40 线程);
CPU 密集型业务(如数据计算)→ 线程数 = CPU 核心数 ×2;
线程池隔离:

使用 Hystrix 或 Resilience4j 为不同消息类型分配独立线程池(如支付消息用payThreadPool,物流消息用logisticsThreadPool),避免某类消息线程池满导致全局阻塞。
业务逻辑轻量化:砍掉 “慢操作”

慢 SQL 优化:消费端涉及的 DB 操作必须加索引,禁止select *、复杂 JOIN(耗时>50ms 的 SQL 需拆分或异步化);
外部依赖缓存:调用第三方接口(如支付回调、物流查询)时,增加本地缓存(Caffeine,过期时间 5 分钟)+ 分布式缓存(Redis),减少远程调用耗时(从 200ms→10ms);
异步处理非核心步骤:如订单消息消费时,“扣减库存”(核心)同步处理,“发送短信通知”(非核心)丢入本地线程池异步执行(失败后不影响主流程)。
批量消费 + 幂等设计:提升吞吐量 + 防重复

批量消费参数固化:consumeMessageBatchMaxSize固定为 16-32(经实测,此范围吞吐量提升最明显,且内存可控);
幂等实现:

  • 消息层面:为每条消息生成唯一 ID(如 UUID),消费端首次处理时写入 “消息处理表”(ID + 状态),重复消息直接跳过;
  • 业务层面:订单支付消息通过 “订单 ID + 支付状态” 去重(如已支付的订单,再次收到支付消息时直接返回成功)。
(3)Broker 端:强化 “承载能力” 与 “稳定性”

Broker 是消息存储与转发的核心,需通过硬件升级、集群扩容、参数优化提升极限承载能力(目标:单 Broker 支持 10 万 TPS+,集群支持百万 TPS+)。
硬件与存储优化


  • 磁盘:使用 SSD(随机读写速度是机械硬盘的 10 倍 +),且单 Broker 磁盘容量≥1TB(避免频繁清理旧消息);
  • 内存:为 Broker 配置足够大的 PageCache(如 16 核 32G 机器,分配 16G 作为 PageCache),减少磁盘 IO 压力(消息先写入 PageCache,再异步刷盘);
  • 网络:Broker 节点间使用万兆网卡,避免跨机房部署(主从节点同机房,延迟控制在 1ms 内)。
集群扩容与负载均衡


  • 集群规模:按 “生产 TPS×2” 配置 Broker 节点(如预估生产端 50 万 TPS,集群部署 10 个 Broker 节点,单节点承载 5 万 TPS);
  • 队列均衡:每个 Topic 的队列数 = Broker 节点数 ×8(如 10 个 Broker→80 个队列),确保队列均匀分布在各 Broker(避免某台 Broker 负载过高);
  • 主从架构:每个 Broker 配置 1 个从节点,主节点故障时自动切换(RocketMQ 支持主从自动切换,切换时间<30 秒)。
刷盘与清理策略优化


  • 刷盘策略:高吞吐场景用ASYNC_FLUSH(异步刷盘,写入 PageCache 即返回成功,由后台线程定时刷盘),牺牲部分一致性换性能;
  • 消息清理:设置合理的fileReservedTime(消息保留时间),非核心消息保留 24 小时,核心消息保留 7 天(避免旧消息占用磁盘空间)。
5.2 : 应急预案:从 “无序应对” 到  积压的 “标准化处理流程”

由于平台 篇幅限制, 此处省略1000字+
剩下的内容,请参加原文地址原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册