網站首頁 編程語言 正文
List
眾所周知redis數據結構中的list的lpush與rpop可以用于常規消息隊列,從集合的最左端寫入,最右端彈出消費。并且支持多個生產者與多個消費者并發拿數據,數據只能由一個消費者拿到。
但這個方案并不能保證消費者消費消息后是否成功處理的問題(服務掛掉或處理異常等),機制屬于點對點模式不能做廣播模式(發布/訂閱模式)
Pub/sub
于是redis提供了相應的發布訂閱功能,為了解除點對點的強綁定模式引入了Channel管道
。
當生產者向管道中發布消息,訂閱了該管道的消費者能夠同時接收到該消息,而且為了簡化訂閱多個管道需要顯式關注多個名稱提供了pattern能力。
通過名稱匹配如果接收消息的頻道wmyskxz.chat,consumer3也會收到消息。
但這個方案也有很大的詬病就是不會持久化,如果服務掛掉重啟數據就全丟棄了,也沒有提供ack機制,不保證數據可靠性,不管有沒有消費成功發后既忘。
Stream
stream的話結構很像kafka的設計思想,提供了consumer?group和offset機制,結構上感覺跟kafka的topic差不多,只是沒有對應partation副本機制,而是一個追加消息的鏈表結構。客戶端調用XADD時候自動創建stream。每個消息都會持久化并存在唯一的id標識
Consumer?Group
消費者組的概念跟kafka的消費者概念如出一轍,消費者既可以用XREAD
命令進行獨立消費,也可以多個消費者同時加入一個消費者組。一條消息只能由一個消費者組中的一個消費者消費。這樣可以在分布式系統中保證消息的唯一性。
其實這個特性我后來仔細琢磨了一下當時自認為無懈可擊的流式圖表為了保證分布式系統消息唯一做了redis分布式鎖。有點雞肋,明明消費者組已經保證了數據的唯一性。只能說加鎖可以壓縮資源成本
last_delivered_id
用于標識消費者組消費在stream上消費位置的游標,每個消費者組都有一個stream內唯一的名稱,消費者組不會自動創建,需要用XGROUP?CREATE
顯式創建。
pending_ids
每個消費者內部都有一個狀態變量。用來表示已經
被客戶端消費但沒有ack的消費。目的是為了保證客戶端至少消費了消息一次(atleastonce
)。如果消費者收到了消息處理完了但是沒有回復ack,就會導致列表不斷增長,如果有很多消費組的話,那么這個列表占用的內存就會放大
curd
- xadd?追加消息
- xdel?刪除消息,這里的刪除僅僅是設置了標志位,不影響消息總長度
- xrange?獲取消息列表,會自動過濾已經刪除的消息
- xlen?消息長度
- del?刪除Stream
pending_ids如何避免消息丟失
在客戶端消費者讀取Stream消息時,Redis服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。
但是pending_ids里已經保存了發出去的消息ID。待客戶端重新連上之后,可以再次收到pending_ids中的消息ID列表。
不過此時xreadgroup的起始消息必須是任意有效的消息ID,一般將參數設為0-0,表示讀取所有的pending_ids消息以及自last_delivered_id之后的新消息。
嵌入SpringBoot
redis?stream雖然還是有一些弊端,但是相比較而言用kafka之類的消息組件太重,redis用作消息隊列已經很合適了。
這里簡單提一下思路,本質上是提供一個管理消息的一個小功能,定義一個注解用于創建stream管道
創建一個注解類,標注該注解的類必須繼承StreamListener<String,?ObjectRecord<String,?Object>>類且重寫onMessage方法。方法上也加這個注解
創建一個config類實現BeanPostProcessor
接口,重寫bean聲明周期postProcessAfterInitialization
和postProcessBeforeInitialization
方法。該方法會在spring啟動流程里的refresh方法加載bean的聲明周期中掃描到所有加了注解的bean。
通過線程池挨個創建stream的group組與stream的consumer監聽連接,config類記得繼承DisposableBean類在destroy方法里把連接關掉免得oom。
注冊redis stream api提供的consumer容器
這里一定注意pollTimeout參數,看名字就知道默認拉取數據時間間隔,這個參數如果寫的值很小或者寫0,你就看你cpu高不高就完了。
@Bean("listenerContainer") @DependsOn(value = "redisConnectionFactory") public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) .serializer(new StringRedisSerializer()) .executor(new ForkJoinPool()) .pollTimeout(Duration.ofSeconds(3)) .targetType(Object.class) .build(); return StreamMessageListenerContainer.create(redisConnectionFactory, options); }
創建消費者
private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) { StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream(); if (stringRedisTemplate.hasKey(streamKey)) { StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey); AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false); groups.forEach(groupInfo -> { if (Objects.equals(group, groupInfo.getRaw().get("name"))) { groupHasKey.set(true); } }); if (groups.isEmpty() || !groupHasKey.get()) { creatGroup(streamKey, group); } else { groups.stream().forEach(g -> { log.info("XInfoGroups:{}", g); StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName()); log.info("XInfoConsumers:{}", consumers); }); } } else { creatGroup(streamKey, group); } StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed()); Consumer consumer = Consumer.from(group, consumerName); Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener); listenerContainer.start(); this.containerList.add(listenerContainer); return subscription; }
原文鏈接:https://juejin.cn/post/7146141052582232071
相關推薦
- 2022-08-10 python中ThreadPoolExecutor線程池和ProcessPoolExecutor進程
- 2024-03-06 SpringBoot 項目 批量刪除的操作
- 2022-07-03 TypeScript 變量聲明 —— 類型斷言
- 2022-10-16 Python計算標準差之numpy.std和torch.std的區別_python
- 2021-11-05 VBA工程加密PJ方式(兩種)_VBA
- 2023-05-05 Golang實現簡易的命令行功能_Golang
- 2022-06-16 Go基礎教程系列之回調函數和閉包詳解_Golang
- 2022-02-12 Flutter項目中有些依賴不支持64位的library的解決方式
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支