網(wǎng)站首頁 編程語言 正文
在之前探討延時(shí)隊(duì)列的文章中我們提到了 redisson delayqueue 使用 redis 的有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列,遺憾的是 go 語言社區(qū)中并無類似的庫。不過問題不大,沒有輪子我們自己造??。
本文的完整代碼實(shí)現(xiàn)在hdt3213/delayqueue,可以直接 go get 安裝使用。
使用有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列的方法已經(jīng)廣為人知,無非是將消息作為有序集合的 member, 投遞時(shí)間戳作為 score 使用 zrangebyscore 命令搜索已到投遞時(shí)間的消息然后將其發(fā)給消費(fèi)者。
然而消息隊(duì)列不是將消息發(fā)給消費(fèi)者就萬事大吉,它們還有一個(gè)重要職責(zé)是確保送達(dá)和消費(fèi)。通常的實(shí)現(xiàn)方式是當(dāng)消費(fèi)者收到消息后向消息隊(duì)列返回確認(rèn)(ack),若消費(fèi)者返回否定確認(rèn)(nack)或超時(shí)未返回,消息隊(duì)列則會按照預(yù)定規(guī)則重新發(fā)送,直到到達(dá)最大重試次數(shù)后停止。如何實(shí)現(xiàn) ack 和重試機(jī)制是我們要重點(diǎn)考慮的問題。
我們的消息隊(duì)列允許分布式地部署多個(gè)生產(chǎn)者和消費(fèi)者,消費(fèi)者實(shí)例定時(shí)執(zhí)行 lua 腳本驅(qū)動消息在隊(duì)列中的流轉(zhuǎn)無需部署額外組件。由于 Redis 保證了 lua 腳本執(zhí)行的原子性,整個(gè)流程無需加鎖。
消費(fèi)者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊(duì)列會重試超時(shí)或者被否定確認(rèn)的消息(nack) 直至到達(dá)最大重試次數(shù)。一條消息最多有一個(gè)消費(fèi)者正在處理,減少了要考慮的并發(fā)問題。
請注意:若消費(fèi)時(shí)間超過了 MaxConsumeDuration 消息隊(duì)列會認(rèn)為消費(fèi)超時(shí)并重新投遞,此時(shí)可能有多個(gè)消費(fèi)者同時(shí)消費(fèi)。
具體使用也非常簡單,只需要注冊處理消息的回調(diào)函數(shù)并調(diào)用 start() 即可:
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// 注冊處理消息的回調(diào)函數(shù)
// 返回 true 表示已成功消費(fèi),返回 false 消息隊(duì)列會重新投遞次消息
return true
})
// 發(fā)送延時(shí)消息
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// start consume
done := queue.StartConsume()
<-done
}
由于數(shù)據(jù)存儲在 redis 中所以我們最多能保證在 redis 無故障且消息隊(duì)列相關(guān) key 未被外部篡改的情況下不會丟失消息。
原理詳解
消息隊(duì)列涉及幾個(gè)關(guān)鍵的 redis 數(shù)據(jù)結(jié)構(gòu):
- msgKey: 為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個(gè)字符串類型的鍵中,并分配一個(gè) UUID 作為它的唯一標(biāo)識。其它數(shù)據(jù)結(jié)構(gòu)中只存儲 UUID 而不存儲完整的消息內(nèi)容。每個(gè) msg 擁有一個(gè)獨(dú)立的 key 而不是將所有消息放到一個(gè)哈希表是為了利用 TTL 機(jī)制避免
- pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時(shí)間的 unix 時(shí)間戳。
- readyKey: 列表類型,需要投遞的消息 ID。
- unAckKey: 有序集合類型,member 為消息 ID, score 為重試時(shí)間的 unix 時(shí)間戳。
- retryKey: 列表類型,已到重試時(shí)間的消息 ID
- garbageKey: 集合類型,用于暫存已達(dá)重試上線的消息 ID
- retryCountKey: 哈希表類型,鍵為消息 ID, 值為剩余的重試次數(shù)
流程如下圖所示:
由于我們允許分布式地部署多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都在定時(shí)執(zhí)行 lua 腳本,所以多個(gè)消費(fèi)者可能處于上述流程中不同狀態(tài),我們無法預(yù)知(或控制)上圖中五個(gè)操作發(fā)生的先后順序,也無法控制有多少實(shí)例正在執(zhí)行同一個(gè)操作。
因此我們需要保證上圖中五個(gè)操作滿足三個(gè)條件:
- 都是原子性的
- 不會重復(fù)處理同一條消息
- 操作前后消息隊(duì)列始終處于正確的狀態(tài)
只要滿足這三個(gè)條件,我們就可以部署多個(gè)實(shí)例且不需要使用分布式鎖等技術(shù)來進(jìn)行狀態(tài)同步。
是不是聽起來有點(diǎn)嚇人??? 其實(shí)簡單的很,讓我們一起來詳細(xì)看看吧~
pending2ReadyScript
pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時(shí)間的消息ID并把它們移動到 ready 中:
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中找出已到投遞時(shí)間的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中刪除已投遞的消息
ready2UnackScript
ready2UnackScript 從 ready 或者 retry 中取出一條消息發(fā)送給消費(fèi)者并放入 unack 中,類似于 RPopLPush:
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg
unack2RetryScript
unack2RetryScript 從 retry 中找出所有已到重試時(shí)間的消息并把它們移動到 unack 中:
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重試時(shí)間的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩余重試次數(shù)
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- 剩余次數(shù)大于 0
redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩余重試次數(shù)
redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
else -- 剩余重試次數(shù)為 0
redis.call("HDel", KEYS[2], k) -- 刪除重試次數(shù)記錄
redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后續(xù)刪除
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 將已處理的消息從 unack key 中刪除
因?yàn)?redis 要求 lua 腳本必須在執(zhí)行前在 KEYS 參數(shù)中聲明自己要訪問的 key, 而我們將每個(gè) msg 有一個(gè)獨(dú)立的 key,我們在執(zhí)行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要?jiǎng)h除的消息記在 garbage key 中,腳本執(zhí)行完后再通過 del 命令將他們刪除:
func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
}
之前提到的 lua 腳本都是原子性執(zhí)行的,不會有其它命令插入其中。 gc 函數(shù)由 3 條 redis 命令組成,在執(zhí)行過程中可能會有其它命令插入執(zhí)行過程中,不過考慮到一條消息進(jìn)入垃圾回收流程之后不會復(fù)活所以不需要保證 3 條命令原子性。
ack
ack 只需要將消息徹底刪除即可:
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}
否定確認(rèn)只需要將 unack key 中消息的重試時(shí)間改為現(xiàn)在,隨后執(zhí)行的 unack2RetryScript 會立即將它移動到 retry key
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}
consume
消息隊(duì)列的核心邏輯是每秒執(zhí)行一次的 consume 函數(shù),它負(fù)責(zé)調(diào)用上述腳本將消息轉(zhuǎn)移到正確的集合中并回調(diào) consumer 來消費(fèi)消息:
func (q *DelayQueue) consume() error {
// 執(zhí)行 pending2ready,將已到時(shí)間的消息轉(zhuǎn)移到 ready
err := q.pending2Ready()
if err != nil {
return err
}
// 循環(huán)調(diào)用 ready2Unack 拉取消息進(jìn)行消費(fèi)
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// 將 nack 或超時(shí)的消息放入重試隊(duì)列
err = q.unack2Retry()
if err != nil {
return err
}
// 清理已達(dá)到最大重試次數(shù)的消息
err = q.garbageCollect()
if err != nil {
return err
}
// 消費(fèi)重試隊(duì)列
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
}
至此一個(gè)簡單可靠的延時(shí)隊(duì)列就做好了,何不趕緊開始試用呢?????
原文鏈接:https://www.cnblogs.com/Finley/p/16400287.html
相關(guān)推薦
- 2022-10-23 Go?數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解_Golang
- 2023-04-02 GoLang調(diào)用鏈可視化go-callvis使用介紹_Golang
- 2022-01-29 git 本地,遠(yuǎn)程做了不同的修改,同步方法
- 2022-12-11 C語言計(jì)算1/1+1/2+1/3+…+1/n的問題_C 語言
- 2022-06-08 FreeRTOS實(shí)時(shí)操作系統(tǒng)Cortex-M內(nèi)核使用注意事項(xiàng)_操作系統(tǒng)
- 2022-10-03 使用useImperativeHandle時(shí)父組件第一次沒拿到子組件的問題_React
- 2022-04-12 Taro打包Android?apk過程詳解_Android
- 2022-04-09 如何利用python提取字符串中的數(shù)字_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支