網站首頁 編程語言 正文
一、前言
??????最近遇到批量刷新ES數據的需求,為了加快處理速度,那必須首選goroutine
了,但是眾所周知,goroutine
的返回值和錯誤處理一直都讓人難以捉摸,go
出去簡單,怎么監測go
出去的結果是個問題。
1、goroutine的錯誤處理
??????sync.ErrGroup
在sync.WaitGroup
功能的基礎上,增加了錯誤傳遞,以及在發生不可恢復的錯誤時取消整個goroutine
集合,或者等待超時。
具體的大家可以百度學習下。errGroup
2、goroutine的處理結果
??????目前使用goroutine
一般采用的是 channel
、 sync.WaitGroup
、context
,來實現各個協程之間的流程控制和消息傳遞,首選是channel
來獲取處理結果,channel
參考:go通過channel獲取goroutine的處理結果。
除了channel
,那么是否可以用并發安全的sync.Map
來存儲結果,在所有的goroutine
執行完畢后,再統一獲取處理結果呢?答案是可以的,sync.Map
就可以完美實現。
二、控制goroutine數量以及獲取處理結果
1、實戰代碼
以下是使用goroutine
批量刷新ES數據,并獲取處理結果的代碼:
//定義要獲取的返回值,成功數量,失敗數量,失敗id集合
succeededNums, failedNums:= 0, 0,
errData:=""
var syncMap sync.Map
wg := sync.WaitGroup{}
//控制goroutine數量,保證同時只有10個goroutine
chan1 := make(chan struct{}, 10)
for {
//業務邏輯
// ....
//goroutine加速,chan1寫滿,則阻塞。等待之前的goroutine釋放才能繼續循環
chan1 <- struct{}{}
wg.Add(1)
go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
defer func() {
if err1 := recover(); err1 != nil { //產生了panic異常
log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
}
wg.Done() //每個goroutine執行完畢則釋放
return
}()
bulkRequest := esClient.Bulk()
//ES使用bulk方法批量刷新數據
bulkResByAssetInfo := new(EsBulkDataRes)
bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
if err != nil {
log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
return
}
//累加執行結果到sync.map,保證并發安全
tempMap := make(map[string]interface{})
tempMap["successNums"] = bulkResByAssetInfo.SucceededNums
tempMap["failedNums"] = bulkResByAssetInfo.FailedNums
tempMap["errData"] = bulkResByAssetInfo.ErrData
//每次取循環的的最大id,作為syncMap的key
syncMap.Store(test[nums-1].ID, tempMap)
//執行完畢再釋放channel
<-chan1
}(ctx, targetIndexName, esClient)
}
wg.Wait()
//刷新結束,寫入通知,通知內容包括,遍歷sync.map,獲取返回值
syncMap.Range(func(key, value interface{}) bool {
val := value.(map[string]interface{})
succeededNums += val["successNums"].(int)
failedNums += val["failedNums"].(int)
errData += val["errData"].(string)
return true
})
//打印結果
fmt.Println("成功數量:",succeededNums)
fmt.Println("失敗數量:",failedNums)
fmt.Println("失敗id:",errData)
2、syncMap的使用
(1)寫入處理結果到map
(2) 寫入map到sync.Map中,注意key不要重復
(3)使用Range來循環sync.Map,獲取處理結果,并累加
3、控制goroutine的數量
這塊主要是通過設置channel
的長度來實現的。
(1)設定channel長度,循環開始每生成一個goroutine則寫入一次channel
(2) channel寫滿則阻塞
(3)goroutine執行完畢,釋放channel
(4) for循環中繼續寫入channel,保證同時執行的goroutine只有10個
三、sync.Map的缺點
1、需要對value
做斷言處理,這個是interface{}
的特性決定的
2、大家都知道sync.Map
適合讀多寫少的場景,博主這里因為是跑腳本,所以使用sync.Map
也無傷大雅,大家要追求性能的話,可以看一下currentMap
的實現,通過hash
分桶,減小鎖的粒度來提升性能。
current-Map
end
原文鏈接:https://blog.csdn.net/LJFPHP/article/details/125825337
相關推薦
- 2022-10-30 C#實現單例模式的6種方法小結_C#教程
- 2022-07-14 python?中的requirements.txt?文件的使用詳情_python
- 2024-01-06 RocketMQ重復消費問題
- 2022-08-10 Go?modules?replace解決Go依賴引用問題_Golang
- 2022-06-15 ASP.NET?Core?MVC中的控制器(Controller)介紹_基礎應用
- 2022-09-29 React事件監聽和State狀態修改方式_React
- 2022-06-21 C語言程序的編譯與預處理基礎定義講解_C 語言
- 2022-02-11 Android?studio?利用共享存儲進行用戶的注冊和登錄驗證功能_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支