網站首頁 編程語言 正文
如何保證消息不丟失
在使用RabbitMQ的時候,我們需要保證消息不能丟失,消息從生產者生產出來一直到消費者消費成功,這條鏈路是這樣的:
消息的可靠投遞分為了兩大內容:發送端的確認(p->broker和exchange->queue)和消費端的確認(queue->c)。
發送端的確認
Rabbit提供了兩種方式來保證發送端的消息可靠性投遞:confirm 確認模式
和return 退回模式。
confirm 確認模式:消息從 producer 到達 exchange 則會給 producer 發送一個應答,我們需要開啟confirm模式,才能接收到這條應答。開啟方式是將Channel.Confirm(noWait bool)
參數設置為false
,表示同意發送者將當前channel信道設置為confirm模式。
return 退回模式:消息從 exchange–>queue 投遞失敗,會將消息退回給producer。
消費端的確認
消息從Queue發送到消費端之后,消費端會發送一個確認消息:Consumer Ack
,有兩種確認方式:自動確認和手動確認。
在編碼中,關于消息的確認方式,我們需要在消費者端調用Consumer
函數時,設置第三個參數:autoAck
是false還是true(false表示手動,true表示自動)。
自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應 message 從 RabbitMQ 的消息緩存中移除。
但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失。如果設置了手動確認方式,則需要在業務處理成功后,調用ch.Ack(false)
,手動簽收,如果出現異常,則調用d.Reject(true)
讓其自動重新發送消息。
Go 實現
安裝操作庫
安裝API庫
Go可以使用streadway/amqp
庫來操作rabbit,使用以下命令來安裝:
go get github.com/streadway/amqp
封裝rabbitmq
接下來我們對streadway/amqp
庫的內容進行一個二次封裝,封裝為一個rabbitmq.go
文件:
package rabbitmq import ( "encoding/json" "github.com/streadway/amqp" "log" ) // RabbitMQ RabbitMQ結構 type RabbitMQ struct { channel *amqp.Channel Name string exchange string } // Connect 連接服務器 func Connect(s string) *RabbitMQ { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務器失敗!") ch, e := conn.Channel() failOnError(e, "無法打開頻道!") mq := new(RabbitMQ) mq.channel = ch return mq } // New 初始化消息隊列 //第一個參數:rabbitmq服務器的鏈接,第二個參數:隊列名字 func New(s string, name string) *RabbitMQ { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務器失敗!") ch, e := conn.Channel() failOnError(e, "無法打開頻道!") q, e := ch.QueueDeclare( name, //隊列名 false, //是否開啟持久化 true, //不使用時刪除 false, //排他 false, //不等待 nil, //參數 ) failOnError(e, "初始化消息隊列失敗!") mq := new(RabbitMQ) mq.channel = ch mq.Name = q.Name return mq } // QueueDeclare 聲明queue func (q *RabbitMQ) QueueDeclare(queue string) { _, e := q.channel.QueueDeclare(queue, false, true, false, false, nil) failOnError(e, "聲明queue失敗!") } // QueueDelete 刪除queue func (q *RabbitMQ) QueueDelete(queue string) { _, e := q.channel.QueueDelete(queue, false, true, false) failOnError(e, "刪除queue失敗!") } // Qos 配置queue參數 func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設置QoS") } // NewExchange 初始化交換機 //第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型 func NewExchange(s string, name string, typename string) { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務器失敗!") ch, e := conn.Channel() failOnError(e, "無法打開頻道!") e = ch.ExchangeDeclare( name, // name typename, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(e, "初始化交換機失敗!") } // ExchangeDelete 刪除交換機 func (q *RabbitMQ) ExchangeDelete(exchange string) { e := q.channel.ExchangeDelete(exchange, false, true) failOnError(e, "刪除交換機失敗!") } // Bind 綁定消息隊列到exchange func (q *RabbitMQ) Bind(exchange string, key string) { e := q.channel.QueueBind( q.Name, key, exchange, false, nil, ) failOnError(e, "綁定隊列失敗!") q.exchange = exchange } // Send 向消息隊列發送消息 //Send方法可以往某個消息隊列發送消息 func (q *RabbitMQ) Send(queue string, body interface{}) { str, e := json.Marshal(body) failOnError(e, "消息序列化失敗!") e = q.channel.Publish( "", //交換 queue, //路由鍵 false, //必填 false, //立即 amqp.Publishing{ ReplyTo: q.Name, Body: []byte(str), }) msg := "向隊列:" + q.Name + "發送消息失敗!" failOnError(e, msg) } // Publish 向exchange發送消息 //Publish方法可以往某個exchange發送消息 func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) { str, e := json.Marshal(body) failOnError(e, "消息序列化失敗!") e = q.channel.Publish( exchange, key, false, false, amqp.Publishing{ReplyTo: q.Name, Body: []byte(str)}, ) failOnError(e, "向交換機發送消息失敗!") } // Consume 接收某個消息隊列的消息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個隊列中接收消息 "", true, false, false, false, nil, ) failOnError(e, "接收消息失敗!") return c } // Close 關閉隊列連接 func (q *RabbitMQ) Close() { q.channel.Close() } //錯誤處理函數 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
發送端的確認
首先初始化消息隊列的時候,我們要開啟confirm模式
,才能接收到這條應答。開啟方式是將Channel.Confirm(noWait bool)
參數設置為false
,表示同意發送者將當前channel信道設置為confirm模式。
func New(s string, name string) *RabbitMQ { conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務器失敗!") ch, e := conn.Channel() failOnError(e, "無法打開頻道!") q, e := ch.QueueDeclare( name, //隊列名 false, //是否開啟持久化 true, //不使用時刪除 false, //排他 false, //不等待 nil, //參數 ) failOnError(e, "初始化消息隊列失敗!") mq := new(RabbitMQ) mq.channel = ch mq.Name = q.Name // 設置為confirm模式 mq.channel.Confirm(false) return mq }
然后在封裝庫中創建一個函數handleConfirm()
用于接收來自Borker的回復:
func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation { return q.channel.NotifyPublish(ch) }
生產者
生產者端在向Broker發送消息的時候,我們使用一個無緩沖的通道來接收來自Broker的回復,然后創建一個協程監聽這個無緩沖通道。
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為topic類型 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout") confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation)) go handleConfirm(confirm) var i int for { time.Sleep(time.Second) producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "") i++ } } func handleConfirm(confirm <-chan amqp.Confirmation) { for { select { case message := <-confirm: fmt.Println("接收到來自Broker的回復:", message) } } }
運行結果:
接收到來自Broker的回復: {1 true}
接收到來自Broker的回復: {2 true}
接收到來自Broker的回復: {3 true}
接收到來自Broker的回復: {4 true}
接收到來自Broker的回復: {5 true}
消費端的確認
首先將Consume
函數的第三個參數autoAck
參數標記為false:
// Consume 接收某個消息隊列的消息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, "", false, // 不自動確認消息 false, false, false, nil, ) failOnError(e, "接收消息失敗!") return c }
在消費者端我們采用公平派遣模式,即隊列發送消息給消費者的時候,不再采用輪詢機制,而是一個消費者消費完消息之后,會調用Ack(false)
函數向隊列發送一個回復,隊列每次會將消息優先發送給消費完消息的消費者(回復過)。
消費端限流:
實現公平派遣模式我們需要設置消費者端一次只能消費一條消息,之前我們已經進行了封裝,直接在消費者端調用即可:
// Qos 配置queue參數 func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設置QoS") }
生產者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為direct類型 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct") i := 0 for { time.Sleep(time.Second) producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1") i = i + 1 } }
消費者1
消費者2在消費第三條消息的時候,假設發生了錯誤,我們調用d.Reject(true)
函數讓隊列重新發送消息。
func main() { //第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 指定一次只消費一條消息,直到消費完才重新接收 consumer1.Qos() // 隊列綁定到exchange consumer1.Bind("exchange", "key1") //接收消息 msgs := consumer1.Consume() go func() { var i int for d := range msgs { time.Sleep(time.Second * 1) log.Printf("Consumer1 received a message: %s", d.Body) // 假設消費第三條消息的時候出現了錯誤,我們就調用d.Reject(true),隊列會重新發送消息給消費者 if i == 2 { d.Reject(true) } else { // 消息消費成功之后就回復 d.Ack(false) } i++ } }() select {} }
消費者2
func main() { //第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 指定一次只消費一條消息,直到消費完才重新接收 consumer2.Qos() // 隊列綁定到exchange consumer2.Bind("exchange", "key1") //接收消息 msgs := consumer2.Consume() go func() { for d := range msgs { time.Sleep(time.Second * 5) log.Printf("Consumer2 received a message: %s", d.Body) // 消息消費成功之后就回復 d.Ack(false) } }() select {} }
運行結果:
# 消費者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"# 消費者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"
原文鏈接:https://blog.csdn.net/qq_49723651/article/details/127717187
相關推薦
- 2022-05-10 remote: error: GE007: Your push would publish a pr
- 2022-03-23 Android?Camera2開啟自動曝光功能_Android
- 2022-09-25 FFmpeg源碼分析:avformat_open_input()打開媒體流
- 2022-07-18 Linux tar命令;sftp;創建變量;find命令;history命令;引號的使用舉例;
- 2023-05-06 Python寫一個簡單的api接口的實現_python
- 2022-07-22 YOLOV7:AttributeError: module ‘distutils‘ has no a
- 2022-06-09 4種方法python批量修改替換列表中元素_python
- 2023-07-22 spark啟動參數性能優化
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支