網(wǎng)站首頁 編程語言 正文
List
眾所周知redis數(shù)據(jù)結(jié)構(gòu)中的list的lpush與rpop可以用于常規(guī)消息隊列,從集合的最左端寫入,最右端彈出消費(fèi)。并且支持多個生產(chǎn)者與多個消費(fèi)者并發(fā)拿數(shù)據(jù),數(shù)據(jù)只能由一個消費(fèi)者拿到。
但這個方案并不能保證消費(fèi)者消費(fèi)消息后是否成功處理的問題(服務(wù)掛掉或處理異常等),機(jī)制屬于點(diǎn)對點(diǎn)模式不能做廣播模式(發(fā)布/訂閱模式)
Pub/sub
于是redis提供了相應(yīng)的發(fā)布訂閱功能,為了解除點(diǎn)對點(diǎn)的強(qiáng)綁定模式引入了Channel管道
。
當(dāng)生產(chǎn)者向管道中發(fā)布消息,訂閱了該管道的消費(fèi)者能夠同時接收到該消息,而且為了簡化訂閱多個管道需要顯式關(guān)注多個名稱提供了pattern能力。
通過名稱匹配如果接收消息的頻道wmyskxz.chat,consumer3也會收到消息。
但這個方案也有很大的詬病就是不會持久化,如果服務(wù)掛掉重啟數(shù)據(jù)就全丟棄了,也沒有提供ack機(jī)制,不保證數(shù)據(jù)可靠性,不管有沒有消費(fèi)成功發(fā)后既忘。
Stream
stream的話結(jié)構(gòu)很像kafka的設(shè)計思想,提供了consumer?group和offset機(jī)制,結(jié)構(gòu)上感覺跟kafka的topic差不多,只是沒有對應(yīng)partation副本機(jī)制,而是一個追加消息的鏈表結(jié)構(gòu)。客戶端調(diào)用XADD時候自動創(chuàng)建stream。每個消息都會持久化并存在唯一的id標(biāo)識
Consumer?Group
消費(fèi)者組的概念跟kafka的消費(fèi)者概念如出一轍,消費(fèi)者既可以用XREAD
命令進(jìn)行獨(dú)立消費(fèi),也可以多個消費(fèi)者同時加入一個消費(fèi)者組。一條消息只能由一個消費(fèi)者組中的一個消費(fèi)者消費(fèi)。這樣可以在分布式系統(tǒng)中保證消息的唯一性。
其實(shí)這個特性我后來仔細(xì)琢磨了一下當(dāng)時自認(rèn)為無懈可擊的流式圖表為了保證分布式系統(tǒng)消息唯一做了redis分布式鎖。有點(diǎn)雞肋,明明消費(fèi)者組已經(jīng)保證了數(shù)據(jù)的唯一性。只能說加鎖可以壓縮資源成本
last_delivered_id
用于標(biāo)識消費(fèi)者組消費(fèi)在stream上消費(fèi)位置的游標(biāo),每個消費(fèi)者組都有一個stream內(nèi)唯一的名稱,消費(fèi)者組不會自動創(chuàng)建,需要用XGROUP?CREATE
顯式創(chuàng)建。
pending_ids
每個消費(fèi)者內(nèi)部都有一個狀態(tài)變量。用來表示已經(jīng)
被客戶端消費(fèi)但沒有ack的消費(fèi)。目的是為了保證客戶端至少消費(fèi)了消息一次(atleastonce
)。如果消費(fèi)者收到了消息處理完了但是沒有回復(fù)ack,就會導(dǎo)致列表不斷增長,如果有很多消費(fèi)組的話,那么這個列表占用的內(nèi)存就會放大
curd
- xadd?追加消息
- xdel?刪除消息,這里的刪除僅僅是設(shè)置了標(biāo)志位,不影響消息總長度
- xrange?獲取消息列表,會自動過濾已經(jīng)刪除的消息
- xlen?消息長度
- del?刪除Stream
pending_ids如何避免消息丟失
在客戶端消費(fèi)者讀取Stream消息時,Redis服務(wù)器將消息回復(fù)給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。
但是pending_ids里已經(jīng)保存了發(fā)出去的消息ID。待客戶端重新連上之后,可以再次收到pending_ids中的消息ID列表。
不過此時xreadgroup的起始消息必須是任意有效的消息ID,一般將參數(shù)設(shè)為0-0,表示讀取所有的pending_ids消息以及自last_delivered_id之后的新消息。
嵌入SpringBoot
redis?stream雖然還是有一些弊端,但是相比較而言用kafka之類的消息組件太重,redis用作消息隊列已經(jīng)很合適了。
這里簡單提一下思路,本質(zhì)上是提供一個管理消息的一個小功能,定義一個注解用于創(chuàng)建stream管道
創(chuàng)建一個注解類,標(biāo)注該注解的類必須繼承StreamListener<String,?ObjectRecord<String,?Object>>類且重寫onMessage方法。方法上也加這個注解
創(chuàng)建一個config類實(shí)現(xiàn)BeanPostProcessor
接口,重寫bean聲明周期postProcessAfterInitialization
和postProcessBeforeInitialization
方法。該方法會在spring啟動流程里的refresh方法加載bean的聲明周期中掃描到所有加了注解的bean。
通過線程池挨個創(chuàng)建stream的group組與stream的consumer監(jiān)聽連接,config類記得繼承DisposableBean類在destroy方法里把連接關(guān)掉免得oom。
注冊redis stream api提供的consumer容器
這里一定注意pollTimeout參數(shù),看名字就知道默認(rèn)拉取數(shù)據(jù)時間間隔,這個參數(shù)如果寫的值很小或者寫0,你就看你cpu高不高就完了。
@Bean("listenerContainer") @DependsOn(value = "redisConnectionFactory") public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) .serializer(new StringRedisSerializer()) .executor(new ForkJoinPool()) .pollTimeout(Duration.ofSeconds(3)) .targetType(Object.class) .build(); return StreamMessageListenerContainer.create(redisConnectionFactory, options); }
創(chuàng)建消費(fèi)者
private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) { StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream(); if (stringRedisTemplate.hasKey(streamKey)) { StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey); AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false); groups.forEach(groupInfo -> { if (Objects.equals(group, groupInfo.getRaw().get("name"))) { groupHasKey.set(true); } }); if (groups.isEmpty() || !groupHasKey.get()) { creatGroup(streamKey, group); } else { groups.stream().forEach(g -> { log.info("XInfoGroups:{}", g); StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName()); log.info("XInfoConsumers:{}", consumers); }); } } else { creatGroup(streamKey, group); } StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed()); Consumer consumer = Consumer.from(group, consumerName); Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener); listenerContainer.start(); this.containerList.add(listenerContainer); return subscription; }
原文鏈接:https://juejin.cn/post/7146141052582232071
相關(guān)推薦
- 2022-09-21 Shell自動化配置SSH免密登錄和取消SSH免密配置腳本_linux shell
- 2023-09-18 springboot異常處理的一點(diǎn)總結(jié)
- 2023-03-01 GoLang?Time時間操作函數(shù)講解_Golang
- 2022-10-27 Python?Pandas中布爾索引的用法詳解_python
- 2022-07-10 CSS解決未知高度垂直居中
- 2023-03-17 一文掌握git?push命令_相關(guān)技巧
- 2022-07-01 python讀取nc數(shù)據(jù)并繪圖的方法實(shí)例_python
- 2022-04-01 使用Git clone代碼失敗的解決方法
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支