消息队列消费积压到打爆磁盘:我用Consumer Lag监控+阈值告警在5分钟内止血
凌晨1点刚躺下没多久手机就开始嗡嗡震。告警信息写的是磁盘使用率超过85%。爬起来登录机器一看好家伙RabbitMQ的/var/lib/rabbitmq目录已经吃了70GB。消息队列的消费 Lag 不知道怎么就炸了堆积的消息还在往磁盘里写。说实话之前我一直觉得消息队列自带缓冲不会出问题。结果这次啪啪打脸。这篇文章说说我是怎么用 Consumer Lag 监控 自动阈值告警5分钟内把消费积压从临界状态拉回来的以及我是怎么搭建这套监控体系的。问题是怎么发生的先说背景线上跑着一个订单处理服务用的 RabbitMQ 做异步下单。消费者是一个 Spring Boot 应用消费端用了手动 ack。出问题时我在做什么呢——给消费端加了个新功能上线前测试了3轮都正常就直接发上去了。结果新功能里有个慢查询把消费线程堵住了。消费速度 生产速度积压就开始涨。一开始我根本没注意到。告警是磁盘用率触发的不是队列积压触发的——这是第一个坑。第一步先止血把积压消掉登录机器第一件事看队列状态# 查看队列信息rabbitmqctl list_queues name messages messages_ready messages_unacknowledged# 查看消费者的 channelrabbitmqctl list_channels connection_name command_name model_name fc当时看到的情况messages_ready: 47万messages_unacknowledged: 3000消费速率consumer_ms掉到了正常值的1/20积压确实很严重。先把消费者停掉不要直接kill用rabbitmqctl stop_channel然后开3个消费者实例并行消费先把水位降下来。# 批量重置消费 channelrabbitmqctlevalrabbit_channel:close(MyChannelPid, normal).这步做完队列里47万条消息开始以正常速度消费了。磁盘写入速度从爆满降到正常。但这只是临时止血。如果不解决根本问题下次还会炸。第二步Consumer Lag 监控怎么搭消费积压的本质是消费速度跟不上生产速度。所以监控的核心指标是Consumer Lag messages_ready messages_unacknowledged。我用的是 Prometheus RabbitMQ Exporter# docker-compose 片段rabbitmq-exporter:image:kbudde/rabbitmq-exporter:latestenvironment:RABBIT_URL:http://rabbitmq:15672RABBIT_USER:guestRABBIT_PASSWORD:guestports:-9419:9419然后在 Prometheus 告警规则里加这个groups:-name:rabbitmqrules:-alert:RabbitMQConsumerLagHighexpr:rabbitmq_queue_messages{queue_nameorder_queue}10000for:2mlabels:severity:warningannotations:summary:队列积压超过1万条description:队列 {{ $labels.queue_name }} 积压 {{ $value }} 条请检查消费者状态-alert:RabbitMQConsumerLagCriticalexpr:rabbitmq_queue_messages{queue_nameorder_queue}50000for:1mlabels:severity:criticalannotations:summary:队列积压超过5万条接近磁盘上限第三步阈值是怎么算出来的不是随便写1万和5万的。阈值要结合两个维度消费者消费一条消息的平均时间磁盘剩余空间能扛多少积压算一下假设消费一条消息平均耗时 50ms一个消费者线程每秒能处理 20 条。积压1万条需要 500 秒8分钟才能消费完。如果这段时间内生产速度 消费速度Lag 还会继续涨。我的经验阈值消费 Lag 1万条发 warning 告警可能只是临时抖动消费 Lag 5万条发 critical 告警必须人工介入消费 Lag 持续增长超过5分钟即使没到阈值也要告警第四步自动恢复脚本光告警不够还得有自动止血脚本。我的思路是当 Lag 持续告警超过 N 分钟自动扩容消费者。#!/usr/bin/env python3importpikaimportrequestsimporttimefromdatetimeimportdatetime RABBITMQ_APIhttp://rabbitmq:15672/api/queuesQUEUE_NAMEorder_queueLAG_THRESHOLD50000CHECK_INTERVAL60# 秒defget_queue_messages():resprequests.get(f{RABBITMQ_API}/%2f/{QUEUE_NAME},auth(guest,guest))dataresp.json()returndata[messages]defmain():last_alert_timeNonewhileTrue:msgsget_queue_messages()print(f[{datetime.now()}] 队列积压:{msgs}条)ifmsgsLAG_THRESHOLD:iflast_alert_timeisNone:last_alert_timetime.time()eliftime.time()-last_alert_time300:# 持续5分钟print( 积压持续5分钟触发自动扩容)# 这里调用 K8s 或 Docker Swarm 扩容消费者# call_k8s_scale(order-consumer, replicas5)last_alert_timetime.time()# 重置避免重复触发else:last_alert_timeNonetime.sleep(CHECK_INTERVAL)if__name____main__:main()踩坑心得坑1消息积压不一定是消费者的问题有时候积压是因为生产者发得太快比如突发流量有时候是 RabbitMQ 本身配置问题内存限制、磁盘告警阈值。定位问题时要先确认是哪个环节慢了。坑2手动 ack 模式下消费慢会导致 unacked 堆积这次我用的手动 ack结果慢查询导致消息一直处于 unacked 状态。消费者处理完之后没有及时 ack消息就卡在消费中状态。监控时要把messages_unacknowledged也加上。坑3不要只看 Lag 数值要看 Lag 变化趋势Lag5000 可能是正常的消费者刚启动也可能是异常的消费突然中断。结合 Lag 增速一起看rate(rabbitmq_queue_messages{queue_nameorder_queue}[5m]) 100如果5分钟内 Lag 增速超过100条/秒说明生产速度远大于消费速度要立刻告警。写在最后这次故障让我重新审视了消息队列的可观测性。之前只知道监控队列长度其实 Consumer Lag 和 Lag 增速才是关键指标。现在我给每个核心队列都配了4个监控Lag 绝对值阈值告警Lag 增速变化率告警unacked 数量消费阻塞告警磁盘使用率兜底告警上线一周已经捕获了两次潜在的 Lag 上涨都是消费者线程池配置有问题。提前处理的感觉比凌晨1点被炸醒好太多了。如果你也在用 RabbitMQ建议先查一下你的监控里有没有 Consumer Lag 这个指标。没有的话这篇文章值得你花10分钟加上。