網(wǎng)站首頁 編程語言 正文
一、前言
??????前幾天寫了篇文章,是通過sync.Map
獲取goroutine
的返回結(jié)果然后做出處理,但是一直感覺方案一般,不是很好。畢竟channel
才是欽定的太子,所以還是用channel
好一些。
golang控制goroutine數(shù)量以及獲取處理結(jié)果
二、誤區(qū)以及實(shí)戰(zhàn)代碼
1、誤區(qū)
??????博主自己用channel
一般都是用來控制goroutine
的并發(fā),所以channel
結(jié)構(gòu)比較簡(jiǎn)單,就想當(dāng)然的認(rèn)為channel
只適合存儲(chǔ)簡(jiǎn)單的結(jié)構(gòu),復(fù)雜的函數(shù)處理結(jié)果通過channel
處理不太方便,是在是謬之千里。
2.實(shí)戰(zhàn)代碼
注:以下為脫敏的偽代碼
代碼含義概覽:
(1)通過channel控制goroutine數(shù)量
(2)定義channel結(jié)構(gòu)體,結(jié)構(gòu)體里面可以根據(jù)需求嵌套其他結(jié)構(gòu)體,實(shí)現(xiàn)我們想要的復(fù)雜結(jié)構(gòu)
(3)每次循環(huán)獲取go函數(shù)處理結(jié)果,處理每次循環(huán)結(jié)果
//EsBulkDataRes 可以定義復(fù)雜的結(jié)構(gòu),或者嵌套結(jié)構(gòu)體
type EsBulkDataRes struct {
SucceededNums int `json:"succeeded_nums"`
FailedNums int `json:"failed_nums"`
ErrData string `json:"err_data"`
//TestDoc *TestDoc
}
type TestDoc struct {
ID string `json:"_id,omitempty"`
Source map[string]interface{} `json:"_source,omitempty"` //map里面可以存儲(chǔ)復(fù)雜的結(jié)構(gòu)
}
func testChannel() {
//定義要獲取的返回值,成功數(shù)量,失敗數(shù)量,失敗id集合
succeededNums, failedNums := 0, 0
var errData string
DealRes := make(chan EsBulkDataRes)
defer func() {
close(DealRes)
}()
wg := sync.WaitGroup{}
//控制goroutine數(shù)量,保證同時(shí)只有10個(gè)goroutine
chan1 := make(chan struct{}, 10)
ctx := context.Background()
for {
//業(yè)務(wù)邏輯
// ....
//goroutine加速,chan1寫滿,則阻塞。等待之前的goroutine釋放才能繼續(xù)循環(huán)
chan1 <- struct{}{}
wg.Add(1)
go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
defer func() {
if err1 := recover(); err1 != nil { //產(chǎn)生了panic異常
log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
}
//執(zhí)行完畢再釋放channel
<-chan1
return
}()
bulkRequest := esClient.Bulk()
//ES使用bulk方法批量刷新數(shù)據(jù)
bulkResByAssetInfo := new(EsBulkDataRes)
bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
if err != nil {
log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
return
}
//執(zhí)行結(jié)果寫入channel
DealRes <- *bulkResByAssetInfo
//執(zhí)行完畢再釋放channel
<-chan1
}(ctx, targetIndexName, esClient)
//goroutine 執(zhí)行結(jié)束,讀取channel累加結(jié)果
//讀取channel,也是為了方便下一個(gè) goroutine 的寫入。沒讀的話,會(huì)阻塞
select {
case d, ok := <-DealRes:
if !ok {
continue
}
//累加結(jié)果
succeededNums += d.SucceededNums
failedNums += d.FailedNums
errData += d.ErrData
case <-ctx.Done():
return
}
}
wg.Wait()
//打印結(jié)果
fmt.Println("成功數(shù)量:", succeededNums)
fmt.Println("失敗數(shù)量:", failedNums)
fmt.Println("失敗id:", errData)
}
三、channel控制多個(gè)goroutine串行
??????這部分是博主之前碰到的面試題,說多個(gè)go
函數(shù),每個(gè)go
函數(shù)都依賴于上一步的處理結(jié)果,如何實(shí)現(xiàn)串行。此處給出偽代碼,參考下即可。
//定義channel結(jié)構(gòu)
type chanStruct struct {
Res1 int64
}
//每個(gè)函數(shù)中重新給channel賦值
func test1(chan1 chan chanStruct) {
res1 := new(chanStruct)
fmt.Println("test1")
res1.Res1 = 2
chan1 <- *res1
return
}
func test2(chan2 chan chanStruct) {
res2 := new(chanStruct)
fmt.Println("test2")
res2.Res1 = 3
chan2 <- *res2
return
}
func test3(chan3 chan chanStruct) {
fmt.Printf("test3,chanStruct:(%+v)", chan3)
return
}
//https://segmentfault.com/q/1010000041024462/
//無緩沖通道讀寫都必須在協(xié)程里,否則會(huì)阻塞。有緩沖通道則可以不需要都準(zhǔn)備好,讀或者寫可以寫在當(dāng)前線程里而不會(huì)阻塞。
func main() {
chan0 := make(chan chanStruct, 1)
//這里使用了"golang.org/x/sync/errgroup" 這個(gè)包,博主個(gè)人實(shí)驗(yàn),在此處非必需
g, ctx := errgroup.WithContext(context.Background())
chan0 <- chanStruct{
Res1: 1,
}
fmt.Println("write chan success!")
//errgroup控制并發(fā),并獲取goroutine返回的錯(cuò)誤(此處沒用到)
g.Go(func() error {
for {
select {
//注意這里,由于每次我們都讀出來了channel,因此需要在函數(shù)中給channel賦值
//保證能觸發(fā)下一個(gè)函數(shù)
case d, ok := <-chan0:
fmt.Println("d:", d)
if ok {
if d.Res1 == 1 {
go test1(chan0)
} else if d.Res1 == 2 {
go test2(chan0)
} else if d.Res1 == 3 {
go test3(chan0)
fmt.Println("end")
return nil
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})
//errgroup的Wait類似于sync.withGroup的Wait()方法,等待goroutine執(zhí)行結(jié)束
if err := g.Wait(); err != nil {
log.Fatal(err)
}
}
四、后記
??????我們?cè)撊绾未_認(rèn)自己寫的代碼比較好呢?這個(gè)好又要如何定義?只是實(shí)現(xiàn)功能還是說要保持優(yōu)雅? 以上是博主跟一個(gè)大佬聊天的時(shí)候大佬問的問題。
??????對(duì)于我們開發(fā)者來說,以實(shí)現(xiàn)需求為第一目的是絕對(duì)沒問題的,但是代碼質(zhì)量也需要持續(xù)提升。怎么提升呢,當(dāng)然是看大佬的代碼!哪里大佬的代碼最多呢,當(dāng)然是github!
??????博主最近看了https://github.com/olivere/esdiff
大佬的代碼,才發(fā)現(xiàn)自己以前的狹隘,也驚嘆于大佬的寫法之妙。這還只是個(gè)不知名的開源項(xiàng)目,不知道k8s,etcd等知名項(xiàng)目又會(huì)是怎樣的波瀾壯闊!加油!
end
原文鏈接:https://blog.csdn.net/LJFPHP/article/details/125963501
相關(guān)推薦
- 2022-03-22 C語言圍圈報(bào)數(shù)題目代碼實(shí)現(xiàn)_C 語言
- 2022-09-21 Flutter實(shí)現(xiàn)底部和頂部導(dǎo)航欄_Android
- 2022-09-02 React前端框架實(shí)現(xiàn)原理的理解_React
- 2022-08-15 oracle數(shù)據(jù)庫表實(shí)現(xiàn)自增主鍵的方法實(shí)例_oracle
- 2022-07-20 C語言詳細(xì)講解while語句的用法_C 語言
- 2022-10-05 Iptables防火墻iprange模塊擴(kuò)展匹配規(guī)則詳解_安全相關(guān)
- 2022-09-01 React新文檔切記不要濫用effect_React
- 2022-06-09 詳解Python中*args和**kwargs的使用_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支