日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

go通過channel獲取goroutine的處理結果

作者:鐵柱同學 更新時間: 2022-07-26 編程語言

一、前言

??????前幾天寫了篇文章,是通過sync.Map獲取goroutine的返回結果然后做出處理,但是一直感覺方案一般,不是很好。畢竟channel才是欽定的太子,所以還是用channel好一些。

golang控制goroutine數量以及獲取處理結果

二、誤區以及實戰代碼

1、誤區

??????博主自己用channel一般都是用來控制goroutine的并發,所以channel結構比較簡單,就想當然的認為channel只適合存儲簡單的結構,復雜的函數處理結果通過channel處理不太方便,是在是謬之千里

2.實戰代碼

注:以下為脫敏的偽代碼

代碼含義概覽:
(1)通過channel控制goroutine數量
(2)定義channel結構體,結構體里面可以根據需求嵌套其他結構體,實現我們想要的復雜結構
(3)每次循環獲取go函數處理結果,處理每次循環結果
//EsBulkDataRes 可以定義復雜的結構,或者嵌套結構體
type EsBulkDataRes struct {
	SucceededNums int    `json:"succeeded_nums"`
	FailedNums    int    `json:"failed_nums"`
	ErrData       string `json:"err_data"`
	//TestDoc *TestDoc
}

type TestDoc struct {
	ID     string                 `json:"_id,omitempty"`
	Source map[string]interface{} `json:"_source,omitempty"` //map里面可以存儲復雜的結構
}

func testChannel() {
	//定義要獲取的返回值,成功數量,失敗數量,失敗id集合
	succeededNums, failedNums := 0, 0
	var errData string
	DealRes := make(chan EsBulkDataRes)
	defer func() {
		close(DealRes)
	}()
	wg := sync.WaitGroup{}
	//控制goroutine數量,保證同時只有10個goroutine
	chan1 := make(chan struct{}, 10)
	ctx := context.Background()
	for {
		//業務邏輯
		// ....
		//goroutine加速,chan1寫滿,則阻塞。等待之前的goroutine釋放才能繼續循環
		chan1 <- struct{}{}
		wg.Add(1)
		go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
			defer func() {
				if err1 := recover(); err1 != nil { //產生了panic異常
					log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
				}
				//執行完畢再釋放channel
				<-chan1
				return
			}()
			bulkRequest := esClient.Bulk()
			//ES使用bulk方法批量刷新數據
			bulkResByAssetInfo := new(EsBulkDataRes)
			bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
			if err != nil {
				log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
				return
			}
			//執行結果寫入channel
			DealRes <- *bulkResByAssetInfo
			//執行完畢再釋放channel
			<-chan1
		}(ctx, targetIndexName, esClient)
		//goroutine 執行結束,讀取channel累加結果
		//讀取channel,也是為了方便下一個 goroutine 的寫入。沒讀的話,會阻塞
		select {
		case d, ok := <-DealRes:
			if !ok {
				continue
			}
			//累加結果
			succeededNums += d.SucceededNums
			failedNums += d.FailedNums
			errData += d.ErrData
		case <-ctx.Done():
			return
		}

	}
	wg.Wait()
	//打印結果
	fmt.Println("成功數量:", succeededNums)
	fmt.Println("失敗數量:", failedNums)
	fmt.Println("失敗id:", errData)

}

三、channel控制多個goroutine串行

??????這部分是博主之前碰到的面試題,說多個go函數,每個go函數都依賴于上一步的處理結果,如何實現串行。此處給出偽代碼,參考下即可。

//定義channel結構
type chanStruct struct {
	Res1 int64
}

//每個函數中重新給channel賦值
func test1(chan1 chan chanStruct) {
	res1 := new(chanStruct)
	fmt.Println("test1")
	res1.Res1 = 2
	chan1 <- *res1
	return
}

func test2(chan2 chan chanStruct) {
	res2 := new(chanStruct)
	fmt.Println("test2")
	res2.Res1 = 3
	chan2 <- *res2
	return

}

func test3(chan3 chan chanStruct) {
	fmt.Printf("test3,chanStruct:(%+v)", chan3)
	return
}

//https://segmentfault.com/q/1010000041024462/
//無緩沖通道讀寫都必須在協程里,否則會阻塞。有緩沖通道則可以不需要都準備好,讀或者寫可以寫在當前線程里而不會阻塞。
func main() {
	chan0 := make(chan chanStruct, 1)
	//這里使用了"golang.org/x/sync/errgroup" 這個包,博主個人實驗,在此處非必需
	g, ctx := errgroup.WithContext(context.Background())
	chan0 <- chanStruct{
		Res1: 1,
	}
	fmt.Println("write chan success!")
	//errgroup控制并發,并獲取goroutine返回的錯誤(此處沒用到)
	g.Go(func() error {
		for {
			select {
			//注意這里,由于每次我們都讀出來了channel,因此需要在函數中給channel賦值
			//保證能觸發下一個函數
			case d, ok := <-chan0:
				fmt.Println("d:", d)
				if ok {
					if d.Res1 == 1 {
						go test1(chan0)
					} else if d.Res1 == 2 {
						go test2(chan0)
					} else if d.Res1 == 3 {
						go test3(chan0)
						fmt.Println("end")
						return nil
					}
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}

	})
	//errgroup的Wait類似于sync.withGroup的Wait()方法,等待goroutine執行結束
	if err := g.Wait(); err != nil {
		log.Fatal(err)
	}

}

四、后記

??????我們該如何確認自己寫的代碼比較好呢?這個好又要如何定義?只是實現功能還是說要保持優雅? 以上是博主跟一個大佬聊天的時候大佬問的問題。

??????對于我們開發者來說,以實現需求為第一目的是絕對沒問題的,但是代碼質量也需要持續提升。怎么提升呢,當然是看大佬的代碼!哪里大佬的代碼最多呢,當然是github!

??????博主最近看了https://github.com/olivere/esdiff 大佬的代碼,才發現自己以前的狹隘,也驚嘆于大佬的寫法之妙。這還只是個不知名的開源項目,不知道k8s,etcd等知名項目又會是怎樣的波瀾壯闊!加油!

end

原文鏈接:https://blog.csdn.net/LJFPHP/article/details/125963501

欄目分類
最近更新