日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達者為師

網(wǎng)站首頁 編程語言 正文

Go語言讀寫鎖RWMutex的源碼分析_Golang

作者:漫漫Coding路 ? 更新時間: 2022-11-21 編程語言

前言

在前面兩篇文章中?初見 Go Mutex?、Go Mutex 源碼詳解,我們學(xué)習(xí)了?Go語言?中的?Mutex,它是一把互斥鎖,每次只允許一個?goroutine?進入臨界區(qū),可以保證臨界區(qū)資源的狀態(tài)正確性。但是有的情況下,并不是所有?goroutine?都會修改臨界區(qū)狀態(tài),可能只是讀取臨界區(qū)的數(shù)據(jù),如果此時還是需要每個?goroutine?拿到鎖依次進入的話,效率就有些低下了。例如房間里面有一幅畫,有人想修改,有人只是想看一下,完全可以放要看的一部分人進去,等他們看完再讓修改的人進去修改,這樣既提高了效率,也保證了臨界區(qū)資源的安全??春托薷?,對應(yīng)的就是讀和寫,本篇文章我們就一起來學(xué)習(xí)下?Go語言?中的讀寫鎖sync.RWMutex。

說明:本文中的示例,均是基于Go1.17 64位機器

RWMutex 總覽

RWMutex?是一個讀/寫互斥鎖,在某一時刻只能由任意數(shù)量的?reader?持有 或者?一個 writer?持有。也就是說,要么放行任意數(shù)量的 reader,多個 reader 可以并行讀;要么放行一個 writer,多個 writer 需要串行寫。

RWMutex 對外暴露的方法有五個:

  • RLock():讀操作獲取鎖,如果鎖已經(jīng)被?writer?占用,會一直阻塞直到?writer?釋放鎖;否則直接獲得鎖;
  • RUnlock():讀操作完畢之后釋放鎖;
  • Lock():寫操作獲取鎖,如果鎖已經(jīng)被?reader?或者?writer?占用,會一直阻塞直到獲取到鎖;否則直接獲得鎖;
  • Unlock():寫操作完畢之后釋放鎖;
  • RLocker():返回讀操作的?Locker?對象,該對象的?Lock()?方法對應(yīng)?RWMutex?的?RLock(),Unlock()?方法對應(yīng)?RWMutex?的?RUnlock()?方法。

一旦涉及到多個 reader 和 writer ,就需要考慮優(yōu)先級問題,是 reader 優(yōu)先還是 writer 優(yōu)先:

  • reader優(yōu)先:只要有 reader 要進行讀操作,writer 就一直等待,直到?jīng)]有 reader 到來。這種方式做到了讀操作的并發(fā),但是如果 reader 持續(xù)到來,會導(dǎo)致 writer 饑餓,一直不能進行寫操作;
  • writer優(yōu)先:只要有 writer 要進行寫操作,reader 就一直等待,直到?jīng)]有 writer 到來。這種方式提高了寫操作的優(yōu)先級,但是如果 writer 持續(xù)到來,會導(dǎo)致 reader 饑餓,一直不能進行讀操作;
  • 沒有優(yōu)先級:按照先來先到的順序,沒有誰比誰更優(yōu)先,這種相對來說會更公平。

我們先來看下 RWMutex 的運行機制,就可以知道它的優(yōu)先級是什么了。

可以想象 RWMutex 有兩個隊伍,一個是包含?所有reader?和你獲得準(zhǔn)入權(quán)writer?的?隊列A,一個是還沒有獲得準(zhǔn)入權(quán) writer 的?隊列B。

  • 隊列 A 最多只允許有?一個writer,如果有其他 writer,需要在 隊列B 等待;
  • 當(dāng)一個 writer 到了 隊列A 后,只允許它?之前的reader?執(zhí)行讀操作,新來的 reader 需要在 隊列A 后面排隊;
  • 當(dāng)前面的 reader 執(zhí)行完讀操作之后,writer 執(zhí)行寫操作;
  • writer 執(zhí)行完寫操作后,讓?后面的reader?執(zhí)行讀操作,再喚醒隊列B?的一個?writer?到 隊列A 后面排隊。

初始時刻?隊列A 中 writer?W1?前面有三個 reader,后面有兩個 reader,隊列B中有兩個 writer

RWMutex運行示例:初始時刻

并發(fā)讀?多個 reader 可以同時獲取到讀鎖,進入臨界區(qū)進行讀操作;writer?W1?在?隊列A?中等待,同時又來了兩個 reader,直接在?隊列A?后面排隊

RWMutex運行示例:并發(fā)讀

寫操作?W1?前面所有的 reader 完成后,W1?獲得鎖,進入臨界區(qū)操作

RWMutex運行示例:寫操作

獲得準(zhǔn)入權(quán)?W1?完成寫操作退出,先讓后面排隊的 reader 進行讀操作,然后從 隊列B 中喚醒?W2?到?隊列A?排隊。W2?從?隊列B?到?隊列A?的過程中,R8?先到了?隊列A,因此?R8?可以執(zhí)行讀操作。R9、R10、R11?在?W2?之后到的,所以在后面排隊;新來的?W4?直接在隊列B 排隊。

RWMutex運行示例:獲得準(zhǔn)入權(quán)

從上面的示例可以看出,RWMutex?可以看作是沒有優(yōu)先級,按照先來先到的順序去執(zhí)行,只不過是?多個reader?可以?并行?去執(zhí)行罷了。

深入源碼

數(shù)據(jù)結(jié)構(gòu)

type?RWMutex?struct?{
?w???????????Mutex??//?控制?writer?在?隊列B?排隊
?writerSem???uint32?//?寫信號量,用于等待前面的?reader?完成讀操作
?readerSem???uint32?//?讀信號量,用于等待前面的?writer?完成寫操作
?readerCount?int32??//?reader?的總數(shù)量,同時也指示是否有?writer?在隊列A?中等待
?readerWait??int32??//?隊列A?中?writer?前面?reader?的數(shù)量
}

//?允許最大的?reader?數(shù)量
const?rwmutexMaxReaders?=?1?<<?30

上述中的幾個變量,比較特殊的是?readerCount?,不僅表示當(dāng)前?所有reader?的數(shù)量,同時表示是否有?writer?在隊列A中等待。當(dāng)?readerCount?變?yōu)?負(fù)數(shù)?時,就代表有 writer 在隊列A 中等待了。

  • 當(dāng)有 writer 進入 隊列A 后,會將?readerCount?變?yōu)樨?fù)數(shù),即?readerCount = readerCount - rwmutexMaxReaders,同時利用?readerWait?變量記錄它前面有多少個 reader;
  • 如果有新來的 reader,發(fā)現(xiàn)?readerCount?是負(fù)數(shù),就會直接去后面排隊;
  • writer 前面的 reader 在釋放鎖時,會將?readerCount?和?readerWait都減一,當(dāng) readerWait==0 時,表示 writer 前面的所有 reader 都執(zhí)行完了,可以讓 writer 執(zhí)行寫操作了;
  • writer 執(zhí)行寫操作完畢后,會將 readerCount 再變回正數(shù),readerCount = readerCount + rwmutexMaxReaders

舉例:假設(shè)當(dāng)前有兩個 reader,readerCount = 2;允許最大的reader 數(shù)量為 10

  • 當(dāng) writer 進入隊列A 時,readerCount = readerCount - rwmutexMaxReaders = -8,readerWait = readerCount = 2
  • 如果再來 3 個reader,readerCount = readerCount + 3 = -5
  • 獲得讀鎖的兩個reader 執(zhí)行完后,readerCount = readerCount - 2 = -7,readerWait = readerWait-2 =0,writer 獲得鎖
  • writer 執(zhí)行完后,readerCount = readerCount + rwmutexMaxReaders = 3,當(dāng)前有 3個 reader

RLock()

reader 執(zhí)行讀操作之前,需要調(diào)用 RLock() 獲取鎖

func?(rw?*RWMutex)?RLock()?{
?
??//?reader?加鎖,將?readerCount?加一,表示多了個?reader
??if?atomic.AddInt32(&rw.readerCount,?1)?<?0?{
??
????//?如果?readerCount<0,說明有?writer?在自己前面等待,排隊等待讀信號量
??runtime_SemacquireMutex(&rw.readerSem,?false,?0)
?}
}

RUnlock()

reader 執(zhí)行完讀操作后,調(diào)用 RUnlock() 釋放鎖

func?(rw?*RWMutex)?RUnlock()?{

??//?reader?釋放鎖,將?readerCount?減一,表示少了個?reader
?if?r?:=?atomic.AddInt32(&rw.readerCount,?-1);?r?<?0?{
??
????//?如果readerCount<0,說明有?writer?在自己后面等待,看是否要讓?writer?運行
??rw.rUnlockSlow(r)
?}
}
func?(rw?*RWMutex)?rUnlockSlow(r?int32)?{

??//?將?readerWait?減一,表示前面的?reader?少了一個
?if?atomic.AddInt32(&rw.readerWait,?-1)?==?0?{
????
??//?如果?readerWait?變?yōu)榱?,那么自己就是最后一個完成的?reader
????//?釋放寫信號量,讓 writer 運行
??runtime_Semrelease(&rw.writerSem,?false,?1)
?}
}

Lock()

writer 執(zhí)行寫操作之前,調(diào)用 Lock() 獲取鎖

func?(rw?*RWMutex)?Lock()?{

???//?利用互斥鎖,如果前面有 writer,那么就需要等待互斥鎖,即在隊列B 中排隊等待;如果沒有,可以直接進入?隊列A 排隊
???rw.w.Lock()

??//?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?將?readerCount?變成了負(fù)數(shù)
??//?再加?rwmutexMaxReaders,相當(dāng)于?r?=?readerCount,r?就是?writer?前面的?reader?數(shù)量
???r?:=?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?+?rwmutexMaxReaders
???
???//?如果?r!=?0?,表示自己前面有?reader,那么令?readerWait?=?r,要等前面的?reader?運行完
???if?r?!=?0?&&?atomic.AddInt32(&rw.readerWait,?r)?!=?0?{
??????runtime_SemacquireMutex(&rw.writerSem,?false,?0)
???}

}

Lock()?和?RUnlock()?是會并發(fā)進行的:

  • 如果 Lock() 將 readerCount 變?yōu)樨?fù)數(shù)后,假設(shè) r=3,表示加入的那一刻前面有三個 reader,還沒有賦值 readerWait CPU 就被強占了,readerWait = 0;
  • 假設(shè)此時三個 reader 的 RUnlock() 會進入到 rUnlockSlow() 邏輯,每個 reader 都將 readerWait 減一, readerWait 會變成負(fù)數(shù),此時不符合喚醒 writer 的條件;
  • 三個 reader 運行完之后,此時 readerWait = -3, Lock() 運行到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不會休眠,直接獲取到鎖,因為前面的 reader 都運行完了。

這就是為什么?rUnlockSlow()?要判斷?atomic.AddInt32(&rw.readerWait, -1) == 0?以及?Lock()?要判斷?atomic.AddInt32(&rw.readerWait, r) != 0?的原因。

Unlock()

writer 執(zhí)行寫操作之后,調(diào)用 Lock() 釋放鎖

func?(rw?*RWMutex)?Unlock()?{

??//?將?readerCount?變?yōu)檎龜?shù),表示當(dāng)前沒有?writer?在隊列A?等待了
?r?:=?atomic.AddInt32(&rw.readerCount,?rwmutexMaxReaders)

??//?將自己后面等待的?reader?喚醒,可以進行讀操作了
?for?i?:=?0;?i?<?int(r);?i++?{
??runtime_Semrelease(&rw.readerSem,?false,?0)
?}
?
??//?釋放互斥鎖,如果隊列B有writer,相當(dāng)于喚醒一個來隊列A 排隊
?rw.w.Unlock()

}

writer 對 readerCount 一加一減,不會改變整體狀態(tài),只是用正負(fù)來表示是否有 writer 在等待。當(dāng)然,如果在 writer 將 readerCount變?yōu)樨?fù)數(shù)后,來了很多 reader,將 readerCount 變?yōu)榱苏龜?shù),此時reader 在 writer 沒有釋放鎖的時候就獲取到鎖了,是有問題的。但是 rwmutexMaxReaders 非常大,可以不考慮這個問題。

常見問題

不可復(fù)制

和 Mutex 一樣,RWMutex 也是不可復(fù)制。不能復(fù)制的原因和互斥鎖一樣。一旦讀寫鎖被使用,它的字段就會記錄它當(dāng)前的一些狀態(tài)。這個時候你去復(fù)制這把鎖,就會把它的狀態(tài)也給復(fù)制過來。但是,原來的鎖在釋放的時候,并不會修改你復(fù)制出來的這個讀寫鎖,這就會導(dǎo)致復(fù)制出來的讀寫鎖的狀態(tài)不對,可能永遠無法釋放鎖。

不可重入

不可重入的原因是,獲得鎖之后,還沒釋放鎖,又申請鎖,這樣有可能造成死鎖。比如 reader A 獲取到了讀鎖,writer B 等待 reader A 釋放鎖,reader 還沒釋放鎖又申請了一把鎖,但是這把鎖申請不成功,他需要等待 writer B。這就形成了一個循環(huán)等待的死鎖。

加鎖和釋放鎖一定要成對出現(xiàn),不能忘記釋放鎖,也不能解鎖一個未加鎖的鎖。

實戰(zhàn)一下

Go 中的 map 是不支持 并發(fā)寫的,我們可以利用 讀寫鎖 RWMutex 來實現(xiàn)并發(fā)安全的 map。在讀多寫少的情況下,使用 RWMutex 要比 Mutex 性能高。

package?main

import?(
?"fmt"
?"math/rand"
?"sync"
?"time"
)

type?ConcurrentMap?struct?{
?m?????sync.RWMutex
?items?map[string]interface{}
}

func?(c?*ConcurrentMap)?Add(key?string,?value?interface{})?{
?c.m.Lock()
?defer?c.m.Unlock()
?c.items[key]?=?value
}

func?(c?*ConcurrentMap)?Remove(key?string)?{
?c.m.Lock()
?defer?c.m.Unlock()
?delete(c.items,?key)
}
func?(c?*ConcurrentMap)?Get(key?string)?interface{}?{
?c.m.RLock()
?defer?c.m.RUnlock()
?return?c.items[key]
}

func?NewConcurrentMap()?ConcurrentMap?{
?return?ConcurrentMap{
??items:?make(map[string]interface{}),
?}
}

func?main()?{
?m?:=?NewConcurrentMap()

?var?wait?sync.WaitGroup

?wait.Add(10000)
?for?i?:=?0;?i?<?10000;?i++?{

??key?:=?fmt.Sprintf("%d",?rand.Intn(10))
??value?:=?fmt.Sprintf("%d",?rand.Intn(100))
??if?i%100?==?0?{
???go?func()?{
????defer?wait.Done()
????m.Add(key,?value)
???}()
??}?else?{
???go?func()?{
????defer?wait.Done()
????fmt.Println(m.Get(key))
????time.Sleep(time.Millisecond?*?10)
???}()
??}

?}

?wait.Wait()
}

原文鏈接:https://mp.weixin.qq.com/s/hJaO6lY-3Cs6SRsHYGNhQg

欄目分類
最近更新