網站首頁 編程語言 正文
問題描述
項目使用redisson延時隊列功能,實現直播的開播提醒,突然有一天業務爆出問題,未觸發開播提醒。
初步排查
首先通過查詢生產日志,發送端日志存在,沒有消費日志,猜測消費端沒有消費到延時消息,,在dba的協助下查詢redis隊列,消息也確實存在,但已經過了過期時間,由此證明redisson消費者出現問題。通過服務日志發現在最后一次設置自定義推送任務是在一次服務發布之前,服務發布后,之前設置的自定義推送消息均沒有被客戶端消費,由此猜想是由發布服務導致消費端失效。
排查過程
發送端代碼
public <T> void produce(String delayQueue, T t, long delay, TimeUnit timeUnit) {
try {
log.info("delay msg,delayQueue:{},key:{},delay:{}", delayQueue, t, delay);
if (delay < 0) {
delay = 0;
}
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}catch (Exception e){
log.error("添加延時任務隊列失敗",e);
}
}
消費端代碼
public class DelayTaskHandler implements Runnable {
@Override
public void run() {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue);
while (true) {
try {
T value = blockingFairQueue.take();
log.info("delay queue {},延時任務開始執行,value - {} , timeStamp - {} , threadName - {}", delayQueue, value, System.currentTimeMillis(), Thread.currentThread().getName());
consumer.accept(value);
} catch (Exception e) {
log.error("延時任務執行失敗,", e);
}
}
}
}
因為redisson 延時隊列是基于redis實現的,所以從redis執行命令開始入手排查
1.打開redis監控,啟動服務,發現redis首先執行了blpop命令,阻塞等待{cl-live-admin:notice_delay_queue} 隊列消息
2.提交一個延時任務后,觀察redis命令
此時發現redis首先執行了一個SUBSCRIBE命令,訂閱了一個隊列,然后執行了一段lua腳本,主要包括以下命令:
- zrangebyscore:獲取zset中score在0至當前時間戳范圍內的前一百條數據 如果獲取到數據則循環執行rpush,lrem,zrem命令
- zrange:取zset中第一條數據
- zadd:向zset中添加一條數據,score為時間戳
- rpush:向list右邊push一條數據
- publish:如果添加的消息在頂部,則發布一條訂閱消息
3.消費一條消息
同樣消費的時候也是提交了一條lua腳本,主要執行了以下命令 可以看到和發送端命令相似
- zrangebyscore:獲取zset中score在0至當前時間戳范圍內的前一百條數據
- rpush:向list右邊push一條數據
- lrem:刪除一條數據
- zrem:刪除zeset中的數據
- zrange:獲取第一條數據
- BLPOP:阻塞等待隊列消息
通過以上redis命令的執行可以發現一個命令SUBCRIBE用于訂閱redis的一個隊列,而這個命令只在發送消息的時候執行了,在消費的時候沒有執行。從而驗證了當服務重啟后如果沒有新的消息發送,那么客戶端就不會發送SUBCRIBE命令,訂閱延時隊列,這就導致在服務重啟前發送的消息到時間后無法消費。
解決方案
在消費端啟動的時候添加一行代碼用于訂閱延時隊列
//訂閱redis隊列
redissonClient.getDelayedQueue(blockingFairQueue);
那么為什么沒有訂閱就消費不到消息了呢?帶著疑問繼續深入理解redisson的實現
redisson 延時隊列原理
首先回到消費端代碼
在我們沒有發送訂閱命令的時候,客戶端只是在阻塞等待一個指定隊列的消息,那么這個隊列的消息是誰放進去的呢? 帶著疑問我們再看發送端代碼
直接進入 delayedQueue.offer()方法內部
可以看到發送端是提交了一個lua腳本主要執行了zadd,rpush,publish命令,這里我們需要注意publish命令,在redis中pub/sub是對應的,當有publish的時候,那么subcribe端會收到該訂閱消息。
那么是誰收到了訂閱的消息,收到消息后又做了什么呢,回到redissonClient.getDelayedQueue(blockingFairQueue)代碼中
繼續進入 new RedissonDelayedQueue()
可以看到這里創建了一個QueueTransferTask,實現了pushTaskAsync()方法,具體內容是一個lua腳本,首先執行zrangebyscore 獲取過期的前一百條數據,循環調用rpush,lrem,zrem,注意這里rpush的隊列為我們指定的延時隊列,也就是consumer端take的隊列。至此明白了消費端的消息是方法pushTaskAsync()執行后放入的。那么什么時候執行這個方法呢。
進入 queueTransferService.schedule(queueName, task)方法
這里會執行start方法,繼續跟進
這里可以看到添加了兩個listener,onSubcribe,onMessage,當訂閱到消息時執行onSubcribe中的pushTash,當redis有新的消息通知,就會觸發scheduleTask(...)方法,startTime為上述中publish通知的元素過期時間
繼續進入pushTask方法
這里可以看到一個熟悉的方法pushTaskAsync(),也就是前邊的一段lua腳本,用于將過期的消息放入阻塞隊列,并返回排在第一個的消息執行scheduleTask()
繼續進入scheduleTask()方法
如果時間差小于10毫秒則執行pushTask方法,如果大于10毫秒則啟動一個延時任務,到時間后執行pushTask方法。pushTask與scheduleTask互相調用循環往復
流程總結
至此源碼分析完畢,整個流程總結如下:
發送端只是往zset,list,添加數據,并且發布一條訂閱消息
消費端收到訂閱消息后會查詢zset中的過期消息,并放入阻塞隊列供消費端take消息,并且獲取zset第一個消息,啟動一個延時任務,到期后繼續從zset中獲取過期消息如此循環。
此時就回答了上邊的問題 那么為什么沒有訂閱就消費不到消息了呢?
如果沒有訂閱的話消費端就收不到訂閱消息,也就不會去獲取過期時間放入阻塞隊列進行循環。
原文鏈接:https://juejin.cn/post/7147507349442265101
相關推薦
- 2023-01-17 Python使用鄰接矩陣實現圖及Dijkstra算法問題_python
- 2022-11-23 Python?threading模塊中lock與Rlock的使用詳細講解_python
- 2022-07-13 Docker的資源控制管理
- 2022-07-10 移動文件夾ubuntu
- 2022-04-09 Redis分布式鎖防止緩存擊穿的實現_Redis
- 2022-05-10 bean基于xml文件
- 2022-01-17 Selenium-Alert彈出框常用處理辦法
- 2022-06-23 教你編寫bat腳本Windows批處理_DOS/BAT
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支