網(wǎng)站首頁 編程語言 正文
前言
channel是golang中標(biāo)志性的概念之一,很好很強(qiáng)大!
channel(通道),顧名思義,是一種通道,一種用于并發(fā)環(huán)境中數(shù)據(jù)傳遞的通道。通常結(jié)合golang中另一重要概念goroutine(go協(xié)程)使用,使得在golang中的并發(fā)編程變得清晰簡(jiǎn)潔同時(shí)又高效強(qiáng)大。
今天嘗試著讀讀golang對(duì)channel的實(shí)現(xiàn)源碼,本文主要是自己個(gè)人對(duì)于Channel源碼的學(xué)習(xí)筆記,需要的朋友可以參考以下內(nèi)容,希望對(duì)大家有幫助。
channel基礎(chǔ)結(jié)構(gòu)
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx: uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
hchan結(jié)構(gòu)就是channel的底層數(shù)據(jù)結(jié)構(gòu),看源碼定義,可以說是非常清晰了。
- qcount:channel緩存隊(duì)列中已有的元素?cái)?shù)量
- dataqsiz:channel的緩存隊(duì)列大小(定義channel時(shí)指定的緩存大小,這里channel用的是一個(gè)環(huán)形隊(duì)列)
- buf:指向channel緩存隊(duì)列的指針
- elemsize:通過channel傳遞的元素大小
- closed:channel是否關(guān)閉的標(biāo)志
- elemtype:通過channel傳遞的元素類型
- sendx:channel中發(fā)送元素在隊(duì)列中的索引
- recvx:channel中接受元素在隊(duì)列中的索引
- recvq:等待從channel中接收元素的協(xié)程列表
- sendq:等待向channel中發(fā)送元素的協(xié)程列表
- lock:channel上的鎖
其中關(guān)于recvq和sendq的兩個(gè)列表所用的結(jié)構(gòu)waitq簡(jiǎn)單看下。
type waitq struct { first *sudog last *sudog } type sudog struct { g *g selectdone *uint32 // CAS to 1 to win select race (may point to stack) next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) ... c *hchan // channel }
可以看出waiq是一個(gè)雙向鏈表結(jié)構(gòu),鏈上的節(jié)點(diǎn)是sudog。從sudog的結(jié)構(gòu)定義可以粗略看出,sudog是對(duì)g(即協(xié)程)的一個(gè)封裝。用于記錄一個(gè)等待在某個(gè)channel上的協(xié)程g、等待的元素elem等信息。
channel初始化
func makechan(t *chantype, size int64) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) { panic(plainError("makechan: size out of range")) } var c *hchan if elem.kind&kindNoPointers != 0 || size == 0 { // Allocate memory in one call. // Hchan does not contain pointers interesting for GC in this case: // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) if size > 0 && elem.size != 0 { c.buf = add(unsafe.Pointer(c), hchanSize) } else { // race detector uses this location for synchronization // Also prevents us from pointing beyond the allocation (see issue 9401). c.buf = unsafe.Pointer(c) } } else { c = new(hchan) c.buf = newarray(elem, int(size)) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n") } return c }
第一部分的3個(gè)if是對(duì)初始化參數(shù)的合法性檢查。
if elem.size >= 1<<16:
檢查channel元素大小,小于2字節(jié)
if hchanSize%maxAlign != 0 || elem.align > maxAlign
沒看懂(對(duì)齊?)
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size)
第一個(gè)判斷緩存大小需要大于等于0
int64(uintptr(size)) != size這一句實(shí)際是用于判斷size是否為負(fù)數(shù)。由于uintptr實(shí)際是一個(gè)無符號(hào)整形,負(fù)數(shù)經(jīng)過轉(zhuǎn)換后會(huì)變成一個(gè)與原數(shù)完全不同的很大的正整數(shù),而正數(shù)經(jīng)過轉(zhuǎn)換后并沒有變化。
最后一句判斷channel的緩存大小要小于heap中能分配的大小。_MaxMem是可分配的堆大小。
第二部分是具體的內(nèi)存分配。
元素類型為kindNoPointers的時(shí)候,既非指針類型,則直接分配(hchanSize+uintptr(size)*elem.size)大小的連續(xù)空間。c.buf指向hchan后面的elem隊(duì)列首地址。
如果channel緩存大小為0,則c.buf實(shí)際上是沒有給他分配空間的
如果類型為非kindNoPointers,則channel的空間和buf的空間是分別分配的。
channel發(fā)送
// entry point for c <- x from compiled code //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c))) }
channel發(fā)送,即協(xié)程向channel中發(fā)送數(shù)據(jù),與此操作對(duì)應(yīng)的go代碼如c <- x。
channel發(fā)送的實(shí)現(xiàn)源碼中,通過chansend1(),調(diào)用chansend(),其中block參數(shù)為true。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2) throw("unreachable") } ... }
chansend()首先對(duì)c進(jìn)行判斷, if c == nil:即channel沒有被初始化,這個(gè)時(shí)候會(huì)直接調(diào)用gopark使得當(dāng)前協(xié)程進(jìn)入等待狀態(tài)。而且用于喚醒的參數(shù)unlockf傳的nil,即沒有人來喚醒它,這樣系統(tǒng)進(jìn)入死鎖。所以channel必須被初始化之后才能使用,否則死鎖。
接下來是正式的發(fā)送處理,且后續(xù)操作會(huì)加鎖。
lock(&c.lock)
close判斷
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
如果channel已經(jīng)是closed狀態(tài),解鎖然后直接panic。也就是說我們不可以向已經(jīng)關(guān)閉的通道內(nèi)在發(fā)送數(shù)據(jù)。
將數(shù)據(jù)發(fā)給接收協(xié)程
if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
嘗試從接收等待協(xié)程隊(duì)列中取出一個(gè)協(xié)程,如果有則直接數(shù)據(jù)發(fā)給它。也就是說發(fā)送到channel的數(shù)據(jù)會(huì)優(yōu)先檢查接收等待隊(duì)列,如果有協(xié)程等待取數(shù),就直接給它。發(fā)完解鎖,操作完成。
這里send()方法會(huì)將數(shù)據(jù)寫到從隊(duì)列里取出來的sg中,通過goready()喚醒sg.g(即等待的協(xié)程),進(jìn)行后續(xù)處理。
數(shù)據(jù)放到緩存
if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
如果沒有接收協(xié)程在等待,則去檢查channel的緩存隊(duì)列是否還有空位。如果有空位,則將數(shù)據(jù)放到緩存隊(duì)列中。
通過c.sendx游標(biāo)找到隊(duì)列中的空余位置,然后將數(shù)據(jù)存進(jìn)去。移動(dòng)游標(biāo),更新數(shù)據(jù),然后解鎖,操作完成。
if c.sendx == c.dataqsiz { c.sendx = 0 }
通過這一段游標(biāo)的處理可以看出,緩存隊(duì)列是一個(gè)環(huán)形。
阻塞發(fā)送協(xié)程
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.selectdone = nil mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
如果緩存也慢了,這時(shí)候就只能阻塞住發(fā)送協(xié)程了, 等有合適的機(jī)會(huì)了,再將數(shù)據(jù)發(fā)送出去。
getg()獲取當(dāng)前協(xié)程對(duì)象g的指針,acquireSudog()生成一個(gè)sudog,然后將當(dāng)前協(xié)程及相關(guān)數(shù)據(jù)封裝好鏈接到sendq列表中。然年通過goparkunlock()將其轉(zhuǎn)為等待狀態(tài),并解鎖。操作完成。
channel接收
// entry points for <- c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
channel接收,即協(xié)程從channel中接收數(shù)據(jù),與此操作對(duì)應(yīng)的go代碼如<- c。
channel接收的實(shí)現(xiàn)源碼中,通過chanrecv1(),調(diào)用chanrecv(),其中block參數(shù)為true。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { if !block { return } gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2) throw("unreachable") } ... }
同發(fā)送一樣,接收也會(huì)首先檢查c是否為nil,如果為nil,會(huì)調(diào)用gopark()休眠當(dāng)前協(xié)程,從而最終造成死鎖。
接收操作同樣先進(jìn)行加鎖,然后開始正式操作。
close處理
if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(unsafe.Pointer(c)) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
接收和發(fā)送略有不同,當(dāng)channel關(guān)閉并且channel的緩存隊(duì)列里沒有數(shù)據(jù)了,那么接收動(dòng)作會(huì)直接結(jié)束,但不會(huì)報(bào)錯(cuò)。
也就是說,允許從已關(guān)閉的channel中接收數(shù)據(jù)。
從發(fā)送等待協(xié)程中接收
if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
嘗試從發(fā)送等待協(xié)程列表中取出一個(gè)等待協(xié)程,如果存在,則調(diào)用recv()方法接收數(shù)據(jù)。
這里的recv()方法比send()方法稍微復(fù)雜一點(diǎn),我們簡(jiǎn)單分析下。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { ... if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) ... // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
recv()的接收動(dòng)作分為兩種情況:
- c.dataqsiz == 0:即當(dāng)channel為無緩存channel時(shí),直接將發(fā)送協(xié)程中的數(shù)據(jù),拷貝給接收者。
- c.dataqsiz != 0:如果channel有緩存,則:根據(jù)緩存的接收游標(biāo),從緩存隊(duì)列中取出一個(gè),拷貝給接受者
小結(jié)
channel必須初始化后才能使用;
channel關(guān)閉后,不允許在發(fā)送數(shù)據(jù),但是還可以繼續(xù)從中接收未處理完的數(shù)據(jù)。所以盡量從發(fā)送端關(guān)閉channel;
無緩存的channel需要注意在一個(gè)協(xié)程中的操作不會(huì)造成死鎖;
原文鏈接:https://juejin.cn/post/7172917453679231012
相關(guān)推薦
- 2022-10-21 Python?NumPy教程之?dāng)?shù)組的基本操作詳解_python
- 2022-06-10 python?PIL?Image?圖像處理基本操作實(shí)例_python
- 2022-07-26 正則表達(dá)式規(guī)則
- 2022-04-03 Python?webargs?模塊的簡(jiǎn)單使用_python
- 2022-03-22 C++實(shí)現(xiàn)轉(zhuǎn)置矩陣的循環(huán)(矩陣轉(zhuǎn)置函數(shù))
- 2022-06-23 python入門語句基礎(chǔ)之if語句、while語句_python
- 2022-06-18 android實(shí)現(xiàn)在圖標(biāo)上顯示數(shù)字_Android
- 2023-02-10 Pytorch模型微調(diào)fine-tune詳解_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支