日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Golang?Mutex互斥鎖深入理解_Golang

作者:酒紅 ? 更新時間: 2022-09-27 編程語言

引言

Golang的并發編程令人著迷,使用輕量的協程、基于CSP的channel、簡單的go func()就可以開始并發編程,在并發編程中,往往離不開鎖的概念。

本文介紹了常用的同步原語 sync.Mutex,同時從源碼剖析它的結構與實現原理,最后簡單介紹了mutex在日常使用中可能遇到的問題,希望大家讀有所獲。

Mutex結構

Mutex運行時數據結構位于sync/mutex.go

type Mutex struct {
   state int32
   sema  uint32
}

其中state表示當前互斥鎖的狀態,sema表示 控制鎖狀態的信號量.

互斥鎖的狀態定義在常量中:

const (
   mutexLocked = 1 << iota // 1 ,處于鎖定狀態; 2^0
   mutexWoken // 2 ;從正常模式被從喚醒;  2^1
   mutexStarving // 4 ;處于饑餓狀態;    2^2
   mutexWaiterShift = iota // 3 ;獲得互斥鎖上等待的Goroutine個數需要左移的位數: 1 << mutexWaiterShift
   starvationThresholdNs = 1e6 // 鎖進入饑餓狀態的等待時間
)

0即其他狀態。

sema是一個組合,低三位分別表示鎖的三種狀態,高29位表示正在等待互斥鎖釋放的gorountine個數,和Java表示線程池狀態那部分有點類似

一個mutex對象僅占用8個字節,讓人不禁感嘆其設計的巧妙

饑餓模式和正常模式

正常模式

在正常模式下,等待的協程會按照先進先出的順序得到鎖 在正常模式下,剛被喚醒的goroutine與新創建的goroutine競爭時,大概率無法獲得鎖。

饑餓模式

為了避免正常模式下,goroutine被“餓死”的情況,go在1.19版本引入了饑餓模式,保證了Mutex的公平性

在饑餓模式中,互斥鎖會直接交給等待隊列最前面的goroutine。新的goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在隊列的末尾等待。

狀態的切換

在正常模式下,一旦Goroutine超過1ms沒有獲取到鎖,它就會將當前互斥鎖切換饑餓模式

如果一個goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當前的互斥鎖就會切換回正常模式。

加鎖和解鎖

加鎖

func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      return
   }
   // 原注釋: Slow path (outlined so that the fast path can be inlined)
   // 將
   m.lockSlow()
}

可以看到,當前互斥鎖的狀態為0時,嘗試將當前鎖狀態設置為更新鎖定狀態,且這些操作是原子的。

若當前狀態不為0,則進入lockSlow方法
先定義了幾個參數

var waitStartTime int64
starving := false // 
awoke := false
iter := 0
old := m.state

隨后進入一個很大的for循環,讓我們來逐步分析

自旋

for {
     // 1 && 2 
   if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      //  3. 
      if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
         atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
         awoke = true
      }
      runtime_doSpin()
      iter++
      old = m.state
      continue
   }

old&(mutexLocked|mutexStarving) == mutexLocked

當且僅當當前鎖狀態為mutexLocked時,表達式為true

runtime_canSpin(iter) 是否滿足自旋條件

  • 運行在擁有多個CPU的機器上;
  • 當前Goroutine為了獲取該鎖進入自旋的次數小于四次;
  • 當前機器上至少存在一個正在運行的處理器 P,并且處理的運行隊列為空;

如果當前狀態下自旋是合理的,將awoke置為true,同時設置鎖狀態為mutexWoken,進入自旋邏輯

runtime_doSpin()會執行30次PAUSE指令,并且僅占用CPU資源 代碼位于:runtime\asm_amd64.s +567

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
   procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE 
    SUBL    $1, AX
    JNZ again
    RET

計算鎖的新狀態

停止了自旋后,

new := old
// 1. 
if old&mutexStarving == 0 {
   new |= mutexLocked
}
// 2.
if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
}
// 3 && 4. 
if starving && old&mutexLocked != 0 {
   new |= mutexStarving
}
// 5. 
if awoke {
   if new&mutexWoken == 0 {
      throw("sync: inconsistent mutex state")
   }
   new &^= mutexWoken
}
  • old&mutexStarving == 0 表明原來不是饑餓模式。如果是饑餓模式的話,其他goroutine不會執行接下來的代碼,直接進入等待隊列隊尾
  • 如果原來是 mutexLocked 或者 mutexStarving模式,waiterCounts數加一
  • 如果被標記為饑餓狀態,且鎖狀態為mutexLocked的話,設置鎖的新狀態為饑餓狀態。
  • 被標記為饑餓狀態的前提是 被喚醒過且搶鎖失敗
  • 計算新狀態

更新鎖狀態

// 1.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
      if old&(mutexLocked|mutexStarving) == 0 {
         break // locked the mutex with CAS
      }
      // 2. 
      queueLifo := waitStartTime != 0
      if waitStartTime == 0 {
         waitStartTime = runtime_nanotime()
      }
      // 3.
      runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 4.
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      old = m.state
      // 5.
      if old&mutexStarving != 0 {
         /
         if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
         }
         delta := int32(mutexLocked - 1<<mutexWaiterShift)
         if !starving || old>>mutexWaiterShift == 1 {
            delta -= mutexStarving
         }
         atomic.AddInt32(&m.state, delta)
         break
      }
      awoke = true
      iter = 0
   } else {
      old = m.state
   }
}
  • 嘗試將鎖狀態設置為new 。這里設置成功不代表上鎖成功,有可能new不為mutexLocked 或者是waiterCount數量的改變
  • waitStartTime不為0 說明當前goroutine已經等待過了,將當前goroutine放到等待隊列的隊頭
  • 走到這里,會調用runtime_SemacquireMutex 方法使當前協程阻塞,runtime_SemacquireMutex方法中會不斷嘗試獲得鎖,并會陷入休眠 等待信號量釋放。
  • 當前協程可以獲得信號量,從runtime_SemacquireMutex方法中返回。此時協程會去更新starving標志位:如果當前starving標志位為true或者等待時間超過starvationThresholdNs ,將starving置為true

之后會按照饑餓模式與正常模式,走不同的邏輯

  • - 在正常模式下,這段代碼會設置喚醒和饑餓標記、重置迭代次數并重新執行獲取鎖的循環; ?
  • - 在饑餓模式下,當前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當前 Goroutine,互斥鎖還會從饑餓模式中退出;

解鎖

func (m *Mutex) Unlock() {
   // 1.
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // 2. 
      m.unlockSlow(new)
   }
}
  • 將鎖狀態的值增加 -mutexLocked 。如果新狀態不等于0,進入unlockSlow方法
func (m *Mutex) unlockSlow(new int32) {
    // 1. 
   if (new+mutexLocked)&mutexLocked == 0 {
      throw("sync: unlock of unlocked mutex")
   }
   if new&mutexStarving == 0 {
      old := new
      for {
      // 2.
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 2.1.
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
         // 2.2.
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
   // 3.
      runtime_Semrelease(&m.sema, true, 1)
   }
}

1.new+mutexLocked代表將鎖置為1,如果兩個狀態& 不為0,則說明重復解鎖.如果重復解鎖則拋出panic

2. 如果等待者數量等于0,或者鎖的狀態已經變為mutexWoken、mutexStarving、mutexStarving,則直接返回

  • 將waiterCount數量-1,嘗試選擇一個goroutine喚醒
  • 嘗試更新鎖狀態,如果更新鎖狀態成功,則喚醒隊尾的一個gorountine

3. 如果不滿足 2的判斷條件,則進入饑餓模式,同時交出鎖的使用權

可能遇到的問題

鎖拷貝

mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()

此時mu2能夠正常解鎖,那么我們再試試解鎖mu1

mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
mu1.Unlock()

可以看到發生了error

panic導致沒有unlock

當lock()之后,可能由于代碼問題導致程序發生了panic,那么mutex無法被及時unlock(),由于其他協程還在等待鎖,此時可能觸發死鎖

func TestWithLock() {
   nums := 100
   wg := &sync.WaitGroup{}
   safeSlice := SafeSlice{
      s:    []int{},
      lock: new(sync.RWMutex),
   }
   i := 0
   for idx := 0; idx < nums; idx++ { // 并行nums個協程做append
      wg.Add(1)
      go func() {
         defer func() {
            if r := recover(); r != nil {
               log.Println("recover")
            }
            wg.Done()
         }()
         safeSlice.lock.Lock()
         safeSlice.s = append(safeSlice.s, i)
         if i == 98{
            panic("123")
         }
         i++
         safeSlice.lock.Unlock()
      }()
   }
   wg.Wait()
   log.Println(len(safeSlice.s))
}

修改:

func TestWithLock() {
   nums := 100
   wg := &sync.WaitGroup{}
   safeSlice := SafeSlice{
      s:    []int{},
      lock: new(sync.RWMutex),
   }
   i := 0
   for idx := 0; idx < nums; idx++ { // 并行nums個協程做append
      wg.Add(1)
      go func() {
         defer func() {
            if r := recover(); r != nil {
            }
            safeSlice.lock.Unlock()
            wg.Done()
         }()
         safeSlice.lock.Lock()
         safeSlice.s = append(safeSlice.s, i)
         if i == 98{
            panic("123")
         }
         i++
      }()
   }
   wg.Wait()
   log.Println(len(safeSlice.s))
}

原文鏈接:https://juejin.cn/post/7095334586019741704

欄目分類
最近更新