網站首頁 編程語言 正文
前言
channel是用于 goroutine 之間的同步、通信的數據結構
channel 的底層是通過 mutex 來控制并發的,但它為程序員提供了更高一層次的抽象,封裝了更多的功能,這樣并發編程變得更加容易和安全,得以讓程序員把注意力留到業務上去,提升開發效率
channel的用途包括但不限于以下幾點:
- 協程間通信,同步
- 定時任務:和timer結合
- 解耦生產方和消費方,實現阻塞隊列
- 控制并發數
本文將介紹channel的底層原理,包括數據結構,channel的創建,發送,接收,關閉的實現邏輯
整體結構
Go channel的數據結構如下所示:
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex }
qcount:已經存儲了多少個元素
dataqsie:最多存儲多少個元素,即緩沖區容量
buf:指向緩沖區的位置,實際上是一個數組
elemsize:每個元素占多大空間
closed:channel能夠關閉,這里記錄其關閉狀態
elemtype:保存數據的類型信息,用于go運行時使用
sendx,recvx:
- 記錄下一個要發送到的位置,下一次從哪里還是接收
- 這里用數組模擬隊列,這兩個變量即表示隊列的隊頭,隊尾
- 因此channel的緩沖也被稱為環形緩沖區
recvq,sendq:
當發送個接收不能立即完成時,需要讓協程在channel上等待,所以有兩個等待隊列,分別針對接收和發送
lock:channel支持協程間并發訪問,因此需要一把鎖來保護
創建
創建channel會被編譯器編譯為調用makechan
函數
// 無緩沖通道 ch1 := make(chan int) // 有緩沖通道 ch2 := make(chan int, 10)
會根據創建的是帶緩存,還是無緩沖,決定第二個參數size的值
可以看出,創建出來的是hchan指針,這樣就能在函數間直接傳遞 channel,而不用傳遞 channel 的指針
func makechan(t *chantype, size int) *hchan { elem := t.elem // mem:緩沖區大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError( "makechan: size out of range" )) } var c *hchan switch { // 緩沖區大小為空,只申請hchanSize大小的內存 case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() // 元素類型不包含指針,一次性分配hchanSize+mem大小的內存 case elem.ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) // 否則就是帶緩存,且有指針,分配兩次內存 default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 保存元素類型,元素大小,容量 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c }
發送
執行以下代碼時:
ch <- 3
編譯器會轉化為對chansend的調用
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 如果channel是空 if c == nil { // 非阻塞,直接返回 if !block { return false } // 否則阻塞當前協程 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw( "unreachable" ) } // 非阻塞,沒有關閉,且容量滿了,無法發送,直接返回 if !block && c.closed == 0 && full(c) { return false } // 加鎖 lock(&c.lock) // 如果已經關閉,無法發送,直接panic if c.closed != 0 { unlock(&c.lock) panic(plainError( "send on closed channel" )) } // 從接收隊列彈出一個協程的包裝結構sudog if sg := c.recvq.dequeue(); sg != nil { // 如果能彈出,即有等到接收的協程,說明: // 該channel要么是無緩沖,要么緩沖區為空,不然不可能有協程在等待 // 將要發送的數據拷貝到該協程的接收指針上 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 緩沖區還有空間 if c.qcount < c.dataqsiz { // qp:計算要發送到的位置的地址 qp := chanbuf(c, c.sendx) // 將數據從ep拷貝到qp typedmemmove(c.elemtype, qp, ep) // 待發送位置移動 c.sendx++ // 由于是數組模擬隊列,sendx到頂了需要歸零 if c.sendx == c.dataqsiz { c.sendx = 0 } // 緩沖區數量++ c.qcount++ unlock(&c.lock) return true } // 往下就是緩沖區無數據,也沒有等到接收協程的情況了 // 如果是非阻塞模式,直接返回 if !block { unlock(&c.lock) return false } // 將當前協程包裝成sudog,阻塞到channel上 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 當前協程進入發送等待隊列 c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 被喚醒后從這里開始執行 KeepAlive(ep) if mysg != gp.waiting { throw( "G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) // 被喚醒后發現channel關閉了,panic if closed { if c.closed == 0 { throw( "chansend: spurious wakeup" ) } panic(plainError( "send on closed channel" )) } return true }
整體流程為:
如果當前操作為非阻塞,channel沒有關閉,且容量滿了,無法發送,直接返回
從接收隊列彈出一個協程的包裝結構sudog,如果能彈出,即有等到接收的協程,說明:
- 該channel要么是無緩沖,要么緩沖區為空,不然不可能有協程在等待
- 將要發送的數據拷貝到該協程的接收指針上,返回
- 這里直接從發送者拷貝到接收者的內存,而不是先把數據拷貝到緩沖區,再從緩沖區拷貝到接收者,節約了一次內存拷貝
否則看看緩沖區還有空間,如果有,將數據拷貝到緩沖區上,也返回
接下來就是既沒有接收者等待,緩沖區也為空的情況,就需要將當前協程包裝成sudog,阻塞到channel上
將協程阻塞到channel的等待隊列時,將其包裝成了sudog結構:
type sudog struct { // 協程 g *g // 前一個,后一個指針 next *sudog prev *sudog // 等到發送的數據在哪,等待從哪個位置接收數據 elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot // 在哪個channel上等待 c *hchan // channel }
其目的是:
- g本身沒有存儲前一個,后一個指針,需要用sudog結構包裝才能加入隊列
- elem字段存儲等到發送的數據在哪,等待從哪個位置接收數據,用于從數據能從協程到協程的直接拷貝
來看看一些子函數:
1.判斷channel是否是滿的
func full(c *hchan) bool { // 無緩沖 if c.dataqsiz == 0 { // 并且沒有其他協程在等待 return c.recvq.first == nil } // 有緩沖,但容量裝滿了 return c.qcount == c.dataqsiz }
2.send方法:
/** c:要操作的channel sg:彈出的接收者協程 ep:要發送的數據在的位置 */ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果接收者指針不為空,直接把數據從ep拷貝到sg.elem if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒該接收者協程 goready(gp, skip+1) }
接收
從channel中接收數據有幾種寫法:
- 帶不帶ok
- 接不接收返回值
根據帶不帶ok,決定用下面哪個方法
func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
根據接不接收返回值,決定elem是不是nil
最終都會調用chanrecv方法:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 如果channel為nil,根據參數中是否阻塞來決定是否阻塞 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw( "unreachable" ) } // 非阻塞,并且channel為空 if !block && empty(c) { // 如果還沒關閉,直接返回 if atomic.Load(&c.closed) == 0 { return } // 否則已經關閉, // 如果為空,返回該類型的零值 if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } lock(&c.lock) // 同樣,如果channel已經關閉,且緩沖區沒有元素,返回該類型零值 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 如果有發送者正在阻塞,說明: // 1.無緩沖 // 2.有緩沖,但緩沖區滿了。因為只有緩沖區滿了,才可能有發送者在等待 if sg := c.sendq.dequeue(); sg != nil { // 將數據從緩沖區拷貝到ep,再將sg的數據拷貝到緩沖區,該函數詳細流程可看下文 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 如果緩存區有數據, if c.qcount > 0 { // qp為緩沖區中下一次接收的位置 qp := chanbuf(c, c.recvx) // 將數據從qp拷貝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } // 接下來就是既沒有發送者在等待,也緩沖區也沒數據 if !block { unlock(&c.lock) return false, false } // 將當前協程包裝成sudog,阻塞到channel中 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 記錄接收地址 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 從這里喚醒 if mysg != gp.waiting { throw( "G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }
接收流程如為:
如果channel為nil,根據參數中是否阻塞來決定是否阻塞
如果channel已經關閉,且緩沖區沒有元素,返回該類型零值
如果有發送者正在阻塞,說明:
- 要么是無緩沖
- 有緩沖,但緩沖區滿了。因為只有緩沖區滿了,才可能有發送者在等待
- 將數據從緩沖區拷貝到ep,再將發送者的數據拷貝到緩沖區,并喚該發送者
如果緩存區有數據, 則從緩沖區將數據復制到ep,返回
接下來就是既沒有發送者在等待,也緩沖區也沒數據的情況:
將當前協程包裝成sudog,阻塞到channel中
來看其中的子函數recv():
/** c:操作的channel sg:阻塞的發送協程 ep:接收者接收數據的地址 */ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果是無緩沖channel,直接將數據從發送者sg拷貝到ep if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } // 接下來是有緩沖,且緩沖區滿的情況 } else { // qp為channel緩沖區中,接收者下一次接收的地址 qp := chanbuf(c, c.recvx) // 將數據從qp拷貝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 將發送者的數據從sg.elem拷貝到qp typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 由于一接收已發送,緩沖區還是滿的,因此 c.sendx = c.recvx c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒發送者 goready(gp, skip+1) }
關閉
func closechan(c *hchan) { // 不能關閉空channel if c == nil { panic(plainError( "close of nil channel" )) } lock(&c.lock) // 不能重復關閉 if c.closed != 0 { unlock(&c.lock) panic(plainError( "close of closed channel" )) } // 修改關閉狀態 c.closed = 1 var glist gList // 釋放所有的接收者協程,并為它們賦予零值 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } // 釋放所有的發送者協程 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock) // 執行喚醒操作 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
關閉的流程比較簡單,可以看出:
不能關閉空channel,不能重復關閉channel
先上一把大鎖,接著把所有掛在這個 channel 上的 sender 和 receiver 全都連成一個 sudog 鏈表,再解鎖。最后,再將所有的 sudog 全都喚醒:
接收者:會收到該類型的零值
這里返回零值沒有問題,因為之所以這些接收者會阻塞,就是因為緩沖區沒有數據,因此channel關閉后該接收者收到零值也符合邏輯
發送者:會被喚醒,然后panic
因此不能在有多個sender的時候貿然關閉channel
原文鏈接:https://juejin.cn/post/7157663775955353614
相關推薦
- 2023-09-12 利用ImportBeanDefinitionRegistrar手動向Spring容器注入Bean
- 2022-06-14 C#實現FTP上傳文件的方法_C#教程
- 2022-07-12 git如何上傳本地項目
- 2022-11-08 C/C++實現遍歷文件夾最全方法總結_C 語言
- 2023-07-27 使用Echarts圖表時,頁面切換后并且改變頁面窗口大小,再切回原來頁面Echarts圖表顯示有問題
- 2022-12-25 React不使用requestIdleCallback實現調度原理解析_React
- 2022-03-04 Tue Dec 01 00:00:00 GMT+08:00 1998 轉成自定義字符串
- 2021-11-10 Android?Studio設置繪制布局時的視圖_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支