網站首頁 編程語言 正文
go開發緩存場景一般使用map或者緩存框架,為了線程安全會使用sync.Map或線程安全的緩存框架。
緩存場景中如果數據量大于百萬級別,需要特別考慮數據類型對于gc的影響(注意string類型底層是指針+Len+Cap,因此也算是指針類型),如果緩存key和value都是非指針類型的話就無需多慮了。但實際應用場景中,key和value是(包含)指針類型數據是很常見的,因此使用緩存框架需要特別注意其對gc影響,從是否對GC影響角度來看緩存框架大致分為2類:
- 零GC開銷:比如freecache或bigcache這種,底層基于ringbuf,減小指針個數;
- 有GC開銷:直接基于Map來實現的緩存框架。
對于map而言,gc時會掃描所有key/value鍵值對,如果其都是基本類型,那么gc便不會再掃描。下面以freecache為例分析下其實現原理,代碼示例如下:
func main() { ? ?cacheSize := 100 * 1024 * 1024 ? ?cache := freecache.NewCache(cacheSize) ? ?for i := 0; i < N; i++ { ? ? ? str := strconv.Itoa(i) ? ? ? _ = cache.Set([]byte(str), []byte(str), 1) ? ?} ? ?now := time.Now() ? ?runtime.GC() ? ?fmt.Printf("freecache, GC took: %s\n", time.Since(now)) ? ?_, _ = cache.Get([]byte("aa")) ? ?now = time.Now() ? ?for i := 0; i < N; i++ { ? ? ? str := strconv.Itoa(i) ? ? ? _, _ = cache.Get([]byte(str)) ? ?} ? ?fmt.Printf("freecache, Get took: %s\n\n", time.Since(now)) }
1 初始化
freecache.NewCache會初始化本地緩存,size表示存儲空間大小,freecache會初始化256個segment,每個segment是獨立的存儲單元,freecache加鎖維度也是基于segment的,每個segment有一個ringbuf,初始大小為size/256。freecache號稱零GC的來源就是其指針是固定的,只有512個,每個segment有2個,分別是rb和slotData(注意切片為指針類型)。
type segment struct { ? ?rb ? ? ? ? ? ?RingBuf // ring buffer that stores data ? ?segId ? ? ? ? int ? ?_ ? ? ? ? ? ? uint32 ?// 占位 ? ?missCount ? ? int64 ? ?hitCount ? ? ?int64 ? ?entryCount ? ?int64 ? ?totalCount ? ?int64 ? ? ?// number of entries in ring buffer, including deleted entries. ? ?totalTime ? ? int64 ? ? ?// used to calculate least recent used entry. ? ?timer ? ? ? ? Timer ? ? ?// Timer giving current time ? ?totalEvacuate int64 ? ? ?// used for debug ? ?totalExpired ?int64 ? ? ?// used for debug ? ?overwrites ? ?int64 ? ? ?// used for debug ? ?touched ? ? ? int64 ? ? ?// used for debug ? ?vacuumLen ? ? int64 ? ? ?// up to vacuumLen, new data can be written without overwriting old data. ? ?slotLens ? ? ?[256]int32 // The actual length for every slot. ? ?slotCap ? ? ? int32 ? ? ?// max number of entry pointers a slot can hold. ? ?slotsData ? ? []entryPtr // 索引指針 } func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) { ? ? cache = new(Cache) ? ? for i := 0; i < segmentCount; i++ { ? ? ? ? cache.segments[i] = newSegment(size/segmentCount, i, timer) ? ? } } func newSegment(bufSize int, segId int, timer Timer) (seg segment) { ? ? seg.rb = NewRingBuf(bufSize, 0) ? ? seg.segId = segId ? ? seg.timer = timer ? ? seg.vacuumLen = int64(bufSize) ? ? seg.slotCap = 1 ? ? seg.slotsData = make([]entryPtr, 256*seg.slotCap) // 每個slotData初始化256個單位大小 }
2 讀寫流程
freecache的key和value都是[]byte數組,使用時需要自行序列化和反序列化,如果緩存復雜對象不可忽略其序列化和反序列化帶來的影響,首先看下Set流程:
_ = cache.Set([]byte(str), []byte(str), 1)
Set流程首先對key進行hash,hashVal類型uint64,其低8位segID對應segment數組,低8-15位表示slotId對應slotsData下標,高16位表示slotsData下標對應的[]entryPtr某個數據,這里需要查找操作。注意[]entryPtr數組大小為slotCap(初始為1),當擴容時會slotCap倍增。
每個segment對應一個lock(sync.Mutex),因此其能夠支持較大并發量,而不像sync.Map只有一個鎖。
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { ? ?hashVal := hashFunc(key) ? ?segID := hashVal & segmentAndOpVal // 低8位 ? ?cache.locks[segID].Lock() // 加鎖 ? ?err = cache.segments[segID].set(key, value, hashVal, expireSeconds) ? ?cache.locks[segID].Unlock() } func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) { ? ?slotId := uint8(hashVal >> 8) ? ?hash16 := uint16(hashVal >> 16) ? ?slot := seg.getSlot(slotId) ? ?idx, match := seg.lookup(slot, hash16, key) ? ?var hdrBuf [ENTRY_HDR_SIZE]byte ? ?hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) ? ?if match { // 有數據更新操作 ? ? ? matchedPtr := &slot[idx] ? ? ? seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset) ? ? ? hdr.slotId = slotId ? ? ? hdr.hash16 = hash16 ? ? ? hdr.keyLen = uint16(len(key)) ? ? ? originAccessTime := hdr.accessTime ? ? ? hdr.accessTime = now ? ? ? hdr.expireAt = expireAt ? ? ? hdr.valLen = uint32(len(value)) ? ? ? if hdr.valCap >= hdr.valLen { ? ? ? ? ?// 已存在數據value空間能存下此次value大小 ? ? ? ? ?atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime)) ? ? ? ? ?seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) ? ? ? ? ?seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) ? ? ? ? ?atomic.AddInt64(&seg.overwrites, 1) ? ? ? ? ?return ? ? ? } ? ? ? // 刪除對應entryPtr,涉及到slotsData內存copy,ringbug中只是標記刪除 ? ? ? seg.delEntryPtr(slotId, slot, idx) ? ? ? match = false ? ? ? // increase capacity and limit entry len. ? ? ? for hdr.valCap < hdr.valLen { ? ? ? ? ?hdr.valCap *= 2 ? ? ? } ? ? ? if hdr.valCap > uint32(maxKeyValLen-len(key)) { ? ? ? ? ?hdr.valCap = uint32(maxKeyValLen - len(key)) ? ? ? } ? ?} else { // 無數據 ? ? ? hdr.slotId = slotId ? ? ? hdr.hash16 = hash16 ? ? ? hdr.keyLen = uint16(len(key)) ? ? ? hdr.accessTime = now ? ? ? hdr.expireAt = expireAt ? ? ? hdr.valLen = uint32(len(value)) ? ? ? hdr.valCap = uint32(len(value)) ? ? ? if hdr.valCap == 0 { // avoid infinite loop when increasing capacity. ? ? ? ? ?hdr.valCap = 1 ? ? ? } ? ?} ? ? ? ?// 數據實際長度為 ENTRY_HDR_SIZE=24 + key和value的長度 ? ? ? ?entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap) ? ?slotModified := seg.evacuate(entryLen, slotId, now) ? ?if slotModified { ? ? ? // the slot has been modified during evacuation, we need to looked up for the 'idx' again. ? ? ? // otherwise there would be index out of bound error. ? ? ? slot = seg.getSlot(slotId) ? ? ? idx, match = seg.lookup(slot, hash16, key) ? ? ? // assert(match == false) ? ?} ? ?newOff := seg.rb.End() ? ?seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen) ? ?seg.rb.Write(hdrBuf[:]) ? ?seg.rb.Write(key) ? ?seg.rb.Write(value) ? ?seg.rb.Skip(int64(hdr.valCap - hdr.valLen)) ? ?atomic.AddInt64(&seg.totalTime, int64(now)) ? ?atomic.AddInt64(&seg.totalCount, 1) ? ?seg.vacuumLen -= entryLen ? ?return }
seg.evacuate會評估ringbuf是否有足夠空間存儲key/value,如果空間不夠,其會從空閑空間尾部后一位(也就是待淘汰數據的開始位置)開始掃描(oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()),如果對應數據已被邏輯deleted或者已過期,那么該塊內存可以直接回收,如果不滿足回收條件,則將entry從環頭調換到環尾,再更新entry的索引,如果這樣循環5次還是不行,那么需要將當前oldHdrBuf回收以滿足內存需要。
執行完seg.evacuate所需空間肯定是能滿足的,然后就是寫入索引和數據了,insertEntryPtr就是寫入索引操作,當[]entryPtr中元素個數大于seg.slotCap(初始1)時,需要擴容操作,對應方法見seg.expand,這里不再贅述。
寫入ringbuf就是執行rb.Write即可。
func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) { ? ?var oldHdrBuf [ENTRY_HDR_SIZE]byte ? ?consecutiveEvacuate := 0 ? ?for seg.vacuumLen < entryLen { ? ? ? oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size() ? ? ? seg.rb.ReadAt(oldHdrBuf[:], oldOff) ? ? ? oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0])) ? ? ? oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap) ? ? ? if oldHdr.deleted { // 已刪除 ? ? ? ? ?consecutiveEvacuate = 0 ? ? ? ? ?atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) ? ? ? ? ?atomic.AddInt64(&seg.totalCount, -1) ? ? ? ? ?seg.vacuumLen += oldEntryLen ? ? ? ? ?continue ? ? ? } ? ? ? expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now ? ? ? leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime) ? ? ? if expired || leastRecentUsed || consecutiveEvacuate > 5 { ? ? ? // 可以回收 ? ? ? ? ?seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff) ? ? ? ? ?if oldHdr.slotId == slotId { ? ? ? ? ? ? slotModified = true ? ? ? ? ?} ? ? ? ? ?consecutiveEvacuate = 0 ? ? ? ? ?atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) ? ? ? ? ?atomic.AddInt64(&seg.totalCount, -1) ? ? ? ? ?seg.vacuumLen += oldEntryLen ? ? ? ? ?if expired { ? ? ? ? ? ? atomic.AddInt64(&seg.totalExpired, 1) ? ? ? ? ?} else { ? ? ? ? ? ? atomic.AddInt64(&seg.totalEvacuate, 1) ? ? ? ? ?} ? ? ? } else { ? ? ? ? ?// evacuate an old entry that has been accessed recently for better cache hit rate. ? ? ? ? ?newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen)) ? ? ? ? ?seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff) ? ? ? ? ?consecutiveEvacuate++ ? ? ? ? ?atomic.AddInt64(&seg.totalEvacuate, 1) ? ? ? } ? ?} }
freecache的Get流程相對來說簡單點,通過hash找到對應segment,通過slotId找到對應索引slot,然后通過二分+遍歷尋找數據,如果找不到直接返回ErrNotFound,否則更新一些time指標。Get流程還會更新緩存命中率相關指標。
func (cache *Cache) Get(key []byte) (value []byte, err error) { ? ?hashVal := hashFunc(key) ? ?segID := hashVal & segmentAndOpVal ? ?cache.locks[segID].Lock() ? ?value, _, err = cache.segments[segID].get(key, nil, hashVal, false) ? ?cache.locks[segID].Unlock() ? ?return } func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) { ? ?hdr, ptr, err := seg.locate(key, hashVal, peek) // hash+定位查找 ? ?if err != nil { ? ? ? return ? ?} ? ?expireAt = hdr.expireAt ? ?if cap(buf) >= int(hdr.valLen) { ? ? ? value = buf[:hdr.valLen] ? ?} else { ? ? ? value = make([]byte, hdr.valLen) ? ?} ? ?seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) }
定位到數據之后,讀取ringbuf即可,注意一般來說讀取到的value是新創建的內存空間,因此涉及到[]byte數據的復制操作。
3 總結
從常見的幾個緩存框架壓測性能來看,Set性能差異較大但還不是數量級別的差距,Get性能差異不大,因此對于絕大多數場景來說不用太關注Set/Get性能,重點應該看功能是否滿足業務需求和gc影響,性能壓測比較見:https://golang2.eddycjy.com/posts/ch5/04-performance/
緩存有一個特殊的場景,那就是將數據全部緩存在內存,涉及到更新時都是全量更新(替換),該場景下使用freecache,如果size未設置好可能導致部分數據被淘汰,是不符合預期的,這個一定要注意。為了使用freecache避免該問題,需要將size設置"足夠大",但也要注意其內存空間占用。
原文鏈接:https://juejin.cn/post/7064359768537694244
相關推薦
- 2022-06-04 Python學習之魔法函數(filter,map,reduce)詳解_python
- 2022-06-25 python實現人機對戰的井字棋游戲_python
- 2022-04-09 如何利用Python獲取鼠標的實時位置_python
- 2023-01-18 python中的參數類型匹配提醒_python
- 2022-04-18 設置彈性布局的三列兩側對齊,最后一行樣式的處理
- 2022-07-10 如何替換重構依賴里面的Service
- 2022-06-30 C語言中結構體的內存對齊規則講解_C 語言
- 2023-04-04 Swift中的HTTP請求體Request?Bodies使用示例詳解_IOS
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支