日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

深入理解go緩存庫freecache的使用_Golang

作者:luoxn28 ? 更新時間: 2022-04-20 編程語言

go開發緩存場景一般使用map或者緩存框架,為了線程安全會使用sync.Map或線程安全的緩存框架。

緩存場景中如果數據量大于百萬級別,需要特別考慮數據類型對于gc的影響(注意string類型底層是指針+Len+Cap,因此也算是指針類型),如果緩存key和value都是非指針類型的話就無需多慮了。但實際應用場景中,key和value是(包含)指針類型數據是很常見的,因此使用緩存框架需要特別注意其對gc影響,從是否對GC影響角度來看緩存框架大致分為2類:

  • 零GC開銷:比如freecache或bigcache這種,底層基于ringbuf,減小指針個數;
  • 有GC開銷:直接基于Map來實現的緩存框架。

對于map而言,gc時會掃描所有key/value鍵值對,如果其都是基本類型,那么gc便不會再掃描。下面以freecache為例分析下其實現原理,代碼示例如下:

func main() {
? ?cacheSize := 100 * 1024 * 1024
? ?cache := freecache.NewCache(cacheSize)

? ?for i := 0; i < N; i++ {
? ? ? str := strconv.Itoa(i)
? ? ? _ = cache.Set([]byte(str), []byte(str), 1)
? ?}

? ?now := time.Now()
? ?runtime.GC()
? ?fmt.Printf("freecache, GC took: %s\n", time.Since(now))
? ?_, _ = cache.Get([]byte("aa"))

? ?now = time.Now()
? ?for i := 0; i < N; i++ {
? ? ? str := strconv.Itoa(i)
? ? ? _, _ = cache.Get([]byte(str))
? ?}
? ?fmt.Printf("freecache, Get took: %s\n\n", time.Since(now))
}

1 初始化

freecache.NewCache會初始化本地緩存,size表示存儲空間大小,freecache會初始化256個segment,每個segment是獨立的存儲單元,freecache加鎖維度也是基于segment的,每個segment有一個ringbuf,初始大小為size/256。freecache號稱零GC的來源就是其指針是固定的,只有512個,每個segment有2個,分別是rb和slotData(注意切片為指針類型)。

type segment struct {
? ?rb ? ? ? ? ? ?RingBuf // ring buffer that stores data
? ?segId ? ? ? ? int
? ?_ ? ? ? ? ? ? uint32 ?// 占位
? ?missCount ? ? int64
? ?hitCount ? ? ?int64
? ?entryCount ? ?int64
? ?totalCount ? ?int64 ? ? ?// number of entries in ring buffer, including deleted entries.
? ?totalTime ? ? int64 ? ? ?// used to calculate least recent used entry.
? ?timer ? ? ? ? Timer ? ? ?// Timer giving current time
? ?totalEvacuate int64 ? ? ?// used for debug
? ?totalExpired ?int64 ? ? ?// used for debug
? ?overwrites ? ?int64 ? ? ?// used for debug
? ?touched ? ? ? int64 ? ? ?// used for debug
? ?vacuumLen ? ? int64 ? ? ?// up to vacuumLen, new data can be written without overwriting old data.
? ?slotLens ? ? ?[256]int32 // The actual length for every slot.
? ?slotCap ? ? ? int32 ? ? ?// max number of entry pointers a slot can hold.
? ?slotsData ? ? []entryPtr // 索引指針
}

func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) {
? ? cache = new(Cache)
? ? for i := 0; i < segmentCount; i++ {
? ? ? ? cache.segments[i] = newSegment(size/segmentCount, i, timer)
? ? }
}
func newSegment(bufSize int, segId int, timer Timer) (seg segment) {
? ? seg.rb = NewRingBuf(bufSize, 0)
? ? seg.segId = segId
? ? seg.timer = timer
? ? seg.vacuumLen = int64(bufSize)
? ? seg.slotCap = 1
? ? seg.slotsData = make([]entryPtr, 256*seg.slotCap) // 每個slotData初始化256個單位大小
}

2 讀寫流程

freecache的key和value都是[]byte數組,使用時需要自行序列化和反序列化,如果緩存復雜對象不可忽略其序列化和反序列化帶來的影響,首先看下Set流程:

_ = cache.Set([]byte(str), []byte(str), 1)

Set流程首先對key進行hash,hashVal類型uint64,其低8位segID對應segment數組,低8-15位表示slotId對應slotsData下標,高16位表示slotsData下標對應的[]entryPtr某個數據,這里需要查找操作。注意[]entryPtr數組大小為slotCap(初始為1),當擴容時會slotCap倍增。

每個segment對應一個lock(sync.Mutex),因此其能夠支持較大并發量,而不像sync.Map只有一個鎖。

func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
? ?hashVal := hashFunc(key)
? ?segID := hashVal & segmentAndOpVal // 低8位
? ?cache.locks[segID].Lock() // 加鎖
? ?err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
? ?cache.locks[segID].Unlock()
}

func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
? ?slotId := uint8(hashVal >> 8)
? ?hash16 := uint16(hashVal >> 16)
? ?slot := seg.getSlot(slotId)
? ?idx, match := seg.lookup(slot, hash16, key)

? ?var hdrBuf [ENTRY_HDR_SIZE]byte
? ?hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
? ?if match { // 有數據更新操作
? ? ? matchedPtr := &slot[idx]
? ? ? seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
? ? ? hdr.slotId = slotId
? ? ? hdr.hash16 = hash16
? ? ? hdr.keyLen = uint16(len(key))
? ? ? originAccessTime := hdr.accessTime
? ? ? hdr.accessTime = now
? ? ? hdr.expireAt = expireAt
? ? ? hdr.valLen = uint32(len(value))
? ? ? if hdr.valCap >= hdr.valLen {
? ? ? ? ?// 已存在數據value空間能存下此次value大小
? ? ? ? ?atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
? ? ? ? ?seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
? ? ? ? ?seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
? ? ? ? ?atomic.AddInt64(&seg.overwrites, 1)
? ? ? ? ?return
? ? ? }
? ? ? // 刪除對應entryPtr,涉及到slotsData內存copy,ringbug中只是標記刪除
? ? ? seg.delEntryPtr(slotId, slot, idx)
? ? ? match = false
? ? ? // increase capacity and limit entry len.
? ? ? for hdr.valCap < hdr.valLen {
? ? ? ? ?hdr.valCap *= 2
? ? ? }
? ? ? if hdr.valCap > uint32(maxKeyValLen-len(key)) {
? ? ? ? ?hdr.valCap = uint32(maxKeyValLen - len(key))
? ? ? }
? ?} else { // 無數據
? ? ? hdr.slotId = slotId
? ? ? hdr.hash16 = hash16
? ? ? hdr.keyLen = uint16(len(key))
? ? ? hdr.accessTime = now
? ? ? hdr.expireAt = expireAt
? ? ? hdr.valLen = uint32(len(value))
? ? ? hdr.valCap = uint32(len(value))
? ? ? if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
? ? ? ? ?hdr.valCap = 1
? ? ? }
? ?}
? ?
? ?// 數據實際長度為 ENTRY_HDR_SIZE=24 + key和value的長度 ? ?
? ?entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
? ?slotModified := seg.evacuate(entryLen, slotId, now)
? ?if slotModified {
? ? ? // the slot has been modified during evacuation, we need to looked up for the 'idx' again.
? ? ? // otherwise there would be index out of bound error.
? ? ? slot = seg.getSlot(slotId)
? ? ? idx, match = seg.lookup(slot, hash16, key)
? ? ? // assert(match == false)
? ?}
? ?newOff := seg.rb.End()
? ?seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
? ?seg.rb.Write(hdrBuf[:])
? ?seg.rb.Write(key)
? ?seg.rb.Write(value)
? ?seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
? ?atomic.AddInt64(&seg.totalTime, int64(now))
? ?atomic.AddInt64(&seg.totalCount, 1)
? ?seg.vacuumLen -= entryLen
? ?return
}

seg.evacuate會評估ringbuf是否有足夠空間存儲key/value,如果空間不夠,其會從空閑空間尾部后一位(也就是待淘汰數據的開始位置)開始掃描(oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()),如果對應數據已被邏輯deleted或者已過期,那么該塊內存可以直接回收,如果不滿足回收條件,則將entry從環頭調換到環尾,再更新entry的索引,如果這樣循環5次還是不行,那么需要將當前oldHdrBuf回收以滿足內存需要。

執行完seg.evacuate所需空間肯定是能滿足的,然后就是寫入索引和數據了,insertEntryPtr就是寫入索引操作,當[]entryPtr中元素個數大于seg.slotCap(初始1)時,需要擴容操作,對應方法見seg.expand,這里不再贅述。

寫入ringbuf就是執行rb.Write即可。

func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) {
? ?var oldHdrBuf [ENTRY_HDR_SIZE]byte
? ?consecutiveEvacuate := 0
? ?for seg.vacuumLen < entryLen {
? ? ? oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()
? ? ? seg.rb.ReadAt(oldHdrBuf[:], oldOff)
? ? ? oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0]))
? ? ? oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap)
? ? ? if oldHdr.deleted { // 已刪除
? ? ? ? ?consecutiveEvacuate = 0
? ? ? ? ?atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
? ? ? ? ?atomic.AddInt64(&seg.totalCount, -1)
? ? ? ? ?seg.vacuumLen += oldEntryLen
? ? ? ? ?continue
? ? ? }
? ? ? expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now
? ? ? leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime)
? ? ? if expired || leastRecentUsed || consecutiveEvacuate > 5 {
? ? ? // 可以回收
? ? ? ? ?seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff)
? ? ? ? ?if oldHdr.slotId == slotId {
? ? ? ? ? ? slotModified = true
? ? ? ? ?}
? ? ? ? ?consecutiveEvacuate = 0
? ? ? ? ?atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
? ? ? ? ?atomic.AddInt64(&seg.totalCount, -1)
? ? ? ? ?seg.vacuumLen += oldEntryLen
? ? ? ? ?if expired {
? ? ? ? ? ? atomic.AddInt64(&seg.totalExpired, 1)
? ? ? ? ?} else {
? ? ? ? ? ? atomic.AddInt64(&seg.totalEvacuate, 1)
? ? ? ? ?}
? ? ? } else {
? ? ? ? ?// evacuate an old entry that has been accessed recently for better cache hit rate.
? ? ? ? ?newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen))
? ? ? ? ?seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff)
? ? ? ? ?consecutiveEvacuate++
? ? ? ? ?atomic.AddInt64(&seg.totalEvacuate, 1)
? ? ? }
? ?}
}

freecache的Get流程相對來說簡單點,通過hash找到對應segment,通過slotId找到對應索引slot,然后通過二分+遍歷尋找數據,如果找不到直接返回ErrNotFound,否則更新一些time指標。Get流程還會更新緩存命中率相關指標。

func (cache *Cache) Get(key []byte) (value []byte, err error) {
? ?hashVal := hashFunc(key)
? ?segID := hashVal & segmentAndOpVal
? ?cache.locks[segID].Lock()
? ?value, _, err = cache.segments[segID].get(key, nil, hashVal, false)
? ?cache.locks[segID].Unlock()
? ?return
}
func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
? ?hdr, ptr, err := seg.locate(key, hashVal, peek) // hash+定位查找
? ?if err != nil {
? ? ? return
? ?}
? ?expireAt = hdr.expireAt
? ?if cap(buf) >= int(hdr.valLen) {
? ? ? value = buf[:hdr.valLen]
? ?} else {
? ? ? value = make([]byte, hdr.valLen)
? ?}

? ?seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
}

定位到數據之后,讀取ringbuf即可,注意一般來說讀取到的value是新創建的內存空間,因此涉及到[]byte數據的復制操作。

3 總結

從常見的幾個緩存框架壓測性能來看,Set性能差異較大但還不是數量級別的差距,Get性能差異不大,因此對于絕大多數場景來說不用太關注Set/Get性能,重點應該看功能是否滿足業務需求和gc影響,性能壓測比較見:https://golang2.eddycjy.com/posts/ch5/04-performance/

緩存有一個特殊的場景,那就是將數據全部緩存在內存,涉及到更新時都是全量更新(替換),該場景下使用freecache,如果size未設置好可能導致部分數據被淘汰,是不符合預期的,這個一定要注意。為了使用freecache避免該問題,需要將size設置"足夠大",但也要注意其內存空間占用。

原文鏈接:https://juejin.cn/post/7064359768537694244

欄目分類
最近更新