日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

GoLang?RabbitMQ?TTL與死信隊(duì)列以及延遲隊(duì)列詳細(xì)講解_Golang

作者:Onemorelight95 ? 更新時(shí)間: 2023-01-15 編程語言

TTL

TTL 全稱 Time To Live(存活時(shí)間/過期時(shí)間)。當(dāng)消息到達(dá)存活時(shí)間后,還沒有被消費(fèi),就會(huì)被自動(dòng)清除。RabbitMQ可以設(shè)置兩種過期時(shí)間:

  • 對(duì)消息設(shè)置過期時(shí)間。
  • 對(duì)整個(gè)隊(duì)列(Queue)設(shè)置過期時(shí)間。

如何設(shè)置

  • 設(shè)置隊(duì)列過期時(shí)間使用參數(shù):x-message-ttl,單位:ms(毫秒),會(huì)對(duì)整個(gè)隊(duì)列消息統(tǒng)一過期。
  • 設(shè)置消息過期時(shí)間使用參數(shù):expiration,單位:ms(毫秒),當(dāng)該消息在隊(duì)列頭部時(shí)(消費(fèi)時(shí)),會(huì)單獨(dú)判斷這一消息是否過期。

如果兩者都設(shè)置了過期時(shí)間,以時(shí)間短的為準(zhǔn)。

在streadway/amqp庫提供的API中設(shè)置TTL

設(shè)置隊(duì)列過期時(shí)間:

QueueDeclare函數(shù)的最后一個(gè)參數(shù)是一個(gè)amqp.Table類型,它的聲明是這樣的: type Table map[string]interface{},其實(shí)是一個(gè)可以用于設(shè)置隊(duì)列屬性的map。

// 設(shè)置Queue ttl為5s
args := amqp.Table{"x-message-ttl": 5000}
q, e := ch.QueueDeclare(
		name,  //隊(duì)列名
		false, 
		true, 
		false, 
		false,
		args,   //設(shè)置Queue ttl為5s
	)

設(shè)置消息過期時(shí)間:

e = q.channel.Publish(
		"",    
		queue, 
		false, 
		false, 
		amqp.Publishing{
			// 設(shè)置當(dāng)前發(fā)送消息的過期時(shí)間為3s
			Expiration: "3000",
			ReplyTo:    q.Name,
			Body:       []byte(str),
})

死信隊(duì)列

當(dāng)一個(gè)隊(duì)列中存在死信時(shí),RabbitMQ會(huì)把消息發(fā)送給DLX(死信交換機(jī)),進(jìn)而被路由到另一個(gè)隊(duì)列中,這個(gè)隊(duì)列就叫做死信隊(duì)列。

死信就是指沒有被消費(fèi)者消費(fèi)成功的消息,一條消息變成死信有三種情況:

  • 如果給消息隊(duì)列設(shè)置了最大容量x-max-length,隊(duì)列已經(jīng)滿了,后續(xù)再進(jìn)來的消息會(huì)溢出,無法被隊(duì)列接收就會(huì)變成死信。
  • 消息接收時(shí)被拒絕會(huì)變成死信,例如調(diào)用Reject()函數(shù),并設(shè)置requeuefalse
  • 如果給消息隊(duì)列設(shè)置了消息的過期時(shí)間x-message-ttl,或者發(fā)送消息時(shí)設(shè)置了當(dāng)前消息的過期時(shí)間,當(dāng)消息在隊(duì)列中的存活時(shí)間大于過期時(shí)間時(shí),就會(huì)變成死信。

如何將死信發(fā)送給DLX

為隊(duì)列設(shè)置參數(shù)即可,將要發(fā)送死信的隊(duì)列配置以下兩個(gè)參數(shù):

x-dead-letter-exchange: [DLX的名字]
x-dead-letter-routing-key: [DLX的routing key]

下面是死信隊(duì)列的工作流程:

延遲隊(duì)列

延時(shí)隊(duì)列就是用來存放需要在指定時(shí)間被處理的元素的隊(duì)列,通常可以用來處理一些具有過期性操作的業(yè)務(wù)。

比如十分鐘內(nèi)未支付則取消訂單,原先這個(gè)功能我們可以使用定時(shí)器來實(shí)現(xiàn),即每隔一段時(shí)間去數(shù)據(jù)庫對(duì)比未支付訂單的當(dāng)前時(shí)間與訂單創(chuàng)建時(shí)間。但是定時(shí)器的時(shí)長難以確定,太長會(huì)導(dǎo)致訂單失效時(shí)間出現(xiàn)誤差,太短則會(huì)增大數(shù)據(jù)庫壓力。

實(shí)現(xiàn)

在RabbitMQ中沒有提供延遲隊(duì)列的功能,但是我們可以使用:TTL+死信隊(duì)列組合的方式來實(shí)現(xiàn)延遲隊(duì)列的效果。

下面是實(shí)現(xiàn)延遲隊(duì)列的流程圖:

Go實(shí)現(xiàn)延遲隊(duì)列

創(chuàng)建一個(gè)死信交換機(jī)

再創(chuàng)建一個(gè)死信隊(duì)列

將死信隊(duì)列綁定至死信交換機(jī)

創(chuàng)建一個(gè)正常隊(duì)列,并指定消息過期后被發(fā)往的死信交換機(jī)

生產(chǎn)者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05")
	fmt.Println(body)
	// 發(fā)送消息到queue.normal隊(duì)列中
	ch.Publish("", "queue.normal", false, false, amqp.Publishing{
		Body:       []byte(body),
		Expiration: "10000", // 設(shè)置TTL為10秒
	})
	defer conn.Close()
	defer ch.Close()
}

消費(fèi)者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	//監(jiān)聽queue.dlx隊(duì)列
	msgs, _ := ch.Consume(
		"queue.dlx",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	for d := range msgs {
		fmt.Printf("receive: %s\n", d.Body) // 收到消息,業(yè)務(wù)處理
	}
}

流程說明

生產(chǎn)者生產(chǎn)一條消息,然后指定消息的TTL為10s,接著將消息發(fā)給普通隊(duì)列,消息在普通隊(duì)列中過期后被發(fā)往死信交換機(jī),死信交換機(jī)將這條消息路由給延遲隊(duì)列。消費(fèi)者一直在監(jiān)聽到延遲隊(duì)列中的死信后,開始消費(fèi)。

原文鏈接:https://blog.csdn.net/qq_49723651/article/details/127720469

欄目分類
最近更新