網站首頁 編程語言 正文
Redis5.0帶來了Stream類型。從字面上看是流類型,但其實從功能上看,應該是Redis對消息隊列(MQ,Message Queue)的完善實現。
基于redis實現消息隊列的方式有很多:
- PUB/SUB,訂閱/發布模式
- 基于List的 LPUSH+BRPOP 的實現
redis 實現消息對列4中方法
發布訂閱
發布訂閱優點: 典型的一對的,所有消費者都能同時消費到消息。主動通知訂閱者而不是訂閱者輪詢去讀。
發布訂閱缺點: 不支持多個消費者公平消費消息,消息沒有持久化,不管訂閱者是否收到消息,消息都會丟失。
使用場景:微服務間的消息同步,如 分布式webSocker,數據同步等。
list 隊列
生產者通過lpush生成消息,消費者通過blpop阻塞讀取消息。
**list隊列優點:**支持多個消費者公平消費消息,對消息進行存儲,可以通過lrange查詢隊列內的消息。
**list隊列缺點:**blpop仍然會阻塞當前連接,導致連接不可用。一旦blpop成功消息就丟棄了,期間如果服務器宕機消息會丟失,不支持一對多消費者。
zset 隊列
生產者通過zadd 創建消息時指定分數,可以確定消息的順序,消費者通過zrange獲取消息后進行消費,消費完后通zrem刪除消息。
zset優點: 保證了消息的順序,消費者消費失敗后重新入隊不會打亂消費順序。
zset缺點: 不支持一對多消費,多個消費者消費時可能出現讀取同一條消息的情況,得通過加鎖或其他方式解決消費的冪等性。
zset使用場景:由于數據是有序的,常常被用于延遲隊列,如 redisson的DelayQueue
Stream 隊列
Redis5.0帶來了Stream類型。從字面上看是流類型,但其實從功能上看,應該是Redis對消息隊列(MQ,Message Queue)的完善實現。
參考kafka的思想,通過多個消費者組和消費者支持一對多消費,公平消費,消費者內維護了pending列表防止消息丟失。
提供消息ack機制。
基本命令
xadd 生產消息
往 stream 內創建消息 語法為:
XADD key ID field string [field string …]
# * 表示自動生成id redis會根據時間戳+序列號自動生成id,不建議我們自己指定id xadd stream1 * name zs age 23
讀取消息
讀取stream內的消息,這個并不是消費,只是提供了查看數據的功能,語法為:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#表示從 stream1 內取出一條消息,從第0條消息讀取(0表示最小的id) xread count 1 streams stream1 0 #表示從 stream1 內 id=1649143363972-0 開始讀取一條消息,讀取的是指定id的下一條消息 xread count 1 streams msg 1649143363972-0 #表示一直阻塞讀取最新的消息($表示獲取下一個生成的消息) xread count 1 block 0 streams stream1 $ xrange stream - + 10
XRANGE key startID endID count
#表示從stream1內取10條消息 起始位置為 -(最小ID) 結束位置為+(最大ID) xrange stream1 - + 10
xgroup 消費者組
redis stream 借鑒了kafka的設計,采用了消費者和消費者組的概念。允許多個消費者組消費stream的消息,每個消費者組都能收到完整的消息,例如:stream內有10條消息,消費者組A和消費者組B同時消費時,都能獲取到這10條消息。
每個消費者組內可以有多個消費者消費,消息會平均分攤給各個消費者,例如:stream有10條消息,消費者A,B,C同時在同一個組內消費,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9
創建消費者組:
#消費消息首先得創建消費者組 # 表示為隊列 stream1 創建一個消費者組 group1 從消息id=0(第一條消息)開始讀取消息 xgroup create stream1 group1 0 #查詢stream1內的所有消費者組信息 xinfo groups stream1
xreadgroup 消費消息
通過xreadgroup可以在消費者組內創建消費者消費消息
XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#創建消費者讀取消息 #在group1消費者組內通過consumer1消費stream1內的消息,消費1條未分配的消息 (> 表示未分配過消費者的消息) xreadgrup group group1 consumer1 count 1 streams stream1 >
Pending 等待列表
通過 xreadgroup 讀取消息時消息會分配給對應的消費者,每個消費者內都維護了一個Pending列表用于保存接收到的消息,當消息ack后會從pending列表內移除,也就是說pending列表內維護的是所有未ack的消息id
每個Pending的消息有4個屬性:
- 消息ID
- 所屬消費者
- IDLE,已讀取時長
- delivery counter,消息被讀取次數
XPENDING key group [start end count] [consumer]
#查看pending列表 # 查看group1組內的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID xpending stream1 group1 - + 10 consumer1 # 查看group1組內的所有消費者pending類表 xpending stream1 group1 - + 10
消息確認
當消費者消費了消息,需要通過 xack
命令確認消息,xack后的消息會從pending列表移除
XACK key gruopName ID
xack stream1 group1 xxx
消息轉移
當消費者接收到消息卻不能正確消費時(報錯或其他原因),可以使用 XCLAIM
將消息轉移給其他消費者消費,需要設置組、轉移的目標消費者和消息ID,同時需要提供IDLE(已被讀取時長),只有超過這個時長,才能被轉移。
通過xclaim轉移的消息只是將消息移入另一個消費者的pending列表,消費者并不能通過xreadgroup讀取到消息,只能通過xpending讀取到。
# 表示將ID為 1553585533795-1 的消息轉移到消費者B消費,前提是消費 XCLAIM stream1 group1 consumer1 3600000 1553585533795-1
信息監控
redis提供了xinfo來查看stream的信息
#查看sream信息 xinfo stream steam1 #查詢消費者組信息 xinfo groups group1 #查詢消費者信息 xinfo consumers consumer1
SpringBoot 整合
1 引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2 編寫消費者
@Slf4j @Component public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> { public final String streamName = "emailStream"; public final String groupName = "emailGroup"; public final String consumerName = "emailConsumer"; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public void onMessage(MapRecord<String, String, String> message) { //log.info("stream名稱-->{}",message.getStream()); //log.info("消息ID-->{}",message.getId()); log.info("消息內容-->{}",message.getValue()); Map<String, String> msgMap = message.getValue(); if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){ //消費異常導致未能ack時,消息會進入pending列表,我們可以啟動定時任務來讀取pending列表處理失敗的任務 log.info("消費異常-->"+message); return; } StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream(); //消息應答 streamOperations.acknowledge( streamName,groupName,message.getId() ); } //我們可以啟動定時任務不斷監聽pending列表,處理死信消息 }
3 配置redis
序列化配置
@EnableCaching @Configuration public class RedisConfig { /** * 設置redis序列化規則 */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(om); return jackson2JsonRedisSerializer; } /** * RedisTemplate配置 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) { // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); // key序列化 redisTemplate.setKeySerializer(stringSerializer); // value序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // Hash key序列化 redisTemplate.setHashKeySerializer(stringSerializer); // Hash value序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
消費者組和消費者配置
@Slf4j @Configuration public class RedisStreamConfig { @Autowired private EmailConsumer emailConsumer; @Autowired private RedisTemplate<String,Object> redisTemplate; @Bean public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){ StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); return StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() //block讀取超時時間 .pollTimeout(Duration.ofSeconds(3)) //count 數量(一次只獲取一條消息) .batchSize(1) //序列化規則 .serializer( stringRedisSerializer ) .build(); } /** * 開啟監聽器接收消息 */ @Bean public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){ StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory, streamMessageListenerContainerOptions); //如果 流不存在 創建 stream 流 if( !redisTemplate.hasKey(emailConsumer.streamName)){ redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", "")); log.info("初始化stream {} success",emailConsumer.streamName); } //創建消費者組 try { redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName); } catch (Exception e) { log.info("消費者組 {} 已存在",emailConsumer.groupName); } //注冊消費者 消費者名稱,從哪條消息開始消費,消費者類 // > 表示沒消費過的消息 // $ 表示最新的消息 listenerContainer.receive( Consumer.from(emailConsumer.groupName, emailConsumer.consumerName), StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()), emailConsumer ); listenerContainer.start(); return listenerContainer; } }
4.生產者生產消息
@GetMapping("/redis/ps") public String redisPublish(String content,Integer count){ StreamOperations streamOperations = redisTemplate.opsForStream(); for (int i = 0; i < count; i++) { AtomicInteger num = new AtomicInteger(i); Map msgMap = new HashMap(); msgMap.put("count", i); msgMap.put("sID", num); //新增消息 streamOperations.add("emailStream",msgMap); } return "success"; }
參考文檔:
redis Stream 消息隊列
SpringBoot整合redis stream 實現消息隊列
原文鏈接:https://blog.csdn.net/dndndnnffj/article/details/124020407
相關推薦
- 2022-03-30 py3nvml實現GPU相關信息讀取的案例分析_python
- 2022-04-03 django8.5?項目部署Nginx的操作步驟_nginx
- 2022-08-01 如何利用python將Xmind用例轉為Excel用例_python
- 2023-03-16 numpy如何獲取array中數組元素的索引位置_python
- 2022-06-13 安裝Docker配置阿里云鏡像加速(圖文教程)_docker
- 2022-05-17 Error running ‘myToncat‘: Address localhost:8080 i
- 2022-09-16 Go?WEB框架使用攔截器驗證用戶登錄狀態實現_Golang
- 2022-08-03 在C++中把字符串轉換為整數的兩種簡單方法_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支