網(wǎng)站首頁 編程語言 正文
前言
在前面的文章中,我們使用過?WaitGroup
?進(jìn)行任務(wù)編排,Go語言中的?WaitGroup
?和 Java 中的?CyclicBarrier
、CountDownLatch
?非常類似。比如我們有一個主任務(wù)在執(zhí)行,執(zhí)行到某一點(diǎn)時需要并行執(zhí)行三個子任務(wù),并且需要等到三個子任務(wù)都執(zhí)行完后,再繼續(xù)執(zhí)行主任務(wù)。那我們就需要設(shè)置一個檢查點(diǎn),使主任務(wù)一直阻塞在這,等三個子任務(wù)執(zhí)行完后再放行。
說明:本文中的示例,均是基于Go1.17 64位機(jī)器
小試牛刀
我們先來個簡單的例子,看下?WaitGroup
?是怎么使用的。示例中使用?Add(5)
?表示我們有 5個 子任務(wù),然后起了 5個 協(xié)程去完成任務(wù),主協(xié)程使用?Wait()
?方法等待 子協(xié)程執(zhí)行完畢,輸出一共等待的時間。
func main() { var waitGroup sync.WaitGroup start := time.Now() waitGroup.Add(5) for i := 0; i < 5; i++ { go func() { defer waitGroup.Done() time.Sleep(time.Second) fmt.Println("done") }() } waitGroup.Wait() fmt.Println(time.Now().Sub(start).Seconds()) } /* done done done done done 1.000306089 */
總覽
WaitGroup 一共有三個方法:
(wg *WaitGroup) Add(delta int) (wg *WaitGroup) Done() (wg *WaitGroup) Wait()
-
Add
?方法用于設(shè)置 WaitGroup 的計(jì)數(shù)值,可以理解為子任務(wù)的數(shù)量 -
Done
?方法用于將 WaitGroup 的計(jì)數(shù)值減一,可以理解為完成一個子任務(wù) -
Wait
?方法用于阻塞調(diào)用者,直到 WaitGroup 的計(jì)數(shù)值為0,即所有子任務(wù)都完成
正常來說,我們使用的時候,需要先確定子任務(wù)的數(shù)量,然后調(diào)用 Add() 方法傳入相應(yīng)的數(shù)量,在每個子任務(wù)的協(xié)程中,調(diào)用 Done(),需要等待的協(xié)程調(diào)用 Wait() 方法,狀態(tài)流轉(zhuǎn)如下圖:
底層實(shí)現(xiàn)
結(jié)構(gòu)體
type WaitGroup struct { noCopy noCopy // noCopy 字段標(biāo)識,由于 WaitGroup 不能復(fù)制,方便工具檢測 state1 [3]uint32 // 12個字節(jié),8個字節(jié)標(biāo)識 計(jì)數(shù)值和等待數(shù)量,4個字節(jié)用于標(biāo)識信號量 }
state1
?是個復(fù)合字段,會拆分為兩部分:?64位(8個字節(jié))的?statep
?作為一個整體用于原子操作, 其中前面4個字節(jié)表示計(jì)數(shù)值,后面四個字節(jié)表示等待數(shù)量;剩余?32位(4個字節(jié))semap
?用于標(biāo)識信號量。
Go語言中對于64位的變量進(jìn)行原子操作,需要保證該變量是64位對齊的,也就是要保證這 8個字節(jié) 的首地址是 8 的整數(shù)倍。因此當(dāng)?state1
?的首地址是 8 的整數(shù)倍時,取前8個字節(jié)作為?statep
?,后4個字節(jié)作為?semap
;當(dāng)?state1
?的首地址不是 8 的整數(shù)倍時,取后8個字節(jié)作為?statep
?,前4個字節(jié)作為?semap
。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { // 首地址是8的倍數(shù)時,前8個字節(jié)為 statep, 后四個字節(jié)為 semap if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { // 后8個字節(jié)為 statep, 前四個字節(jié)為 semap return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
Add
-
Add
?方法用于添加一個計(jì)數(shù)值(負(fù)數(shù)相當(dāng)于減),當(dāng)計(jì)數(shù)值變?yōu)?后,?Wait
?方法阻塞的所有等待者都會被釋放 - 計(jì)數(shù)值變?yōu)樨?fù)數(shù)是非法操作,產(chǎn)生?
panic
- 當(dāng)計(jì)數(shù)值為0時(初始狀態(tài)),
Add
?方法不能和?Wait
?方法并發(fā)調(diào)用,需要保證?Add
?方法在?Wait
?方法之前
調(diào)用,否則會?panic
func (wg *WaitGroup) Add(delta int) { // 拿到計(jì)數(shù)值等待者變量 statep 和 信號量 semap statep, semap := wg.state() // 計(jì)數(shù)值加上 delta: statep 的前四個字節(jié)是計(jì)數(shù)值,因此將 delta 前移 32位 state := atomic.AddUint64(statep, uint64(delta)<<32) // 計(jì)數(shù)值 v := int32(state >> 32) // 等待者數(shù)量 w := uint32(state) // 如果加上 delta 之后,計(jì)數(shù)值變?yōu)樨?fù)數(shù),不合法,panic if v < 0 { panic("sync: negative WaitGroup counter") } // delta > 0 && v == int32(delta) : 表示從 0 開始添加計(jì)數(shù)值 // w!=0 :表示已經(jīng)有了等待者 // 說明在添加計(jì)數(shù)值的時候,同時添加了等待者,非法操作。添加等待者需要在添加計(jì)數(shù)值之后 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v>0 : 計(jì)數(shù)值不等于0,不需要喚醒等待者,直接返回 // w==0: 沒有等待者,不需要喚醒,直接返回 if v > 0 || w == 0 { return } // 再次檢查數(shù)據(jù)是否一致 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 到這里說明計(jì)數(shù)值為0,且等待者大于0,需要喚醒所有的等待者,并把系統(tǒng)置為初始狀態(tài)(0狀態(tài)) // 將計(jì)數(shù)值和等待者數(shù)量都置為0 *statep = 0 // 喚醒等待者 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
Done
// 完成一個任務(wù),將計(jì)數(shù)值減一,當(dāng)計(jì)數(shù)值減為0時,需要喚醒所有的等待者 func (wg *WaitGroup) Done() { wg.Add(-1) }
Wait
// 調(diào)用 Wait 方法會被阻塞,直到 計(jì)數(shù)值 變?yōu)? func (wg *WaitGroup) Wait() { // 獲取計(jì)數(shù)、等待數(shù)和信號量 statep, semap := wg.state() for { state := atomic.LoadUint64(statep) // 計(jì)數(shù)值 v := int32(state >> 32) // 等待者數(shù)量 w := uint32(state) // 計(jì)數(shù)值數(shù)量為0,直接返回,無需等待 if v == 0 { return } // 到這里說明計(jì)數(shù)值數(shù)量大于0 // 增加等待者數(shù)量:這里會有競爭,比如多個 Wait 調(diào)用,或者在同時調(diào)用 Add 方法,增加不成功會繼續(xù) for 循環(huán) if atomic.CompareAndSwapUint64(statep, state, state+1) { // 增加成功后,阻塞在信號量這里,等待被喚醒 runtime_Semacquire(semap) // 被喚醒的時候,應(yīng)該是0狀態(tài)。如果重用 WaitGroup,需要等 Wait 返回 if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
易錯點(diǎn)
上面分析源碼可以看到幾個會產(chǎn)生?panic
?的點(diǎn),這也是我們使用?WaitGroup
?需要注意的地方
1.計(jì)數(shù)值變?yōu)樨?fù)數(shù)
調(diào)用 Add 時參數(shù)值傳負(fù)數(shù)
func main() { var wg sync.WaitGroup wg.Add(1) wg.Add(-1) wg.Add(-1) }
多次調(diào)用 Done 方法
func main() { var wg sync.WaitGroup wg.Add(1) go func() { fmt.Println("test") wg.Done() wg.Done() }() time.Sleep(time.Second) wg.Wait() }
2.Add 和 Wait 并發(fā)調(diào)用
Add
?和?Wait
?并發(fā)調(diào)用,有可能達(dá)不到我們預(yù)期的效果,甚至?panic
。如下示例中,我們想要等待 3 個子任務(wù)都執(zhí)行完后再執(zhí)行主任務(wù),但實(shí)際情況可能是子任務(wù)還沒起來,主任務(wù)就繼續(xù)往下執(zhí)行了。
func doSomething(wg *sync.WaitGroup) { wg.Add(1) fmt.Println("do something") defer wg.Done() } func main() { var wg sync.WaitGroup for i := 0; i < 3; i++ { go doSomething(&wg) } wg.Wait() fmt.Println("main") } //main //do something //do something
正確的使用方式,應(yīng)該是在調(diào)用?Wait
?前先調(diào)用?Add
func doSomething(wg *sync.WaitGroup) { defer wg.Done() fmt.Println("do something") } func main() { var wg sync.WaitGroup wg.Add(3) for i := 0; i < 3; i++ { go doSomething(&wg) } wg.Wait() fmt.Println("main") } //do something //do something //do something //main
3.沒有等 Wait 返回,就重用 WaitGroup
func main() { var wg sync.WaitGroup wg.Add(1) go func() { fmt.Println("do something") wg.Done() wg.Add(1) }() wg.Wait() }
4.復(fù)制使用
我們知道 Go 語言中的參數(shù)傳遞,都是值傳遞,就會產(chǎn)生復(fù)制操作。因此在向函數(shù)傳遞 WaitGroup 時,使用指針進(jìn)行操作。
// 錯誤使用方式,沒有使用指針 func doSomething(wg sync.WaitGroup) { fmt.Println("do something") defer wg.Done() } func main() { var wg sync.WaitGroup wg.Add(3) for i := 0; i < 3; i++ { // 這里沒使用指針,wg狀態(tài)一直不會改變,導(dǎo)致 Wait 一直阻塞 go doSomething(wg) } wg.Wait() fmt.Println("main") }
總結(jié)
我們通過源碼+示例的方式,一起學(xué)習(xí)了?sync.WaitGroup
?實(shí)現(xiàn)邏輯,同時也給出了一些注意點(diǎn),只要做到如下操作,就不會出現(xiàn)問題:
- 保證 Add 在 Wait 前調(diào)用
- Add 中不傳遞負(fù)數(shù)
- 任務(wù)完成后不要忘記調(diào)用 Done 方法,建議使用 defer wg.Done()
- 不要復(fù)制使用 WaitGroup,函數(shù)傳遞時使用指針傳遞
- 盡量不復(fù)用 WaigGroup,減少出問題的風(fēng)險
原文鏈接:https://segmentfault.com/a/1190000041968136
相關(guān)推薦
- 2022-03-10 Android如何獲取APP啟動時間_Android
- 2022-12-21 Python?eval()和exec()函數(shù)使用詳解_python
- 2022-02-09 linux下源碼包安裝的服務(wù)管理_Linux
- 2022-06-29 pytorch部署到j(luò)upyter中的問題及解決方案_python
- 2022-07-18 properties配置文件解耦合連接數(shù)據(jù)庫的原理
- 2022-12-10 深入了解C++11中promise和future的使用_C 語言
- 2023-02-23 GoLang的sync.WaitGroup與sync.Once簡單使用講解_Golang
- 2022-07-03 如何讓Python在HTML中運(yùn)行_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支