網(wǎng)站首頁 編程語言 正文
Goroutine
Goroutine 是 Golang 提供的一種輕量級線程,我們通常稱之為「協(xié)程」,相比較線程,創(chuàng)建一個協(xié)程的成本是很低的。所以你會經(jīng)常看到 Golang 開發(fā)的應(yīng)用出現(xiàn)上千個協(xié)程并發(fā)的場景。
Goroutine 的優(yōu)勢:
- 與線程相比,Goroutines 成本很低。
它們的堆棧大小只有幾 kb,堆棧可以根據(jù)應(yīng)用程序的需要增長和縮小,context switch 也很快,而在線程的情況下,堆棧大小必須指定并固定。
- Goroutine 被多路復(fù)用到更少數(shù)量的 OS 線程。
一個包含數(shù)千個 Goroutine 的程序中可能只有一個線程。如果該線程中的任何 Goroutine 阻塞等待用戶輸入,則創(chuàng)建另一個 OS 線程并將剩余的 Goroutine 移動到新的 OS 線程。所有這些都由運(yùn)行時處理,作為開發(fā)者無需耗費(fèi)心力關(guān)心,這也使得我們有很干凈的 API 來支持并發(fā)。
- Goroutines 使用 channel 進(jìn)行通信。
channel 的設(shè)計(jì)有效防止了在使用 Goroutine 訪問共享內(nèi)存時發(fā)生競爭條件(race conditions) 。channel 可以被認(rèn)為是 Goroutine 進(jìn)行通信的管道。
下文中我們會以「協(xié)程」來代指 Goroutine。
協(xié)程池
在高并發(fā)場景下,我們可能會啟動大量的協(xié)程來處理業(yè)務(wù)邏輯。協(xié)程池是一種利用池化技術(shù),復(fù)用對象,減少內(nèi)存分配的頻率以及協(xié)程創(chuàng)建開銷,從而提高協(xié)程執(zhí)行效率的技術(shù)。
最近抽空了解了字節(jié)官方開源的 gopkg 庫提供的 gopool
協(xié)程池實(shí)現(xiàn),感覺還是很高質(zhì)量的,代碼也非常簡潔清晰,而且 Kitex
底層也在使用 gopool
來管理協(xié)程,這里我們梳理一下設(shè)計(jì)和實(shí)現(xiàn)。
gopool
Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool
gopool
?is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to the?go
?keyword.
了解官方 README 就會發(fā)現(xiàn)gopool
的用法其實(shí)非常簡單,將曾經(jīng)我們經(jīng)常使用的?go func(){...}
?替換為?gopool.Go(func(){...})
即可。
此時 gopool
將會使用默認(rèn)的配置來管理你啟動的協(xié)程,你也可以選擇針對業(yè)務(wù)場景配置池子大小,以及擴(kuò)容上限。
old:
go func() { // do your job }()
new:
import ( "github.com/bytedance/gopkg/util/gopool" ) gopool.Go(func(){ /// do your job })
核心實(shí)現(xiàn)
下面我們來看看gopool
是怎樣實(shí)現(xiàn)協(xié)程池管理的。
Pool
Pool
是一個定義了協(xié)程池能力的接口。
type Pool interface { // 池子的名稱 Name() string // 設(shè)置池子內(nèi)Goroutine的容量 SetCap(cap int32) // 執(zhí)行 f 函數(shù) Go(f func()) // 帶 ctx,執(zhí)行 f 函數(shù) CtxGo(ctx context.Context, f func()) // 設(shè)置發(fā)生panic時調(diào)用的函數(shù) SetPanicHandler(f func(context.Context, interface{})) }
gopool
提供了這個接口的默認(rèn)實(shí)現(xiàn)(即下面即將介紹的pool
),當(dāng)我們直接調(diào)用 gopool.CtxGo 時依賴的就是這個。
這樣的設(shè)計(jì)模式在 Kitex
中也經(jīng)常出現(xiàn),所有的依賴均設(shè)計(jì)為接口,便于隨后擴(kuò)展,底層提供一個默認(rèn)的實(shí)現(xiàn)暴露出去,這樣對調(diào)用方也很友好。
type pool struct { // 池子名稱 name string // 池子的容量, 即最大并發(fā)工作的 goroutine 的數(shù)量 cap int32 // 池子配置 config *Config // task 鏈表 taskHead *task taskTail *task taskLock sync.Mutex taskCount int32 // 記錄當(dāng)前正在運(yùn)行的 worker 的數(shù)量 workerCount int32 // 當(dāng) worker 出現(xiàn)panic時被調(diào)用 panicHandler func(context.Context, interface{}) } // NewPool 創(chuàng)建一個新的協(xié)程池,初始化名稱,容量,配置 func NewPool(name string, cap int32, config *Config) Pool { p := &pool{ name: name, cap: cap, config: config, } return p }
調(diào)用 NewPool
獲取了以 Pool
的形式返回的 pool
結(jié)構(gòu)體。
Task
type task struct { ctx context.Context f func() next *task }
task
是一個鏈表結(jié)構(gòu),可以把它理解為一個待執(zhí)行的任務(wù),它包含了當(dāng)前節(jié)點(diǎn)需要執(zhí)行的函數(shù)f
, 以及指向下一個task
的指針。
綜合前一節(jié) pool
的定義,我們可以看到,一個協(xié)程池 pool
對應(yīng)了一組task
。
pool
維護(hù)了指向鏈表的頭尾的兩個指針:taskHead
和 taskTail
,以及鏈表的長度taskCount
和對應(yīng)的鎖 taskLock
。
Worker
type worker struct { pool *pool }
一個 worker
就是邏輯上的一個執(zhí)行器,它唯一對應(yīng)到一個協(xié)程池 pool
。當(dāng)一個worker
被喚起,將會開啟一個goroutine
,不斷地從 pool
中的 task
鏈表獲取任務(wù)并執(zhí)行。
func (w *worker) run() { go func() { for { // 聲明即將執(zhí)行的 task var t *task // 操作 pool 中的 task 鏈表,加鎖 w.pool.taskLock.Lock() if w.pool.taskHead != nil { // 拿到 taskHead 準(zhǔn)備執(zhí)行 t = w.pool.taskHead // 更新鏈表的 head 以及數(shù)量 w.pool.taskHead = w.pool.taskHead.next atomic.AddInt32(&w.pool.taskCount, -1) } // 如果前一步拿到的 taskHead 為空,說明無任務(wù)需要執(zhí)行,清理后返回 if t == nil { w.close() w.pool.taskLock.Unlock() w.Recycle() return } w.pool.taskLock.Unlock() // 執(zhí)行任務(wù),針對 panic 會recover,并調(diào)用配置的 handler func() { defer func() { if r := recover(); r != nil { msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack()) logger.CtxErrorf(t.ctx, msg) if w.pool.panicHandler != nil { w.pool.panicHandler(t.ctx, r) } } }() t.f() }() t.Recycle() } }() }
整體來看
看到這里,其實(shí)就能把整個流程串起來了。我們來看看對外的接口 CtxGo(context.Context, f func())
到底做了什么?
func Go(f func()) { CtxGo(context.Background(), f) } func CtxGo(ctx context.Context, f func()) { defaultPool.CtxGo(ctx, f) } func (p *pool) CtxGo(ctx context.Context, f func()) { // 創(chuàng)建一個 task 對象,將 ctx 和待執(zhí)行的函數(shù)賦值 t := taskPool.Get().(*task) t.ctx = ctx t.f = f // 將 task 插入 pool 的鏈表的尾部,更新鏈表數(shù)量 p.taskLock.Lock() if p.taskHead == nil { p.taskHead = t p.taskTail = t } else { p.taskTail.next = t p.taskTail = t } p.taskLock.Unlock() atomic.AddInt32(&p.taskCount, 1) // 以下兩個條件滿足時,創(chuàng)建新的 worker 并喚起執(zhí)行: // 1. task的數(shù)量超過了配置的限制 // 2. 當(dāng)前運(yùn)行的worker數(shù)量小于上限(或無worker運(yùn)行) if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 { // worker數(shù)量+1 p.incWorkerCount() // 創(chuàng)建一個新的worker,并把當(dāng)前 pool 賦值 w := workerPool.Get().(*worker) w.pool = p // 喚起worker執(zhí)行 w.run() } }
相信看了代碼注釋,大家就能理解發(fā)生了什么。
gopool
會自行維護(hù)一個 defaultPool
,這是一個默認(rèn)的 pool
結(jié)構(gòu)體,在引入包的時候就進(jìn)行初始化。當(dāng)我們直接調(diào)用 gopool.CtxGo()
時,本質(zhì)上是調(diào)用了 defaultPool
的同名方法
func init() { defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig()) } const ( defaultScalaThreshold = 1 ) // Config is used to config pool. type Config struct { // 控制擴(kuò)容的門檻,一旦待執(zhí)行的 task 超過此值,且 worker 數(shù)量未達(dá)到上限,就開始啟動新的 worker ScaleThreshold int32 } // NewConfig creates a default Config. func NewConfig() *Config { c := &Config{ ScaleThreshold: defaultScalaThreshold, } return c }
defaultPool
的名稱為 gopool.DefaultPool
,池子容量一萬,擴(kuò)容下限為 1。
當(dāng)我們調(diào)用 CtxGo
時,gopool
就會更新維護(hù)的任務(wù)鏈表,并且判斷是否需要擴(kuò)容 worker
:
- 若此時已經(jīng)有很多
worker
啟動(底層一個worker
對應(yīng)一個goroutine
),不需要擴(kuò)容,就直接返回。 - 若判斷需要擴(kuò)容,就創(chuàng)建一個新的
worker
,并調(diào)用worker.run()
方法啟動,各個worker
會異步地檢查pool
里面的任務(wù)鏈表是否還有待執(zhí)行的任務(wù),如果有就執(zhí)行。
三個角色的定位
-
task
是一個待執(zhí)行的任務(wù)節(jié)點(diǎn),同時還包含了指向下一個任務(wù)的指針,鏈表結(jié)構(gòu); -
worker
是一個實(shí)際執(zhí)行任務(wù)的執(zhí)行器,它會異步啟動一個goroutine
執(zhí)行協(xié)程池里面未執(zhí)行的task
; -
pool
是一個邏輯上的協(xié)程池,對應(yīng)了一個task
鏈表,同時負(fù)責(zé)維護(hù)task
狀態(tài)的更新,以及在需要的時候創(chuàng)建新的worker
。
使用 sync.Pool 進(jìn)行性能優(yōu)化
其實(shí)到這個地方,gopool
已經(jīng)是一個代碼簡潔清晰的協(xié)程池庫了,但是性能上顯然有改進(jìn)空間,所以gopool
的作者應(yīng)用了多次 sync.Pool
來池化對象的創(chuàng)建,復(fù)用woker和task對象。
這里建議大家直接看源碼,其實(shí)在上面的代碼中已經(jīng)有所涉及。
- task 池化
var taskPool sync.Pool func init() { taskPool.New = newTask } func newTask() interface{} { return &task{} } func (t *task) Recycle() { t.zero() taskPool.Put(t) }
- worker 池化
var workerPool sync.Pool func init() { workerPool.New = newWorker } func newWorker() interface{} { return &worker{} } func (w *worker) Recycle() { w.zero() workerPool.Put(w) }
原文鏈接:https://juejin.cn/post/7086443265309818894
相關(guān)推薦
- 2022-04-19 Python的閉包和裝飾器你真的了解嗎_python
- 2022-11-19 Gogs遷移
- 2022-03-20 6ull加載linux驅(qū)動模塊失敗解決方法_Linux
- 2022-07-03 python爬蟲lxml庫解析xpath網(wǎng)頁過程示例_python
- 2022-04-05 如何定義多個context:property-placeholder配置
- 2022-12-06 C++類成員函數(shù)后面加const問題_C 語言
- 2023-01-03 Nginx?Gunicorn?flask項(xiàng)目部署思路分析詳解_nginx
- 2022-09-24 React報錯之Object?is?possibly?null的問題及解決方法_React
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支