網站首頁 編程語言 正文
項目中需要對 api 的接口進行限流,但是麻煩的是,api 可能有多個節點,傳統的本地限流無法處理這個問題。限流的算法有很多,比如計數器法,漏斗法,令牌桶法,等等。各有利弊,相關博文網上很多,這里不再贅述。
項目的要求主要有以下幾點:
- 支持本地/分布式限流,接口統一
- 支持多種限流算法的切換
- 方便配置,配置方式不確定
go 語言不是很支持 OOP,我在實現的時候是按 Java 的思路走的,所以看起來有點不倫不類,希望能拋磚引玉。
1. 接口定義
package ratelimit import "time" // 限流器接口 type Limiter interface { ? ? Acquire() error ? ? TryAcquire() bool } // 限流定義接口 type Limit interface { ? ? Name() string ? ? Key() string ? ? Period() time.Duration ? ? Count() int32 ? ? LimitType() LimitType } // 支持 burst type BurstLimit interface { ? ? Limit ? ? BurstCount() int32 } // 分布式定義的 burst type DistLimit interface { ? ? Limit ? ? ClusterNum() int32 } type LimitType int32 const ( ? ? CUSTOM LimitType = iota ? ? IP )
Limiter 接口參考了 Google 的 guava 包里的 Limiter 實現。Acquire 接口是阻塞接口,其實還需要加上 context 來保證調用鏈安全,因為實際項目中并沒有用到 Acquire 接口,所以沒有實現完善;同理,超時時間的支持也可以通過添加新接口繼承自 Limiter 接口來實現。TryAcquire 會立即返回。
Limit 抽象了一個限流定義,Key() 方法返回這個 Limit 的唯一標識,Name() 僅作輔助,Period() 表示周期,單位是秒,Count() 表示周期內的最大次數,LimitType()表示根據什么來做區分,如 IP,默認是 CUSTOM.
BurstLimit 提供突發的能力,一般是配合令牌桶算法。DistLimit 新增 ClusterNum() 方法,因為 mentor 要求分布式遇到錯誤的時候,需要退化為單機版本,退化的策略即是:2 節點總共 100QPS,如果出現分區,每個節點需要調整為各 50QPS
2. LocalCounterLimiter
package ratelimit import ( ? ? "errors" ? ? "fmt" ? ? "math" ? ? "sync" ? ? "sync/atomic" ? ? "time" ) // todo timer 需要 stop type localCounterLimiter struct { ? ? limit Limit ? ? limitCount int32 // 內部使用,對 limit.count 做了 <0 時的轉換 ? ? ticker *time.Ticker ? ? quit chan bool ? ? lock sync.Mutex ? ? newTerm *sync.Cond ? ? count int32 } func (lim *localCounterLimiter) init() { ? ? lim.newTerm = sync.NewCond(&lim.lock) ? ? lim.limitCount = lim.limit.Count() ? ? if lim.limitCount < 0 { ? ? ? ? lim.limitCount = math.MaxInt32 // count 永遠不會大于 limitCount,后面的寫法保證溢出也沒問題 ? ? } else if lim.limitCount == 0 ?{ ? ? ? ? // 禁止訪問, 會無限阻塞 ? ? } else { ? ? ? ? lim.ticker = time.NewTicker(lim.limit.Period()) ? ? ? ? lim.quit = make(chan bool, 1) ? ? ? ? go func() { ? ? ? ? ? ? for { ? ? ? ? ? ? ? ? select { ? ? ? ? ? ? ? ? case <- lim.ticker.C: ? ? ? ? ? ? ? ? ? ? fmt.Println("ticker .") ? ? ? ? ? ? ? ? ? ? atomic.StoreInt32(&lim.count, 0) ? ? ? ? ? ? ? ? ? ? lim.newTerm.Broadcast() ? ? ? ? ? ? ? ? ? ? //lim.newTerm.L.Unlock() ? ? ? ? ? ? ? ? case <- lim.quit: ? ? ? ? ? ? ? ? ? ? fmt.Println("work well .") ? ? ? ? ? ? ? ? ? ? lim.ticker.Stop() ? ? ? ? ? ? ? ? ? ? return ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? }() ? ? } } // todo 需要機制來防止無限阻塞, 不超時也應該有個極限時間 func (lim *localCounterLimiter) Acquire() error { ? ? if lim.limitCount == 0 { ? ? ? ? return errors.New("rate limit is 0, infinity wait") ? ? } ? ? lim.newTerm.L.Lock() ? ? for lim.count >= lim.limitCount { ? ? ? ? // block instead of spinning ? ? ? ? lim.newTerm.Wait() ? ? ? ? //fmt.Println(count, lim.limitCount) ? ? } ? ? lim.count++ ? ? lim.newTerm.L.Unlock() ? ? return nil } func (lim *localCounterLimiter) TryAcquire() bool { ? ? count := atomic.AddInt32(&lim.count, 1) ? ? if count > lim.limitCount { ? ? ? ? return false ? ? } else { ? ? ? ? return true ? ? } }
代碼很簡單,就不多說了
3. LocalTokenBucketLimiter
golang 的官方庫里提供了一個 ratelimiter,就是采用令牌桶的算法。所以這里并沒有重復造輪子,直接代理了 ratelimiter。
package ratelimit import ( ? ? "context" ? ? "golang.org/x/time/rate" ? ? "math" ) type localTokenBucketLimiter struct { ? ? limit Limit ? ? limiter *rate.Limiter // 直接復用令牌桶的 } func (lim *localTokenBucketLimiter) init() { ? ? burstCount := lim.limit.Count() ? ? if burstLimit, ok := lim.limit.(BurstLimit); ok { ? ? ? ? burstCount = burstLimit.BurstCount() ? ? } ? ? count := lim.limit.Count() ? ? if count < 0 { ? ? ? ? count = math.MaxInt32 ? ? } ? ? f := float64(count) / lim.limit.Period().Seconds() ? ? if f < 0 { ? ? ? ? f = float64(rate.Inf) // 無限 ? ? } else if f == 0 { ? ? ? ? panic("為 0 的時候,底層實現有問題") ? ? } ? ? lim.limiter = rate.NewLimiter(rate.Limit(f), int(burstCount)) } func (lim *localTokenBucketLimiter) Acquire() error { ? ? err := lim.limiter.Wait(context.TODO()) ? ? return err } func (lim *localTokenBucketLimiter) TryAcquire() bool { ? ? return lim.limiter.Allow() }
4. RedisCounterLimiter
package ratelimit import ( ? ? "math" ? ? "sync" ? ? "xg-go/log" ? ? "xg-go/xg/common" ) type redisCounterLimiter struct { ? ? limit ? ? ?DistLimit ? ? limitCount int32 // 內部使用,對 limit.count 做了 <0 時的轉換 ? ? redisClient *common.RedisClient ? ? once sync.Once // 退化為本地計數器的時候使用 ? ? localLim Limiter ? ? //script string } func (lim *redisCounterLimiter) init() { ? ? lim.limitCount = lim.limit.Count() ? ? if lim.limitCount < 0 { ? ? ? ? lim.limitCount = math.MaxInt32 ? ? } ? ? //lim.script = buildScript() } //func buildScript() string { // ?sb := strings.Builder{} // // ?sb.WriteString("local c") // ?sb.WriteString("\nc = redis.call('get',KEYS[1])") // ?// 調用不超過最大值,則直接返回 // ?sb.WriteString("\nif c and tonumber(c) > tonumber(ARGV[1]) then") // ?sb.WriteString("\nreturn c;") // ?sb.WriteString("\nend") // ?// 執行計算器自加 // ?sb.WriteString("\nc = redis.call('incr',KEYS[1])") // ?sb.WriteString("\nif tonumber(c) == 1 then") // ?sb.WriteString("\nredis.call('expire',KEYS[1],ARGV[2])") // ?sb.WriteString("\nend") // ?sb.WriteString("\nif tonumber(c) == 1 then") // ?sb.WriteString("\nreturn c;") // // ?return sb.String() //} func (lim *redisCounterLimiter) Acquire() error { ? ? panic("implement me") } func (lim *redisCounterLimiter) TryAcquire() (success bool) { ? ? defer func() { ? ? ? ? // 一般是 redis 連接斷了,會觸發空指針 ? ? ? ? if err := recover(); err != nil { ? ? ? ? ? ? //log.Errorw("TryAcquire err", common.ERR, err) ? ? ? ? ? ? //success = lim.degradeTryAcquire() ? ? ? ? ? ? //return ? ? ? ? ? ? success = true ? ? ? ? } ? ? ? ? // 沒有錯誤,判斷是否開啟了 local 如果開啟了,把它停掉 ? ? ? ? //if lim.localLim != nil { ? ? ? ? // ?// stop 線程安全 ? ? ? ? // ?lim.localLim.Stop() ? ? ? ? //} ? ? }() ? ? count, err := lim.redisClient.IncrBy(lim.limit.Key(), 1) ? ? //panic("模擬 redis 出錯") ? ? if err != nil { ? ? ? ? log.Errorw("TryAcquire err", common.ERR, err) ? ? ? ? panic(err) ? ? } ? ? // *2 是為了保留久一點,便于觀察 ? ? err = lim.redisClient.Expire(lim.limit.Key(), int(2 * lim.limit.Period().Seconds())) ? ? if err != nil { ? ? ? ? log.Errorw("TryAcquire error", common.ERR, err) ? ? ? ? panic(err) ? ? } ? ? // 業務正確的情況下 確認超限 ? ? if int32(count) > lim.limitCount { ? ? ? ? return false ? ? } ? ? return true ? ? //keys := []string{lim.limit.Key()} ? ? // ? ? //log.Errorw("TryAcquire ", keys, lim.limit.Count(), lim.limit.Period().Seconds()) ? ? //count, err := lim.redisClient.Eval(lim.script, keys, lim.limit.Count(), lim.limit.Period().Seconds()) ? ? //if err != nil { ? ? // ?log.Errorw("TryAcquire error", common.ERR, err) ? ? // ?return false ? ? //} ? ? // ? ? // ? ? //typeName := reflect.TypeOf(count).Name() ? ? //log.Errorw(typeName) ? ? // ? ? //if count != nil && count.(int32) <= lim.limitCount { ? ? // ? ? // ?return true ? ? //} ? ? //return false } func (lim *redisCounterLimiter) Stop() { ? ? // 判斷是否開啟了 local 如果開啟了,把它停掉 ? ? if lim.localLim != nil { ? ? ? ? // stop 線程安全 ? ? ? ? lim.localLim.Stop() ? ? } } func (lim *redisCounterLimiter) degradeTryAcquire() bool { ? ? lim.once.Do(func() { ? ? ? ? count := lim.limit.Count() / lim.limit.ClusterNum() ? ? ? ? limit := LocalLimit { ? ? ? ? ? ? name: lim.limit.Name(), ? ? ? ? ? ? key: lim.limit.Key(), ? ? ? ? ? ? count: count, ? ? ? ? ? ? period: lim.limit.Period(), ? ? ? ? ? ? limitType: lim.limit.LimitType(), ? ? ? ? } ? ? ? ? lim.localLim = NewLimiter(&limit) ? ? }) ? ? return lim.localLim.TryAcquire() }
代碼里回退的部分注釋了,因為線上為了穩定,實習生的代碼畢竟,所以先不跑。
本來原有的思路是直接用 lua 腳本在 redis 上保證原子操作,但是底層封裝的庫對于直接調 eval 跑的時候,會拋錯,而且 source 是 go-redis 里面,趕 ddl 沒有時間去 debug,所以只能用 incrBy + expire 分開來。
5. RedisTokenBucketLimiter
令牌桶的狀態變量得放在一個 線程安全/一致 的地方,redis 是不二人選。但是令牌桶的算法核心是個延遲計算得到令牌數量,這個是一個很長的臨界區,所以要么用分布式鎖,要么直接利用 redis 的單線程以原子方式跑。一般業界是后者,即 lua 腳本維護令牌桶的狀態變量、計算令牌。代碼類似這種
local tokens_key = KEYS[1] local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) local rate = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local requested = tonumber(ARGV[4]) local intval = tonumber(ARGV[5]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) * intval local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then ? last_tokens = capacity end local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then ? last_refreshed = 0 end local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens if allowed then ? new_tokens = filled_tokens - requested end redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed, new_tokens }
原文鏈接:https://www.jianshu.com/p/709090b86018
相關推薦
- 2022-10-06 python中關于對super()函數疑問解惑_python
- 2022-11-29 C#使用泛型隊列Queue實現生產消費模式_C#教程
- 2022-10-09 淺談Redis處理接口冪等性的兩種方案_Redis
- 2022-03-26 C語言宏定義#define的使用_C 語言
- 2022-11-05 解決使用pip安裝報錯:Microsoft?Visual?C++?14.0?is?required.
- 2022-09-16 windows?dos命令解除端口占用的問題_DOS/BAT
- 2022-08-30 MQTT - 消息隊列遙測傳輸協議
- 2022-05-03 在Django中動態地過濾查詢集的實現_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支