網站首頁 編程語言 正文
前言
在使用協程并發處理某些任務時, 其并發數量往往因為各種因素的限制不能無限的增大. 例如網絡請求、數據庫查詢等等。從運行效率角度考慮,在相關服務可以負載的前提下(限制最大并發數),盡可能高的并發。本文就這個問題探尋一下解決方案和實現。共兩種思路,一是使用帶緩沖的通道實現,二是使用鎖實現。
一、使用帶緩沖的通道限制并發數
1.1方案詳情
先上代碼如下, 邏輯很簡單.
package golimit type GoLimit struct { ch chan int } func NewGoLimit(max int) *GoLimit { return &GoLimit{ch: make(chan int, max)} func (g *GoLimit) Add() { g.ch <- 1 func (g *GoLimit) Done() { <-g.ch
按允許最大并發數創建一個帶緩沖的通道, 創建協程之前調用Add()往通道里寫一個數據, 協程完成是調用Done()方法讀取一個數據. 若無法往通道里寫數據時, 表示通道已經寫滿, 也就是目前的協程并發數為允許的最大數量. Add()方法將被阻塞, 也就無法創建新的協程. 直到有協程運行完成, 調用Done()方法讀取了通道了一個數據.
以下是使用示例
package main import ( "golimit" "log" "time" ) func main() { log.Println("開始測試...") g := golimit.NewGoLimit(2) //max_num(最大允許并發數)設置為2 for i := 0; i < 10; i++ { //嘗試增加一個協程, 若已達到最大并發數,將阻塞 g.Add() go func(g *golimit.GoLimit, i int) { defer g.Done() //一個并發協程已經完成 time.Sleep(time.Second * 2) log.Println(i, "done") }(g, i) } log.Println("循環結束") time.Sleep(time.Second * 3)//等待執行完成 log.Println("測試結束") }
1.2評估總結
優點:此方案的實現邏輯簡單明了,易理解、易維護。若能滿足需求,在一般的場景下,此方案為首選。
隱憂:使用通道的緩沖區的大小來表示最大可并發數,在允許并發數較大,如幾千幾萬甚至更大的情況下,通道的性能和內存的負載是否會有問題,我不太清楚,若哪位朋友知道請告知一下。
不足:運行中難以調整最大可并發數。而在某些場景下是有這種需求的,如A服務依賴的B服務有擴容或縮減,但A服務不能停止,需要調整請求B服務接口的最大可并發數。二、使用鎖實現協程并發數量限制2.1方案詳情
同樣先上代碼(注:此代碼我已經在github上開源https://github.com/zh-five/golimit)
// 協程并發數限制庫 package golimit import ( "sync" ) type GoLimit struct { max uint //并發最大數量 count uint //當前已有并發數 isAddLock bool //是否已鎖定增加 zeroChan chan interface{} //為0時廣播 addLock sync.Mutex //(增加并發數的)鎖 dataLock sync.Mutex //(修改數據的)鎖 } func NewGoLimit(max uint) *GoLimit { return &GoLimit{max: max, count: 0, isAddLock: false, zeroChan: nil} } //并發計數加1.若 計數>=max_num, 則阻塞,直到 計數<max_num func (g *GoLimit) Add() { g.addLock.Lock() g.dataLock.Lock() g.count += 1 if g.count < g.max { //未超并發時解鎖,后續可以繼續增加 g.addLock.Unlock() } else { //已到最大并發數, 不解鎖并標記. 等數量減少后解鎖 g.isAddLock = true } g.dataLock.Unlock() } //并發計數減1 //若計數<max_num, 可以使原阻塞的Add()快速解除阻塞 func (g *GoLimit) Done() { g.dataLock.Lock() g.count -= 1 //解鎖 if g.isAddLock == true && g.count < g.max { g.isAddLock = false g.addLock.Unlock() } //0廣播 if g.count == 0 && g.zeroChan != nil { close(g.zeroChan) g.zeroChan = nil } g.dataLock.Unlock() } //更新最大并發計數為, 若是調大, 可以使原阻塞的Add()快速解除阻塞 func (g *GoLimit) SetMax(n uint) { g.dataLock.Lock() g.max = n //解鎖 if g.isAddLock == true && g.count < g.max { g.isAddLock = false g.addLock.Unlock() } //加鎖 if g.isAddLock == false && g.count >= g.max { g.isAddLock = true g.addLock.Lock() } g.dataLock.Unlock() } //若當前并發計數為0, 則快速返回; 否則阻塞等待,直到并發計數為0 func (g *GoLimit) WaitZero() { g.dataLock.Lock() //無需等待 if g.count == 0 { g.dataLock.Unlock() return } //無廣播通道, 創建一個 if g.zeroChan == nil { g.zeroChan = make(chan interface{}) } //復制通道后解鎖, 避免從nil讀數據 c := g.zeroChan g.dataLock.Unlock() <-c } //獲取并發計數 func (g *GoLimit) Count() uint { return g.count } //獲取最大并發計數 func (g *GoLimit) Max() uint { return g.max }
總共使用了兩把鎖,一把是數據鎖(dataLock),用來鎖定數據,保證數據修改安全,加鎖解鎖是在修改數據前后進行的;另一把是增加能否增加協程的鎖(addLock),增加協程時必須先加鎖,加鎖成功后修改并發數,若并發數小于最大可并發數,則解鎖,否則不解鎖,促使后續增加協程的加鎖操作阻塞,從而限制協程的并發數。使用示例如下:
package main import ( "github.com/zh-five/golimit" "log" "time" ) func main() { log.Println("開始測試...") g := golimit.NewGoLimit(2) //max_num(最大允許并發數)設置為2 for i := 0; i < 10; i++ { //并發計數加1.若 計數>=max_num, 則阻塞,直到 計數<max_num g.Add() //運行過程中可以隨時修改最大可并發數據 //g.SetMax(3) go func(g *golimit.GoLimit, i int) { defer g.Done() //并發計數減1 time.Sleep(time.Second * 2) log.Println(i, "done") }(g, i) } log.Println("循環結束") g.WaitZero() //阻塞, 直到所有并發都完成 log.Println("測試結束") }
方案2的GoLimit除了增加了SetMax()方法用于修改最大可并發數。出于好玩和偷懶增加了一個WaitZero()方法(其實外部使用sync.WaitGroup也可以快速實現此功能),用于阻塞等待所有并發協程都執行完成。大約可以用于如下場景:有一大批url需要有限制的并發采集數據,主程序里只需要簡單的調用一下WaitZero()方法,就可以阻塞等等所有采集的協程完成。
2.2評估總結
- 優點: 從實現邏輯上說,可以確定性能和消耗不會隨著最大可并發數增加而線性增加。另外還有很多可擴展的想象。
- 缺點:實現邏輯比較復雜
其它
其實我很想對比測試一下兩種方案的性能,特別是最大可并發比較大時。但我一直沒有找到一種好的測試方法,若哪個朋友有方法或思路,歡迎交流。
原文鏈接:https://www.cnblogs.com/lidabo/p/15839257.html
相關推薦
- 2022-05-22 基于jQuery排序及應用實現Tab欄特效_jquery
- 2022-08-25 利用Python實現一個簡易的截圖工具_python
- 2023-07-26 vscode中配置代碼片段
- 2022-04-17 Failed to bind properties under spring.servlet.mul
- 2022-10-05 虛擬機VMware?Tools安裝步驟_VMware
- 2022-03-17 C語言判斷數是否為素數與素數輸出_C 語言
- 2022-09-25 navicat連接遠程服務器報錯代碼:10038
- 2022-12-08 Matlab實現獲取文件夾下所有指定后綴的文件_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支