網站首頁 編程語言 正文
正文
我們目前在工作中遇到一個性能問題,我們有個定時任務需要處理大量的數據,為了提升吞吐量,所以部署了很多臺機器,但這個任務在運行前需要從別的服務那拉取大量的數據,隨著數據量的增大,如果同時多臺機器并發拉取數據,會對下游服務產生非常大的壓力。之前已經增加了單機限流,但無法解決問題,因為這個數據任務運行中只有不到10%的時間拉取數據,如果單機限流限制太狠,雖然集群總的請求量控制住了,但任務吞吐量又降下來。如果限流閾值太高,多機并發的時候,還是有可能壓垮下游。 所以目前唯一可行的解決方案就是分布式限流。
我目前是選擇直接使用Redisson庫中的RRateLimiter實現了分布式限流,關于Redission可能很多人都有所耳聞,它其實是在Redis能力上構建的開發庫,除了支持Redis的基礎操作外,還封裝了布隆過濾器、分布式鎖、限流器……等工具。今天要說的RRateLimiter及時其實現的限流器。接下來本文將詳細介紹下RRateLimiter的具體使用方式、實現原理還有一些注意事項,最后簡單談談我對分布式限流底層原理的理解。
RRateLimiter使用
RRateLimiter的使用方式異常的簡單,參數也不多。只要創建出RedissonClient,就可以從client中獲取到RRateLimiter對象,直接看代碼示例。
RedissonClient redissonClient = Redisson.create(); RRateLimiter rateLimiter = redissonClient.getRateLimiter("xindoo.limiter"); rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.HOURS);
rateLimiter.trySetRate就是設置限流參數,RateType有兩種,OVERALL是全局限流 ,PER_CLIENT是單Client限流(可以認為就是單機限流),這里我們只討論全局模式。而后面三個參數的作用就是設置在多長時間窗口內(rateInterval+IntervalUnit),許可總量不超過多少(rate),上面代碼中我設置的值就是1小時內總許可數不超過100個。然后調用rateLimiter的tryAcquire()或者acquire()方法即可獲取許可。
rateLimiter.acquire(1); // 申請1份許可,直到成功 boolean res = rateLimiter.tryAcquire(1, 5, TimeUnit.SECONDS); // 申請1份許可,如果5s內未申請到就放棄
使用起來還是很簡單的嘛,以上代碼中的兩種方式都是同步調用,但Redisson還同樣提供了異步方法acquireAsync()和tryAcquireAsync(),使用其返回的RFuture就可以異步獲取許可。
RRateLimiter的實現
接下來我們順著tryAcquire()方法來看下它的實現方式,在RedissonRateLimiter類中,我們可以看到最底層的tryAcquireAsync()方法。
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { byte[] random = new byte[8]; ThreadLocalRandom.current().nextBytes(random); return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "——————————————————————————————————————" + "這里是一大段lua代碼" + "____________________________________", Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()), value, System.currentTimeMillis(), random); }
映入眼簾的就是一大段lua代碼,其實這段Lua代碼就是限流實現的核心,我把這段lua代碼摘出來,并加了一些注釋,我們來詳細看下。
local rate = redis.call("hget", KEYS[1], "rate") # 100 local interval = redis.call("hget", KEYS[1], "interval") # 3600000 local type = redis.call("hget", KEYS[1], "type") # 0 assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized") local valueName = KEYS[2] # {xindoo.limiter}:value 用來存儲剩余許可數量 local permitsName = KEYS[4] # {xindoo.limiter}:permits 記錄了所有許可發出的時間戳 # 如果是單實例模式,name信息后面就需要拼接上clientId來區分出來了 if type == "1" then valueName = KEYS[3] # {xindoo.limiter}:value:b474c7d5-862c-4be2-9656-f4011c269d54 permitsName = KEYS[5] # {xindoo.limiter}:permits:b474c7d5-862c-4be2-9656-f4011c269d54 end # 對參數校驗 assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate") # 獲取當前還有多少許可 local currentValue = redis.call("get", valueName) local res # 如果有記錄當前還剩余多少許可 if currentValue ~= false then # 回收已過期的許可數量 local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval) local released = 0 for i, v in ipairs(expiredValues) do local random, permits = struct.unpack("Bc0I", v) released = released + permits end # 清理已過期的許可記錄 if released > 0 then redis.call("zremrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval) if tonumber(currentValue) + released > tonumber(rate) then currentValue = tonumber(rate) - redis.call("zcard", permitsName) else currentValue = tonumber(currentValue) + released end redis.call("set", valueName, currentValue) end # ARGV permit timestamp random, random是一個隨機的8字節 # 如果剩余許可不夠,需要在res中返回下個許可需要等待多長時間 if tonumber(currentValue) < tonumber(ARGV[1]) then local firstValue = redis.call("zrange", permitsName, 0, 0, "withscores") res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2])) else redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1])) # 減小可用許可量 redis.call("decrby", valueName, ARGV[1]) res = nil end else # 反之,記錄到還有多少許可,說明是初次使用或者之前已記錄的信息已經過期了,就將配置rate寫進去,并減少許可數 redis.call("set", valueName, rate) redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1])) redis.call("decrby", valueName, ARGV[1]) res = nil end local ttl = redis.call("pttl", KEYS[1]) # 重置 if ttl > 0 then redis.call("pexpire", valueName, ttl) redis.call("pexpire", permitsName, ttl) end return res
即便是加了注釋,相信你還是很難一下子看懂這段代碼的,接下來我就以其在Redis中的數據存儲形式,然輔以流程圖讓大家徹底了解其實現實現原理。
首先用RRateLimiter有個name,在我代碼中就是xindoo.limiter
,用這個作為KEY你就可以在Redis中找到一個map,里面存儲了limiter的工作模式(type)、可數量(rate)、時間窗口大小(interval),這些都是在limiter創建時寫入到的redis中的,在上面的lua代碼中也使用到了。
其次還倆很重要的key,valueName和permitsName,其中在我的代碼實現中valueName是{xindoo.limiter}:value
,它存儲的是當前可用的許可數量。我代碼中permitsName的具體值是{xindoo.limiter}:permits
,它是一個zset,其中存儲了當前所有的許可授權記錄(含有許可授權時間戳),其中SCORE直接使用了時間戳,而VALUE中包含了8字節的隨機值和許可的數量,如下圖:
{xindoo.limiter}:permits這個zset中存儲了所有的歷史授權記錄,直到了這些信息,相信你也就理解了RRateLimiter的實現原理,我們還是將上面的那大段Lua代碼的流程圖繪制出來,整個執行的流程會更直觀。
看到這大家應該能理解這段Lua代碼的邏輯了,可以看到Redis用了多個字段來存儲限流的信息,也有各種各樣的操作,那Redis是如何保證在分布式下這些限流信息數據的一致性的?
答案是不需要保證,在這個場景下,信息天然就是一致性的。原因是Redis的單進程數據處理模型,在同一個Key下,所有的eval請求都是串行的,所有不需要考慮數據并發操作的問題。在這里,Redisson也使用了HashTag,保證所有的限流信息都存儲在同一個Redis實例上。
RRateLimiter使用時注意事項
了解了RRateLimiter的底層原理,再結合Redis自身的特性,我想到了RRateLimiter使用的幾個局限點(問題點)。
RRateLimiter是非公平限流器
這個是我查閱資料得知,并且在自己代碼實踐的過程中也得到了驗證,具體表現就是如果多個實例(機器)取競爭這些許可,很可能某些實例會獲取到大部分,而另外一些實例可憐巴巴僅獲取到少量的許可,也就是說容易出現旱的旱死 澇的澇死的情況。在使用過程中,你就必須考慮你能否接受這種情況,如果不能接受就得考慮用某些方式盡可能讓其變公平。
Rate不要設置太大
從RRateLimiter的實現原理你也看出了,它采用的是滑動窗口的模式來限流的,而且記錄了所有的許可授權信息,所以如果你設置的Rate值過大,在Redis中存儲的信息(permitsName對應的zset)也就越多,每次執行那段lua腳本的性能也就越差,這對Redis實例也是一種壓力。個人建議如果你是想設置較大的限流閾值,傾向于小Rate+小時間窗口的方式,而且這種設置方式請求也會更均勻一些。
限流的上限取決于Redis單實例的性能
從原理上看,RRateLimiter在Redis上所存儲的信息都必須在一個Redis實例上,所以它的限流QPS的上限就是Redis單實例的上限,比如你Redis實例就是1w QPS,你想用RRateLimiter實現一個2w QPS的限流器,必然實現不了。 那有沒有突破Redis單實例性能上限的方式?單限流器肯定是實現不了的,我們可以拆分多個限流器,比如我搞10個限流器,名詞用不一樣的,然后每臺機器隨機使用一個限流器限流,實際的流量不就被分散到不同的限流器上了嗎,總的限流上線不也就上來了。
分布式限流的本質
分布式限流的本質實際上就是協同,協同的本質就是信息交換,信息交換最重要的的就是信息的準確性和一致性。 更簡單粗暴理解,分布式限流的本質原理其實還是分布式數據一致性的原理,而限流只是數據結果的一種決策。所以只要以任何方式能讓信息同步,且保證信息的正確性就可以實現一個分布式限流器了,這就是我理解的本質思路。
其實從上面的RRateLimiter的實現原理也可以看出來,它不就是存儲了一些信息嗎! 那我不用Redis,而是使用mysql行不行。實際肯定是可以的,只要將的上面Lua代碼中的所有操作都放到一個事務里,且事務的級別改成串行化,依舊能實現RRateLimiter同樣的功能。如果你具備Mysql相關知識的話,肯定也能基于Mysql將RRateLimiter的API封裝出來,但是封裝出來的限流器,其限流的上限就取決于Mysql實例的性能上限。
最近chatGPT比較火,我也問了下它對分布式限流本質原理的理解,下面是它的回答,大家覺得怎么樣?
分布式限流的本質原理是通過在分布式系統中共享限流狀態來限制系統中單位時間內的請求數量,從而避免系統因流量過大而崩潰。
這是通過使用一些共享的存儲組件,如數據庫,緩存,分布式鎖等來實現的。在每次請求時,系統會檢查當前的請求數量是否超過了預先設定的限制,如果超過了限制,請求就會被拒絕;如果未超過限制,請求就會被允許。
通過使用分布式限流技術,系統可以在高并發情況下保持穩定的性能,并避免因流量過大而導致的系統崩潰。
原文鏈接:https://juejin.cn/post/7198826344582676538
相關推薦
- 2022-05-17 Mybatis中報錯:attempted to return null from a method
- 2022-06-30 C++四種case的詳細介紹小結_C 語言
- 2021-11-03 linux下shell常用腳本命令及有關知識_Linux
- 2022-05-21 k8s入門集群組件介紹及概念理解_服務器其它
- 2022-08-02 Android自定義Dialog的方法實例_Android
- 2022-08-20 Python超詳細講解內存管理機制_python
- 2022-11-30 Go語言k8s?kubernetes使用leader?election實現選舉_Golang
- 2022-09-10 Python入門之模塊和包用法詳解_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支