日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

GO?CountMinSketch計數器(布隆過濾器思想的近似計數器)_Golang

作者:jiaxwu?????? ? 更新時間: 2022-11-05 編程語言

簡介

CountMinSketch是一種計數器,用來統計一個元素的計數,它能夠以一個非常小的空間統計大量元素的計數,同時保證高的性能及準確性。

與布隆過濾器類似,由于它是基于概率的,因此它所統計的計數是有一定概率存在誤差的,也就是可能會比真實的計數大。比如一個元素實際的計數是10,但是計算器的計算結果可能比10大。因此適合能夠容忍計數存在一定誤差的場景,比如社交網絡中推文的訪問次數。

它一秒能夠進行上百萬次操作(主要取決于哈希函數的速度),并且如果我們每天有一個長度為100億的數據流需要進行計數,計數值允許的誤差范圍是100,允許的錯誤率是0.1%,計數器大小是32位,只需要7.2GB內存,這完全可以單機進行計數。

原理

數據結構

CountMinSketch計數器的數據結構是一個二維數組,每一個元素都是一個計數器,計數器可以使用一個數值類型進行表示,比如無符號int

增加計數

每個元素會通過不同的哈希函數映射到每一行的某個位置,并增加對應位置上的計數:

估算計數

估算計數也是如上圖流程,根據哈希映射到每一行的對應位置,然后讀取所有行的計數,返回其中最小的一個。

返回最小的一個是因為其他其他元素也可能會映射到自身所映射位置上面,導致計數比真實計數大,因此最小的一個計數最可能是真實計數:

比如上圖元素123映射到了元素abc第一行的相同位置,因此這個位置的計數累加了元素abc和元素123的計數和。但是只要我們取三行里面最小的一個計數,那么就能容忍這種情況。

當然,如果一個元素的每一行的對應位置都被其他元素所映射,那么這個估算的計數就會比真實計數大。

哈希函數

CountMinSketch計數器里面的哈希函數需要是彼此獨立且均勻分布(類似于哈希表的哈希函數),而且需要盡可能的快,比如murmur3就是一個很好的選擇。

CountMinSketch計數器的性能嚴重依賴于哈希函數的性能,而一般哈希函數的性能則依賴于輸入串(一般為字節數組)的長度,因此為了提高CountMinSketch計數器的性能建議減少輸入串的長度。

下面是一個簡單的性能測試,單位是字節,可以看到時間的消耗隨著元素的增大基本是線性增長的:

pkg: github.com/jiaxwu/gommon/counter/cm
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkAddAndEstimate/1-8              2289142               505.9 ns/op         1.98 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/2-8              2357380               513.7 ns/op         3.89 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/4-8              2342382               496.9 ns/op         8.05 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/8-8              2039792               499.7 ns/op        16.01 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/16-8             2350281               526.8 ns/op        30.37 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/32-8             2558060               444.3 ns/op        72.03 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/64-8             2540272               459.5 ns/op       139.29 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/128-8            1919720               538.6 ns/op       237.67 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/256-8            1601738               720.6 ns/op       355.28 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/512-8             950584              1599 ns/op         320.18 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/1024-8            363592              3169 ns/op         323.17 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/2048-8            187500              5888 ns/op         347.81 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/4096-8            130425              8825 ns/op         464.15 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/8192-8             67198             17460 ns/op         469.18 MB/s           0 B/op          0 allocs/op

數組大小、哈希函數數量、錯誤范圍、錯誤率

數組大小、哈希函數數量、錯誤范圍和錯誤率之間是互相影響的,如果我們想減少錯誤率和錯誤范圍,則需要更大的數組和更多的哈希函數。但是我們很難直觀的計算出這些參數,還好有兩個公式可以幫助我們計算出準確的數值:

在我們可以確定我們的數據流大小和能夠容忍的錯誤范圍錯誤率的情況下,我們可以根據下面公式計算數組大小哈希函數數量

n = 數據流大小 
m = 數組大小
k = 哈希函數數量 
eRange = 錯誤范圍(ErrorRange)
eRate = 錯誤率(ErrorRate)
ceil() = 向上取整操作
E = 2.718281828459045(自然常數)

m = ceil(E/(eRange/n))
k = ceil(ln(1/eRate))

應用

TopK(海量數據計數器)

對于海量數據流中頻率最高的K個數,如果使用常規的map<key, uint>,由于內存大小限制,一般情況下單機無法完成計算,需要把數據路由到多臺機器上進行計數。

而如果我們使用CountMinSketch則能夠在單機情況下處理大量的數據,比如開頭所提到對于一個長度為100億的數據流進行計數,只需要7.2GB內存。這個計數結果可能存在一定誤差,不過我們可以在這個基礎上再進行過濾。

TinyLFU

TinyLFU是一個緩存淘汰策略,它里面有LFU策略的思想,LFU是一個基于訪問頻率的淘汰策略,因此需要統計每個元素被訪問的次數。如果對每個元素使用一個獨立的計數器,那么這個成本會很大,而且對于一個緩存淘汰策略來說,我們并不需要這個計數器非常大且非常準確。

因此TinyLFU使用一個計數器長度為4位的CountMinSketch計數器統計每個元素的頻率,減少計數所消耗的內存空間,同時還引入了計數衰減機制避免某些之前熱門但是當前已經很少被訪問的元素很難被淘汰。

實現

這里給出一個Golang的泛型實現,這個實現支持uint8uint16uint32uint64等基本類型計數器,實際上還可以實現比如長度為2bit4bit6bit的計數器,但是代碼會稍微復雜一點(特別是非2的次方的計數器)。

package cm
import (
	"math"

	"github.com/jiaxwu/gommon/hash"
	mmath "github.com/jiaxwu/gommon/math"
	"github.com/jiaxwu/gommon/mem"
	"golang.org/x/exp/constraints"
)

// Count-Min Sketch 計數器,原理類似于布隆過濾器,根據哈希映射到多個位置,然后在對應位置進行計數
// 讀取時拿對應位置最小的
// 適合需要一個比較小的計數,而且不需要這個計數一定準確的情況
// 可以減少空間消耗
// https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.591.8351&rep=rep1&type=pdf
type Counter[T constraints.Unsigned] struct {
	counters    [][]T
	countersLen uint64       // 計數器長度
	hashs       []*hash.Hash // 哈希函數列表
	maxCount    T            // 最大計數值
}

// 創建一個計數器
// size:數據流大小
// errorRange:計數值誤差范圍(會超過真實計數值)
// errorRate:錯誤率
func New[T constraints.Unsigned](size uint64, errorRange T, errorRate float64) *Counter[T] {
	// 計數器長度
	countersLen := uint64(math.Ceil(math.E / (float64(errorRange) / float64(size))))
	// 哈希個數
	hashsCnt := int(math.Ceil(math.Log(1.0 / errorRate)))
	hashs := make([]*hash.Hash, hashsCnt)
	counters := make([][]T, hashsCnt)
	for i := 0; i < hashsCnt; i++ {
		hashs[i] = hash.New()
		counters[i] = make([]T, countersLen)
	}
	return &Counter[T]{
		counters:    counters,
		countersLen: countersLen,
		hashs:       hashs,
		maxCount:    T(0) - 1,
	}
}

// 增加元素的計數
func (c *Counter[T]) Add(b []byte, val T) {
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		if c.counters[i][index]+val <= c.counters[i][index] {
			c.counters[i][index] = c.maxCount
		} else {
			c.counters[i][index] += val
		}
	}
}

// 增加元素的計數
// 等同于Add(b, 1)
func (c *Counter[T]) Inc(b []byte) {
	c.Add(b, 1)
}

// 增加元素的計數
// 字符串類型
func (c *Counter[T]) AddString(s string, val T) {
	c.Add([]byte(s), val)
}

// 增加元素的計數
// 等同于Add(b, 1)
// 字符串類型
func (c *Counter[T]) IncString(s string) {
	c.Add([]byte(s), 1)
}

// 估算元素的計數
func (c *Counter[T]) Estimate(b []byte) T {
	minCount := c.maxCount
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		count := c.counters[i][index]
		if count == 0 {
			return 0
		}
		minCount = mmath.Min(minCount, count)
	}
	return minCount
}

// 估算元素的計數
// 字符串類型
func (c *Counter[T]) EstimateString(s string) T {
	return c.Estimate([]byte(s))
}

// 計數衰減
// 如果factor為0則直接清空
func (c *Counter[T]) Attenuation(factor T) {
	for _, counter := range c.counters {
		if factor == 0 {
			mem.Memset(counter, 0)
		} else {
			for j := uint64(0); j < c.countersLen; j++ {
				counter[j] /= factor
			}
		}
	}
}

數據結構

這里的數據結構核心是一個k*m的二維數組counters,k是哈希函數數量,m是數組每一行的長度;countersLen其實就是m;hashs是哈希函數列表;maxCount是當前類型的最大值,比如uint8就是255,下面的計算需要用到它。

type Counter[T constraints.Unsigned] struct {
	counters    [][]T
	countersLen uint64       // 計數器長度
	hashs       []*hash.Hash // 哈希函數列表
	maxCount    T            // 最大計數值
}

初始化

我們首先使用上面提到的兩個公式計算數組每一行長度和哈希函數的數量,然后初始化哈希函數列表和二維數組。

// 創建一個計數器
// size:數據流大小
// errorRange:計數值誤差范圍(會超過真實計數值)
// errorRate:錯誤率
func New[T constraints.Unsigned](size uint64, errorRange T, errorRate float64) *Counter[T] {
	// 計數器長度
	countersLen := uint64(math.Ceil(math.E / (float64(errorRange) / float64(size))))
	// 哈希個數
	hashsCnt := int(math.Ceil(math.Log(1.0 / errorRate)))
	hashs := make([]*hash.Hash, hashsCnt)
	counters := make([][]T, hashsCnt)
	for i := 0; i < hashsCnt; i++ {
		hashs[i] = hash.New()
		counters[i] = make([]T, countersLen)
	}
	return &Counter[T]{
		counters:    counters,
		countersLen: countersLen,
		hashs:       hashs,
		maxCount:    T(0) - 1,
	}
}

增加計數

對于一個元素,我們需要把它根據每個哈希函數計算出它在每一行數組的位置,然后增加對應位置計數器的計數值。

這里需要注意的是,計數值可能會溢出,因此我們首先判斷是否溢出,如果溢出則設置為最大值。

// 增加元素的計數
func (c *Counter[T]) Add(b []byte, val T) {
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		if c.counters[i][index]+val <= c.counters[i][index] {
			c.counters[i][index] = c.maxCount
		} else {
			c.counters[i][index] += val
		}
	}
}

估算計數

同增加計數原理,把元素根據哈希函數映射到每一行數組的對應位置,然后選擇所有行中最小的那個計數值。

// 估算元素的計數
func (c *Counter[T]) Estimate(b []byte) T {
	minCount := c.maxCount
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		count := c.counters[i][index]
		if count == 0 {
			return 0
		}
		minCount = mmath.Min(minCount, count)
	}
	return minCount
}

原文鏈接:https://juejin.cn/post/7142896490061496334

欄目分類
最近更新