日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

go通過channel獲取goroutine的處理結(jié)果

作者:鐵柱同學(xué) 更新時(shí)間: 2022-07-26 編程語言

一、前言

??????前幾天寫了篇文章,是通過sync.Map獲取goroutine的返回結(jié)果然后做出處理,但是一直感覺方案一般,不是很好。畢竟channel才是欽定的太子,所以還是用channel好一些。

golang控制goroutine數(shù)量以及獲取處理結(jié)果

二、誤區(qū)以及實(shí)戰(zhàn)代碼

1、誤區(qū)

??????博主自己用channel一般都是用來控制goroutine的并發(fā),所以channel結(jié)構(gòu)比較簡(jiǎn)單,就想當(dāng)然的認(rèn)為channel只適合存儲(chǔ)簡(jiǎn)單的結(jié)構(gòu),復(fù)雜的函數(shù)處理結(jié)果通過channel處理不太方便,是在是謬之千里

2.實(shí)戰(zhàn)代碼

注:以下為脫敏的偽代碼

代碼含義概覽:
(1)通過channel控制goroutine數(shù)量
(2)定義channel結(jié)構(gòu)體,結(jié)構(gòu)體里面可以根據(jù)需求嵌套其他結(jié)構(gòu)體,實(shí)現(xiàn)我們想要的復(fù)雜結(jié)構(gòu)
(3)每次循環(huán)獲取go函數(shù)處理結(jié)果,處理每次循環(huán)結(jié)果
//EsBulkDataRes 可以定義復(fù)雜的結(jié)構(gòu),或者嵌套結(jié)構(gòu)體
type EsBulkDataRes struct {
	SucceededNums int    `json:"succeeded_nums"`
	FailedNums    int    `json:"failed_nums"`
	ErrData       string `json:"err_data"`
	//TestDoc *TestDoc
}

type TestDoc struct {
	ID     string                 `json:"_id,omitempty"`
	Source map[string]interface{} `json:"_source,omitempty"` //map里面可以存儲(chǔ)復(fù)雜的結(jié)構(gòu)
}

func testChannel() {
	//定義要獲取的返回值,成功數(shù)量,失敗數(shù)量,失敗id集合
	succeededNums, failedNums := 0, 0
	var errData string
	DealRes := make(chan EsBulkDataRes)
	defer func() {
		close(DealRes)
	}()
	wg := sync.WaitGroup{}
	//控制goroutine數(shù)量,保證同時(shí)只有10個(gè)goroutine
	chan1 := make(chan struct{}, 10)
	ctx := context.Background()
	for {
		//業(yè)務(wù)邏輯
		// ....
		//goroutine加速,chan1寫滿,則阻塞。等待之前的goroutine釋放才能繼續(xù)循環(huán)
		chan1 <- struct{}{}
		wg.Add(1)
		go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
			defer func() {
				if err1 := recover(); err1 != nil { //產(chǎn)生了panic異常
					log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
				}
				//執(zhí)行完畢再釋放channel
				<-chan1
				return
			}()
			bulkRequest := esClient.Bulk()
			//ES使用bulk方法批量刷新數(shù)據(jù)
			bulkResByAssetInfo := new(EsBulkDataRes)
			bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
			if err != nil {
				log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
				return
			}
			//執(zhí)行結(jié)果寫入channel
			DealRes <- *bulkResByAssetInfo
			//執(zhí)行完畢再釋放channel
			<-chan1
		}(ctx, targetIndexName, esClient)
		//goroutine 執(zhí)行結(jié)束,讀取channel累加結(jié)果
		//讀取channel,也是為了方便下一個(gè) goroutine 的寫入。沒讀的話,會(huì)阻塞
		select {
		case d, ok := <-DealRes:
			if !ok {
				continue
			}
			//累加結(jié)果
			succeededNums += d.SucceededNums
			failedNums += d.FailedNums
			errData += d.ErrData
		case <-ctx.Done():
			return
		}

	}
	wg.Wait()
	//打印結(jié)果
	fmt.Println("成功數(shù)量:", succeededNums)
	fmt.Println("失敗數(shù)量:", failedNums)
	fmt.Println("失敗id:", errData)

}

三、channel控制多個(gè)goroutine串行

??????這部分是博主之前碰到的面試題,說多個(gè)go函數(shù),每個(gè)go函數(shù)都依賴于上一步的處理結(jié)果,如何實(shí)現(xiàn)串行。此處給出偽代碼,參考下即可。

//定義channel結(jié)構(gòu)
type chanStruct struct {
	Res1 int64
}

//每個(gè)函數(shù)中重新給channel賦值
func test1(chan1 chan chanStruct) {
	res1 := new(chanStruct)
	fmt.Println("test1")
	res1.Res1 = 2
	chan1 <- *res1
	return
}

func test2(chan2 chan chanStruct) {
	res2 := new(chanStruct)
	fmt.Println("test2")
	res2.Res1 = 3
	chan2 <- *res2
	return

}

func test3(chan3 chan chanStruct) {
	fmt.Printf("test3,chanStruct:(%+v)", chan3)
	return
}

//https://segmentfault.com/q/1010000041024462/
//無緩沖通道讀寫都必須在協(xié)程里,否則會(huì)阻塞。有緩沖通道則可以不需要都準(zhǔn)備好,讀或者寫可以寫在當(dāng)前線程里而不會(huì)阻塞。
func main() {
	chan0 := make(chan chanStruct, 1)
	//這里使用了"golang.org/x/sync/errgroup" 這個(gè)包,博主個(gè)人實(shí)驗(yàn),在此處非必需
	g, ctx := errgroup.WithContext(context.Background())
	chan0 <- chanStruct{
		Res1: 1,
	}
	fmt.Println("write chan success!")
	//errgroup控制并發(fā),并獲取goroutine返回的錯(cuò)誤(此處沒用到)
	g.Go(func() error {
		for {
			select {
			//注意這里,由于每次我們都讀出來了channel,因此需要在函數(shù)中給channel賦值
			//保證能觸發(fā)下一個(gè)函數(shù)
			case d, ok := <-chan0:
				fmt.Println("d:", d)
				if ok {
					if d.Res1 == 1 {
						go test1(chan0)
					} else if d.Res1 == 2 {
						go test2(chan0)
					} else if d.Res1 == 3 {
						go test3(chan0)
						fmt.Println("end")
						return nil
					}
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}

	})
	//errgroup的Wait類似于sync.withGroup的Wait()方法,等待goroutine執(zhí)行結(jié)束
	if err := g.Wait(); err != nil {
		log.Fatal(err)
	}

}

四、后記

??????我們?cè)撊绾未_認(rèn)自己寫的代碼比較好呢?這個(gè)好又要如何定義?只是實(shí)現(xiàn)功能還是說要保持優(yōu)雅? 以上是博主跟一個(gè)大佬聊天的時(shí)候大佬問的問題。

??????對(duì)于我們開發(fā)者來說,以實(shí)現(xiàn)需求為第一目的是絕對(duì)沒問題的,但是代碼質(zhì)量也需要持續(xù)提升。怎么提升呢,當(dāng)然是看大佬的代碼!哪里大佬的代碼最多呢,當(dāng)然是github!

??????博主最近看了https://github.com/olivere/esdiff 大佬的代碼,才發(fā)現(xiàn)自己以前的狹隘,也驚嘆于大佬的寫法之妙。這還只是個(gè)不知名的開源項(xiàng)目,不知道k8s,etcd等知名項(xiàng)目又會(huì)是怎樣的波瀾壯闊!加油!

end

原文鏈接:https://blog.csdn.net/LJFPHP/article/details/125963501

欄目分類
最近更新