日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Go底層channel實現原理及示例詳解_Golang

作者:阿甘與阿Q ? 更新時間: 2022-10-02 編程語言

概念:

Go中的channel 是一個隊列,遵循先進先出的原則,負責協程之間的通信(Go 語言提倡不要通過共享內存來通信,而要通過通信來實現內存共享,CSP(Communicating Sequential Process)并發模型,就是通過 goroutine 和 channel 來實現的)

使用場景:

停止信號監聽

定時任務

生產方和消費方解耦

控制并發數

底層數據結構:

通過var聲明或者make函數創建的channel變量是一個存儲在函數棧幀上的指針,占用8個字節,指向堆上的hchan結構體

源碼包中src/runtime/chan.go定義了hchan的數據結構:

hchan結構體:

type hchan struct {
 closed   uint32   // channel是否關閉的標志
 elemtype *_type   // channel中的元素類型
 // channel分為無緩沖和有緩沖兩種。
 // 對于有緩沖的channel存儲數據,使用了 ring buffer(環形緩沖區) 來緩存寫入的數據,本質是循環數組
 // 為啥是循環數組?普通數組不行嗎,普通數組容量固定更適合指定的空間,彈出元素時,普通數組需要全部都前移
 // 當下標超過數組容量后會回到第一個位置,所以需要有兩個字段記錄當前讀和寫的下標位置
 buf      unsafe.Pointer // 指向底層循環數組的指針(環形緩沖區)
 qcount   uint           // 循環數組中的元素數量
 dataqsiz uint           // 循環數組的長度
 elemsize uint16                 // 元素的大小
 sendx    uint           // 下一次寫下標的位置
 recvx    uint           // 下一次讀下標的位置
 // 嘗試讀取channel或向channel寫入數據而被阻塞的goroutine
 recvq    waitq  // 讀等待隊列
 sendq    waitq  // 寫等待隊列
 lock mutex //互斥鎖,保證讀寫channel時不存在并發競爭問題
}

等待隊列:

雙向鏈表,包含一個頭結點和一個尾結點

每個節點是一個sudog結構體變量,記錄哪個協程在等待,等待的是哪個channel,等待發送/接收的數據在哪里

type waitq struct {
   first *sudog
   last  *sudog
}
type sudog struct {
    g *g
    next *sudog
    prev *sudog
    elem unsafe.Pointer 
    c        *hchan 
    ...
}

操作:

創建

使用?make(chan T, cap)?來創建 channel,make 語法會在編譯時,轉換為?makechan64?和?makechan

func makechan64(t *chantype, size int64) *hchan {
    if int64(int(size)) != size {
        panic(plainError("makechan: size out of range"))
    }
    return makechan(t, int(size))
}

創建channel 有兩種,一種是帶緩沖的channel,一種是不帶緩沖的channel

// 帶緩沖
ch := make(chan int, 3)
// 不帶緩沖
ch := make(chan int)

創建時會做一些檢查:

  • 元素大小不能超過 64K
  • 元素的對齊大小不能超過 maxAlign 也就是 8 字節
  • 計算出來的內存是否超過限制

創建時的策略:

  • 如果是無緩沖的 channel,會直接給 hchan 分配內存
  • 如果是有緩沖的 channel,并且元素不包含指針,那么會為 hchan 和底層數組分配一段連續的地址
  • 如果是有緩沖的 channel,并且元素包含指針,那么會為 hchan 和底層數組分別分配地址

發送

發送操作,編譯時轉換為runtime.chansend函數

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 

阻塞式:

調用chansend函數,并且block=true

ch <- 10

非阻塞式:

調用chansend函數,并且block=false

select {
    case ch <- 10:
    ...
  default
}

向 channel 中發送數據時大概分為兩大塊:檢查和數據發送,數據發送流程如下:

如果 channel 的讀等待隊列存在接收者goroutine

  • 將數據直接發送給第一個等待的 goroutine,?喚醒接收的 goroutine

如果 channel 的讀等待隊列不存在接收者goroutine

  • 如果循環數組buf未滿,那么將會把數據發送到循環數組buf的隊尾
  • 如果循環數組buf已滿,這個時候就會走阻塞發送的流程,將當前 goroutine 加入寫等待隊列,并掛起等待喚醒

接收

發送操作,編譯時轉換為runtime.chanrecv函數

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 

阻塞式:

調用chanrecv函數,并且block=true

<ch
v := <ch
v, ok := <ch
// 當channel關閉時,for循環會自動退出,無需主動監測channel是否關閉,可以防止讀取已經關閉的channel,造成讀到數據為通道所存儲的數據類型的零值
for i := range ch {
    fmt.Println(i)
}

非阻塞式:

調用chanrecv函數,并且block=false

select {
    case <-ch:
    ...
  default
}

向 channel 中接收數據時大概分為兩大塊,檢查和數據發送,而數據接收流程如下:

如果 channel 的寫等待隊列存在發送者goroutine

  • 如果是無緩沖 channel,直接從第一個發送者goroutine那里把數據拷貝給接收變量,喚醒發送的 goroutine
  • 如果是有緩沖 channel(已滿),將循環數組buf的隊首元素拷貝給接收變量,將第一個發送者goroutine的數據拷貝到 buf循環數組隊尾,喚醒發送的 goroutine

如果 channel 的寫等待隊列不存在發送者goroutine

  • 如果循環數組buf非空,將循環數組buf的隊首元素拷貝給接收變量
  • 如果循環數組buf為空,這個時候就會走阻塞接收的流程,將當前 goroutine 加入讀等待隊列,并掛起等待喚醒

關閉

關閉操作,調用close函數,編譯時轉換為runtime.closechan函數

close(ch)
func closechan(c *hchan) 

案例分析:

package main
import (
    "fmt"
    "time"
    "unsafe"
)
func main() {
  // ch是長度為4的帶緩沖的channel
  // 初始hchan結構體重的buf為空,sendx和recvx均為0
    ch := make(chan string, 4)
    fmt.Println(ch, unsafe.Sizeof(ch))
    go sendTask(ch)
    go receiveTask(ch)
    time.Sleep(1 * time.Second)
}
// G1是發送者
// 當G1向ch里發送數據時,首先會對buf加鎖,然后將task存儲的數據copy到buf中,然后sendx++,然后釋放對buf的鎖
func sendTask(ch chan string) {
    taskList := []string{"this", "is", "a", "demo"}
    for _, task := range taskList {
        ch <- task //發送任務到channel
    }
}
// G2是接收者
// 當G2消費ch的時候,會首先對buf加鎖,然后將buf中的數據copy到task變量對應的內存里,然后recvx++,并釋放鎖
func receiveTask(ch chan string) {
    for {
        task := <-ch                  //接收任務
        fmt.Println("received", task) //處理任務
    }
}

總結hchan結構體的主要組成部分有四個:

  • 用來保存goroutine之間傳遞數據的循環數組:buf
  • 用來記錄此循環數組當前發送或接收數據的下標值:sendx和recvx
  • 用于保存向該chan發送和從該chan接收數據被阻塞的goroutine隊列: sendq 和 recvq
  • 保證channel寫入和讀取數據時線程安全的鎖:lock

原文鏈接:https://juejin.cn/post/7129085275266875422

欄目分類
最近更新