網(wǎng)站首頁 編程語言 正文
GoLang?RabbitMQ?TTL與死信隊(duì)列以及延遲隊(duì)列詳細(xì)講解_Golang
作者:Onemorelight95 ? 更新時(shí)間: 2023-01-15 編程語言TTL
TTL 全稱 Time To Live(存活時(shí)間/過期時(shí)間)。當(dāng)消息到達(dá)存活時(shí)間后,還沒有被消費(fèi),就會(huì)被自動(dòng)清除。RabbitMQ可以設(shè)置兩種過期時(shí)間:
- 對(duì)消息設(shè)置過期時(shí)間。
- 對(duì)整個(gè)隊(duì)列(Queue)設(shè)置過期時(shí)間。
如何設(shè)置
- 設(shè)置隊(duì)列過期時(shí)間使用參數(shù):
x-message-ttl
,單位:ms(毫秒),會(huì)對(duì)整個(gè)隊(duì)列消息統(tǒng)一過期。 - 設(shè)置消息過期時(shí)間使用參數(shù):
expiration
,單位:ms(毫秒),當(dāng)該消息在隊(duì)列頭部時(shí)(消費(fèi)時(shí)),會(huì)單獨(dú)判斷這一消息是否過期。
如果兩者都設(shè)置了過期時(shí)間,以時(shí)間短的為準(zhǔn)。
在streadway/amqp庫提供的API中設(shè)置TTL
設(shè)置隊(duì)列過期時(shí)間:
QueueDeclare
函數(shù)的最后一個(gè)參數(shù)是一個(gè)amqp.Table
類型,它的聲明是這樣的: type Table map[string]interface{}
,其實(shí)是一個(gè)可以用于設(shè)置隊(duì)列屬性的map。
// 設(shè)置Queue ttl為5s args := amqp.Table{"x-message-ttl": 5000} q, e := ch.QueueDeclare( name, //隊(duì)列名 false, true, false, false, args, //設(shè)置Queue ttl為5s )
設(shè)置消息過期時(shí)間:
e = q.channel.Publish( "", queue, false, false, amqp.Publishing{ // 設(shè)置當(dāng)前發(fā)送消息的過期時(shí)間為3s Expiration: "3000", ReplyTo: q.Name, Body: []byte(str), })
死信隊(duì)列
當(dāng)一個(gè)隊(duì)列中存在死信
時(shí),RabbitMQ會(huì)把消息發(fā)送給DLX
(死信交換機(jī)),進(jìn)而被路由到另一個(gè)隊(duì)列中,這個(gè)隊(duì)列就叫做死信隊(duì)列。
死信就是指沒有被消費(fèi)者消費(fèi)成功的消息,一條消息變成死信有三種情況:
- 如果給消息隊(duì)列設(shè)置了最大容量
x-max-length
,隊(duì)列已經(jīng)滿了,后續(xù)再進(jìn)來的消息會(huì)溢出,無法被隊(duì)列接收就會(huì)變成死信。 - 消息接收時(shí)被拒絕會(huì)變成死信,例如調(diào)用
Reject()
函數(shù),并設(shè)置requeue
為false
。 - 如果給消息隊(duì)列設(shè)置了消息的過期時(shí)間
x-message-ttl
,或者發(fā)送消息時(shí)設(shè)置了當(dāng)前消息的過期時(shí)間,當(dāng)消息在隊(duì)列中的存活時(shí)間大于過期時(shí)間時(shí),就會(huì)變成死信。
如何將死信發(fā)送給DLX
為隊(duì)列設(shè)置參數(shù)即可,將要發(fā)送死信的隊(duì)列配置以下兩個(gè)參數(shù):
x-dead-letter-exchange: [DLX的名字]
x-dead-letter-routing-key: [DLX的routing key]
下面是死信隊(duì)列的工作流程:
延遲隊(duì)列
延時(shí)隊(duì)列就是用來存放需要在指定時(shí)間被處理的元素的隊(duì)列,通常可以用來處理一些具有過期性操作的業(yè)務(wù)。
比如十分鐘內(nèi)未支付則取消訂單,原先這個(gè)功能我們可以使用定時(shí)器來實(shí)現(xiàn),即每隔一段時(shí)間去數(shù)據(jù)庫對(duì)比未支付訂單的當(dāng)前時(shí)間與訂單創(chuàng)建時(shí)間。但是定時(shí)器的時(shí)長難以確定,太長會(huì)導(dǎo)致訂單失效時(shí)間出現(xiàn)誤差,太短則會(huì)增大數(shù)據(jù)庫壓力。
實(shí)現(xiàn)
在RabbitMQ中沒有提供延遲隊(duì)列的功能,但是我們可以使用:TTL+死信隊(duì)列組合的方式來實(shí)現(xiàn)延遲隊(duì)列的效果。
下面是實(shí)現(xiàn)延遲隊(duì)列的流程圖:
Go實(shí)現(xiàn)延遲隊(duì)列
創(chuàng)建一個(gè)死信交換機(jī)
再創(chuàng)建一個(gè)死信隊(duì)列
將死信隊(duì)列綁定至死信交換機(jī)
創(chuàng)建一個(gè)正常隊(duì)列,并指定消息過期后被發(fā)往的死信交換機(jī)
生產(chǎn)者
func main() { conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/") ch, _ := conn.Channel() body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05") fmt.Println(body) // 發(fā)送消息到queue.normal隊(duì)列中 ch.Publish("", "queue.normal", false, false, amqp.Publishing{ Body: []byte(body), Expiration: "10000", // 設(shè)置TTL為10秒 }) defer conn.Close() defer ch.Close() }
消費(fèi)者
func main() { conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/") ch, _ := conn.Channel() //監(jiān)聽queue.dlx隊(duì)列 msgs, _ := ch.Consume( "queue.dlx", "", true, false, false, false, nil, ) for d := range msgs { fmt.Printf("receive: %s\n", d.Body) // 收到消息,業(yè)務(wù)處理 } }
流程說明
生產(chǎn)者生產(chǎn)一條消息,然后指定消息的TTL為10s,接著將消息發(fā)給普通隊(duì)列,消息在普通隊(duì)列中過期后被發(fā)往死信交換機(jī),死信交換機(jī)將這條消息路由給延遲隊(duì)列。消費(fèi)者一直在監(jiān)聽到延遲隊(duì)列中的死信后,開始消費(fèi)。
原文鏈接:https://blog.csdn.net/qq_49723651/article/details/127720469
相關(guān)推薦
- 2022-03-22 .NET?6開發(fā)TodoList實(shí)現(xiàn)請求日志組件HttpLogging_實(shí)用技巧
- 2022-05-28 C語言結(jié)構(gòu)體詳細(xì)圖解分析_C 語言
- 2022-05-03 使用EF的Code?First模式操作數(shù)據(jù)庫_實(shí)用技巧
- 2023-03-01 shell?創(chuàng)建子進(jìn)程及并行延時(shí)執(zhí)行命令方法_linux shell
- 2022-01-05 實(shí)體類[notmapped]特殊 “The specified type member ‘‘ is
- 2022-10-15 C++中?Sort函數(shù)詳細(xì)解析_C 語言
- 2022-05-06 shouldband綁定數(shù)據(jù)的方法詳解
- 2022-06-22 C語言樹與二叉樹基礎(chǔ)全刨析_C 語言
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支