日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

GoLang?RabbitMQ?TTL與死信隊列以及延遲隊列詳細講解_Golang

作者:Onemorelight95 ? 更新時間: 2023-01-15 編程語言

TTL

TTL 全稱 Time To Live(存活時間/過期時間)。當消息到達存活時間后,還沒有被消費,就會被自動清除。RabbitMQ可以設置兩種過期時間:

  • 對消息設置過期時間。
  • 對整個隊列(Queue)設置過期時間。

如何設置

  • 設置隊列過期時間使用參數:x-message-ttl,單位:ms(毫秒),會對整個隊列消息統一過期。
  • 設置消息過期時間使用參數:expiration,單位:ms(毫秒),當該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。

如果兩者都設置了過期時間,以時間短的為準。

在streadway/amqp庫提供的API中設置TTL

設置隊列過期時間:

QueueDeclare函數的最后一個參數是一個amqp.Table類型,它的聲明是這樣的: type Table map[string]interface{},其實是一個可以用于設置隊列屬性的map。

// 設置Queue ttl為5s
args := amqp.Table{"x-message-ttl": 5000}
q, e := ch.QueueDeclare(
		name,  //隊列名
		false, 
		true, 
		false, 
		false,
		args,   //設置Queue ttl為5s
	)

設置消息過期時間:

e = q.channel.Publish(
		"",    
		queue, 
		false, 
		false, 
		amqp.Publishing{
			// 設置當前發送消息的過期時間為3s
			Expiration: "3000",
			ReplyTo:    q.Name,
			Body:       []byte(str),
})

死信隊列

當一個隊列中存在死信時,RabbitMQ會把消息發送給DLX(死信交換機),進而被路由到另一個隊列中,這個隊列就叫做死信隊列。

死信就是指沒有被消費者消費成功的消息,一條消息變成死信有三種情況:

  • 如果給消息隊列設置了最大容量x-max-length,隊列已經滿了,后續再進來的消息會溢出,無法被隊列接收就會變成死信。
  • 消息接收時被拒絕會變成死信,例如調用Reject()函數,并設置requeuefalse。
  • 如果給消息隊列設置了消息的過期時間x-message-ttl,或者發送消息時設置了當前消息的過期時間,當消息在隊列中的存活時間大于過期時間時,就會變成死信。

如何將死信發送給DLX

為隊列設置參數即可,將要發送死信的隊列配置以下兩個參數:

x-dead-letter-exchange: [DLX的名字]
x-dead-letter-routing-key: [DLX的routing key]

下面是死信隊列的工作流程:

延遲隊列

延時隊列就是用來存放需要在指定時間被處理的元素的隊列,通常可以用來處理一些具有過期性操作的業務。

比如十分鐘內未支付則取消訂單,原先這個功能我們可以使用定時器來實現,即每隔一段時間去數據庫對比未支付訂單的當前時間與訂單創建時間。但是定時器的時長難以確定,太長會導致訂單失效時間出現誤差,太短則會增大數據庫壓力。

實現

在RabbitMQ中沒有提供延遲隊列的功能,但是我們可以使用:TTL+死信隊列組合的方式來實現延遲隊列的效果。

下面是實現延遲隊列的流程圖:

Go實現延遲隊列

創建一個死信交換機

再創建一個死信隊列

將死信隊列綁定至死信交換機

創建一個正常隊列,并指定消息過期后被發往的死信交換機

生產者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05")
	fmt.Println(body)
	// 發送消息到queue.normal隊列中
	ch.Publish("", "queue.normal", false, false, amqp.Publishing{
		Body:       []byte(body),
		Expiration: "10000", // 設置TTL為10秒
	})
	defer conn.Close()
	defer ch.Close()
}

消費者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	//監聽queue.dlx隊列
	msgs, _ := ch.Consume(
		"queue.dlx",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	for d := range msgs {
		fmt.Printf("receive: %s\n", d.Body) // 收到消息,業務處理
	}
}

流程說明

生產者生產一條消息,然后指定消息的TTL為10s,接著將消息發給普通隊列,消息在普通隊列中過期后被發往死信交換機,死信交換機將這條消息路由給延遲隊列。消費者一直在監聽到延遲隊列中的死信后,開始消費。

原文鏈接:https://blog.csdn.net/qq_49723651/article/details/127720469

欄目分類
最近更新