日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Golang?WaitGroup實現原理解析_Golang

作者:raoxiaoya ? 更新時間: 2023-04-02 編程語言

原理解析

type WaitGroup struct {
   noCopy noCopy
   // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
   // 64-bit atomic operations require 64-bit alignment, but 32-bit
   // compilers only guarantee that 64-bit fields are 32-bit aligned.
   // For this reason on 32 bit architectures we need to check in state()
   // if state1 is aligned or not, and dynamically "swap" the field order if
   // needed.
   state1 uint64
   state2 uint32
}

其中 noCopy 是 golang 源碼中檢測禁止拷貝的技術。如果程序中有 WaitGroup 的賦值行為,使用 go vet 檢查程序時,就會發現有報錯。但需要注意的是,noCopy 不會影響程序正常的編譯和運行。

state1字段

  • 高32位為counter,代表目前尚未完成的協程個數。
  • 低32位為waiter,代表目前已調用 Wait 的 goroutine 的個數,因為wait可以被多個協程調用。

state2為信號量。

WaitGroup 的整個調用過程可以簡單地描述成下面這樣:

  • 當調用 WaitGroup.Add(n) 時,counter 將會自增: counter + n
  • 當調用 WaitGroup.Wait() 時,會將 waiter++。同時調用 runtime_Semacquire(semap), 增加信號量,并掛起當前 goroutine。
  • 當調用 WaitGroup.Done() 時,將會 counter--。如果自減后的 counter 等于 0,說明 WaitGroup 的等待過程已經結束,則需要調用 runtime_Semrelease 釋放信號量,喚醒正在 WaitGroup.Wait 的 goroutine。

關于內存對其

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		// state1 is 64-bit aligned: nothing to do.
		return &wg.state1, &wg.state2
	} else {
		// state1 is 32-bit aligned but not 64-bit aligned: this means that
		// (&state1)+4 is 64-bit aligned.
		state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
		return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
	}
}

如果變量是 64 位對齊 (8 byte), 則該變量的起始地址是 8 的倍數。如果變量是 32 位對齊 (4 byte),則該變量的起始地址是 4 的倍數。

state1 是 32 位的時候,那么state1被當成是一個數組[3]uint32,數組的第一位是semap,第二三位存儲著counter, waiter正好是64位。

為什么會有這種奇怪的設定呢?這里涉及兩個前提:

前提 1:在 WaitGroup 的真實邏輯中, counter 和 waiter 被合在了一起,當成一個 64 位的整數對外使用。當需要變化 counter 和 waiter 的值的時候,也是通過 atomic 來原子操作這個 64 位整數。

前提 2:在 32 位系統下,如果使用 atomic 對 64 位變量進行原子操作,調用者需要自行保證變量的 64 位對齊,否則將會出現異常。golang 的官方文檔 sync/atomic/#pkg-note-BUG 原文是這么說的:

On ARM, x86-32, and 32-bit MIPS, it is the caller’s responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.

因此,在前提 1 的情況下,WaitGroup 需要對 64 位進行原子操作。根據前提 2,WaitGroup 需要自行保證 count+waiter 的 64 位對齊。

這個方法非常的巧妙,只不過是改變 semap 的位置順序,就既可以保證 counter+waiter 一定會 64 位對齊,也可以保證內存的高效利用。

注: 有些文章會講到,WaitGroup 兩種不同的內存布局方式是 32 位系統和 64 位系統的區別,這其實不太嚴謹。準確的說法是 32 位對齊和 64 位對齊的區別。因為在 32 位系統下,state1 變量也有可能恰好符合 64 位對齊。

sync.mutex的源碼中就沒有出現內存對其的操作,雖然它也有大量的atomic操作,那是因為state int32

sync.mutex中也是將四個狀態存在一個變量地址,其實這么做的目的就是為了實現原子操作,因為沒有辦法同時修改多個變量還要保證原子性。

WaitGroup 直接把 counterwaiter 看成了一個統一的 64 位變量。其中 counter 是這個變量的高 32 位,waiter 是這個變量的低 32 位。 在需要改變 counter 時, 通過將累加值左移 32 位的方式。

這里的原子操作并沒有使用Mutex或者RWMutex這樣的鎖,主要是因為鎖會帶來不小的性能損耗,存在上下文切換,而對于單個內存地址的原子操作最好的方式是atomic,因為這是由底層硬件提供的支持(CPU指令),粒度更小,性能更高。

源碼部分

func (wg *WaitGroup) Add(delta int) {
    // wg.state()返回的是地址
	statep, semap := wg.state()
    // 原子操作,修改statep高32位的值,即counter的值
	state := atomic.AddUint64(statep, uint64(delta)<<32)
    // 右移32位,使高32位變成了低32,得到counter的值
	v := int32(state >> 32)
    // 直接取低32位,得到waiter的值
	w := uint32(state)
	// 不規范的操作
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
    // 不規范的操作
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    // 這是正常的情況
	if v > 0 || w == 0 {
		return
	}
    // 剩下的就是 counter == 0 且 waiter != 0 的情況
    // 在這個情況下,*statep 的值就是 waiter 的值,否則就有問題
    // 在這個情況下,所有的任務都已經完成,可以將 *statep 整個置0
    // 同時向所有的Waiter釋放信號量
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
    // wg.state()返回的是地址
	statep, semap := wg.state()
    // for循環是配合CAS操作
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32) // counter
		w := uint32(state) // waiter
        // 如果counter為0,說明所有的任務在調用Wait的時候就已經完成了,直接退出
        // 這就要求,必須在同步的情況下調用Add(),否則Wait可能先退出了
		if v == 0 {
			return
		}
		// waiter++,原子操作
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
            // 如果自增成功,則獲取信號量,此處信號量起到了同步的作用
			runtime_Semacquire(semap)
			return
		}
	}
}

總結一下,WaitGroup 的原理就五個點:內存對齊,原子操作,counter,waiter,信號量。

  • 內存對齊的作用是為了原子操作。
  • counter的增減使用原子操作,counter的作用是一旦為0就釋放全部信號量。
  • waiter的自增使用原子操作,waiter的作用是表明要釋放多少信號量。

原文鏈接:https://blog.csdn.net/raoxiaoya/article/details/125632687

欄目分類
最近更新