網站首頁 編程語言 正文
RabbitMQ
【第一章】RabbitMQ:從認識MQ到安裝,學習消息模型等
【第二章】RabbitMQ:常見消息模型
3.消息可靠性
消息從發送,到消費者接收,會經歷多個過程:
其中的每一步都可能導致消息丟失,常見的丟失原因包括:
- 發送時丟失:
- 生產者發送的消息未送達exchange
- 消息到達exchange后未到達queue
- MQ宕機,queue將消息丟失
- consumer接收到消息后未消費就宕機
針對這些問題,RabbitMQ分別給出了解決方案:
- 生產者確認機制
- mq持久化
- 消費者確認機制
- 失敗重試機制
下面我們就通過案例來演示每一個步驟。
首先,導入課前資料提供的demo工程:
項目結構如下:
3.1.生產者消息確認
RabbitMQ提供了publisher confirm機制來避免消息發送到MQ過程中丟失。這種機制必須給每個消息指定一個唯一ID。消息發送到MQ以后,會返回一個結果給發送者,表示消息是否處理成功。
返回結果有兩種方式:
- publisher-confirm,發送者確認
- 消息成功投遞到交換機,返回ack
- 消息未投遞到交換機,返回nack
- publisher-return,發送者回執
- 消息投遞到交換機了,但是沒有路由到隊列。返回ACK,及路由失敗原因。
注意:
3.1.1.修改配置
首先,修改publisher服務中的application.yml文件,添加下面的內容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
說明:
-
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:-
simple
:同步等待confirm結果,直到超時 -
correlated
:異步回調,定義ConfirmCallback,MQ返回結果時會回調這個ConfirmCallback
-
-
publish-returns
:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback -
template.mandatory
:定義消息路由失敗時的策略。true,則調用ReturnCallback;false:則直接丟棄消息
3.1.2.定義Return回調
每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置:
修改publisher服務,添加一個:
package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投遞失敗,記錄日志
log.info("消息發送失敗,應答碼{},原因{},交換機{},路由鍵{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有業務需要,可以重發消息
});
}
}
3.1.3.定義ConfirmCallback
ConfirmCallback可以在發送消息時指定,因為每個業務處理confirm成功或失敗的邏輯不一定相同。
在publisher服務的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個單元測試方法:
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息體
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息發送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失敗
log.error("消息發送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息發送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.發送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一會兒,等待ack回執
Thread.sleep(2000);
}
3.2.消息持久化
生產者確認可以確保消息投遞到RabbitMQ的隊列中,但是消息發送到RabbitMQ以后,如果突然宕機,也可能導致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機制。
- 交換機持久化
- 隊列持久化
- 消息持久化
3.2.1.交換機持久化
RabbitMQ中交換機默認是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過代碼指定交換機持久化:
@Bean
public DirectExchange simpleExchange(){
// 三個參數:交換機名稱、是否持久化、當沒有queue與其綁定時是否自動刪除
return new DirectExchange("simple.direct", true, false);
}
事實上,默認情況下,由SpringAMQP聲明的交換機都是持久化的。
可以在RabbitMQ控制臺看到持久化的交換機都會帶上D
的標示:
3.2.2.隊列持久化
RabbitMQ中隊列默認是非持久化的,mq重啟后就丟失。
SpringAMQP中可以通過代碼指定交換機持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構建隊列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
事實上,默認情況下,由SpringAMQP聲明的隊列都是持久化的。
可以在RabbitMQ控制臺看到持久化的隊列都會帶上D
的標示:
3.2.3.消息持久化
利用SpringAMQP發送消息時,可以設置消息的屬性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代碼指定:
默認情況下,SpringAMQP發出的任何消息都是持久化的,不用特意指定。
3.3.消費者消息確認
RabbitMQ是閱后即焚機制,RabbitMQ確認消息被消費者消費后會立刻刪除。
而RabbitMQ是通過消費者回執來確認消費者是否成功處理消息的:消費者獲取消息后,應該向RabbitMQ發送ACK回執,表明自己已經處理消息。
設想這樣的場景:
- 1)RabbitMQ投遞消息給消費者
- 2)消費者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費者宕機,消息尚未處理
這樣,消息就丟失了。因此消費者返回ACK的時機非常重要。
而SpringAMQP則允許配置三種確認模式:
?manual:手動ack,需要在業務代碼結束后,調用api發送ack。
?auto:自動ack,由spring監測listener代碼是否出現異常,沒有異常則返回ack;拋出異常則返回nack
?none:關閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除
由此可知:
- none模式下,消息投遞是不可靠的,可能丟失
- auto模式類似事務機制,出現異常時返回nack,消息回滾到mq;沒有異常,返回ack
- manual:自己根據業務情況,判斷什么時候該ack
一般,我們都是使用默認的auto即可。
發送者端:
@Test
public void testSimpleQueue() {
// 隊列名稱
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 發送消息
rabbitTemplate.convertAndSend(queueName, message);
}
3.3.1.演示none模式
修改consumer服務的application.yml文件,添加下面內容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 關閉ack
修改consumer服務的SpringRabbitListener類中的方法,模擬一個消息處理異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消費者接收到simple.queue的消息:【{}】", msg);
// 模擬異常
System.out.println(1 / 0);
log.debug("消息處理完成!");
}
測試可以發現,當消息處理拋異常時,消息依然被RabbitMQ刪除了。
3.3.2.演示auto模式
再次把確認機制修改為auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 關閉ack
在異常位置打斷點,再次發送消息,程序卡在斷點時,可以發現此時消息狀態為unack(未確定狀態):
拋出異常后,因為Spring會自動返回nack,所以消息恢復至Ready狀態,并且沒有被RabbitMQ刪除:
3.4.消費失敗重試機制
當消費者出現異常后,消息會不斷requeue(重入隊)到隊列,再重新發送給消費者,然后再次異常,再次requeue,無限循環,導致mq的消息處理飆升,帶來不必要的壓力:
怎么辦呢?
3.4.1.本地重試
我們可以利用Spring的retry機制,在消費者出現異常時利用本地重試,而不是無限制的requeue到mq隊列。
修改consumer服務的application.yml文件,添加內容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 # 初識的失敗等待時長為1秒
multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數
stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false
重啟consumer服務,重復之前的測試。可以發現:
- 在重試3次后,SpringAMQP會拋出異常AmqpRejectAndDontRequeueException,說明本地重試觸發了
- 查看RabbitMQ控制臺,發現消息被刪除了,說明最后SpringAMQP返回的是ack,mq刪除消息了
結論:
- 開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試
- 重試達到最大次數后,Spring會返回ack,消息會被丟棄
3.4.2.失敗策略
在之前的測試中,達到最大重試次數后,消息會被丟棄,這是由Spring內部機制決定的。
在開啟重試模式后,重試次數耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實現:
-
RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式
-
ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
-
RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機
比較優雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續由人工集中處理。
1)在consumer服務中定義處理失敗消息的交換機和隊列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定義一個RepublishMessageRecoverer,關聯隊列和交換機
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代碼:
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
3.5.總結
如何確保RabbitMQ消息的可靠性?
- 開啟生產者確認機制,確保生產者的消息能到達隊列
- 開啟持久化功能,確保消息未消費前在隊列中不會丟失
- 開啟消費者確認機制為auto,由spring確認消息處理成功后完成ack
- 開啟消費者失敗重試機制,并設置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機,交由人工處理
原文鏈接:https://blog.csdn.net/itcast_cn/article/details/127249194
相關推薦
- 2022-07-13 C# System.Web.HttpContext.Current.Server.MapPath 報
- 2022-03-19 Linux系統下安裝Redis數據庫過程_Redis
- 2023-07-22 macos通過homebrew安裝多版本node
- 2023-11-14 使用python獲取指定進程的CPU/內存情況;Python獲取指定進程的CPU和內存使用情況
- 2022-06-25 VS2022?.NET5一鍵發布到遠程騰訊云IIS服務器的詳細步驟_實用技巧
- 2022-07-30 Redis批量生成數據的實現_Redis
- 2022-05-24 python中的元組與列表及元組的更改_python
- 2022-05-17 Spring boot 集成Redis客戶端Lettuce,導致服務線程數不斷增加
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支