網站首頁 編程語言 正文
GO 中 Chan 實現原理分享
嗨,我是小魔童哪吒,還記得咱們之前分享過GO 通道 和sync包的使用嗎?咱們來回顧一下
- 分享了通道是什么,通道的種類
- 無緩沖,有緩沖,單向通道具體對應什么
- 對于通道的具體實踐
- 分享了關于通道的異常情況整理
- 簡單分享了sync包的使用
要是對上述內容還有點興趣的話,歡迎查看文章 GO通道和 sync 包的分享
chan 是什么
是一種特殊的類型,是連接并發goroutine
的管道
channel 通道是可以讓一個 goroutine 協程發送特定值到另一個 goroutine 協程的通信機制。
通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發數據的順序,這一點和管道是一樣的
一個協程從通道的一頭放入數據,另一個協程從通道的另一頭讀出數據
每一個通道都是一個具體類型的導管,聲明 channel 的時候需要為其指定元素類型。
本篇文章主要是分享關于通道的實現原理,關于通道的使用,可以查看文章 GO通道和 sync 包的分享 ,這里有詳細的說明
GO 中 Chan 的底層數據結構
了解每一個組件或者每一個數據類型的實現原理,咱們都會去看源碼中的數據結構是如何設計的
同樣,我們一起來看看 GO 的 Chan 的數據結構
GO 的 Chan 的源碼實現是在 : src/runtime/chan.go
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
hchan
是實現通道的核心數據結構,對應的成員也是不少,咱們根據源碼注釋一個參數一個參數的來看看
tag | 說明 |
---|---|
qcount | 當前的隊列,剩余元素個數 |
dataqsiz | 環形隊列可以存放的元素個數,也就是環形隊列的長度 |
buf | 指針,指向環形隊列 |
elemsize | 指的的隊列中每個元素的大小 |
closed | 具體標識關閉的狀態 |
elemtype | 見名知意,元素的類型 |
sendx | 發送隊列的下標,向隊列中寫入數據的時候,存放在隊列中的位置 |
recvx | 接受隊列的下標,從隊列的 這個位置開始讀取數據 |
recvq | 協程隊列,等待讀取消息的協程隊列 |
sendq | 協程隊列,等待發送消息的協程隊列 |
lock | 互斥鎖,在 chan 中,不可以并發的讀寫數據 |
根據上面的參數,我們或多或少就可以知道 GO 中的通道實現原理設計了哪些知識點:
- 指針
- 環形隊列
- 協程
- 互斥鎖
我們順便再來看看上述成員的協程隊列 waitq
對應的是啥樣的數據結構
type waitq struct { first *sudog last *sudog }
sudog
結構是在 src/runtime/runtime2.go
中 ,咱們順便多學一手
// sudog represents a g in a wait list, such as for sending/receiving // on a channel. type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool // success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because a // value was delivered over channel c, and false if awoken // because c was closed. success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
根據源碼注釋,咱們大致知道sudog
是干啥的
Sudog
表示等待列表中的 g,例如在一個通道上發送/接收
Sudog
是很必要的,因為g?synchronization對象關系是多對多
一個 g 可能在很多等候隊列上,所以一個 g 可能有很多sudogs
而且許多 g 可能在等待同一個同步對象,所以一個對象可能有許多sudogs
咱們抓住主要矛盾
Sudog
的數據結構,主要的東西就是一個 g
和一個 elem
,
g,上面有說到他和 Sudog
的對應關系
無論是讀通道還是寫通道,都會需要 elem
讀通道
數據會從hchan
的隊列中,拷貝到sudog
的elem
中
寫通道
與讀通道類似,是將數據從 sudog
的elem
處拷貝到hchan
的隊列中
咱們來畫個圖看看
此處咱們畫一個 hchan
的結構,主要畫一下 recvq
等待讀取消息的協程隊列,此處的隊列,實際上就是用鏈表來實現的
recvq
會對應到 waitq
結構,waitq
分為first
頭結點 和 last
尾節點 結構分別是 sudog
sudog
里面 elem存放具體的數據,next 指針指向下一個 sudog
,直到指到last
的 sudog
通過上述的,應該就能明白 GO 中的 chan
基本結構了吧
咱來再來詳細看看 hchan
中其他參數都具體是啥意思
-
dataqsiz
對應的環形隊列是啥樣的 - 寫
sendq
和 讀recvq
等待隊列是啥樣的 -
elemtype
元素類型信息又是啥
dataqsiz 對應的環形隊列是啥樣的
環形隊列,故名思議就是 一個首尾連接,成環狀的隊列
GO 中的 chan
內部的環形隊列,主要作用是作為緩沖區
這個環形隊列的長度,我們在創建隊列的時候, 也就是創建 hchan
結構的時候,就已經指定好了的
就是 dataqsiz
,環形隊列的長度
咱們畫個圖清醒一下
上圖需要表達的意思是這個樣子的,上述的隊列是循環隊列,默認首尾連接哦:
- dataqsiz 表示 循環隊列的長度是 8 個
- qcount 表示 當前隊列中有 5 個元素
- buf 是指針,指向循環隊列頭
- sendx 是發送隊列的下標,這里為 1 ,則指向隊列的第 2 個區域 ,這個參數可選范圍是 [0 , 8)
- recvx 是接收隊列的下標,這里為 4 ,則指向的是 隊列的第 5 個區域進行讀取數據
這里順帶提一下,hchan
中讀取數據還是寫入數據,都是需要去拿 lock
互斥鎖的,同一個通道,在同一個時刻只能允許一個協程進行讀寫
寫 sendq和 讀 recvq 等待隊列是啥樣的
hchan
結構中的 2 個協程隊列,一個是用于讀取數據,一個是用于發送數據,他們都是等待隊列,我們來看看這個等待隊列都是咋放數據上去的,分別有啥特性需要注意
當從通道中讀取 或者 發送數據:
- 若通道的緩沖區為空,或者沒有緩沖區,此時從通道中讀取數據,則協程是會被阻塞的
- 若通道緩沖區為滿,或者沒有緩沖區,此時從通道中寫數據,則協程仍然也會被阻塞
這些被阻塞的協程就會被放到等待隊列中,按照讀 和 寫 的動作來進行分類為寫 sendq
和 讀 recvq
隊列
那么這些阻塞的協程,啥時候會被喚醒呢?
看過之前的文章 GO通道和 sync 包的分享,應該就能知道
我們在來回顧一下,這篇文章的表格,通道會存在的異常情況:
channel 狀態 | 未初始化的通道(nil) | 通道非空 | 通道是空的 | 通道滿了 | 通道未滿 |
---|---|---|---|---|---|
接收數據 | 阻塞 | 接收數據 | 阻塞 | 接收數據 | 接收數據 |
發送數據 | 阻塞 | 發送數據 | 發送數據 | 阻塞 | 發送數據 |
關閉 | panic | 關閉通道成功 待數據讀取完畢后 返回零值 |
關閉通道成功 直接返回零值 |
關閉通道成功 待數據讀取完畢后 返回零值 |
關閉通道成功 待數據讀取完畢后 返回零值 |
此時,我們就知道,具體什么時候被阻塞的協程會被喚醒了
- 因為讀阻塞的協程,會被通道中的寫入數據的協程喚醒,反之亦然
- 因為寫阻塞的協程,也會被通道中讀取數據的協程喚醒
elemtype元素類型信息又是啥
這個元素類型信息就不難理解了,對于我們使用通道,創建通道的時候我們需要填入通道中數據的類型,一個通道,只能寫一種數據類型,指的就是這里的elemtype
另外 hchan
還有一個成員是elemsize
,代表上述元素類型的占用空間大小
那么這倆成員有啥作用呢?
elemtype
和elemsize
就可以計算指定類型的數據占用空間大小了
前者用于在數據傳遞的過程中進行賦值
后者可以用來在環形隊列中定位具體的元素
創建 chan 是咋實現的
我們再來瞅瞅 chan.go
的源碼實現 ,看到源碼中的 makechan
具體實現
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
如上源碼實際上就是初始化 chan
對應的成員,其中循環隊列 buf 的大小,是由 makechan
函數傳入的 類型信息和緩沖區長度決定的,也就是makechan
的入參
可以通過上述代碼的 3 個位置就可以知道
// 1 func makechan(t *chantype, size int) *hchan // 2 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 3 var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) }
讀寫 chan 的基本流程
第一張圖說明白向 chan 寫入數據的流程
向通道中寫入數據,我們會涉及sendq
、 recvq
隊列,和循環隊列的資源問題
根據圖示可以看出向通道中寫入數據分為 3 種情況:
- 寫入數據的時候,若
recvq
隊列為空,且循環隊列有空位,那么就直接將數據寫入到 循環隊列的隊尾 即可 - 若
recvq
隊列為空,且循環隊列無空位,則將當前的協程放到sendq
等待隊列中進行阻塞,等待被喚醒,當被喚醒的時候,需要寫入的數據,已經被讀取出來,且已經完成了寫入操作 - 若
recvq
隊列為不為空,那么可以說明循環隊列中沒有數據,或者循環隊列是空的,即沒有緩沖區(向無緩沖的通道寫入數據),此時,直接將recvq
等待隊列中取出一個G,寫入數據,喚醒G,完成寫入操作
第二張圖說明白向 chan 讀取數據的流程
向通道中讀取數據,我們會涉及sendq
、 recvq
隊列,和循環隊列的資源問題
根據圖示可以看出向通道中讀取數據分為 4 種情況:
- 若
sendq
為空,且循環隊列無元素的時候,那就將當前的協程加入recvq
等待隊列,把recvq
等待隊列對頭的一個協程取出來,喚醒,讀取數據 - 若
sendq
為空,且循環隊列有元素的時候,直接讀取循環隊列中的數據即可 - 若
sendq
有數據,且循環隊列有元素的時候,直接讀取循環隊列中的數據即可,且把sendq
隊列取一個G放到循環隊列中,進行補充 - 若
sendq
有數據,且循環隊列無元素的時候,則從sendq
取出一個G,并且喚醒他,進行數據讀取操作
上面說了通道的創建,讀寫,那么通道咋關閉?
通道的關閉,我們在應用的時候直接 close
就搞定了,那么對應close
的時候,底層的隊列都是做了啥呢?
若關閉了當前的通道,那么系統會把recvq
讀取數據的等待隊列里面的所有協程,全部喚醒,這里面的每一個G 寫入的數據 默認就寫個 nil,因為通道關閉了,從關閉的通道里面讀取數據,讀到的是nil
系統還會把sendq
寫數據的等待隊列里面的每一個協程喚醒,但是此時就會有問題了,向已經關閉的協程里面寫入數據,會報panic
我們再來梳理一下,什么情況下對通道操作,會報panic
,咱們現在對之前提到的表格再來補充一波
channel 狀態 | 未初始化的通道(nil) | 通道非空 | 通道是空的 | 通道滿了 | 通道未滿 | 關閉的通道 |
---|---|---|---|---|---|---|
接收數據 | 阻塞 | 接收數據 | 阻塞 | 接收數據 | 接收數據 | nil |
發送數據 | 阻塞 | 發送數據 | 發送數據 | 阻塞 | 發送數據 | panic |
關閉 | panic | 關閉通道成功 待數據讀取完畢后 返回零值 |
關閉通道成功 直接返回零值 |
關閉通道成功 待數據讀取完畢后 返回零值 |
關閉通道成功 待數據讀取完畢后 返回零值 |
panic |
- 關閉一個已經被關閉了的通道,會報
panic
- 關閉一個未初始化的通道,即為
nil
的通道,也會報panic
- 向一個已經關閉的通道寫入數據,會報
panic
你以為這就完了嗎?
GO 里面Chan
一般會和 select
搭配使用,我們最后來簡單說一下GO 的 通道咋和select使用吧
GO 里面select
就和 C/C++
里面的多路IO復用類似,在C/C++
中多路IO復用有如下幾種方式
- SELECT
- POLL
- EPOLL
都可以自己去模擬實現多路IO復用,各有利弊,一般使用的最多的是 EPOLL,且C/C++也有對應的網絡庫
當我們寫GO 的多路IO復用的時候,那就相當爽了,GO 默認支持select
關鍵字
SELECT 簡單使用
我們就來看看都是咋用的,不廢話,咱直接上DEMO
package main import ( "log" "time" ) func main() { // 簡單設置log參數 log.SetFlags(log.Lshortfile | log.LstdFlags) // 創建 2 個通道,元素數據類型為 int,緩沖區大小為 5 var ch1 = make(chan int, 5) var ch2 = make(chan int, 5) // 分別向通道中各自寫入數據,咱默認寫1吧 // 直接寫一個匿名函數 向通道中添加數據 go func (){ var num = 1 for { ch1 <- num num += 1 time.Sleep(1 * time.Second) } }() go func (){ var num = 1 for { ch2 <- num num += 1 time.Sleep(1 * time.Second) } }() for { select {// 讀取數據 case num := <-ch1: log.Printf("read ch1 data is %d\n", num) case num := <-ch2: log.Printf("read ch2 data is: %d\n", num) default: log.Printf("ch1 and ch2 is empty\n") // 休息 1s 再讀 time.Sleep(1 * time.Second) } } }
運行效果
2021/06/18 17:43:06 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:07 main.go:48: read ch1 data is ?1
2021/06/18 17:43:07 main.go:48: read ch1 data is ?2
2021/06/18 17:43:07 main.go:51: read ch2 data is: 1
2021/06/18 17:43:07 main.go:51: read ch2 data is: 2
2021/06/18 17:43:07 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:08 main.go:48: read ch1 data is ?3
2021/06/18 17:43:08 main.go:51: read ch2 data is: 3
2021/06/18 17:43:08 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:09 main.go:48: read ch1 data is ?4
2021/06/18 17:43:09 main.go:51: read ch2 data is: 4
2021/06/18 17:43:09 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:10 main.go:51: read ch2 data is: 5
2021/06/18 17:43:10 main.go:48: read ch1 data is ?5
從運行結果來看,select
監控的 2個 通道,讀取到的數據是隨機的
可是我們看到case
這個關鍵字,是不是會想到 switch ... case...
,此處的的case
是順序運行的(GO 中沒有switch),select
里面的 case
應該也是順序運行才對呀,為啥結果是隨機的?
大家要是感興趣的話,可以深入研究一下,咱們今天就先到這里了。
總結
- 分享了 GO 中通道是什么
- 通道的底層數據結構詳細解析
- 通道在GO源碼中是如何實現的
- Chan 讀寫的基本原理
- 關閉通道會出現哪些異常,panic
- select 的簡單應用
原文鏈接:https://juejin.cn/post/6975280009082568740
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2022-04-24 一起來學習C語言的字符串轉換函數_C 語言
- 2022-12-12 flutter?InheritedWidget使用方法總結_Android
- 2022-07-03 go實現分布式鎖
- 2022-07-25 pandas實現數據讀取&清洗&分析的項目實踐_python
- 2023-10-17 react跨域請求數據(proxy)
- 2022-03-31 C語言類的基本語法詳解_C 語言
- 2022-04-17 spring cloud config和bus組件實現自動刷新
- 2022-10-27 Python入門之字符串操作詳解_python
- 欄目分類
-
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支