網站首頁 編程語言 正文
引言
Golang的并發編程令人著迷,使用輕量的協程、基于CSP的channel、簡單的go func()就可以開始并發編程,在并發編程中,往往離不開鎖的概念。
本文介紹了常用的同步原語 sync.Mutex
,同時從源碼剖析它的結構與實現原理,最后簡單介紹了mutex在日常使用中可能遇到的問題,希望大家讀有所獲。
Mutex結構
Mutex運行時數據結構位于sync/mutex.go
包
type Mutex struct { state int32 sema uint32 }
其中state
表示當前互斥鎖的狀態,sema
表示 控制鎖狀態的信號量.
互斥鎖的狀態定義在常量中:
const ( mutexLocked = 1 << iota // 1 ,處于鎖定狀態; 2^0 mutexWoken // 2 ;從正常模式被從喚醒; 2^1 mutexStarving // 4 ;處于饑餓狀態; 2^2 mutexWaiterShift = iota // 3 ;獲得互斥鎖上等待的Goroutine個數需要左移的位數: 1 << mutexWaiterShift starvationThresholdNs = 1e6 // 鎖進入饑餓狀態的等待時間 )
0即其他狀態。
sema
是一個組合,低三位分別表示鎖的三種狀態,高29位表示正在等待互斥鎖釋放的gorountine個數,和Java表示線程池狀態那部分有點類似
一個mutex對象僅占用8個字節,讓人不禁感嘆其設計的巧妙
饑餓模式和正常模式
正常模式
在正常模式下,等待的協程會按照先進先出的順序得到鎖 在正常模式下,剛被喚醒的goroutine與新創建的goroutine競爭時,大概率無法獲得鎖。
饑餓模式
為了避免正常模式下,goroutine被“餓死”的情況,go在1.19版本引入了饑餓模式,保證了Mutex的公平性
在饑餓模式中,互斥鎖會直接交給等待隊列最前面的goroutine。新的goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在隊列的末尾等待。
狀態的切換
在正常模式下,一旦Goroutine超過1ms沒有獲取到鎖,它就會將當前互斥鎖切換饑餓模式
如果一個goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當前的互斥鎖就會切換回正常模式。
加鎖和解鎖
加鎖
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // 原注釋: Slow path (outlined so that the fast path can be inlined) // 將 m.lockSlow() }
可以看到,當前互斥鎖的狀態為0時,嘗試將當前鎖狀態設置為更新鎖定狀態,且這些操作是原子的。
若當前狀態不為0,則進入lockSlow
方法
先定義了幾個參數
var waitStartTime int64 starving := false // awoke := false iter := 0 old := m.state
隨后進入一個很大的for循環,讓我們來逐步分析
自旋
for { // 1 && 2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 3. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
old&(mutexLocked|mutexStarving) == mutexLocked
當且僅當當前鎖狀態為mutexLocked時,表達式為true
runtime_canSpin(iter)
是否滿足自旋條件
- 運行在擁有多個CPU的機器上;
- 當前Goroutine為了獲取該鎖進入自旋的次數小于四次;
- 當前機器上至少存在一個正在運行的處理器 P,并且處理的運行隊列為空;
如果當前狀態下自旋是合理的,將awoke
置為true,同時設置鎖狀態為mutexWoken
,進入自旋邏輯
runtime_doSpin()
會執行30次PAUSE
指令,并且僅占用CPU資源 代碼位于:runtime\asm_amd64.s +567
//go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
計算鎖的新狀態
停止了自旋后,
new := old // 1. if old&mutexStarving == 0 { new |= mutexLocked } // 2. if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 3 && 4. if starving && old&mutexLocked != 0 { new |= mutexStarving } // 5. if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
-
old&mutexStarving == 0
表明原來不是饑餓模式。如果是饑餓模式的話,其他goroutine不會執行接下來的代碼,直接進入等待隊列隊尾 - 如果原來是
mutexLocked
或者mutexStarving
模式,waiterCounts數加一 - 如果被標記為饑餓狀態,且鎖狀態為
mutexLocked
的話,設置鎖的新狀態為饑餓狀態。 - 被標記為饑餓狀態的前提是
被喚醒過且搶鎖失敗
- 計算新狀態
更新鎖狀態
// 1. if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 2. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 3. runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 4. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 5. if old&mutexStarving != 0 { / if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } }
- 嘗試將鎖狀態設置為new 。這里設置成功不代表上鎖成功,有可能new不為
mutexLocked
或者是waiterCount數量的改變 -
waitStartTime
不為0 說明當前goroutine已經等待過了,將當前goroutine放到等待隊列的隊頭 - 走到這里,會調用
runtime_SemacquireMutex
方法使當前協程阻塞,runtime_SemacquireMutex
方法中會不斷嘗試獲得鎖,并會陷入休眠 等待信號量釋放。 - 當前協程可以獲得信號量,從
runtime_SemacquireMutex
方法中返回。此時協程會去更新starving
標志位:如果當前starving
標志位為true或者等待時間超過starvationThresholdNs
,將starving
置為true
之后會按照饑餓模式與正常模式,走不同的邏輯
- - 在正常模式下,這段代碼會設置喚醒和饑餓標記、重置迭代次數并重新執行獲取鎖的循環; ?
- - 在饑餓模式下,當前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當前 Goroutine,互斥鎖還會從饑餓模式中退出;
解鎖
func (m *Mutex) Unlock() { // 1. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 2. m.unlockSlow(new) } }
- 將鎖狀態的值增加 -mutexLocked 。如果新狀態不等于0,進入
unlockSlow
方法
func (m *Mutex) unlockSlow(new int32) { // 1. if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // 2. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 2.1. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 2.2. runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 3. runtime_Semrelease(&m.sema, true, 1) } }
1.new+mutexLocked
代表將鎖置為1,如果兩個狀態& 不為0,則說明重復解鎖.如果重復解鎖則拋出panic
2. 如果等待者數量等于0,或者鎖的狀態已經變為mutexWoken、mutexStarving、mutexStarving,則直接返回
- 將waiterCount數量-1,嘗試選擇一個goroutine喚醒
- 嘗試更新鎖狀態,如果更新鎖狀態成功,則喚醒隊尾的一個gorountine
3. 如果不滿足 2
的判斷條件,則進入饑餓模式,同時交出鎖的使用權
可能遇到的問題
鎖拷貝
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock()
此時mu2
能夠正常解鎖,那么我們再試試解鎖mu1
呢
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock() mu1.Unlock()
可以看到發生了error
panic導致沒有unlock
當lock()之后,可能由于代碼問題導致程序發生了panic,那么mutex無法被及時unlock(),由于其他協程還在等待鎖,此時可能觸發死鎖
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums個協程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { log.Println("recover") } wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ safeSlice.lock.Unlock() }() } wg.Wait() log.Println(len(safeSlice.s)) }
修改:
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums個協程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { } safeSlice.lock.Unlock() wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ }() } wg.Wait() log.Println(len(safeSlice.s)) }
原文鏈接:https://juejin.cn/post/7095334586019741704
相關推薦
- 2022-10-18 react電商商品列表的實現流程詳解_React
- 2022-07-10 初中級前端程序員必用且夠用的git命令同時推送到github/gitee及三種常用場景
- 2022-10-06 uwsgi啟動django項目的實現步驟_python
- 2022-06-28 C++哈希表之線性探測法實現詳解_C 語言
- 2022-10-16 python列表倒序的幾種方法(切片、reverse()、reversed())_python
- 2022-07-04 C#使用StreamReader和StreamWriter類讀寫操作文件_C#教程
- 2022-09-19 Tomcat日志自動分割的三種方法_Tomcat
- 2022-05-13 python魔法方法之__setattr__()_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支