網站首頁 編程語言 正文
Overview
go 里面的 rwlock 是 write preferred 的,可以避免寫鎖饑餓。
讀鎖和寫鎖按照先來后到的規則持有鎖,一旦有協程持有了寫鎖,后面的協程只能在寫鎖被釋放后才能得到讀鎖。
同樣,一旦有 >= 1 個協程寫到了讀鎖,只有等這些讀鎖全部釋放后,后面的協程才能拿到寫鎖。
下面了解一下 Go 的 RWMutex 是如何實現的吧,下面的代碼取自 go1.17.2/src/sync/rwmutex.go,并刪減了 race 相關的代碼。
PS: rwmutex 的代碼挺短的,其實讀源碼也沒那么可怕...
RWMutex 的結構
RWMutex 總體上是通過: 普通鎖和條件變量來實現的
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers }
Lock
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
Unlock
const rwmutexMaxReaders = 1 << 30 func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
RLock
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
RUnlock
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } } func (rw *RWMutex) rUnlockSlow(r int32) { // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
Q1: 多個協程并發拿讀鎖,如何保證這些讀鎖協程都不會被阻塞?
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
拿讀鎖時,僅僅會增加 readerCount,因此讀鎖之間是可以正常并發的
Q2: 多個協程并發拿寫鎖,如何保證只會有一個協程拿到寫鎖?
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
拿寫鎖時,會獲取 w.Lock,自然能保證同一時間只會有一把寫鎖
Q3: 在讀鎖被拿到的情況下,新協程拿寫鎖,如果保證寫鎖現成會被阻塞?
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
假設此時有 5 個協程拿到讀鎖,則 readerCount = 5,假設 rwmutexMaxReaders = 100。
此時有一個新的協程 w1 想要拿寫鎖。
在執行
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
后, rw.readerCount = -95,r = 5。
在執行
atomic.AddInt32(&rw.readerWait, r)
后,rw.readerWait = 5。
readerWait
?記錄了在獲取寫鎖的這一瞬間有多少個協程持有讀鎖。這一瞬間之后,就算有新的協程嘗試獲取讀鎖,也只會增加 readerCount ,而不會動到 readerWait。
之后執行 runtime_SemacquireMutex() 睡在了 writerSem 這個信號量上面。
Q4: 在讀鎖被拿到的情況下,新協程拿寫鎖被阻塞,當舊有的讀鎖協程全部釋放,如何喚醒等待的寫鎖協程
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } } func (rw *RWMutex) rUnlockSlow(r int32) { // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
繼續上一步的場景,每當執行 RUnlock 時,readerCount 都會減去1。當 readerCount 為負數時,意味著有協程正在持有或者正在等待持有寫鎖。
之前的五個讀協程中的四個,每次 RUnlock() 之后,readerCount = -95 - 4 = -99,readerWait = 5 - 4 = 1。
當最后一個讀協程調用 RUnlock() 之后,readerCount 變成了 -100,readerWait 變成 0,此時會喚醒在 writerSem 上沉睡的協程 w1。
Q5: 在寫鎖被拿到的情況下,新協程拿讀鎖,如何讓新協程被阻塞?
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
繼續上面的場景,readerCount = -100 + 1 = -99 < 0。
新的讀協程 r1 被沉睡在 readerSem 下面。
假設此時再來一個讀協程 r2,則 readerCount = -98,依舊沉睡。
Q6: 在寫鎖被拿到的情況下,新協程拿讀鎖,寫鎖協程釋放,如何喚醒等待的讀鎖協程?
繼續上面的場景,此時協程 w1 釋放寫鎖
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
在執行
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
后,r = readerCount = -98 + 100 = 2,代表此時有兩個讀協程 r1 和 r2 在等待
ps: 如果此時有一些新的協程想要拿讀鎖,他會因為 readerCount = 2 + 1 = 3 > 0 而順利執行下去,不會被阻塞
之后 for 循環執行兩次,將協程 r1 和 協程 r2 都喚醒了。
Q7: 在寫鎖被拿到的情況下,有兩個協程分別去搶讀鎖和寫鎖,當寫鎖被釋放時,這兩個協程誰會勝利?
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
由于是先喚醒讀鎖,再調用 w.Unlock() ,因此肯定是讀協程先勝利!
認為寫的比較巧妙的兩個點
-
readerCount 與 rwmutexMaxReaders 的糾纏
通過?
readerCount + rwmutexMaxReaders
?以及?readerCount - rwmutexMaxReaders
?這兩個操作可以得知當前是否有協程等待/持有寫鎖以及當前等待/持有讀鎖的協程數量 -
readerCount 與 readerWait 的糾纏
在 Lock() 時直接將 readerCount 的值賦給 readerWait,在 readerWait = 0 而非 readerCount = 0 是喚醒寫協程,可以避免在 Lock() 后來達到的讀協程先于寫協程被執行。
原文鏈接:https://www.cnblogs.com/XiaoXiaoShuai-/p/15998061.html
相關推薦
- 2022-09-13 Golang優雅保持main函數不退出的辦法_Golang
- 2022-07-26 Spring底層核心原理解析
- 2022-11-10 Flutter?WillPopScope攔截返回事件原理示例詳解_Android
- 2022-03-29 python教程之生成器和匿名函數_python
- 2022-07-07 python中的format是什么意思,format怎么用_python
- 2023-02-01 C++泛型編程綜合講解_C 語言
- 2022-06-16 C++新特性詳細分析基于范圍的for循環_C 語言
- 2022-10-06 Python中os模塊的12種用法總結_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支