網站首頁 編程語言 正文
WaitGroup 示例
本文基于 Go 1.19。
go 里面的 WaitGroup
是非常常見的一種并發控制方式,它可以讓我們的代碼等待一組 goroutine 的結束。 比如在主協程中等待幾個子協程去做一些耗時的操作,如發起幾個 HTTP 請求,然后等待它們的結果。
下面的代碼展示了一個 goroutine 等待另外 2 個 goroutine 結束的例子:
func TestWaitgroup(t *testing.T) { var wg sync.WaitGroup // 計數器 +2 wg.Add(2) go func() { sendHttpRequest("https://baidu.com") // 計數器 -1 wg.Done() }() go func() { sendHttpRequest("https://baidu.com") // 計數器 -1 wg.Done() }() // 阻塞。計數器為 0 的時候,Wait 返回 wg.Wait() } // 發起 HTTP GET 請求 func sendHttpRequest(url string) (string, error) { method := "GET" client := &http.Client{} req, err := http.NewRequest(method, url, nil) if err != nil { return "", err } res, err := client.Do(req) if err != nil { return "", err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { return "", err } return string(body), err }
在這個例子中,我們做了如下事情:
- 定義了一個
WaitGroup
對象wg
,調用wg.Add(2)
將其計數器+2
。 - 啟動兩個新的 goroutine,在這兩個 goroutine 中,使用
sendHttpRequest
函數發起了一個 HTTP 請求。 - 在 HTTP 請求返回之后,調用
wg.Done
將計數器-1
。 - 在函數的最后,我們調用了
wg.Wait
,這個方法會阻塞,直到WaitGroup
的計數器的值為 0 才會解除阻塞狀態。
WaitGroup 基本原理
WaitGroup
內部通過一個計數器來統計有多少協程被等待。這個計數器的值在我們啟動 goroutine 之前先寫入(使用 Add
方法), 然后在 goroutine 結束的時候,將這個計數器減 1(使用 Done
方法)。除此之外,在啟動這些 goroutine 的協程中, 會調用 Wait
來進行等待,在 Wait
調用的地方會阻塞,直到 WaitGroup
內部的計數器減到 0。 也就實現了等待一組 goroutine 的目的
背景知識
在操作系統中,有多種實現進程/線程間同步的方式,如:test_and_set
、compare_and_swap
、互斥鎖等。 除此之外,還有一種是信號量,它的功能類似于互斥鎖,但是它能提供更為高級的方法,以便進程能夠同步活動。
信號量
一個信號量(semaphore)S是一個整型變量,它除了初始化外只能通過兩個標準的原子操作:wait()
和 signal()
來訪問。 操作 wait()
最初稱為 P
(荷蘭語 proberen
,測試);操作 signal()
最初稱為 V
(荷蘭語 verhogen
,增加),可按如下來定義 wait()
:
PV 原語。
wait(S) { while (S <= 0) ; // 忙等待 S--; }
可按如下來定義 signal()
:
signal(S) { S++; }
在 wait()
和 signal()
操作中,信號量整數值的修改應不可分割地執行。也就是說,當一個進程修改信號量值時,沒有其他進程能夠同時修改同一信號量的值。
簡單來說,信號量實現的功能是:
- 當信號量>0 時,表示資源可用,則
wait
會對信號量執行減 1 操作。 - 當信號量<=0 時,表示資源暫時不可用,獲取信號量時,當前的進程/線程會阻塞,直到信號量為正時被喚醒。
WaitGroup 中的信號量
在 WaitGroup
中,使用了信號量來實現 goroutine 的阻塞以及喚醒:
- 在調用
Wait
的地方,goroutine 會陷入阻塞,直到信號量大于等于 0 的時候解除阻塞狀態,得以繼續執行。 - 在調用
Done
的時候,如果WaitGroup
內的等待協程的計數器減到 0 的時候,信號量會進行遞增,這樣那些阻塞的協程會進行執行下去。
WaitGroup 數據結構
type WaitGroup struct { noCopy noCopy // 高 32 位為計數器,低 32 位為等待者數量 state atomic.Uint64 sema uint32 }
noCopy
我們發現,WaitGroup
中有一個字段 noCopy
,顧名思義,它的目的是防止復制。 這個字段在運行時是沒有什么影響的,但是我們通過 go vet
可以發現我們對 WaitGroup
的復制。 為什么不能復制呢?因為一旦復制,WaitGroup
內的計數器就不再準確了,比如下面這個例子:
func test(wg sync.WaitGroup) { wg.Done() } func TestWaitGroup(t *testing.T) { var wg sync.WaitGroup wg.Add(1) test(wg) wg.Wait() }
go 里面的函數參數傳遞是值傳遞。調用 test(wg) 的時候將 WaitGroup
復制了一份。
在這個例子中,程序會永遠阻塞下去,因為 test
中調用 wg.Done()
的時候,只是將 WaitGroup
副本的計數器減去了 1, 而 TestWaitGroup
里面的 WaitGroup
的計數器并沒有發生改變,因此 Wait
會永遠阻塞。
我們如果需要將 WaitGroup
作為參數,請傳遞指針:
func test(wg *sync.WaitGroup) { wg.Done() }
傳遞指針之后,我們在 test
中調用 wg.Done()
修改的就是 TestWaitGroup
里面同一個 WaitGroup
。 從而,Wait
方法可以正常返回。
state
WaitGroup
里面的 state
是一個 64 位的 atomic.Uint64
類型,它的高 32 位用來保存 counter
(也就是上面說的計數器),低 32 位用來保存 waiter
(也就是阻塞在 Wait
上的 goroutine 數量。)
sema
WaitGroup
通過 sema
來記錄信號量:
-
runtime_Semrelease
表示將信號量遞增(對應信號量中的signal
操作) -
runtime_Semacquire
表示將信號量遞減(對應信號量中的wait
操作)
簡單來說,在調用 runtime_Semacquire
的時候 goroutine 會阻塞,而調用 runtime_Semrelease
會喚醒阻塞在同一個信號量上的 goroutine。
WaitGroup 的三個基本操作
-
Add
: 這會將WaitGroup
里面的counter
加上一個整數(也就是傳遞給Add
的函數參數)。 -
Done
: 這會將WaitGroup
里面的counter
減去 1。 -
Wait
: 這會將WaitGroup
里面的waiter
加上 1,并且調用Wait
的地方會阻塞。(有可能會有多個 goroutine 等待一個WaitGroup
)
WaitGroup 的實現
Add 的實現
Add
做了下面兩件事:
- 將
delta
加到state
的高 32 位上 - 如果
counter
為0
了,并且waiter
大于 0,表示所有被等待的 goroutine 都完成了,而還有在等待的 goroutine,這會喚醒那些阻塞在Wait
上的 goroutine。
源碼實現:
func (wg *WaitGroup) Add(delta int) { // wg.state 的計數器加上 delta //(加到 state 的高 32 上) state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta v := int32(state >> 32) // 高 32 位(counter) w := uint32(state) // 低 32 位(waiter) // 計數器不能為負數(加上 delta 之后不能為負數,最小只能到 0) if v < 0 { panic("sync: negative WaitGroup counter") } // 正常使用情況下,是先調用 Add 再調用 Wait 的,這種情況下,w 是 0,v > 0 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v > 0,計數器大于 0 // w == 0,沒有在 Wait 的協程 // 說明還沒有到喚醒 waiter 的時候 if v > 0 || w == 0 { return } // Add 負數的時候,v 會減去對應的數值,減到最后 v 是 0。 // 計數器是 0,并且有等待的協程,現在要喚醒這些協程。 // 存在等待的協程時,goroutine 已將計數器設置為0。 // 現在不可能同時出現狀態突變: // - Add 不能與 Wait 同時發生, // - 如果看到計數器==0,則 Wait 不會增加等待的協程。 // 仍然要做一個廉價的健康檢查,以檢測 WaitGroup 的誤用。 if wg.state.Load() != state { // 不能在 Add 的同時調用 Wait panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 將等待的協程數量設置為 0。 wg.state.Store(0) for ; w != 0; w-- { // signal,調用 Wait 的地方會解除阻塞 runtime_Semrelease(&wg.sema, false, 0) // goyield } }
Done 的實現
WaitGroup
里的 Done
其實只是對 Add
的調用,但是它的效果是,將計數器的值減去 1。 背后的含義是:一個被等待的協程執行完畢了。
Wait 的實現
Wait
主要功能是阻塞當前的協程:
-
Wait
會先判斷計數器是否為0
,為0
說明沒有任何需要等待的協程,那么就可以直接返回了。 - 如果計數器還不是
0
,說明有協程還沒執行完,那么調用Wait
的地方就需要被阻塞起來,等待所有的協程完成。
源碼實現:
func (wg *WaitGroup) Wait() { for { // 獲取當前計數器 state := wg.state.Load() // 計數器 v := int32(state >> 32) // waiter 數量 w := uint32(state) // v 為 0,不需要等待,直接返回 if v == 0 { // 計數器是 0,不需要等待 return } // 增加 waiter 數量。 // 調用一次 Wait,waiter 數量會加 1。 if wg.state.CompareAndSwap(state, state+1) { // 這會阻塞,直到 sema (信號量)大于 0 runtime_Semacquire(&wg.sema) // goparkunlock // state 不等 0 // wait 還沒有返回又繼續使用了 WaitGroup if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 解除阻塞狀態了,可以返回了 return } // 狀態沒有修改成功(state 沒有成功 +1),開始下一次嘗試。 } }
總結
-
WaitGroup
使用了信號量來實現了并發資源控制,sema
字段表示信號量。 - 使用
runtime_Semacquire
會使得 goroutine 阻塞直到計數器減少至0
,而使用runtime_Semrelease
會使得信號量遞增,這等于是通知之前阻塞在信號量上的協程,告訴它們可以繼續執行了。 -
WaitGroup
作為參數傳遞的時候,需要傳遞指針作為參數,否則在被調用函數內對Add
或者Done
的調用,在caller
里面調用的Wait
會觀測不到。 -
WaitGroup
使用一個 64 位的數來保存計數器(高 32 位)和waiter
(低 32 位,正在等待的協程的數量)。 -
WaitGroup
使用Add
增加計數器,使用Done
來將計數器減1
,使用Wait
來等待 goroutine。Wait
會阻塞直到計數器減少到0
。
原文鏈接:https://juejin.cn/post/7181812988461252667
相關推薦
- 2021-10-09 C#?將Excel轉為PDF時自定義表格紙張大小的代碼思路_C#教程
- 2022-08-19 Python運行時修改業務SQL代碼_python
- 2022-10-05 Python實現單例模式的五種寫法總結_python
- 2022-12-08 python?datetime?和時間戳互相轉換問題_python
- 2022-08-22 C++貪心算法處理多機調度問題詳解_C 語言
- 2022-07-31 Android?中的類文件和類加載器詳情_Android
- 2022-04-27 Python線程之線程安全的隊列Queue_python
- 2022-04-01 17條提高工作效率的Python技巧分享_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支