網站首頁 編程語言 正文
前言
工作中常常會遇到這樣的場景,如訂單到期未支付取消,到期自動續費等,我們發現延遲隊列非常適合在這樣的場景中使用。常見的延遲隊列的優秀實現有rabbitMQ的死信隊列,RocketMQ的延遲隊列等,但是了有時候項目沒有特別的大,沒有引入類似的消息中間件,但是了又遇到了特別適合使用延遲隊列的場景,我們一般會利用已有的redis實現一個簡陋的延遲隊列。常見的實現方式有監聽過期key,使用zset利用分值進行一個匹配,但是了這些實現或多或少有些問題,不夠優雅。監聽過期key是一種危險行為,一是如果過redis中key數量較大監聽過期key可能導致服務負載異常,二是redis中key過期后key是惰性刪除的,因此監聽機制需要主動觸發。利用zset分值實現呢,需要自己開發代碼處理定時輪訓以及key刪除的邏輯,具有一定的工作量和復雜度。哪有沒有一種優雅的redis延遲隊列的實現呢?
Redisson是Redis服務器上的分布式可伸縮Java數據結構----駐內存數據網格(In-Memory Data Grid,IMDG)。底層使用netty框架,并提供了與java對象相對應的分布式對象、分布式集合、分布式鎖和同步器、分布式服務等一系列的Redisson的分布式對象。為我們提供了許多開箱即用的功能。今天介紹Redisson實現的優雅的延遲隊列。
使用
依賴配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.homeey</groupId> <artifactId>redis-delay-queue</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redis-delay-queue</name> <description>redis-delay-queue</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.19.3</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-data-23</artifactId> <version>3.19.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
備注:處理redisson和springboot兼容性問題
配置文件
springboot整合redisson有三種方式
- 第一種:通用的redis配置+redisson的自動配置[最簡單]
- 第二種:使用單獨的redisson配置文件
- 第三種:使用spring.redis.redisson這個配置key下進行配置
詳細的整合查看 springboot整合redisson配置
spring: redis: database: 0 host: localhost port: 6379 timeout: 10000 lettuce: pool: max-active: 8 max-wait: -1 min-idle: 0 max-idle: 8
demo代碼
package com.homeey.redisdelayqueue.delay; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 明天的你會因今天到的努力而幸運 * * @author jt4mrg@qq.com * 23:11 2023-02-19 2023 **/ @Slf4j @Component @RequiredArgsConstructor public class RedissonDelayQueue { private final RDelayedQueue<String> delayedQueue; private final RBlockingQueue<String> blockingQueue; @PostConstruct public void init() { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.submit(() -> { while (true) { try { String task = blockingQueue.take(); log.info("rev delay task:{}", task); } catch (Exception e) { log.error("occur error", e); } } }); } public void offerTask(String task, long seconds) { log.info("add delay task:{},delay time:{}s", task, seconds); delayedQueue.offer(task, seconds, TimeUnit.SECONDS); } @Configuration static class RedissonDelayQueueConfigure { @Bean public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) { return redissonClient.getBlockingQueue("TOKEN-RENEWAL"); } @Bean public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue, RedissonClient redissonClient) { return redissonClient.getDelayedQueue(blockingQueue); } } }
執行效果
原理分析
從RedissonDelayedQueue
實現中我們看到有四個角色
- redisson_delay_queue_timeout:xxx,sorted set數據類型,存放所有延遲任務,按照延遲任務的到期時間戳(提交任務時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執行的任務,這個概念很重要
- redisson_delay_queue:xxx,list數據類型,暫時沒發現什么用,只是在提交任務時會寫入這里面,隊列轉移時又會刪除里面的元素
- xxx:list數據類型,被稱為目標隊列,這個里面存放的任務都是已經到了延遲時間的,可以被消費者獲取的任務,所以上面demo中的RBlockingQueue的take方法是從這個目標隊列中獲取到任務的
- redisson_delay_queue_channel:xxx,是一個channel,用來通知客戶端開啟一個延遲任務
隊列創建
RedissonDelayedQueue
延遲隊列創建時,指定了隊列轉移服務,以及實現延遲隊列的四個重要校色的key。核心代碼是指定隊列轉移任務
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "http://拿到zset中過期的值列表 + "if #expiredValues > 0 then " //如果有 + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);"http://解構消息,在提交任務時打包的消息 + "redis.call('rpush', KEYS[1], value);" //放入無前綴的list 隊頭 + "redis.call('lrem', KEYS[3], 1, v);"http://移除帶前綴list 隊尾元素 + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" //移除zset中本次讀取的過期元素 + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "http://取zset最小分值的元素 + "if v[1] ~= nil then " + "return v[2]; " //返回分值,即過期時間 + "end " + "return nil;", Arrays.asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic getTopic() { return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } };
生產者
核心代碼RedissonDelayedQueue#offerAsync
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" //打包消息體:消息id,消息長度,消息值 + "redis.call('zadd', KEYS[2], ARGV[1], value);"http://zset中加入消息及其超時分值 + "redis.call('rpush', KEYS[3], value);" //向帶前綴的list中添加消息 // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "http://取出zset中第一個元素 + "if v[1] == value then " //如果最快過期的元素就是這次發送的消息 + "redis.call('publish', KEYS[4], ARGV[1]); " //channel中發布一下超時時間 + "end;", Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));
消費者
消費者最簡單,直接從不帶前綴的list中BLPOP讀取就可以
整個流程
總結思考
Lua是redis的好朋友,我們可以看到Redisson實現延遲隊列時,大量使用到lua腳本,因Redis會將整個腳本作為一個整體執行,中間不會被其他請求插入。因此在腳本運行過程中無需擔心會出現競態條件,無需使用事務。我們在平時開發時有多個redis命令操作的有簡單的業務邏輯,不妨嘗試一下lua腳本的方式,可以避免使用分布式鎖來保障一致性。
Redisson的源碼值得一讀,有很多新東西值得學習,如果其用到的netty基于時間輪算法的定時任務調度,可以讓我們基于此實現自己的任務調度框架,也讓我有了去探究這種實現方式和基于ScheduledThreadPoolExecutor的定時調度的差異及各自優劣的欲望。
原文鏈接:https://juejin.cn/post/7203911920637919292
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2022-03-19 C語言常量介紹_C 語言
- 2022-04-10 解決:git push error: failed to push some refs to
- 2022-02-09 C++學習之IO流(輸入輸出流)詳解_C 語言
- 2022-11-05 python類別數據數字化LabelEncoder?VS?OneHotEncoder區別_pytho
- 2024-03-17 樹莓派無桌面配置WiFi連接
- 2022-04-17 python中random隨機函數詳解_python
- 2022-11-04 詳解Pytorch中的tensor數據結構_python
- 2022-12-19 Oracle?數據庫啟動過程的三階段、停庫四種模式詳解_oracle
- 欄目分類
-
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支