日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

golang控制goroutine數量以及獲取處理結果

作者:鐵柱同學 更新時間: 2022-07-26 編程語言

一、前言

??????最近遇到批量刷新ES數據的需求,為了加快處理速度,那必須首選goroutine了,但是眾所周知,goroutine的返回值和錯誤處理一直都讓人難以捉摸,go出去簡單,怎么監測go出去的結果是個問題。

1、goroutine的錯誤處理

??????sync.ErrGroupsync.WaitGroup功能的基礎上,增加了錯誤傳遞,以及在發生不可恢復的錯誤時取消整個goroutine集合,或者等待超時。

具體的大家可以百度學習下。errGroup

2、goroutine的處理結果

??????目前使用goroutine一般采用的是 channelsync.WaitGroupcontext,來實現各個協程之間的流程控制和消息傳遞,首選是channel來獲取處理結果,channel參考:go通過channel獲取goroutine的處理結果。

除了channel,那么是否可以用并發安全的sync.Map來存儲結果,在所有的goroutine執行完畢后,再統一獲取處理結果呢?答案是可以的,sync.Map就可以完美實現。

二、控制goroutine數量以及獲取處理結果

1、實戰代碼

以下是使用goroutine批量刷新ES數據,并獲取處理結果的代碼:

//定義要獲取的返回值,成功數量,失敗數量,失敗id集合
succeededNums, failedNums:= 0, 0, 
errData:=""
var syncMap sync.Map
wg := sync.WaitGroup{}
//控制goroutine數量,保證同時只有10個goroutine
chan1 := make(chan struct{}, 10) 
for {
   //業務邏輯
   // ....
   //goroutine加速,chan1寫滿,則阻塞。等待之前的goroutine釋放才能繼續循環
   chan1 <- struct{}{}
   wg.Add(1)
   go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
      defer func() {
         if err1 := recover(); err1 != nil { //產生了panic異常
            log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
         }
         wg.Done() //每個goroutine執行完畢則釋放
         return
      }()
      bulkRequest := esClient.Bulk()
      //ES使用bulk方法批量刷新數據
      bulkResByAssetInfo := new(EsBulkDataRes)
      bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
      if err != nil {
         log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
         return
      }
      //累加執行結果到sync.map,保證并發安全
      tempMap := make(map[string]interface{})
      tempMap["successNums"] = bulkResByAssetInfo.SucceededNums
      tempMap["failedNums"] = bulkResByAssetInfo.FailedNums
      tempMap["errData"] = bulkResByAssetInfo.ErrData
      //每次取循環的的最大id,作為syncMap的key
      syncMap.Store(test[nums-1].ID, tempMap)
      //執行完畢再釋放channel
      <-chan1
   }(ctx, targetIndexName, esClient)
  
}
wg.Wait()
//刷新結束,寫入通知,通知內容包括,遍歷sync.map,獲取返回值
syncMap.Range(func(key, value interface{}) bool {
   val := value.(map[string]interface{})
   succeededNums += val["successNums"].(int)
   failedNums += val["failedNums"].(int)
   errData += val["errData"].(string)
   return true
})

//打印結果
fmt.Println("成功數量:",succeededNums)
fmt.Println("失敗數量:",failedNums)
fmt.Println("失敗id:",errData)

2、syncMap的使用

1)寫入處理結果到map2) 寫入map到sync.Map中,注意key不要重復
(3)使用Range來循環sync.Map,獲取處理結果,并累加

3、控制goroutine的數量

這塊主要是通過設置channel的長度來實現的。

1)設定channel長度,循環開始每生成一個goroutine則寫入一次channel
(2) channel寫滿則阻塞
(3)goroutine執行完畢,釋放channel
(4for循環中繼續寫入channel,保證同時執行的goroutine只有10

三、sync.Map的缺點

1、需要對value做斷言處理,這個是interface{}的特性決定的

2、大家都知道sync.Map適合讀多寫少的場景,博主這里因為是跑腳本,所以使用sync.Map也無傷大雅,大家要追求性能的話,可以看一下currentMap的實現,通過hash分桶,減小鎖的粒度來提升性能。
current-Map

end

原文鏈接:https://blog.csdn.net/LJFPHP/article/details/125825337

欄目分類
最近更新