網站首頁 編程語言 正文
1.channel 簡介
Go語言有個很出名的話是“以通信的手段來共享內存”,channel就是其最佳的體現,channel提供一種機制,可以同步兩個并發執行的函數,還可以讓兩個函數通過互相傳遞特定類型的值來通信
channel有兩種初始化方式,分別是帶緩存的和不帶緩存的:
make(chan int) // 無緩存 chan make(chan int, 10) // 有緩存 chan
使用方式也很簡單:
c := make(chan int) defer close(c) go func(){ c <- 5 // send }() n := <- c // recv
十分簡潔的做到了不同協程的交互。
2.channel 內部結構
chan的實現在runtime/chan.go,是一個hchan的結構體:
type hchan struct { qcount uint // 隊列中的數據個數 dataqsiz uint // 環形隊列的大小,channel本身是一個環形隊列 buf unsafe.Pointer // 存放實際數據的指針,用unsafe.Pointer存放地址,為了避免gc elemsize uint16 closed uint32 // 標識channel是否關閉 elemtype *_type // 數據 元素類型 sendx uint // send的 index recvx uint // recv 的 index recvq waitq // 阻塞在 recv 的隊列 sendq waitq // 阻塞在 send 的隊列 lock mutex // 鎖 }
可以看出,channel本身是一個環形緩沖區,數據存放到堆上面,channel的同步是通過鎖實現的,并不是想象中的lock-free的方式,channel中有兩個隊列,一個是發送阻塞隊列,一個是接收阻塞隊列。當向一個已滿的channel發送數據會被阻塞,此時發送協程會被添加到sendq中,同理,當向一個空的channel接收數據時,接收協程也會被阻塞,被置入recvq中。
waitq是一個鏈表,里面對g結構做了一下簡單的封裝。
3.創建channel
當我們在代碼里面通過make創建一個channel時,實際調用的是下面這個函數:
CALL runtime.makechan(SB)
makechan的實現如下所示:
func makechan(t *chantype, size int) *hchan { elem := t.elem // 判斷 元素類型的大小 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } // 判斷對齊限制 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 判斷 size非負 和 是否大于 maxAlloc限制 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // 無緩沖區,即 make沒設置大小 c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: // 數據類型不包含指針 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 如果包含指針 // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
根據上面的代碼,我們可以看到,創建channel分為三種情況:
1.第一種緩沖區大小為0,此時只需要分配hchansize大小的內存就ok
2.第二種緩沖區大小不為0,且channel的類型不包含指針,此時buf為hchanSize+元素大小*元素個數的連續內存
3.第三種緩沖區大小不為0,且channel的類型包含指針,則不能簡單的根據元素的大小去申請內存,需要通過mallocgc去分配內存
4.發送數據
發送數據會調用chan.go中的如下接口:
CALL runtime.chansend1(SB)
chansend1會調用chansend接口,chansend方法簽名如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
c是具體的channel,ep是發送的數據,block為true表示阻塞的發送,一般向channel發送數據都是阻塞的,如果channel數據滿了,會一直阻塞在這里。但是在select中如果有case監聽某個channel的發送,那么此時的block參數為false,后續分析select實現會講到。
select { case <-c: // 這里為非阻塞發送 // do some thing default: // do some thing }
chansend接口會對一些條件做判斷
如果向一個為nil的channel發送數據,如果是阻塞發送會一直阻塞:
if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
首先會加鎖,保證原子性,如果向一個已關閉的channel發送數據就會panic。
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
如果此時recvq中有等待協程,就直接調用send函數將數據復制給接收方, 實現如下:
// sg 為接收者協程,ep為發送元素 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
如果此時沒有等待協程,并且數據未滿的情況下,就將數據copy到環形緩沖區中,將位置后移一位。
if c.qcount < c.dataqsiz { // 如果 未滿 // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
如果此時環形緩沖區數據滿了,如果是阻塞發送,此時會把發送方放到sendq隊列中。
5.接收數據
接收數據會調用下面的接口:
CALL runtime.chanrecv1(SB)
chanrecv1會調用chanrecv接口,chanrecv方法簽名如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
c 指需要操作的channel,接收的數據會寫到ep中,block與send中的情況一樣,表示是阻塞接收還是非阻塞接收,非阻塞接收指在select中case 接收一個channel值:
select { case a := <-c: // 這里為非阻塞接收,沒有數據直接返回 // do some thing default: // do some thing }
首先chanrecv也會做一些參數校驗
如果channel為nil并且是非阻塞模式,直接返回,如果是阻塞模式,永遠等待
if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
隨后會加鎖,防止競爭讀寫
lock(&c.lock)
如果向一個已關閉的channel接收數據,此時channel里面還有數據,那么依然可以接收數據,屬于正常接收數據情況。
如果向一個已關閉的channel接收數據,此時channel里面沒有數據,那么此時返回的是(true,false),表示有值返回,但不是我們需要的值:
if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) // 將 ep 指向的內存塊置 0 } return true, false }
接收也分為三種情況:
如果此時 sendq中有發送方在阻塞,此時會調用recv函數:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
此時有發送方在等待,表示此時channel中數據已滿,這個時候會將channel頭部的數據copy到接收方,然后將發送方隊列頭部的發送者的數據copy到那個位置。這涉及到兩次copy操作。
第二種情況是如果沒有發送方等待,此時會把數據copy到channel中:
if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
第三種情況如果channel里面沒有數據,如果是非阻塞接收直接返回false,如果是阻塞接收會將接收方協程放入channel的recvq中。
6.關閉channel
關閉channel時會調用如下接口:
func closechan(c *hchan)
首先會做一些數據校驗:
if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 //置關閉標記位
如果向一個為nil的channel或者向一個已關閉的channel發起close操作就會panic。
隨后會喚醒所有在recvq或者sendq里面的協程:
var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
如果存在接收者,將接收數據通過typedmemclr置0。
如果存在發送者,將所有發送者panic。
7.總結
綜上分析,在使用channel有這么幾點要注意
1.確保所有數據發送完后再關閉channel,由發送方來關閉
2.不要重復關閉channel
3.不要向為nil的channel里面發送值
4.不要向為nil的channel里面接收值
5.接收數據時,可以通過返回值判斷是否ok
n , ok := <- c if ok{ // do some thing }
這樣防止channel被關閉后返回了零值,對業務造成影響
原文鏈接:https://blog.csdn.net/qq_53267860/article/details/126821525
相關推薦
- 2022-01-30 composer 安裝包,提示找不到對應的包,很奇怪的問題,備忘
- 2022-08-27 C#8.0中的模式匹配_C#教程
- 2022-10-11 delphi fmx android 屏幕分辨率
- 2022-10-01 Python字符串常用方法以及其應用場景詳解_python
- 2022-07-12 ERROR:ORA-12543: TNS:destination host unreachable
- 2023-12-19 Mybatis使用注解實現復雜動態SQL
- 2022-05-11 垃圾收集器ParNew&CMS與底層三色標記算法詳解
- 2022-07-15 Android?Camera開發實現可復用的相機組件_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支