網站首頁 編程語言 正文
以下源碼都摘自 golang 1.16.15 版本。
1. channel 底層結構
Golang 中的 channel 對應的底層結構為 hchan 結構體(channel的源碼位置在Golang包的 runtime/chan.go):
type hchan struct { qcount uint // buf當前元素的數量 dataqsiz uint // buf的容量 buf unsafe.Pointer // channel緩沖區,一個循環數組 elemsize uint16 // 元素大小 closed uint32 // channel關閉標記 elemtype *_type // element type sendx uint // 當下一次發送數據到channel時,數據存放到buf中的哪個index recvx uint // 當下一次從channel接收數據時,從buf的哪個index獲取數據 recvq waitq // 等待接收數據的goroutine列表,雙向鏈表 sendq waitq // 等待發送數據的goroutine列表,雙向鏈表 lock mutex // 互斥鎖,發送和接收操作前需要獲取的鎖,所以channel的發送和接收操作是互斥的 }
如果 dataqsiz == 0 時,則為無緩沖 channel,如果 dataqsiz > 0 時,則為有緩沖 channel。
其中 recvq 和 sendq 是一個雙向鏈表結構,鏈表中的元素為 sudog 結構體,其中該結構體中保存了g,所以本質上recvq 和 sendq 是保存了等待接收/發送數據的goroutine列表。
channel 中的 recvq 和 sendq 的使用場景如下所示:
在從 channel 接收數據時 (data := <- ch),如果 sendq 中沒有等待發送數據的 goroutine,且 buf 中沒有數據時,則需要把當前 goroutine 保存到 recvq 列表中,并掛起。
在向 channel 發送數據時 (ch <- data),如果 recvq 中沒有等待接收數據的 goroutine,且 buf 滿了的情況下,則需要把當前 goroutine 保存到 sendq 列表中,并掛起。
type waitq struct { first *sudog last *sudog }
// sudog表示等待隊列中的一個g,例如在一個channel中的發送/接收。 // sudog是必要的,因為g和同步對象的關系是多對多的,一個g可以在多個等待隊列中,因此一個g會有很多個sudog, // 很多g可能在等待著同一個同步對象,因此一個對象可能有多個sudog。 // sudog是從一個特殊的池中分配的,使用acquireSudog和releaseSudog分配和釋放它們。 type sudog struct { // 以下字段受此sudog阻塞的channel的hchan.lock保護 g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // 以下字段永遠不會被同時訪問 // 對于channel,waitlink只能被g訪問 // 對于信號量,所有字段(包括上述字段)只有在持有semaRoot鎖時才能訪問。 acquiretime int64 releasetime int64 ticket uint32 // isSelect表示g正在參與選擇,因此g.selectDone必須經過CAS處理,才能被喚醒 isSelect bool // success表示通過channel c的通信是否成功。 // 如果goroutine因為通過channel c傳遞了一個值而被喚醒,則為true // 如果因為c被關閉而喚醒,則為false success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
channel 結構圖:
2. channel 的創建
// 無緩沖channel ch := make(chan int) // 緩沖大小為5的channel ch2 := make(chan int, 5)
創建 channel 的源碼為runtime/chan.go文件中的 makechan 函數:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // 隊列或元素大小為0,即無緩沖channel c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不包含指針類型,只進行一次 hchan 和 buf 的內存分配 // 當存儲在buf中的元素不包含指針時,GC就不會掃描hchan中的元素 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素中包含指針類型,進行2次內存分配操作 // 用new分配內存返回的是指針 c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 初始化channel數據 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } // 返回 hchan 的指針類型 return c }
注意這里返回的是 hchan 的指針,因此我們在函數間可以直接傳遞 channel,而不用傳遞channel的指針了。
另外,因為channel 的內存分配都用到了 mallocgc 函數,而 mallocgc 是負責堆內存分配的關鍵函數,因此可見 channel 是分配在堆內存上的。
3. channel 的發送流程
channel 的發送:
ch <- data
channel 發送的源碼對應 runtime/chan.go 的 chansend 函數:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 如果當前channel是nil if c == nil { // 如果不阻塞,則直接返回false if !block { return false } // 掛起當前goroutine gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // 這里訪問了hchan結構中的closed, full函數內部訪問了dataqsiz,recvq,qcount字段,這里沒有加鎖,是為什么呢? // 先說說這里判斷的含義:如果不阻塞,且channel沒有被關閉,且buf已滿,則快速返回false,表示數據發送失敗。 // 因為沒有加鎖,假如在判斷c.closed == 0之后結果為true,在判斷full之前,這時channel被其他goroutine關閉了, // 然后full函數返回了true,那么它會直接return false,這樣子會有什么影響呢? // 其實并沒有什么影響,在這種情況下返回false也是合理的,因為都是表示在不阻塞的情況下發送數據失敗。 // 所以這里訪問hchan里面的數據就沒有加鎖了 if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 鎖住channel,可見channel是并發安全的 lock(&c.lock) // 如果channel已關閉,則panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 如果recvq等待接收隊列中有值,則直接把值傳給等待接收的goroutine,這樣可以減少一次內存拷貝 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 如果recvq等待接收隊列中沒有值,且為有緩沖channel,則把數據copy到buf中 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ // 因為buf是環形數組,所以如果sendx超出了最大index,就要歸0 if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } // 如果recvq等待接收隊列中沒有值,且為無緩沖channel,且不阻塞,則直接返回false if !block { unlock(&c.lock) return false } // 接下來做阻塞當前goroutine的一些準備工作,構造一個sudog // 獲取當前goroutine的指針 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 把構建好的 sudog 加到 sendq 發送等待隊列中 c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 掛起當前goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 如果當前 goroutine 被喚醒后,會在這里繼續執行 // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } // 如果喚醒后,發現 channel 被關閉,則關閉 panic(plainError("send on closed channel")) } return true }
full 函數,用于判斷當前channel是否還有坑位接收待發送的數據:
// 判斷channel中是否還有位置存放數據 func full(c *hchan) bool { // 如果是非緩沖channel if c.dataqsiz == 0 { // 如果 recvq 中沒有等待接收數據的 goroutine,則返回 true,表示已滿,否則返回 false return c.recvq.first == nil } // 如果是有緩沖 channel,則判斷buf是否已滿 return c.qcount == c.dataqsiz }
send 函數,在recvq中有等待接收數據的goroutine時會被調用:
// 在一個空的 channel c 中完成發送操作 // 把數據 ep 從發送者復制到接收者 sg 中 // 最后接收的 goroutine 會被喚醒 // channel c 一定是空的且被鎖住的 // sg 一定是已經從 c 的 recvq 中出隊了 // eq 一定是不等于 nil 的,且指向堆或者是調用者的棧 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } // sg.elem 指向接收者存放接收數據的存放的位置 if sg.elem != nil { // 直接內存拷貝,從發送者拷貝到接收者內存 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g // 解鎖 unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒接收數據的goroutine goready(gp, skip+1) }
總結 channel 的發送流程:
判斷 channel 是否是 nil,如果是,則會永久阻塞導致死鎖報錯
如果 channel 中 recvq 存在接收者 goroutine,則直接把需要發送的數據拷貝到接收 goroutine,這里其實是有sodog 的結構,里面保存了接受者goroutine的指針。
如果 recvq 中不存在接收者:
a. 如果 buf 沒有滿,則直接把數據拷貝到 buf 的 sendx 位置
b. 如果 channel 為無緩沖 channel 或 buf 已滿,則把當前 goroutine 保存到 sendq 等待隊列中,阻塞當前 goroutine
4. channel 的接收流程
channel 的接收:
data := <- ch
data2, ok := <- ch
channel 的接收分別有2個函數,其中一種是帶”ok“返回值的,另外一種是不帶"ok"返回值的。
- 帶”ok"返回值的函數,該返回的布爾值為 true 時,并不表示當前通道還沒有關閉,而是僅僅表示當前獲取到的值是通道的正常生產出來的數據,而不是零值;當該布爾值為 false 時,表示當前的通道已經被關閉,并且獲取到的值是零值。
- 不帶"ok"返回值的函數,當 channel 被關閉時,就不能判斷當前獲取到的值是 channel 正常生產的值,還是零值了。
// 無返回值 func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } // 返回 bool 類型,如果返回false,表示 channel 已經被關閉,否則返回false。 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
不管是否返回 received,channel 的接收都調用了 chanrecv 函數:
// 從 channel c 中接收數據,并把數據復制到 ep 中。 // 在忽略接收數據的情況下,eq 可能是 nil,例如:<- ch // 如果不阻塞,且 channel 中沒有元素的情況下,直接快速返回(false, false) // 如果 c 已經被關閉,*ep 為零值,怎返回(true, false) // 如果 *ep 中有元素,則返回(true, true) // 一個不等于 nil 的 eq 一定指向堆或者調用者的棧 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } // 如果 c 為 nil,掛起當前 goroutine gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. // 在非阻塞模式下,快速檢測接收失敗的情況 if !block && empty(c) { // 發現 channel 沒有準備好要接收數據后,我們觀察通道是否已經關閉。 // 重新排序這些檢查可能會導致在關閉時不正確的行為。 // 例如,如果通道是open,且not empty,然后被關閉,接著排空->empty, // 重新排序的讀取可能會錯誤地表示成”open和empty“。 // 為了防止重排序,我們對這2個檢查都使用原子加載,并依靠清空和關閉發生在同一個鎖下的不同臨界區。 // 當關閉帶有阻塞發送的非緩沖channel,此假設失敗,但這無論如何都是錯誤的條件。 if atomic.Load(&c.closed) == 0 { // 因為 channel 不能重新打開,所以在后面這里觀察到 channel 沒有被關閉,意味著它在第一次判斷 empty 的時候也沒有關閉。 // 這樣就表現得像在第一次判斷 empty 時,通道也沒有關閉:if empty(c) && atomic.Load(&c.closed) == 0 {...} return } // 當執行到這里的時候,說明 channel 已經被關閉了。 // 這時重新檢查通道是否還有其他待接收的數據,這些數據可能在第一次 empty 檢查和通道關閉檢查之間到達。 // 在這種情況下發送時,也需要按照連貫的順序。 if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 獲取鎖 lock(&c.lock) // 如果 channel c 已經被關閉,且 buf 中無元素,將獲取到零值 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 如果 sendq 中有元素 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). // 找到一個正在等待的發送者。 // 1.如果是無緩沖 channel,則直接把從發送者那里接收數據。 // 2.如果是有緩沖 channel,這時 sendq 中有元素,說明 buf 滿了,發送者需要等待消費者消費 buf 數據后才能繼續發送數據。 // 這時當前的 goroutine 會從 buf 的 recvx 位置接收數據,并且把剛剛獲取到的發送者 sg 的發送數據拷貝到 buf 的 sendx 位置中。 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // sendq 中沒有等待的發送者,且 buf 中有數據,則直接從 buf 中接收數據 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- // 解鎖 unlock(&c.lock) return true, true } // 如果代碼運行到這里,說明 channel 中沒有數據可以接收了,接下來就要準備阻塞當前 goroutine 了 // 如果不阻塞,則快速返回 if !block { // 解鎖 unlock(&c.lock) return false, false } // no sender available: block on this channel. // 構造sudog // 獲取當前 goroutine 指針 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 把構造好的 sudog 入隊 recvq c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 掛起當前 goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 如果 goroutine 被喚醒,會從這里開始繼續執行 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }
empty 函數用于判斷從 channel c 中讀取數據是否會阻塞:
func empty(c *hchan) bool { // c.dataqsiz 是不會被改變的. if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0 }
recv 函數在 channel c 的 buf 是滿的,且 sendq 中有等待發送的 goroutine 時會被調用:
// 這里分為 2 個部分: // 1.發送者 sg 待發送的值會被放入通道 buf 中,發送者被喚醒繼續執行 // 2.接收方(當前 goroutine)接收的值寫入 ep // 對于同步 channel(無緩沖),2 個值都是一樣的 // 對于異步 channel(有緩沖),接收方從 channel buf 獲取數據,發送方的數據放入 channel buf // channel c 一定是滿的,且已被鎖定,recv 用 unlockf 解鎖 channel c。 // sg 一定已經從 sendq 出隊 // 不等于 nil 的 ep 一定指向堆或調用者的棧 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // 非緩沖 channel,直接從發送方接收數據 recvDirect(c.elemtype, sg, ep) } } else { // 緩沖 channel,buf 已滿 // 先從 buf 隊列頭部接收數據,然后把獲取出來的發送方數據入隊 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 從 buf 中復制數據到接收方 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 把發送方 sg 的數據復制到 buf 中 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g // 解鎖 unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒發送方 goroutine goready(gp, skip+1) }
總結 channel 的接收流程:
判斷 channel 是否是 nil,如果是,則會永久阻塞導致死鎖報錯如果 channel 中 sendq 有等待發送數據的 goroutine:
a. 如果是無緩存 channel,則直接把要發送的數據拷貝到接收者的 goroutine 中,并喚醒發送方 goroutine;
b. 如果是有緩存的 channel(說明此時recvd滿了),則把 buf 中的 recvx 位置的數據拷貝到當前接收的goroutine,然后把 sendq 中第一個等待發送goroutine的數據拷貝到buf 中的 sendx 位置,并喚醒發送的goroutine如果 channel 中 sendq 沒有等待發送數據的 goroutine:
a. 如果 buf 有數據,則把 buf 中的 recvx 位置的數據拷貝到當前的接收goroutine
b. 如果 buf 沒有數據,則把當前 goroutine 加入 recvd 等待隊列中,并掛起
5. channel 使用注意事項
最后啰嗦一下 channel 使用的注意事項,這也是在我們平常開發中容易忽略的:
- 一個 channel 不能多次 close,否則會導致 panic。
- 關閉一個 nil 的 channel,會導致 panic。
- 向一個已經 close 的 channel 發送數據,會導致 panic。
- 不要從一個 receiver 測關閉 channel,也不要在有多個 sender 時關閉 channel。在go語言中,對于一個 channel,如果最終沒有任何 goroutine 引用它,不管 channel 有沒有被關閉,最終都會被 gc 回收。
- 如果監聽的channel 已經關閉,還可以獲取到 channel buf 中剩余的值,當接收完 buf 中的數據后,才會獲取到零值。
原文鏈接:https://blog.csdn.net/weixin_38418951/article/details/127330082
相關推薦
- 2022-04-17 aspx頁面報“XPathResult未定義”的解決方法
- 2023-08-13 uniapp寫一個隨時間變化的預約日期列表
- 2023-04-13 react 打包優化,配置生產環境不輸出console.log
- 2022-05-23 Go語言映射內部實現及基礎功能實戰_Golang
- 2022-05-07 MongoDB連接和創建數據庫的方法講解_MongoDB
- 2022-07-02 在React中使用axios發送請求
- 2022-11-22 Linux命令學習之原來最簡單的ls命令這么復雜_linux shell
- 2022-02-10 MongoDB數據庫安裝部署及警告優化_MongoDB
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支