日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達者為師

網(wǎng)站首頁 編程語言 正文

go并發(fā)編程sync.Cond使用場景及實現(xiàn)原理_Golang

作者:Koffee ? 更新時間: 2022-10-26 編程語言

使用場景

sync.Cond是go標(biāo)準(zhǔn)庫提供的一個條件變量,用于控制一組goroutine在滿足特定條件下被喚醒。

sync.Cond常用于一組goroutine等待,一個goroutine通知(事件發(fā)生)的場景。如果只有一個goroutine等待,一個goroutine通知(事件發(fā)生),使用Mutex或者Channel就可以實現(xiàn)。

可以用一個全局變量標(biāo)志特定條件condition,每個sync.Cond都必須要關(guān)聯(lián)一個互斥鎖(Mutex或者RWMutex),當(dāng)condition發(fā)生變更或者調(diào)用Wait時,都必須加鎖,保證多個goroutine安全地訪問condition。

下面是go標(biāo)準(zhǔn)庫http中關(guān)于pipe的部分實現(xiàn),我們可以看到,pipe使用sync.Cond來控制管道中字節(jié)流的寫入和讀取,在pipe中數(shù)據(jù)可用并且字節(jié)流復(fù)制到pipe的緩沖區(qū)之前,所有的需要讀取該管道數(shù)據(jù)的goroutine都必須等待,直到數(shù)據(jù)準(zhǔn)備完成。

type pipe struct {
   mu       sync.Mutex
   c        sync.Cond     // c.L lazily initialized to &p.mu
   b        pipeBuffer    // nil when done reading
   ...
}
// Read waits until data is available and copies bytes
// from the buffer into p.
func (p *pipe) Read(d []byte) (n int, err error) {
   p.mu.Lock()
   defer p.mu.Unlock()
   if p.c.L == nil {
      p.c.L = &p.mu
   }
   for {
      ...
      if p.b != nil && p.b.Len() > 0 {
         return p.b.Read(d)
      }
      ...
      p.c.Wait() // write未完成前調(diào)用Wait進入等待
   }
}
// Write copies bytes from p into the buffer and wakes a reader.
// It is an error to write more data than the buffer can hold.
func (p *pipe) Write(d []byte) (n int, err error) {
   p.mu.Lock()
   defer p.mu.Unlock()
   if p.c.L == nil {
      p.c.L = &p.mu
   }
   defer p.c.Signal() // 喚醒所有等待的goroutine
   if p.err != nil {
      return 0, errClosedPipeWrite
   }
   if p.breakErr != nil {
      p.unread += len(d)
      return len(d), nil // discard when there is no reader
   }
   return p.b.Write(d)
}

實現(xiàn)原理

type Cond struct {
   noCopy noCopy       // 用來保證結(jié)構(gòu)體無法在編譯期間拷貝
   // L is held while observing or changing the condition
   L Locker             // 用來保證condition變更安全
   notify  notifyList   // 待通知的goutine列表
   checker copyChecker  // 用于禁止運行期間發(fā)生的拷貝
}
type notifyList struct {
   wait   uint32      // 正在等待的goroutine的ticket
   notify uint32      // 已經(jīng)通知到的goroutine的ticket
   lock   uintptr // key field of the mutex
   head   unsafe.Pointer     // 鏈表頭部
   tail   unsafe.Pointer     // 鏈表尾部
}

copyChecker

copyChecker是一個指針類型,在創(chuàng)建時,它的值指向自身地址,用于檢測該對象是否發(fā)生了拷貝。如果發(fā)生了拷貝,則直接panic。

// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
func (c *copyChecker) check() {
   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
      uintptr(*c) != uintptr(unsafe.Pointer(c)) {
      panic("sync.Cond is copied")
   }
}

Wait

調(diào)用 Wait 會自動釋放鎖 c.L,并掛起調(diào)用者所在的 goroutine,因此當(dāng)前協(xié)程會阻塞在 Wait 方法調(diào)用的地方。如果其他協(xié)程調(diào)用了 Signal 或 Broadcast 喚醒了該協(xié)程,那么 Wait 方法在結(jié)束阻塞時,會重新給 c.L 加鎖,并且繼續(xù)執(zhí)行 Wait 后面的代碼。

對條件的檢查,使用了?for !condition()?而非?if,是因為當(dāng)前協(xié)程被喚醒時,條件不一定符合要求,需要再次 Wait 等待下次被喚醒。為了保險起見,使用?for?能夠確保條件符合要求后,再執(zhí)行后續(xù)的代碼。

func (c *Cond) Wait() {
   c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}
  • 檢查Cond是否被復(fù)制,如果被復(fù)制,直接panic;
  • 調(diào)用runtime_notifyListAdd調(diào)用者添加到通知列表并解鎖,以便可以接收到通知,然后將返回的ticket傳入到runtime_notifyListWait來等待通知。
  • 當(dāng)前goroutine會阻塞在wait調(diào)用的地方,直到其他goroutine調(diào)用Signal或Broadcast喚醒該協(xié)程。
func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait會將當(dāng)前goroutine追加到鏈表的尾端,同時調(diào)用goparkunlock讓當(dāng)前goroutine陷入休眠,該方法會直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒。

func notifyListWait(l *notifyList, t uint32) {
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
       l.head = s
    } else {
       l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

Signal

Signal會喚醒隊列最前面的Goroutine。

func (c *Cond) Signal() {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
   t := l.notify
   atomic.Store(&l.notify, t+1)
   for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
      if s.ticket == t {
         n := s.next
         if p != nil {
            p.next = n
         } else {
            l.head = n
         }
         if n == nil {
            l.tail = p
         }
         s.next = nil
         readyWithTime(s, 4)
         return
      }
   }
}

Broadcast

Broadcast會喚醒隊列中全部的goroutine。

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
   s := l.head
   l.head = nil
   l.tail = nil
   atomic.Store(&l.notify, atomic.Load(&l.wait))
   for s != nil {
      next := s.next
      s.next = nil
      readyWithTime(s, 4)
      s = next
   }
}

原文鏈接:https://juejin.cn/post/7136433654338682887

欄目分類
最近更新