網站首頁 編程語言 正文
代碼路徑:/lib/bloomfilter
概述
victoriaMetrics的vmstorage
組件會接收上游傳遞過來的指標,在現實場景中,指標或瞬時指標的數量級可能會非??植溃绻幌拗凭彺娴拇笮。锌赡軙捎赾ache miss而導致出現過高的slow insert。
為此,vmstorage提供了兩個參數:maxHourlySeries
和maxDailySeries
,用于限制每小時/每天添加到緩存的唯一序列。
唯一序列指表示唯一的時間序列,如metrics{label1="value1",label2="value2"}屬于一個時間序列,但多條不同值的metrics{label1="value1",label2="value2"}屬于同一條時間序列。victoriaMetrics使用如下方式來獲取時序的唯一標識:
func getLabelsHash(labels []prompbmarshal.Label) uint64 { bb := labelsHashBufPool.Get() b := bb.B[:0] for _, label := range labels { b = append(b, label.Name...) b = append(b, label.Value...) } h := xxhash.Sum64(b) bb.B = b labelsHashBufPool.Put(bb) return h }
限速器的初始化
victoriaMetrics使用了一個類似限速器的概念,限制每小時/每天新增的唯一序列,但與普通的限速器不同的是,它需要在序列級別進行限制,即判斷某個序列是否是新的唯一序列,如果是,則需要進一步判斷一段時間內緩存中新的時序數目是否超過限制,而不是簡單地在請求層面進行限制。
hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour) dailySeriesLimiter = bloomfilter.NewLimiter(*maxDailySeries, 24*time.Hour)
下面是新建限速器的函數,傳入一個最大(序列)值,以及一個刷新時間。該函數中會:
- 初始化一個限速器,限速器的最大元素個數為
maxItems
- 則啟用了一個goroutine,當時間達到
refreshInterval
時會重置限速器
func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter { l := &Limiter{ maxItems: maxItems, stopCh: make(chan struct{}), } l.v.Store(newLimiter(maxItems)) //1 l.wg.Add(1) go func() { defer l.wg.Done() t := time.NewTicker(refreshInterval) defer t.Stop() for { select { case <-t.C: l.v.Store(newLimiter(maxItems))//2 case <-l.stopCh: return } } }() return l }
限速器只有一個核心函數Add
,當vmstorage接收到一個指標之后,會(通過getLabelsHash
計算該指標的唯一標識(h),然后調用下面的Add
函數來判斷該唯一標識是否存在于緩存中。
如果當前存儲的元素個數大于等于允許的最大元素,則通過過濾器判斷緩存中是否已經存在該元素;否則將該元素直接加入過濾器中,后續允許將該元素加入到緩存中。
func (l *Limiter) Add(h uint64) bool { lm := l.v.Load().(*limiter) return lm.Add(h) } func (l *limiter) Add(h uint64) bool { currentItems := atomic.LoadUint64(&l.currentItems) if currentItems >= uint64(l.f.maxItems) { return l.f.Has(h) } if l.f.Add(h) { atomic.AddUint64(&l.currentItems, 1) } return true }
上面的過濾器采用的是布隆過濾器,核心函數為Has
和Add
,分別用于判斷某個元素是否存在于過濾器中,以及將元素添加到布隆過濾器中。
過濾器的初始化函數如下,bitsPerItem
是個常量,值為16。bitsCount
統計了過濾器中的總bit數,每個bit表示某個值的存在性。bits
以64bit為單位的(后續稱之為slot,目的是為了在bitsCount中快速檢索目標bit)。計算bits
時加上63
的原因是為了四舍五入向上取值,比如當maxItems=1時至少需要1個unit64的slot。
func newFilter(maxItems int) *filter { bitsCount := maxItems * bitsPerItem bits := make([]uint64, (bitsCount+63)/64) return &filter{ maxItems: maxItems, bits: bits, } }
為什么bitsPerItem
為16?這篇文章給出了如何計算布隆過濾器的大小。在本代碼中,k為4(hashesCount
),期望的漏失率為0.003(可以從官方的filter_test.go
中看出),則要求總存儲和總元素的比例為15,為了方便檢索slot(64bit,為16的倍數),將之設置為16。
if p > 0.003 { t.Fatalf("too big false hits share for maxItems=%d: %.5f, falseHits: %d", maxItems, p, falseHits) }
下面是過濾器的Add
操作,目的是在過濾器中添加某個元素。Add
函數中沒有使用多個哈希函數來計算元素的哈希值,轉而改變同一個元素的值,然后對相應的值應用相同的哈希函數,元素改變的次數受hashesCount
的限制。
- 獲取過濾器的完整存儲,并轉換為以bit單位
- 將元素
h
轉換為byte數組,便于xxhash.Sum64計算 - 后續將執行hashesCount次哈希,降低漏失率
- 計算元素h的哈希
- 遞增元素
h
,為下一次哈希做準備 - 取余法獲取元素的bit范圍
- 獲取元素所在的slot(即uint64大小的bit范圍)
- 獲取元素所在的slot中的bit位,該位為1表示該元素存在,為0表示該元素不存在
- 獲取元素所在bit位的掩碼
- 加載元素所在的slot的數值
- 如果
w & mask
結果為0,說明該元素不存在, - 將元素所在的slot(
w
)中的元素所在的bit位(mask)置為1,表示添加了該元素 - 由于
Add
函數可以并發訪問,因此bits[i]
有可能被其他操作修改,因此需要通過重新加載(14)并通過循環來在bits[i]
中設置該元素的存在性
func (f *filter) Add(h uint64) bool { bits := f.bits maxBits := uint64(len(bits)) * 64 //1 bp := (*[8]byte)(unsafe.Pointer(&h))//2 b := bp[:] isNew := false for i := 0; i < hashesCount; i++ {//3 hi := xxhash.Sum64(b)//4 h++ //5 idx := hi % maxBits //6 i := idx / 64 //7 j := idx % 64 //8 mask := uint64(1) << j //9 w := atomic.LoadUint64(&bits[i])//10 for (w & mask) == 0 {//11 wNew := w | mask //12 if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {//13 isNew = true//14 break } w = atomic.LoadUint64(&bits[i])//14 } } return isNew }
看懂了Add
函數,Has
就相當簡單了,它只是Add
函數的縮減版,無需設置bits[i]
:
func (f *filter) Has(h uint64) bool { bits := f.bits maxBits := uint64(len(bits)) * 64 bp := (*[8]byte)(unsafe.Pointer(&h)) b := bp[:] for i := 0; i < hashesCount; i++ { hi := xxhash.Sum64(b) h++ idx := hi % maxBits i := idx / 64 j := idx % 64 mask := uint64(1) << j w := atomic.LoadUint64(&bits[i]) if (w & mask) == 0 { return false } } return true }
總結
由于victoriaMetrics的過濾器采用的是布隆過濾器,因此它的限速并不精準,在源碼條件下, 大約有3%的偏差。但同樣地,由于采用了布隆過濾器,降低了所需的內存以及相關計算資源。此外victoriaMetrics的過濾器實現了并發訪問。
在大流量場景中,如果需要對請求進行相對精準的過濾,可以考慮使用布隆過濾器,降低所需要的資源,但前提是過濾的結果能夠忍受一定程度的漏失率。
原文鏈接:https://www.cnblogs.com/charlieroro/p/16101238.html
相關推薦
- 2022-03-21 SQL?查詢連續登錄的用戶情況_MsSql
- 2022-08-02 C#5.0中的異步編程關鍵字async和await_C#教程
- 2022-12-04 詳解Go?依賴管理?go?mod?tidy_Golang
- 2022-11-12 docker修改容器內存大小的實現方式_docker
- 2022-10-21 Python?NumPy教程之數組的基本操作詳解_python
- 2023-12-15 Linux系統中date命令、hwclock命令 語法詳解
- 2022-05-12 android okHttp網絡請求封裝
- 2022-04-11 解決git push出現error: failed to push some refs to 錯誤
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支