日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

go?sync?Waitgroup數據結構實現基本操作詳解_Golang

作者:eleven26 ? 更新時間: 2023-02-09 編程語言

WaitGroup 示例

本文基于 Go 1.19。

go 里面的 WaitGroup 是非常常見的一種并發控制方式,它可以讓我們的代碼等待一組 goroutine 的結束。 比如在主協程中等待幾個子協程去做一些耗時的操作,如發起幾個 HTTP 請求,然后等待它們的結果。

下面的代碼展示了一個 goroutine 等待另外 2 個 goroutine 結束的例子:

func TestWaitgroup(t *testing.T) {
   var wg sync.WaitGroup
   // 計數器 +2
   wg.Add(2)
   go func() {
      sendHttpRequest("https://baidu.com")
      // 計數器 -1
      wg.Done()
   }()
   go func() {
      sendHttpRequest("https://baidu.com")
      // 計數器 -1
      wg.Done()
   }()
   // 阻塞。計數器為 0 的時候,Wait 返回
   wg.Wait()
}
// 發起 HTTP GET 請求
func sendHttpRequest(url string) (string, error) {
   method := "GET"
   client := &http.Client{}
   req, err := http.NewRequest(method, url, nil)
   if err != nil {
      return "", err
   }
   res, err := client.Do(req)
   if err != nil {
      return "", err
   }
   defer res.Body.Close()
   body, err := io.ReadAll(res.Body)
   if err != nil {
      return "", err
   }
   return string(body), err
}

在這個例子中,我們做了如下事情:

  • 定義了一個 WaitGroup 對象 wg,調用 wg.Add(2) 將其計數器 +2
  • 啟動兩個新的 goroutine,在這兩個 goroutine 中,使用 sendHttpRequest 函數發起了一個 HTTP 請求。
  • 在 HTTP 請求返回之后,調用 wg.Done 將計數器 -1
  • 在函數的最后,我們調用了 wg.Wait,這個方法會阻塞,直到 WaitGroup 的計數器的值為 0 才會解除阻塞狀態。

WaitGroup 基本原理

WaitGroup 內部通過一個計數器來統計有多少協程被等待。這個計數器的值在我們啟動 goroutine 之前先寫入(使用 Add 方法), 然后在 goroutine 結束的時候,將這個計數器減 1(使用 Done 方法)。除此之外,在啟動這些 goroutine 的協程中, 會調用 Wait 來進行等待,在 Wait 調用的地方會阻塞,直到 WaitGroup 內部的計數器減到 0。 也就實現了等待一組 goroutine 的目的

背景知識

在操作系統中,有多種實現進程/線程間同步的方式,如:test_and_setcompare_and_swap、互斥鎖等。 除此之外,還有一種是信號量,它的功能類似于互斥鎖,但是它能提供更為高級的方法,以便進程能夠同步活動。

信號量

一個信號量(semaphore)S是一個整型變量,它除了初始化外只能通過兩個標準的原子操作:wait()signal() 來訪問。 操作 wait() 最初稱為 P(荷蘭語 proberen,測試);操作 signal() 最初稱為 V(荷蘭語 verhogen,增加),可按如下來定義 wait()

PV 原語。

wait(S) {
    while (S <= 0)
        ; // 忙等待
    S--;
}

可按如下來定義 signal()

signal(S) {
    S++;
}

wait()signal() 操作中,信號量整數值的修改應不可分割地執行。也就是說,當一個進程修改信號量值時,沒有其他進程能夠同時修改同一信號量的值。

簡單來說,信號量實現的功能是:

  • 當信號量>0 時,表示資源可用,則 wait 會對信號量執行減 1 操作。
  • 當信號量<=0 時,表示資源暫時不可用,獲取信號量時,當前的進程/線程會阻塞,直到信號量為正時被喚醒。

WaitGroup 中的信號量

WaitGroup 中,使用了信號量來實現 goroutine 的阻塞以及喚醒:

  • 在調用 Wait 的地方,goroutine 會陷入阻塞,直到信號量大于等于 0 的時候解除阻塞狀態,得以繼續執行。
  • 在調用 Done 的時候,如果 WaitGroup 內的等待協程的計數器減到 0 的時候,信號量會進行遞增,這樣那些阻塞的協程會進行執行下去。

WaitGroup 數據結構

type WaitGroup struct {
   noCopy noCopy
   // 高 32 位為計數器,低 32 位為等待者數量
   state atomic.Uint64
   sema  uint32
}

noCopy

我們發現,WaitGroup 中有一個字段 noCopy,顧名思義,它的目的是防止復制。 這個字段在運行時是沒有什么影響的,但是我們通過 go vet 可以發現我們對 WaitGroup 的復制。 為什么不能復制呢?因為一旦復制,WaitGroup 內的計數器就不再準確了,比如下面這個例子:

func test(wg sync.WaitGroup) {
   wg.Done()
}
func TestWaitGroup(t *testing.T) {
   var wg sync.WaitGroup
   wg.Add(1)
   test(wg)
   wg.Wait()
}

go 里面的函數參數傳遞是值傳遞。調用 test(wg) 的時候將 WaitGroup 復制了一份。

在這個例子中,程序會永遠阻塞下去,因為 test 中調用 wg.Done() 的時候,只是將 WaitGroup 副本的計數器減去了 1, 而 TestWaitGroup 里面的 WaitGroup 的計數器并沒有發生改變,因此 Wait 會永遠阻塞。

我們如果需要將 WaitGroup 作為參數,請傳遞指針:

func test(wg *sync.WaitGroup) {
   wg.Done()
}

傳遞指針之后,我們在 test 中調用 wg.Done() 修改的就是 TestWaitGroup 里面同一個 WaitGroup。 從而,Wait 方法可以正常返回。

state

WaitGroup 里面的 state 是一個 64 位的 atomic.Uint64 類型,它的高 32 位用來保存 counter(也就是上面說的計數器),低 32 位用來保存 waiter(也就是阻塞在 Wait 上的 goroutine 數量。)

sema

WaitGroup 通過 sema 來記錄信號量:

  • runtime_Semrelease 表示將信號量遞增(對應信號量中的 signal 操作)
  • runtime_Semacquire 表示將信號量遞減(對應信號量中的 wait 操作)

簡單來說,在調用 runtime_Semacquire 的時候 goroutine 會阻塞,而調用 runtime_Semrelease 會喚醒阻塞在同一個信號量上的 goroutine。

WaitGroup 的三個基本操作

  • Add: 這會將 WaitGroup 里面的 counter 加上一個整數(也就是傳遞給 Add 的函數參數)。
  • Done: 這會將 WaitGroup 里面的 counter 減去 1。
  • Wait: 這會將 WaitGroup 里面的 waiter 加上 1,并且調用 Wait 的地方會阻塞。(有可能會有多個 goroutine 等待一個 WaitGroup

WaitGroup 的實現

Add 的實現

Add 做了下面兩件事:

  • delta 加到 state 的高 32 位上
  • 如果 counter0 了,并且 waiter 大于 0,表示所有被等待的 goroutine 都完成了,而還有在等待的 goroutine,這會喚醒那些阻塞在 Wait 上的 goroutine。

源碼實現:

func (wg *WaitGroup) Add(delta int) {
   // wg.state 的計數器加上 delta
   //(加到 state 的高 32 上)
   state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta
   v := int32(state >> 32)                    // 高 32 位(counter)
   w := uint32(state)                         // 低 32 位(waiter)
   // 計數器不能為負數(加上 delta 之后不能為負數,最小只能到 0)
   if v < 0 {
      panic("sync: negative WaitGroup counter")
   }
   // 正常使用情況下,是先調用 Add 再調用 Wait 的,這種情況下,w 是 0,v > 0
   if w != 0 && delta > 0 && v == int32(delta) {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   // v > 0,計數器大于 0
   // w == 0,沒有在 Wait 的協程
   // 說明還沒有到喚醒 waiter 的時候
   if v > 0 || w == 0 {
      return
   }
   // Add 負數的時候,v 會減去對應的數值,減到最后 v 是 0。
   // 計數器是 0,并且有等待的協程,現在要喚醒這些協程。
   // 存在等待的協程時,goroutine 已將計數器設置為0。
   // 現在不可能同時出現狀態突變:
   // - Add 不能與 Wait 同時發生,
   // - 如果看到計數器==0,則 Wait 不會增加等待的協程。
   // 仍然要做一個廉價的健康檢查,以檢測 WaitGroup 的誤用。
   if wg.state.Load() != state { // 不能在 Add 的同時調用 Wait
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   // 將等待的協程數量設置為 0。
   wg.state.Store(0)
   for ; w != 0; w-- {
      // signal,調用 Wait 的地方會解除阻塞
      runtime_Semrelease(&wg.sema, false, 0) // goyield
   }
}

Done 的實現

WaitGroup 里的 Done 其實只是對 Add 的調用,但是它的效果是,將計數器的值減去 1。 背后的含義是:一個被等待的協程執行完畢了

Wait 的實現

Wait 主要功能是阻塞當前的協程:

  • Wait 會先判斷計數器是否為 0,為 0 說明沒有任何需要等待的協程,那么就可以直接返回了。
  • 如果計數器還不是 0,說明有協程還沒執行完,那么調用 Wait 的地方就需要被阻塞起來,等待所有的協程完成。

源碼實現:

func (wg *WaitGroup) Wait() {
   for {
      // 獲取當前計數器
      state := wg.state.Load()
      // 計數器
      v := int32(state >> 32)
      // waiter 數量
      w := uint32(state)
      // v 為 0,不需要等待,直接返回
      if v == 0 {
         // 計數器是 0,不需要等待
         return
      }
      // 增加 waiter 數量。
      // 調用一次 Wait,waiter 數量會加 1。
      if wg.state.CompareAndSwap(state, state+1) {
         // 這會阻塞,直到 sema (信號量)大于 0
         runtime_Semacquire(&wg.sema) // goparkunlock
         // state 不等 0
         // wait 還沒有返回又繼續使用了 WaitGroup
         if wg.state.Load() != 0 {
            panic("sync: WaitGroup is reused before previous Wait has returned")
         }
         // 解除阻塞狀態了,可以返回了
         return
      }
      // 狀態沒有修改成功(state 沒有成功 +1),開始下一次嘗試。
   }
}

總結

  • WaitGroup 使用了信號量來實現了并發資源控制,sema 字段表示信號量。
  • 使用 runtime_Semacquire 會使得 goroutine 阻塞直到計數器減少至 0,而使用 runtime_Semrelease 會使得信號量遞增,這等于是通知之前阻塞在信號量上的協程,告訴它們可以繼續執行了。
  • WaitGroup 作為參數傳遞的時候,需要傳遞指針作為參數,否則在被調用函數內對 Add 或者 Done 的調用,在 caller 里面調用的 Wait 會觀測不到。
  • WaitGroup 使用一個 64 位的數來保存計數器(高 32 位)和 waiter(低 32 位,正在等待的協程的數量)。
  • WaitGroup 使用 Add 增加計數器,使用 Done 來將計數器減 1,使用 Wait 來等待 goroutine。Wait 會阻塞直到計數器減少到 0

原文鏈接:https://juejin.cn/post/7181812988461252667

欄目分類
最近更新