日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

GO語言中Chan實現原理的示例詳解_Golang

作者:阿兵云原生 ? 更新時間: 2023-06-16 編程語言

GO 中 Chan 實現原理分享

嗨,我是小魔童哪吒,還記得咱們之前分享過GO 通道 和sync包的使用嗎?咱們來回顧一下

  • 分享了通道是什么,通道的種類
  • 無緩沖,有緩沖,單向通道具體對應什么
  • 對于通道的具體實踐
  • 分享了關于通道的異常情況整理
  • 簡單分享了sync包的使用

要是對上述內容還有點興趣的話,歡迎查看文章 GO通道和 sync 包的分享

chan 是什么

是一種特殊的類型,是連接并發goroutine的管道

channel 通道是可以讓一個 goroutine 協程發送特定值到另一個 goroutine 協程的通信機制

通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發數據的順序,這一點和管道是一樣的

一個協程從通道的一頭放入數據,另一個協程從通道的另一頭讀出數據

每一個通道都是一個具體類型的導管,聲明 channel 的時候需要為其指定元素類型。

本篇文章主要是分享關于通道的實現原理,關于通道的使用,可以查看文章 GO通道和 sync 包的分享 ,這里有詳細的說明

GO 中 Chan 的底層數據結構

了解每一個組件或者每一個數據類型的實現原理,咱們都會去看源碼中的數據結構是如何設計的

同樣,我們一起來看看 GO 的 Chan 的數據結構

GO 的 Chan 的源碼實現是在 : src/runtime/chan.go

type hchan struct {
   qcount   uint           // total data in the queue
   dataqsiz uint           // size of the circular queue
   buf      unsafe.Pointer // points to an array of dataqsiz elements
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

hchan 是實現通道的核心數據結構,對應的成員也是不少,咱們根據源碼注釋一個參數一個參數的來看看

tag 說明
qcount 當前的隊列,剩余元素個數
dataqsiz 環形隊列可以存放的元素個數,也就是環形隊列的長度
buf 指針,指向環形隊列
elemsize 指的的隊列中每個元素的大小
closed 具體標識關閉的狀態
elemtype 見名知意,元素的類型
sendx 發送隊列的下標,向隊列中寫入數據的時候,存放在隊列中的位置
recvx 接受隊列的下標,從隊列的 這個位置開始讀取數據
recvq 協程隊列,等待讀取消息的協程隊列
sendq 協程隊列,等待發送消息的協程隊列
lock 互斥鎖,在 chan 中,不可以并發的讀寫數據

根據上面的參數,我們或多或少就可以知道 GO 中的通道實現原理設計了哪些知識點:

  • 指針
  • 環形隊列
  • 協程
  • 互斥鎖

我們順便再來看看上述成員的協程隊列 waitq 對應的是啥樣的數據結構

type waitq struct {
   first *sudog
   last  *sudog
}

sudog 結構是在 src/runtime/runtime2.go中 ,咱們順便多學一手

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
   // The following fields are protected by the hchan.lock of the
   // channel this sudog is blocking on. shrinkstack depends on
   // this for sudogs involved in channel ops.

   g *g

   next *sudog
   prev *sudog
   elem unsafe.Pointer // data element (may point to stack)

   // The following fields are never accessed concurrently.
   // For channels, waitlink is only accessed by g.
   // For semaphores, all fields (including the ones above)
   // are only accessed when holding a semaRoot lock.

   acquiretime int64
   releasetime int64
   ticket      uint32

   // isSelect indicates g is participating in a select, so
   // g.selectDone must be CAS'd to win the wake-up race.
   isSelect bool

   // success indicates whether communication over channel c
   // succeeded. It is true if the goroutine was awoken because a
   // value was delivered over channel c, and false if awoken
   // because c was closed.
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   c        *hchan // channel
}

根據源碼注釋,咱們大致知道sudog 是干啥的

Sudog表示等待列表中的 g,例如在一個通道上發送/接收

Sudog是很必要的,因為g?synchronization對象關系是多對多

一個 g 可能在很多等候隊列上,所以一個 g 可能有很多sudogs

而且許多 g 可能在等待同一個同步對象,所以一個對象可能有許多sudogs

咱們抓住主要矛盾

Sudog的數據結構,主要的東西就是一個 g 和一個 elem ,

g,上面有說到他和 Sudog的對應關系

無論是讀通道還是寫通道,都會需要 elem

讀通道

數據會從hchan的隊列中,拷貝到sudogelem

寫通道

與讀通道類似,是將數據從 sudogelem處拷貝到hchan的隊列中

咱們來畫個圖看看

此處咱們畫一個 hchan的結構,主要畫一下 recvq等待讀取消息的協程隊列,此處的隊列,實際上就是用鏈表來實現的

recvq會對應到 waitq結構,waitq 分為first頭結點 和 last尾節點 結構分別是 sudog

sudog里面 elem存放具體的數據,next 指針指向下一個 sudog,直到指到lastsudog

通過上述的,應該就能明白 GO 中的 chan 基本結構了吧

咱來再來詳細看看 hchan 中其他參數都具體是啥意思

  • dataqsiz 對應的環形隊列是啥樣的
  • sendq和 讀 recvq 等待隊列是啥樣的
  • elemtype元素類型信息又是啥

dataqsiz 對應的環形隊列是啥樣的

環形隊列,故名思議就是 一個首尾連接,成環狀的隊列

GO 中的 chan內部的環形隊列,主要作用是作為緩沖區

這個環形隊列的長度,我們在創建隊列的時候, 也就是創建 hchan 結構的時候,就已經指定好了的

就是 dataqsiz ,環形隊列的長度

咱們畫個圖清醒一下

上圖需要表達的意思是這個樣子的,上述的隊列是循環隊列,默認首尾連接哦

  • dataqsiz 表示 循環隊列的長度是 8 個
  • qcount 表示 當前隊列中有 5 個元素
  • buf 是指針,指向循環隊列頭
  • sendx 是發送隊列的下標,這里為 1 ,則指向隊列的第 2 個區域 ,這個參數可選范圍是 [0 , 8)
  • recvx 是接收隊列的下標,這里為 4 ,則指向的是 隊列的第 5 個區域進行讀取數據

這里順帶提一下,hchan 中讀取數據還是寫入數據,都是需要去拿 lock 互斥鎖的,同一個通道,在同一個時刻只能允許一個協程進行讀寫

寫 sendq和 讀 recvq 等待隊列是啥樣的

hchan 結構中的 2 個協程隊列,一個是用于讀取數據,一個是用于發送數據,他們都是等待隊列,我們來看看這個等待隊列都是咋放數據上去的,分別有啥特性需要注意

當從通道中讀取 或者 發送數據:

  • 若通道的緩沖區為空,或者沒有緩沖區,此時從通道中讀取數據,則協程是會被阻塞
  • 若通道緩沖區為滿,或者沒有緩沖區,此時從通道中寫數據,則協程仍然也會被阻塞

這些被阻塞的協程就會被放到等待隊列中,按照讀 和 寫 的動作來進行分類為寫 sendq和 讀 recvq 隊列

那么這些阻塞的協程,啥時候會被喚醒呢?

看過之前的文章 GO通道和 sync 包的分享,應該就能知道

我們在來回顧一下,這篇文章的表格,通道會存在的異常情況:

channel 狀態 未初始化的通道(nil) 通道非空 通道是空的 通道滿了 通道未滿
接收數據 阻塞 接收數據 阻塞 接收數據 接收數據
發送數據 阻塞 發送數據 發送數據 阻塞 發送數據
關閉 panic 關閉通道成功
待數據讀取完畢后
返回零值
關閉通道成功
直接返回零值
關閉通道成功
待數據讀取完畢后
返回零值
關閉通道成功
待數據讀取完畢后
返回零值

此時,我們就知道,具體什么時候被阻塞的協程會被喚醒了

  • 因為讀阻塞的協程,會被通道中的寫入數據的協程喚醒,反之亦然
  • 因為寫阻塞的協程,也會被通道中讀取數據的協程喚醒

elemtype元素類型信息又是啥

這個元素類型信息就不難理解了,對于我們使用通道,創建通道的時候我們需要填入通道中數據的類型,一個通道,只能寫一種數據類型,指的就是這里的elemtype

另外 hchan 還有一個成員是elemsize,代表上述元素類型的占用空間大小

那么這倆成員有啥作用呢?

elemtypeelemsize就可以計算指定類型的數據占用空間大小了

前者用于在數據傳遞的過程中進行賦值

后者可以用來在環形隊列中定位具體的元素

創建 chan 是咋實現的

我們再來瞅瞅 chan.go 的源碼實現 ,看到源碼中的 makechan 具體實現

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

如上源碼實際上就是初始化 chan 對應的成員,其中循環隊列 buf 的大小,是由 makechan 函數傳入的 類型信息和緩沖區長度決定的,也就是makechan 的入參

可以通過上述代碼的 3 個位置就可以知道

// 1
func makechan(t *chantype, size int) *hchan
// 2
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// 3
var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

讀寫 chan 的基本流程

第一張圖說明白向 chan 寫入數據的流程

向通道中寫入數據,我們會涉及sendq 、 recvq隊列,和循環隊列的資源問題

根據圖示可以看出向通道中寫入數據分為 3 種情況:

  • 寫入數據的時候,若recvq 隊列為空,且循環隊列有空位,那么就直接將數據寫入到 循環隊列的隊尾 即可
  • recvq 隊列為空,且循環隊列無空位,則將當前的協程放到sendq等待隊列中進行阻塞,等待被喚醒,當被喚醒的時候,需要寫入的數據,已經被讀取出來,且已經完成了寫入操作
  • recvq 隊列為不為空,那么可以說明循環隊列中沒有數據,或者循環隊列是空的,即沒有緩沖區(向無緩沖的通道寫入數據),此時,直接將recvq等待隊列中取出一個G,寫入數據,喚醒G,完成寫入操作

第二張圖說明白向 chan 讀取數據的流程

向通道中讀取數據,我們會涉及sendq 、 recvq隊列,和循環隊列的資源問題

根據圖示可以看出向通道中讀取數據分為 4 種情況:

  • sendq為空,且循環隊列無元素的時候,那就將當前的協程加入recvq等待隊列,把recvq等待隊列對頭的一個協程取出來,喚醒,讀取數據
  • sendq為空,且循環隊列有元素的時候,直接讀取循環隊列中的數據即可
  • sendq有數據,且循環隊列有元素的時候,直接讀取循環隊列中的數據即可,且把sendq隊列取一個G放到循環隊列中,進行補充
  • sendq有數據,且循環隊列無元素的時候,則從sendq取出一個G,并且喚醒他,進行數據讀取操作

上面說了通道的創建,讀寫,那么通道咋關閉?

通道的關閉,我們在應用的時候直接 close 就搞定了,那么對應close的時候,底層的隊列都是做了啥呢?

若關閉了當前的通道,那么系統會把recvq 讀取數據的等待隊列里面的所有協程,全部喚醒,這里面的每一個G 寫入的數據 默認就寫個 nil,因為通道關閉了,從關閉的通道里面讀取數據,讀到的是nil

系統還會把sendq寫數據的等待隊列里面的每一個協程喚醒,但是此時就會有問題了,向已經關閉的協程里面寫入數據,會報panic

我們再來梳理一下,什么情況下對通道操作,會報panic,咱們現在對之前提到的表格再來補充一波

channel 狀態 未初始化的通道(nil) 通道非空 通道是空的 通道滿了 通道未滿 關閉的通道
接收數據 阻塞 接收數據 阻塞 接收數據 接收數據 nil
發送數據 阻塞 發送數據 發送數據 阻塞 發送數據 panic
關閉 panic 關閉通道成功
待數據讀取完畢后
返回零值
關閉通道成功
直接返回零值
關閉通道成功
待數據讀取完畢后
返回零值
關閉通道成功
待數據讀取完畢后
返回零值
panic
  • 關閉一個已經被關閉了的通道,會報panic
  • 關閉一個未初始化的通道,即為nil的通道,也會報panic
  • 向一個已經關閉的通道寫入數據,會報panic

你以為這就完了嗎?

GO 里面Chan 一般會和 select 搭配使用,我們最后來簡單說一下GO 的 通道咋和select使用

GO 里面select 就和 C/C++里面的多路IO復用類似,在C/C++中多路IO復用有如下幾種方式

  • SELECT
  • POLL
  • EPOLL

都可以自己去模擬實現多路IO復用,各有利弊,一般使用的最多的是 EPOLL,且C/C++也有對應的網絡庫

當我們寫GO 的多路IO復用的時候,那就相當爽了,GO 默認支持select 關鍵字

SELECT 簡單使用

我們就來看看都是咋用的,不廢話,咱直接上DEMO

package main

import (
   "log"
   "time"
)

func main() {

   // 簡單設置log參數
   log.SetFlags(log.Lshortfile | log.LstdFlags)

   // 創建 2 個通道,元素數據類型為 int,緩沖區大小為 5
   var ch1 = make(chan int, 5)
   var ch2 = make(chan int, 5)

   // 分別向通道中各自寫入數據,咱默認寫1吧
   // 直接寫一個匿名函數 向通道中添加數據
   go func (){
      var num = 1
      for {
         ch1 <- num
         num += 1
         time.Sleep(1 * time.Second)
      }
   }()

   go func (){
      var num = 1
      for {
         ch2 <- num
         num += 1
         time.Sleep(1 * time.Second)
      }
   }()

   for {
      select {// 讀取數據
      case num := <-ch1:
         log.Printf("read ch1 data is  %d\n", num)

      case num := <-ch2:
         log.Printf("read ch2 data is: %d\n", num)

      default:
         log.Printf("ch1 and ch2 is empty\n")
          // 休息 1s 再讀
         time.Sleep(1 * time.Second)
      }
   }
}

運行效果

2021/06/18 17:43:06 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:07 main.go:48: read ch1 data is ?1
2021/06/18 17:43:07 main.go:48: read ch1 data is ?2
2021/06/18 17:43:07 main.go:51: read ch2 data is: 1
2021/06/18 17:43:07 main.go:51: read ch2 data is: 2
2021/06/18 17:43:07 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:08 main.go:48: read ch1 data is ?3
2021/06/18 17:43:08 main.go:51: read ch2 data is: 3
2021/06/18 17:43:08 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:09 main.go:48: read ch1 data is ?4
2021/06/18 17:43:09 main.go:51: read ch2 data is: 4
2021/06/18 17:43:09 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:10 main.go:51: read ch2 data is: 5
2021/06/18 17:43:10 main.go:48: read ch1 data is ?5

從運行結果來看,select 監控的 2個 通道,讀取到的數據是隨機的

可是我們看到case這個關鍵字,是不是會想到 switch ... case...,此處的的case 是順序運行的(GO 中沒有switch),select 里面的 case 應該也是順序運行才對呀,為啥結果是隨機的?

大家要是感興趣的話,可以深入研究一下,咱們今天就先到這里了。

總結

  • 分享了 GO 中通道是什么
  • 通道的底層數據結構詳細解析
  • 通道在GO源碼中是如何實現的
  • Chan 讀寫的基本原理
  • 關閉通道會出現哪些異常,panic
  • select 的簡單應用

原文鏈接:https://juejin.cn/post/6975280009082568740

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新