網站首頁 編程語言 正文
一、前言
??????前幾天寫了篇文章,是通過sync.Map
獲取goroutine
的返回結果然后做出處理,但是一直感覺方案一般,不是很好。畢竟channel
才是欽定的太子,所以還是用channel
好一些。
golang控制goroutine數量以及獲取處理結果
二、誤區以及實戰代碼
1、誤區
??????博主自己用channel
一般都是用來控制goroutine
的并發,所以channel
結構比較簡單,就想當然的認為channel
只適合存儲簡單的結構,復雜的函數處理結果通過channel
處理不太方便,是在是謬之千里。
2.實戰代碼
注:以下為脫敏的偽代碼
代碼含義概覽:
(1)通過channel控制goroutine數量
(2)定義channel結構體,結構體里面可以根據需求嵌套其他結構體,實現我們想要的復雜結構
(3)每次循環獲取go函數處理結果,處理每次循環結果
//EsBulkDataRes 可以定義復雜的結構,或者嵌套結構體
type EsBulkDataRes struct {
SucceededNums int `json:"succeeded_nums"`
FailedNums int `json:"failed_nums"`
ErrData string `json:"err_data"`
//TestDoc *TestDoc
}
type TestDoc struct {
ID string `json:"_id,omitempty"`
Source map[string]interface{} `json:"_source,omitempty"` //map里面可以存儲復雜的結構
}
func testChannel() {
//定義要獲取的返回值,成功數量,失敗數量,失敗id集合
succeededNums, failedNums := 0, 0
var errData string
DealRes := make(chan EsBulkDataRes)
defer func() {
close(DealRes)
}()
wg := sync.WaitGroup{}
//控制goroutine數量,保證同時只有10個goroutine
chan1 := make(chan struct{}, 10)
ctx := context.Background()
for {
//業務邏輯
// ....
//goroutine加速,chan1寫滿,則阻塞。等待之前的goroutine釋放才能繼續循環
chan1 <- struct{}{}
wg.Add(1)
go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
defer func() {
if err1 := recover(); err1 != nil { //產生了panic異常
log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
}
//執行完畢再釋放channel
<-chan1
return
}()
bulkRequest := esClient.Bulk()
//ES使用bulk方法批量刷新數據
bulkResByAssetInfo := new(EsBulkDataRes)
bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
if err != nil {
log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
return
}
//執行結果寫入channel
DealRes <- *bulkResByAssetInfo
//執行完畢再釋放channel
<-chan1
}(ctx, targetIndexName, esClient)
//goroutine 執行結束,讀取channel累加結果
//讀取channel,也是為了方便下一個 goroutine 的寫入。沒讀的話,會阻塞
select {
case d, ok := <-DealRes:
if !ok {
continue
}
//累加結果
succeededNums += d.SucceededNums
failedNums += d.FailedNums
errData += d.ErrData
case <-ctx.Done():
return
}
}
wg.Wait()
//打印結果
fmt.Println("成功數量:", succeededNums)
fmt.Println("失敗數量:", failedNums)
fmt.Println("失敗id:", errData)
}
三、channel控制多個goroutine串行
??????這部分是博主之前碰到的面試題,說多個go
函數,每個go
函數都依賴于上一步的處理結果,如何實現串行。此處給出偽代碼,參考下即可。
//定義channel結構
type chanStruct struct {
Res1 int64
}
//每個函數中重新給channel賦值
func test1(chan1 chan chanStruct) {
res1 := new(chanStruct)
fmt.Println("test1")
res1.Res1 = 2
chan1 <- *res1
return
}
func test2(chan2 chan chanStruct) {
res2 := new(chanStruct)
fmt.Println("test2")
res2.Res1 = 3
chan2 <- *res2
return
}
func test3(chan3 chan chanStruct) {
fmt.Printf("test3,chanStruct:(%+v)", chan3)
return
}
//https://segmentfault.com/q/1010000041024462/
//無緩沖通道讀寫都必須在協程里,否則會阻塞。有緩沖通道則可以不需要都準備好,讀或者寫可以寫在當前線程里而不會阻塞。
func main() {
chan0 := make(chan chanStruct, 1)
//這里使用了"golang.org/x/sync/errgroup" 這個包,博主個人實驗,在此處非必需
g, ctx := errgroup.WithContext(context.Background())
chan0 <- chanStruct{
Res1: 1,
}
fmt.Println("write chan success!")
//errgroup控制并發,并獲取goroutine返回的錯誤(此處沒用到)
g.Go(func() error {
for {
select {
//注意這里,由于每次我們都讀出來了channel,因此需要在函數中給channel賦值
//保證能觸發下一個函數
case d, ok := <-chan0:
fmt.Println("d:", d)
if ok {
if d.Res1 == 1 {
go test1(chan0)
} else if d.Res1 == 2 {
go test2(chan0)
} else if d.Res1 == 3 {
go test3(chan0)
fmt.Println("end")
return nil
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})
//errgroup的Wait類似于sync.withGroup的Wait()方法,等待goroutine執行結束
if err := g.Wait(); err != nil {
log.Fatal(err)
}
}
四、后記
??????我們該如何確認自己寫的代碼比較好呢?這個好又要如何定義?只是實現功能還是說要保持優雅? 以上是博主跟一個大佬聊天的時候大佬問的問題。
??????對于我們開發者來說,以實現需求為第一目的是絕對沒問題的,但是代碼質量也需要持續提升。怎么提升呢,當然是看大佬的代碼!哪里大佬的代碼最多呢,當然是github!
??????博主最近看了https://github.com/olivere/esdiff
大佬的代碼,才發現自己以前的狹隘,也驚嘆于大佬的寫法之妙。這還只是個不知名的開源項目,不知道k8s,etcd等知名項目又會是怎樣的波瀾壯闊!加油!
end
原文鏈接:https://blog.csdn.net/LJFPHP/article/details/125963501
相關推薦
- 2022-08-22 C++動態規劃實現查找最長公共子序列_C 語言
- 2023-03-25 Redis實現UV統計的示例代碼_Redis
- 2023-01-13 解決安裝torch后,torch.cuda.is_available()結果為false的問題_py
- 2022-02-17 antv g2設置chart圖例的legend為一條線與一個圓的組合
- 2023-01-02 Pytes正確的配置使用日志功能_python
- 2022-11-05 Flutter實現一個支持漸變背景的Button示例詳解_Android
- 2022-05-17 Servlet快速入門
- 2022-11-21 在react中使用windicss的問題_React
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支