網站首頁 編程語言 正文
背景
延遲隊列是一種特殊的隊列,元素入隊時需要指定到期時間(或延遲時間),從隊頭出隊的元素必須是已經到期的,而且最先到期的元素最先出隊,也就是隊列里面的元素是按照到期時間排序的,添加元素和從隊頭出隊的時間復雜度是O(log(n))。
由于以上性質,延遲隊列一般可以用于以下場景(定時任務、延遲任務):
- 緩存:用戶淘汰過期元素
- 通知:在指定時間通知用戶,比如會議開始前30分鐘
- 訂單:30分鐘未支付取消訂單
- 超時:服務器自動斷開太長時間沒有心跳的連接
其實在Golang中是自帶定時器的,也就是time.After()
、time.AfterFunc()
等函數,它們的性能也是非常好的,隨著Golang版本升級還會優化。但是對于某些場景來說確實不夠方便,比如緩存場景我們需要能夠支持隨機刪除定時器,隨機重置過期時間,更加靈活的刪除一小批過期元素。而且像Kafka的時間輪算法(TimeWheel)里面也用到了延遲隊列,因此還是有必要了解下如何實現延遲隊列。
原理
堆
延遲隊列每次出隊的是最小到期時間的元素,而堆就是用來獲取最值的數據結構。使用堆我們可以實現O(log(n))時間復雜度添加元素和移除最小到期時間元素。
隨機刪除
有時候延遲隊列還需要具有隨機刪除元素的能力,可以通過以下方式實現:
- 元素添加刪除標記字段:堆中每個元素都添加一個刪除標記字段,并把這個元素的地址返回給用戶,用戶就可以標記元素的這個字段為true,這樣元素到達堆頂時如果判斷到這個字段為true就會被清除,而延遲隊列里的元素邏輯上是一定會到達堆頂的(因為時間會流逝)。這是一種懶刪除的方式。
- 元素添加堆中下標字段(或用map記錄下標):堆中每個元素都添加一個堆中下標字段,并把這個元素的地址返回給用戶,這樣我們就可以通過這個元素里面記錄的下標快速定位元素在堆中的位置,從而刪除元素。詳細可以看文章如何實現一個支持O(log(n))隨機刪除元素的堆。
重置元素到期時間
如果需要重置延遲隊列里面元素的到期時間,則必須知道元素在堆中的下標,因為重置到期時間之后必須對堆進行調整,因此只能是元素添加堆中下標字段
。
Golang實現
這里我們實現一個最簡單的延遲隊列,也就是不支持隨機刪除元素和重置元素的到期時間,因為有些場景只需要添加元素和獲取到期元素這兩個功能,比如Kafka中的時間輪,而且這種簡單實現性能會高一點。
代碼地址
數據結構
主要的結構可以看到就是一個heap,Entry是每個元素在堆中的表示,Value是具體的元素值,Expired是為了堆中元素根據到期時間排序。
mutex是一個互斥鎖,主要是保證操作并發安全。
wakeup是一個緩沖區長度為1的通道,通過它實現添加元素的時候喚醒等待隊列不為空或者有更小到期時間元素加入的協程。(重點)
type Entry[T any] struct { Value T Expired time.Time // 到期時間 } // 延遲隊列 type DelayQueue[T any] struct { h *heap.Heap[*Entry[T]] mutex sync.Mutex // 保證并發安全 wakeup chan struct{} // 喚醒通道 } // 創建延遲隊列 func New[T any]() *DelayQueue[T] { return &DelayQueue[T]{ h: heap.New(nil, func(e1, e2 *Entry[T]) bool { return e1.Expired.Before(e2.Expired) }), wakeup: make(chan struct{}, 1), } }
實現原理
阻塞獲取元素的時候如果隊列已經沒有元素,或者沒有元素到期,那么協程就需要掛起等待。而被喚醒的條件是元素到期、隊列不為空或者有更小到期時間元素加入。
其中元素到期協程在阻塞獲取元素時發現堆頂元素還沒到期,因此這個條件可以自己構造并等待。但是條件隊列不為空和有更小到期時間元素加入則需要另外一個協程在添加元素時才能滿足,因此必須通過一個中間結構來進行協程間通信,一般Golang里面會使用Channel來實現。
添加元素
一開始加了一個互斥鎖,避免并發沖突,然后把元素加到堆里。
因為我們Take()操作,既阻塞獲取元素操作,在不滿足條件時會去等待wakeup通道,但是等待通道前必須釋放鎖,否則Push()無法寫入新元素去滿足條件隊列不為空和有更小到期時間元素加入。而從釋放鎖后到開始讀取wakeup通道這段時間是沒有鎖保護的,如果Push()在這期間插入新元素,為了保證通道不阻塞同時又能通知到Take()協程,我們的通道的長度需要是1,同時使用select+default保證在通道里面已經有元素的時候不阻塞Push()協程。
// 添加延遲元素到隊列 func (q *DelayQueue[T]) Push(value T, delay time.Duration) { q.mutex.Lock() defer q.mutex.Unlock() entry := &Entry[T]{ Value: value, Expired: time.Now().Add(delay), } q.h.Push(entry) // 喚醒等待的協程 // 這里表示新添加的元素到期時間是最早的,或者原來隊列為空 // 因此必須喚醒等待的協程,因為可以拿到更早到期的元素 if q.h.Peek() == entry { select { case q.wakeup <- struct{}{}: default: } } }
阻塞獲取元素
這里先判斷堆是否有元素,如果有獲取堆頂元素,然后判斷是否已經到期,如果到期則直接出堆并返回。否則等待直到超時或者元素到期或者有新的元素到達。
這里在解鎖之前會清空wakeup通道,這樣可以保證下面讀取的wakeup通道里的元素肯定是新加入的。
// 等待直到有元素到期 // 或者ctx被關閉 func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) { for { var expired *time.Timer q.mutex.Lock() // 有元素 if !q.h.Empty() { // 獲取元素 entry := q.h.Peek() if time.Now().After(entry.Expired) { q.h.Pop() q.mutex.Unlock() return entry.Value, true } // 到期時間,使用time.NewTimer()才能夠調用Stop(),從而釋放定時器 expired = time.NewTimer(time.Until(entry.Expired)) } // 避免被之前的元素假喚醒 select { case <-q.wakeup: default: } q.mutex.Unlock() // 不為空,需要同時等待元素到期 // 并且除非expired到期,否則都需要關閉expired避免泄露 if expired != nil { select { case <-q.wakeup: // 新的更快到期元素 expired.Stop() case <-expired.C: // 首元素到期 case <-ctx.Done(): // 被關閉 expired.Stop() var t T return t, false } } else { select { case <-q.wakeup: // 新的更快到期元素 case <-ctx.Done(): // 被關閉 var t T return t, false } } } }
Channel方式阻塞讀取
Golang里面可以使用Channel進行流式消費,因此簡單包裝一個Channel形式的阻塞讀取接口,給通道一點緩沖區大小可以帶來更好的性能。
// 返回一個通道,輸出到期元素 // size是通道緩存大小 func (q *DelayQueue[T]) Channel(ctx context.Context, size int) <-chan T { out := make(chan T, size) go func() { for { entry, ok := q.Take(ctx) if !ok { return } out <- entry } }() return out }
使用方式
for entry := range q.Channel(context.Background(), 10) { // do something }
性能測試
這里進行一個簡單的性能測試,也就是先添加元素,然后等待到期后全部拿出來。
func BenchmarkPushAndTake(b *testing.B) { q := New[int]() b.ResetTimer() // 添加元素 for i := 0; i < b.N; i++ { q.Push(i, time.Duration(i)) } // 等待全部元素到期 b.StopTimer() time.Sleep(time.Duration(b.N)) b.StartTimer() // 獲取元素 for i := 0; i < b.N; i++ { _, ok := q.Take(context.Background()) if !ok { b.Errorf("want %v, but %v", true, ok) } } }
測試結果:
Benchmark-8 ? ? ?2331534 ? ? ? ? ? ? ? 476.8 ns/op ? ? ? ? ? ?76 B/op ? ? ? ? ?1 allocs/op
總結
堆實現的延遲隊列是一種實現起來比較簡單的定時器(當然阻塞讀取Take()是比較復雜的),由于時間復雜度是O(log(n)),因此可以滿足定時任務數量不是特別多的場景。堆實現的延遲隊列也是可以隨機刪除元素的,可以根據具體任務選擇是否實現。如果對定時器性能要求比較敏感的話可以選擇使用時間輪實現定時器,它可以在O(1)的時間復雜度添加和刪除一個定時器,不過實現起來比較復雜(挖個坑,下篇文章實現)。
原文鏈接:https://juejin.cn/post/7146641201301569566
相關推薦
- 2022-10-14 初識RPC中間件zeroC ICE工具之iceca
- 2022-02-06 TP6記錄報錯的SQL語句
- 2023-01-10 Flutter?CustomPaint繪制widget使用示例_IOS
- 2023-01-10 redis中Could?not?get?a?resource?from?the?pool異常及解決方
- 2023-03-23 Python中win32com模塊的使用_python
- 2022-05-01 jquery實現移動端按鈕組左右滑動_jquery
- 2022-07-10 Callable接口的使用詳解
- 2022-11-15 Python一直報錯SyntaxError:invalid?syntax的解決辦法_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支