網(wǎng)站首頁 編程語言 正文
這個(gè)東西很重要,可以經(jīng)常用在項(xiàng)目當(dāng)中,所以我們單獨(dú)拿出來進(jìn)行講解。
在使用它之前我們需要導(dǎo)包:
?go get golang.org/x/sync/singleflight
golang/sync/singleflight.Group
是 Go 語言擴(kuò)展包中提供了另一種同步原語,它能夠在一個(gè)服務(wù)中抑制對下游的多次重復(fù)請求。一個(gè)比較常見的使用場景是:我們在使用 Redis 對數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行緩存,發(fā)生緩存擊穿時(shí),大量的流量都會(huì)打到數(shù)據(jù)庫上進(jìn)而影響服務(wù)的尾延時(shí)。
但是 golang/sync/singleflight.Group
能有效地解決這個(gè)問題,它能夠限制對同一個(gè)鍵值對的多次重復(fù)請求,減少對下游的瞬時(shí)流量。
使用方法
singleflight
類的使用方法就新建一個(gè)singleflight.Group
,使用其方法Do
或者DoChan
來包裝方法,被包裝的方法在對于同一個(gè)key,只會(huì)有一個(gè)協(xié)程執(zhí)行,其他協(xié)程等待那個(gè)協(xié)程執(zhí)行結(jié)束后,拿到同樣的結(jié)果。
Group
結(jié)構(gòu)體
代表一類工作,同一個(gè)group
中,同樣的key
同時(shí)只能被執(zhí)行一次
Do
方法
func (g *Group) Do(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) (v interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, err error, shared bool)
key
:同一個(gè)key
,同時(shí)只有一個(gè)協(xié)程執(zhí)行
fn
:被包裝的函數(shù)
v
:返回值,即執(zhí)行結(jié)果。其他等待的協(xié)程都會(huì)拿到
shared
:表示是否由其他協(xié)程得到了這個(gè)結(jié)果v
DoChan
方法
func (g *Group) DoChan(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) <-chan Result
和Do
差不多其實(shí),因此我們就只講解Do
的實(shí)際應(yīng)用場景了。
具體應(yīng)用場景
var singleSetCache singleflight.Group func GetAndSetCache(r *http.Request, cacheKey string) (string, error) { log.Printf("request %s start to get and set cache...", r.URL) value, err, _ := singleSetCache.Do(cacheKey, func() (interface{}, error) { log.Printf("request %s is getting cache...", r.URL) time.Sleep(3 * time.Second) log.Printf("request %s get cache success!", r.URL) return cacheKey, nil }) return value.(string), err } func main() { r := gin.Default() r.GET("/sekill/:id", func(context *gin.Context) { ID := context.Param("id") cache, err := GetAndSetCache(context.Request, ID) if err != nil { log.Println(err) } log.Printf("request %s get value: %v", context.Request.URL, cache) }) r.Run() }
來看一下執(zhí)行結(jié)果:
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:19 request /sekill/9 start to get and set cache...
2022/12/29 16:21:19 request /sekill/5 start to get and set cache...
2022/12/29 16:21:21 request /sekill/9 get cache success!
2022/12/29 16:21:21 request /sekill/5 get cache success!
2022/12/29 16:21:21 request /sekill/5 get value: 5
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?3.0106529s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?2.8090881s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?2.2166003s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?2.6064069s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?2.4178652s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?2.8101267s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?3.0116892s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?2.6074537s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/5"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ?2.4076473s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/5"
[GIN] 2022/12/29 - 16:21:21 | 200 | ? ? 2.218686s | ? ? ? 127.0.0.1 | GET ? ? ?"/sekill/5"
可以看到確實(shí)只有一個(gè)協(xié)程執(zhí)行了被包裝的函數(shù),并且其他協(xié)程都拿到了結(jié)果。
接下來我們來看一下它的原理吧!
原理
首先來看一下Group
結(jié)構(gòu)體:
type Group struct { mu sync.Mutex // 鎖保證并發(fā)安全 m map[string]*call //保存key對應(yīng)的函數(shù)執(zhí)行過程和結(jié)果的變量。 }
然后我們來看一下call
結(jié)構(gòu)體:
type call struct { wg sync.WaitGroup //用WaitGroup實(shí)現(xiàn)只有一個(gè)協(xié)程執(zhí)行函數(shù) val interface{} //函數(shù)執(zhí)行結(jié)果 err error forgotten bool dups int //含義是duplications,即同時(shí)執(zhí)行同一個(gè)key的協(xié)程數(shù)量 chans []chan<- Result }
然后我們來看一下Do
方法:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { // 寫Group的m字段時(shí),加鎖保證寫安全 g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { // 如果key已經(jīng)存在,說明已經(jīng)由協(xié)程在執(zhí)行,則dups++并等待其執(zhí)行結(jié)果,執(zhí)行結(jié)果保存在對應(yīng)的call的val字段里 c.dups++ g.mu.Unlock() c.wg.Wait() if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } // 如果key不存在,則新建一個(gè)call,并使用WaitGroup來阻塞其他協(xié)程,同時(shí)在m字段里寫入key和對應(yīng)的call c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) // 進(jìn)來的第一個(gè)協(xié)程就來執(zhí)行這個(gè)函數(shù) return c.val, c.err, c.dups > 0 }
然后我們來分析一下doCall
函數(shù):
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete(g.m, key) for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } g.mu.Unlock() }
- 運(yùn)行傳入的函數(shù)
fn
,該函數(shù)的返回值會(huì)賦值給c.val
和c.err
; - 調(diào)用
sync.WaitGroup.Done
方法通知所有等待結(jié)果的Goroutine
— 當(dāng)前函數(shù)已經(jīng)執(zhí)行完成,可以從call
結(jié)構(gòu)體中取出返回值并返回了; - 獲取持有的互斥鎖并通過管道將信息同步給使用
golang/sync/singleflight.Group.DoChan
方法的Goroutine
;
問題分析
分析了源碼之后,我們得出了一個(gè)結(jié)論,這個(gè)東西是用阻塞來實(shí)現(xiàn)的,這就引發(fā)了一個(gè)問題:如果我們處理的那個(gè)請求剛好遇到問題了,那么后面的所有請求都會(huì)被阻塞,也就是,我們應(yīng)該加上適合的超時(shí)控制,如果在一定時(shí)間內(nèi),沒有獲得結(jié)果,那么就當(dāng)作超時(shí)處理。
于是這個(gè)適合我們應(yīng)該使用DoChan()
。兩者實(shí)現(xiàn)上完全一樣,不同的是, DoChan()
通過 channel
返回結(jié)果。因此可以使用 select
語句實(shí)現(xiàn)超時(shí)控制。
var singleSetCache singleflight.Group func GetAndSetCache(r *http.Request, cacheKey string) (string, error) { log.Printf("request %s start to get and set cache...", r.URL) retChan := singleSetCache.DoChan(cacheKey, func() (interface{}, error) { log.Printf("request %s is getting cache...", r.URL) time.Sleep(3 * time.Second) log.Printf("request %s get cache success!", r.URL) return cacheKey, nil }) var ret singleflight.Result timeout := time.After(2 * time.Second) select { case <-timeout: log.Println("time out!") return "", errors.New("time out") case ret = <-retChan: // 從chan中獲取結(jié)果 return ret.Val.(string), ret.Err } } func main() { r := gin.Default() r.GET("/sekill/:id", func(context *gin.Context) { ID := context.Param("id") cache, err := GetAndSetCache(context.Request, ID) if err != nil { log.Println(err) } log.Printf("request %s get value: %v", context.Request.URL, cache) }) r.Run() }
補(bǔ)充
這里其實(shí)還有一個(gè)Forget
方法,它可以在映射表中刪除某個(gè)鍵,接下來對鍵的調(diào)用就不會(huì)等待前面的函數(shù)返回了。
總結(jié)
當(dāng)然,如果單次的失敗無法容忍,在高并發(fā)的場景下更好的處理方案是:
放棄使用同步請求,犧牲數(shù)據(jù)更新的實(shí)時(shí)性
“緩存” 存儲(chǔ)準(zhǔn)實(shí)時(shí)的數(shù)據(jù) + “異步更新” 數(shù)據(jù)到緩存
原文鏈接:https://blog.csdn.net/qq_61039408/article/details/128484679
相關(guān)推薦
- 2022-05-18 C/C++實(shí)現(xiàn)segy文件的讀取詳解_C 語言
- 2022-05-15 C++11:搞清楚萬能引用和右值引用
- 2022-09-23 MongoDB慢查詢與索引實(shí)例詳解_MongoDB
- 2022-07-11 搭建spring MVC框架,完成和servlet相似的操作
- 2022-08-04 淺析.net?core?拋異常對性能影響_實(shí)用技巧
- 2022-07-20 關(guān)于numpy強(qiáng)制類型轉(zhuǎn)換的問題_python
- 2023-05-30 Jquery使用原生AJAX方法請求數(shù)據(jù)_jquery
- 2022-03-11 UE4 WorldComposition加載Level與位置偏移代碼分析
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支