網(wǎng)站首頁 編程語言 正文
使用場景
sync.Cond是go標(biāo)準(zhǔn)庫提供的一個條件變量,用于控制一組goroutine在滿足特定條件下被喚醒。
sync.Cond常用于一組goroutine等待,一個goroutine通知(事件發(fā)生)的場景。如果只有一個goroutine等待,一個goroutine通知(事件發(fā)生),使用Mutex或者Channel就可以實現(xiàn)。
可以用一個全局變量標(biāo)志特定條件condition,每個sync.Cond都必須要關(guān)聯(lián)一個互斥鎖(Mutex或者RWMutex),當(dāng)condition發(fā)生變更或者調(diào)用Wait時,都必須加鎖,保證多個goroutine安全地訪問condition。
下面是go標(biāo)準(zhǔn)庫http中關(guān)于pipe的部分實現(xiàn),我們可以看到,pipe使用sync.Cond來控制管道中字節(jié)流的寫入和讀取,在pipe中數(shù)據(jù)可用并且字節(jié)流復(fù)制到pipe的緩沖區(qū)之前,所有的需要讀取該管道數(shù)據(jù)的goroutine都必須等待,直到數(shù)據(jù)準(zhǔn)備完成。
type pipe struct { mu sync.Mutex c sync.Cond // c.L lazily initialized to &p.mu b pipeBuffer // nil when done reading ... }
// Read waits until data is available and copies bytes // from the buffer into p. func (p *pipe) Read(d []byte) (n int, err error) { p.mu.Lock() defer p.mu.Unlock() if p.c.L == nil { p.c.L = &p.mu } for { ... if p.b != nil && p.b.Len() > 0 { return p.b.Read(d) } ... p.c.Wait() // write未完成前調(diào)用Wait進入等待 } }
// Write copies bytes from p into the buffer and wakes a reader. // It is an error to write more data than the buffer can hold. func (p *pipe) Write(d []byte) (n int, err error) { p.mu.Lock() defer p.mu.Unlock() if p.c.L == nil { p.c.L = &p.mu } defer p.c.Signal() // 喚醒所有等待的goroutine if p.err != nil { return 0, errClosedPipeWrite } if p.breakErr != nil { p.unread += len(d) return len(d), nil // discard when there is no reader } return p.b.Write(d) }
實現(xiàn)原理
type Cond struct { noCopy noCopy // 用來保證結(jié)構(gòu)體無法在編譯期間拷貝 // L is held while observing or changing the condition L Locker // 用來保證condition變更安全 notify notifyList // 待通知的goutine列表 checker copyChecker // 用于禁止運行期間發(fā)生的拷貝 }
type notifyList struct { wait uint32 // 正在等待的goroutine的ticket notify uint32 // 已經(jīng)通知到的goroutine的ticket lock uintptr // key field of the mutex head unsafe.Pointer // 鏈表頭部 tail unsafe.Pointer // 鏈表尾部 }
copyChecker
copyChecker是一個指針類型,在創(chuàng)建時,它的值指向自身地址,用于檢測該對象是否發(fā)生了拷貝。如果發(fā)生了拷貝,則直接panic。
// copyChecker holds back pointer to itself to detect object copying. type copyChecker uintptr func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } }
Wait
調(diào)用 Wait 會自動釋放鎖 c.L,并掛起調(diào)用者所在的 goroutine,因此當(dāng)前協(xié)程會阻塞在 Wait 方法調(diào)用的地方。如果其他協(xié)程調(diào)用了 Signal 或 Broadcast 喚醒了該協(xié)程,那么 Wait 方法在結(jié)束阻塞時,會重新給 c.L 加鎖,并且繼續(xù)執(zhí)行 Wait 后面的代碼。
對條件的檢查,使用了?for !condition()
?而非?if
,是因為當(dāng)前協(xié)程被喚醒時,條件不一定符合要求,需要再次 Wait 等待下次被喚醒。為了保險起見,使用?for
?能夠確保條件符合要求后,再執(zhí)行后續(xù)的代碼。
func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() }
- 檢查Cond是否被復(fù)制,如果被復(fù)制,直接panic;
- 調(diào)用runtime_notifyListAdd調(diào)用者添加到通知列表并解鎖,以便可以接收到通知,然后將返回的ticket傳入到runtime_notifyListWait來等待通知。
- 當(dāng)前goroutine會阻塞在wait調(diào)用的地方,直到其他goroutine調(diào)用Signal或Broadcast喚醒該協(xié)程。
func notifyListAdd(l *notifyList) uint32 { return atomic.Xadd(&l.wait, 1) - 1 }
notifyListWait會將當(dāng)前goroutine追加到鏈表的尾端,同時調(diào)用goparkunlock讓當(dāng)前goroutine陷入休眠,該方法會直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒。
func notifyListWait(l *notifyList, t uint32) { s := acquireSudog() s.g = getg() s.ticket = t if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) releaseSudog(s) }
Signal
Signal會喚醒隊列最前面的Goroutine。
func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) }
func notifyListNotifyOne(l *notifyList) { t := l.notify atomic.Store(&l.notify, t+1) for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } s.next = nil readyWithTime(s, 4) return } } }
Broadcast
Broadcast會喚醒隊列中全部的goroutine。
func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) }
func notifyListNotifyAll(l *notifyList) { s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, atomic.Load(&l.wait)) for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
原文鏈接:https://juejin.cn/post/7136433654338682887
相關(guān)推薦
- 2022-04-07 淺談C++11中=delete的巧妙用法_C 語言
- 2022-07-06 Android中ViewFlipper和AdapterViewFlipper使用的方法實例_Andr
- 2022-11-18 React網(wǎng)絡(luò)請求發(fā)起方法詳細介紹_React
- 2022-04-28 Python中print()函數(shù)的用法詳情_python
- 2022-01-31 關(guān)于el-form中的rules未生效問題的解決方法
- 2023-07-26 TypeScript中的泛型(泛型函數(shù)、接口、類、泛型約束)
- 2023-07-16 uniapp 小程序訂閱消息報錯( wx.requestSubscribeMessage is no
- 2022-08-02 Android開發(fā)自定義雙向SeekBar拖動條控件_Android
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支