日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Go語言學習教程之goroutine和通道的示例詳解_Golang

作者:任沫 ? 更新時間: 2022-11-17 編程語言

goroutine

goroutine是由Go運行時管理的輕量級線程

go f(x, y, z)在一個新的goroutine中開始執行f(x, y,z)

goroutines運行在相同的地址空間中,所以對共享的內存訪問必須同步。sync包提供了基本的同步原語(synchronization primitives),比如互斥鎖(mutual exclusion locks)。

goroutines運行在相同的地址空間中,沒有內存隔離,不同的goroutines可以訪問同一個內存地址。這樣對共享的內存的訪問就可能出現問題,比如有一個全局變量A,goroutine 1開始修改A的數據,但是還沒修改完,goroutine 2就開始讀取A的數據了,這樣讀到的數據可能是不準確的,如果goroutine 2中也要修改A的數據,那A的數據就處于一種更不確定的狀態了。所以需要使用互斥鎖,當goroutine 1開始修改A的數據之前,先加個鎖,表示這塊內存已經被鎖上了,等修改完A的數據再將鎖解開。在goroutine 1修改數據A但還沒修改完的期間,goroutine 2需要修改/讀取A的內容,發現已經加鎖,就會進入休眠狀態,直到變量A的鎖被解開才會執行goroutine 2中的修改/讀取。

package main

import (
    "fmt"
    "time"
)

func main() {
    go say("a")
    say("b")
}

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(2000 * time.Millisecond)
        fmt.Println(s, time.Now().Format("15:04:05.000000"))
    }
}

執行go run goroutine.go的時候,會在主goroutine中執行main函數,當執行到go say("a")的時候,會在一個新的goroutine中執行say("a")(稱這個子goroutine為goroutine 1),然后主goroutine中繼續執行say("b"),主goroutine和goroutine 1中的函數執行是并發的。

因為是并發執行,打印出的字符串a和字符串b的順序是無法確定的。

(仔細觀察的話會發現打印的前2條數據的時間戳,b的時間戳在a的后面,但是先打印出了b,這說明這次執行中,兩者的fmt.Println函數的執行(直到輸出到終端)時間不同,先拿到了字符串a,但是打印字符串a的fmt.Println執行比打印字符串b的函數執行稍稍慢了一點,所以b先出現在了輸出界面上。可能背后還有更復雜的原因,這里不作深究。)

通道

通道(channels)是一個類型化的管道(conduit),可以通過<-(通道運算符)來使用通道,對值進行發送和接收。

可選的<-操作符指定了通道的方向,如果給出了一個方向,通道就是定向的,否則就是雙向的。

chan T // 可以被用來發送和接收類型為T的值
chan <- float64 // 只能被用來發送float64類型的值
<-chan int // 只能被用來接收int類型的值

如果有<-操作符的話,數據按照箭頭的方向流動。

通道在使用前必須被創建:

make(chan int, 100)

通過內置的make函數創建一個新的、初始化的通道,接收的參數是通道類型和一個可選的容量。容量設置緩存區的大小。如果容量是0或者省略了,通道就是非緩存的,只在發送方和接收方都準備好的時候才能通信成功。否則通道就是緩存的,發送方的緩存區沒有滿,或者接收方的緩存區不為空,就能不阻塞地進行通信。

“發送方的緩存區沒有滿,或者接收方的緩存區不為空,就能不阻塞地進行通信。“這句話直白一點說,就是如果緩存區滿了,就不能再往通道中發送數據了(chan <- 數據 ),如果緩存區是空的,就不能從通道中接收數據了(<-chan)。

1.無緩存通道例子:

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    example1()
    wg.Wait() // 等待所有goroutines執行完成
}

func example1() {
    chan1 := make(chan int)

    wg.Add(1)
    go a(chan1) // 向通道中發送數字1、2

    wg.Add(1)
    go b(chan1) // 等待1秒之后,從通道中拿數據,拿到的是數字2

    fmt.Println("接收數據A", <-chan1) // 這里拿到的是數字1
}

func a(chan1 chan int) {
    defer wg.Done()
    chan1 <- 1
    chan1 <- 2
}

func b(chan1 chan int) {
    defer wg.Done()
    time.Sleep(time.Second)
    fmt.Println("接收數據B", <-chan1)
}

如果把以下這兩句注釋掉,運行代碼就會報錯:fatal error: all goroutines are asleep - deadlock!

    wg.Add(1)
    go b(chan1) // 等待1秒之后,從通道中拿數據

把這句注釋掉,代碼變成了往無緩存通道中發送了2個元素,但是只接收了1個元素。由于向通道中發送的元素2沒被接收,通道會阻塞,sync包又在等待數字2的發送(chan1 <- 2)完成,就造成了死鎖。

最終在無緩存通道中的元素個數為0,無緩存通道就不會阻塞。

2.有緩存通道例子:

...
var wg sync.WaitGroup

func main() {
    example2()
    wg.Wait() // 等待所有goroutines執行完成
}

func example2() {
    chan1 := make(chan int, 2)

    wg.Add(1)
    go a(chan1) // 向通道中發送數字1、2、3

    fmt.Println("接收數據", <-chan1)
}

func a(chan1 chan int) {
    defer wg.Done()
    chan1 <- 1
    chan1 <- 2
    chan1 <- 3
}

func b(chan1 chan int) {
    defer wg.Done()
    time.Sleep(time.Second)
    fmt.Println("接收數據", <-chan1)
}

以上代碼向容量為2的緩存通道中發送了3個元素,但是只接收了1個,此時通道中還有2個元素,不會阻塞。

如果在a函數的最后一行再加上一句chan1 <- 4,再執行代碼,就會報錯fatal error: all goroutines are asleep - deadlock!。因為發送了4個元素,只接收了1個元素,還剩3個元素沒被接收,3 > 2,緩存已經滿了,由于代碼中沒有別的地方來接收元素,通道阻塞,但是sync包又在等待chan1 <- 4的完成,所以會造成死鎖。

最終在有緩存通道中的元素個數小于等于容量,有緩存通道就不會阻塞。

3.使用通道在goroutines間進行通信的例子:

func main() {
    example3()
}

func example3() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c

    fmt.Println(x, y, x+y)
}

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum
}

這段代碼將數組的內容分為兩部分,在兩個goroutines中分別進行計算,最后再進行求和。

這里兩個子goroutines是與主goroutine并發執行的,但主goroutine中的x, y := <-c, <-c依然拿到了兩個子goroutines中往通道發送的數據(c <- sum)。這是因為通道的發送和接收會阻塞,直到另一邊準備好。

x拿到的是先計算完的和,y拿到的是后計算完的和,xy的值是不確定的,可能是-5 17 或者 17 -5,就看哪個子goroutine中的計算先完成。

Range 和 Close

發送方可以close一個通道來表明沒有更多的值會被發送。接收方可以通過賦值第二個參數給接收表達式,測試一個通道是否已經被關閉。

執行如下語句:

v, ok := <-ch

如果沒有更多的值要接收,并且通道已經關閉了,ok的值就為false

for i := range c循環,從通道中重復地接收值,直到通道關閉。

注意:

  • ?只有發送方可以關閉一個通道,接收方不可以。在一個已經關閉的通道上進行發送會導致一個錯誤(panic)。
  • ?通道不像文件,不需要總是關閉它們。關閉只有必須告訴接收方不會再來更多值時,才是必須的,比如終止一個range循環。
func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Println(i)
    }
}

func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }
    // 必須在遍歷結束之后關閉通道
    // 否則 for i := range c 會一直等待通道關閉
    close(c)
}

以上代碼求斐波那契數列,依次將求得的值發送到通道。

如果把close(c) 語句注釋掉,運行代碼,就會報錯:fatal error: all goroutines are asleep - deadlock!。因為for i := range c一直在等通道關閉,但是整個執行過程中并沒有關閉通道,造成了死鎖。

Select

select語句讓一個goroutine等待多個通信操作。

一個select?會阻塞,直到它的cases中的一個可以運行,然后它就會執行該case。如果多個通信都準備好了,就會隨機選擇一個。

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

上述代碼還是實現一個斐波那契數列的計算。

在子goroutine(稱之為goroutine 1)中循環10次,依次從通道c中接收數據,循環結束之后,將數字0發送到通道quit。

在主goroutine中,調用fibonacci函數:

  • ?c <- x是向通道中發送數據,只要有地方從通道中接收數據,向通道中發送數據就能繼續運行。每次在goroutine 1的循環中<-c,主goroutine中的select語句中的case c <- x中的語句就會執行。
  • <-quit是從通道中接收數據,只要有地方向通道中發送數據,從通道中接收數據就能繼續運行。當goroutine 1中循環結束之后quit <- 0case <-quit中的語句就會執行。

一個select中的default case,在沒有其他case準備好的時候就會運行。

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)

    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}

每隔100毫秒,通道tick就會收到一次數據,case <-tick中的語句會執行,打印一次tick.;500毫秒之后,通道boom會收到數據,case <-boom中的語句會執行,打印BOOM!,并且使用return結束程序的執行。在這期間,由于for語句是一直在循環的,當通道tick和通道boom中都沒收到數據時,就會執行default中的語句:打印一個點并且等待50毫秒。

粗略看了下time.Ticktime.After代碼,兩者返回的值都是類型為<-chan Time的通道,使用輪詢,在滿足時間條件之后,向通道中發送當前時間。如果想看通道中傳遞的時間數據的話,可以使用以下代碼:

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)
    var x, y time.Time
    for {
        select {
        case x, _ = <-tick:
            fmt.Println(x, "tick.")
        case y, _ = <-boom:
            fmt.Println(y, "BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}

sync.Mutex

如果我們想要避免沖突,確保一次只有一個goroutine可以訪問一個變量(這個概念稱為互斥),則可以使用互斥鎖(mutex)。

Go的標準庫提供了互斥的使用,需要用到sync.Mutex和它的兩個方法LockUnlock

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    c := SafeCounter{v: make(map[string]int)}
    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }
    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}


type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

// 使用給定的key遞增計數器
func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    // 鎖住之后,一次只能有一個goroutine可以訪問映射c.v
    c.v[key]++
    c.mu.Unlock()
}

// 返回 給定key的 計數器的當前值
func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    // 鎖住之后,一次只能有一個goroutine可以訪問映射c.v
    defer c.mu.Unlock()
    return c.v[key]
}

官方留的兩道練習題

官方留了兩道練習題,沒有給出完整的代碼。可以作為了解了以上知識之后的練手。

等價的二叉樹

有很多不同的二叉樹,存儲著相同的值的序列。例如,下圖兩棵二叉樹存儲的序列是1, 1, 2, 3, 5, 8, 13。

1.實現Walk函數。

2.測試Walk函數。

函數tree.New(k)構造了一個隨機結構(但總是排序的)的二叉樹來存儲值k2k3k,...,10k

創建一個新的通道ch并開始遍歷:

go Walk(tree.New(1), ch)

然后打印樹中包含的10個值,應該是數字1,2,3,...,10。

3.實現Same函數,使用Walk來決定t1t2是否存儲相同的值。

4.測試Same函數:

  • Same(tree.New(1), tree.New(1)) 應該返回true
  • Same(tree.New(1), tree.New(2)) 應該返回false

代碼實現

主要部分代碼如下:

package main

import (
    "equbintrees/tree"
    "fmt"
)

func main() {
    tree1 := tree.New(1)
    tree2 := tree.New(2)
    fmt.Println(Same(tree1, tree2))
}

// 函數遍歷樹 t,將樹中的所有值依次發送到通道中
func Walk(t *tree.Tree, ch chan int) {
    if t == nil {
        return
    }
    if t.Left != nil {
        Walk(t.Left, ch)
    }
    ch <- t.Value
    if t.Right != nil {
        Walk(t.Right, ch)
    }
}

// 判斷兩棵樹是否包含相同的值
func Same(t1, t2 *tree.Tree) bool {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go Walk(t1, ch1)
    go Walk(t2, ch2)

    var count int

    for {
        if <-ch1 == <-ch2 {
            count++
            // 這里的count等于10,是因為題目要求里面隨機生成的樹的節點個數就是10個
            // 一般的樹可以給樹添加一個Len屬性表示節點個數,用Len屬性來判斷
            if count == 10 {
                return true
            }
        } else {
            return false
        }
    }
}

網絡爬蟲

使用Go的并發功能來并發網絡爬蟲。

修改Crawl函數來并發獲取URLs,并且相同的URL不會獲取2次。

提示:你可以使用映射緩存已經獲取到的URL,但是只使用映射對于并發使用來說是不安全的。

代碼實現

這部分我嘗試實現了下,主要思路是在遞歸的過程中,將遍歷到鏈接中包含的urls發送到通道ch中,用for urls := range ch遍歷通道中的元素,以此來等待所有發送到通道中的urls都被接收,在遞歸過程中判斷深度是否達到4,達到4之后調用close(ch)關閉通道。

但是有問題,因為不能僅憑 深度是否達到 來判斷 是否關閉通道。給出的例子實際只有4層鏈接,如果設置深度需要到達到5,當遞歸到盡頭的時候就應該關閉通道了,但是因為沒有達到深度5,沒有關閉通道,for urls := range ch還會繼續等通道接收數據,但已經不會再往通道中發送數據了,造成死鎖。總之,手動調用close(ch)來正確關閉通道有點難,因為很難找到遞歸和并發請求時不會再往通道中發送數據的那個時機。

我從這個鏈接找到了大佬的代碼實現:https://rmoff.net/2020/07/03/learning-golang-some-rough-notes-s01e10-concurrency-web-crawler/

主要思路就是使用sync.WaitGroup,用Add方法添加WaitGroup計數,用wg.Wait()等待所有的goroutines執行結束。

主要部分代碼如下:

func main() {
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go Crawl("https://golang.org/", 5, fetcher, wg)
    wg.Wait()
}

type URLs struct {
    c   map[string]bool // 用于存放表示一個鏈接是否被抓取過的映射
    mux sync.Mutex      // 使用互斥鎖在并發的執行中進行安全的讀寫
}

var u URLs = URLs{c: make(map[string]bool)}

// 檢查鏈接是否已經被抓取過
func (u URLs) IsCrawled(url string) bool {
    fmt.Printf("\n?? Checking if %v has been crawled…", url)
    u.mux.Lock()
    defer u.mux.Unlock()
    if _, ok := u.c[url]; ok == false {
        fmt.Printf("…it hasn't\t")
        return false
    }
    fmt.Printf("…it has\t")
    return true
}

// 將鏈接標記為抓取過
func (u URLs) Crawled(url string) {
    u.mux.Lock()
    u.c[url] = true
    u.mux.Unlock()
}

// 遞歸地請求抓去url的數據,直到一個最大深度
func Crawl(url string, depth int, fetcher Fetcher, wg *sync.WaitGroup) {
    defer wg.Done()

    if depth <= 0 {
        return
    }

    if u.IsCrawled(url) == true {
        return
    }

    fmt.Printf("\n?? Crawling %v", url)
    body, urls, err := fetcher.Fetch(url)
    u.Crawled(url)

    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("\n\t->? found: %s %q\n", url, body)

    for _, z := range urls {
        wg.Add(1)

        go Crawl(z, depth-1, fetcher, wg)
    }

}

源碼地址

https://github.com/renmo/myBlog/tree/master/2022-05-31-goroutine

原文鏈接:https://juejin.cn/post/7103907668955250702

欄目分類
最近更新