網站首頁 編程語言 正文
概述
事故經過
由于大量商戶反應收不到推送,第一反應是不是推送系統掛了,導致沒有進行推送。于是讓運維檢查推送系統個節點的情況,發現都正常。于是打開RabbitMQ的管控臺看了一下,人都蒙了。已經有幾萬條消息處于ready狀態,還有幾百條unacked的消息。
以為推送服務和MQ連接斷開了,導致無法推送消息,于是讓運維重啟推送服務,將所有的推送服務重啟完,發現unacked的消息全部變成ready,但是沒過多久又有幾百條unacked的消息了,這個就很明顯了能消費,沒有進行ack呀。
當時以為是網絡問題,導致MQ無法接收到ack,讓運維檢查了一下,發現網絡沒問題。現在看真的是傻,網絡有問題連接都連不上。由于確定的是無法ack造成的,立馬將ack模式由原來的manual改成auto緊急發布。將所有的節點升級好以后,發現推送正常了。
你以為這就結束了其實沒有,沒過多久發現有一臺MQ服務出現異常,由于生產采用了鏡像隊列,立即將這臺有問題的MQ從集群中移除了。直接進行重置,然后加回集群。這事情算是告一段落了。此時已經接近24:00了。
時間來到第二天上午10:00,運維那邊又出現報警了,說推送系統有臺機器,磁盤快被寫滿了,并且占用率很高。
事故重現-隊列阻塞
MQ配置
spring:
# 消息隊列
rabbitmq:
host: 10.0.0.53
username: guest
password: guest
virtual-host: local
port: 5672
# 消息發送確認
publisher-confirm-type: correlated
# 開啟發送失敗退回
publisher-returns: true
listener:
simple:
# 消費端最小并發數
concurrency: 1
# 消費端最大并發數
max-concurrency: 5
# 一次請求中預處理的消息數量
prefetch: 2
# 手動應答
acknowledge-mode: manual
問題代碼
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯誤信息:{},消息內容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
} finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
看起來好像沒啥問題。由于和交易系統約定好,訂單數據需要先轉換json串,然后再使用AES進行加密,所以這邊需要,先進行解密然后在進行解析。才能得到訂單數據。
為了防止消息丟失,交易系統做了失敗重發機制,防止消息丟失,不巧的是重發的時候沒有對訂單數據進行加密。這就導致推送系統,在解密的時候出異常,從而無法進行ack。
模擬推送
# 發送3條正常的消息
curl http://localhost:8080/sendMsg/3
# 發送1條錯誤的消息
curl http://localhost:8080/sendErrorMsg/1
# 再發送3條正常的消息
curl http://localhost:8080/sendMsg/3
觀察日志發現,雖然有報錯,但是還能正常進行推送。但是RabbitMQ已經出現了一條unacked的消息。
繼續發送1條錯誤的消息
curl http://localhost:8080/sendErrorMsg/1
再發送3條正常的消息
curl http://localhost:8080/sendMsg/3
這個時候你會發現控制臺報錯,當然錯誤信息是解密失敗,但是正常的消息卻沒有被消費,這個時候其實隊列已經阻塞了。
從RabbitMQ管控臺也可以看到,剛剛發送的3條消息處于ready狀態。這個時候如果一直有消息進入,都會堆積在隊列里面無法被消費。
再發送3條正常的消息。
curl http://localhost:8080/sendMsg/3
原因分析
上面說了是由于沒有進行ack導致隊列阻塞。那么問題來了,這是為什么呢?其實這是RabbitMQ的一種保護機制。防止當消息激增的時候,海量的消息進入consumer而引發consumer宕機。
RabbitMQ提供了一種QOS(服務質量保證)功能,即在非自動確認的消息前提下,限制信道上的消費者所能保持的最大未確認的數量。可以通過設置Prefetch數量來實現。
舉例說明:可以理解為在consumer前面加一個緩沖器,容器能容納最大的消息量就是prefetch count。如果容器沒有滿RabbitMQ就會將投遞到容器內,如果滿了就不投遞了。當consumer對消息進行ack以后就會將此消息移除,從而放入新的消息。
listener:
simple:
# 消費端最小并發數
concurrency: 1
# 消費端最大并發數
max-concurrency: 5
# 一次處理的消息數量
prefetch: 2
# 手動應答
acknowledge-mode: manual
prefetch參數就是Prefetch Count。
通過上面的配置發現prefetch我只配置了2,并且concurrency配置的只有1,所以當我發送了2條錯誤消息以后,由于解密失敗這兩條消息一直沒有被ack。將緩沖區占滿了,這個時候RabbitMQ認為這個consumer已經沒有消費能力了就不繼續給他推送消息了,所以就造成了隊列阻塞。
判斷隊列是否有阻塞風險
當ack模式為manual,并且線上出現了unacked消息,這個時候不用慌。由于QOS是限制信道channel上的消費者所能保持的最大未確認的數量。所以允許出現unacked的數量可以通過channelCount * prefetchCount * 節點數量 得出。
channelCount就是由concurrency,max-concurrency決定的。
- min = concurrency * prefetch * 節點數量。
- max = max-concurrency * prefetch * 節點數量。
由此可以得出結論。 - unacked_msg_count < min 隊列不會阻塞。但需要及時處理unacked的消息。
- unacked_msg_count >= min 可能會出現阻塞。
- unacked_msg_count >= max 隊列一定阻塞。
解決方案
其實處理的方法很簡單,將解密和解析的方法放入try catch中就解決了這樣不管解密正常與否,消息都會被簽收。如果出錯將會輸出錯誤日志,讓開發人員進行處理了。
對于這個就需要有日志監控系統,來及時告警了。
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception {
try {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯誤信息:{},消息內容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
注意點
unacked的消息在consumer 切斷連接后(比如重啟),會自動回到對頭。
事故重現-磁盤占用飆升
一開始不知道代碼有問題,就是以為單純的沒有進行ack所以將ack模式改成auto自動(acknowledge-mode默認是auto),緊急升級了,這樣不管正常與否,消息都會被簽收,所以在當時確實是解決了問題。
其實現在回想起來是非常危險的操作,將ack模式改成auto自動,這樣會使QOS不生效。會出現大量消息涌入consumer從而造成consumer宕機,可以是因為當時在晚上,交易比較少,并且推送系統有多個節點,才沒出問題。
問題代碼
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯誤信息:{},消息內容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
配置文件
listener:
simple:
# 消費端最小并發數
concurrency: 1
# 消費端最大并發數
max-concurrency: 5
# 一次處理的消息數量
prefetch: 2
# 手動應答
acknowledge-mode: auto
由于當時不知道交易系統的重發機制,重發時沒有對訂單數據加密的bug,所以還是會發出少量有誤的消息。
發送一條錯誤消息
curl http://localhost:8080/sendErrorMsg/1
原因
RabbitMQ消息監聽程序異常時,consumer會向rabbitmq server發送Basic.Reject,表示消息拒絕接受,由于Spring默認requeue-rejected配置為true,消息會重新入隊,然后rabbitmq server重新投遞。就相當于死循環了,所以控制臺在瘋狂刷錯誤日志造成磁盤利用率飆升的原因。
解決方法
將default-requeue-rejected: false 即可。
參考
生產RabbitMQ隊列阻塞該如何處理?
原文鏈接:https://blog.csdn.net/tianzhonghaoqing/article/details/125756571
相關推薦
- 2022-09-17 Python?seaborn數據可視化繪圖(直方圖,密度圖,散點圖)_python
- 2022-06-04 Dashboard管理Kubernetes集群與API訪問配置_云和虛擬化
- 2022-08-19 Python截取字符串的簡單方法實例_python
- 2022-09-24 opencv實現圖像校正_python
- 2022-12-29 Kotlin使用滾動控件RecyclerView實例教程_Android
- 2023-02-14 Python利用Prim算法生成迷宮_python
- 2023-06-13 Python的加密模塊之hashlib?與?base64詳解及常用加密方法_python
- 2022-04-02 python3?QT5?端口轉發工具兩種場景分析_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支