日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

RabbitMQ:生產者消息確認、消息持久化、消費者消息確認、消費失敗重試機制

作者:黑馬程序員官方 更新時間: 2022-10-11 編程語言

RabbitMQ

【第一章】RabbitMQ:從認識MQ到安裝,學習消息模型等
【第二章】RabbitMQ:常見消息模型

3.消息可靠性

消息從發送,到消費者接收,會經歷多個過程:

在這里插入圖片描述

其中的每一步都可能導致消息丟失,常見的丟失原因包括:

  • 發送時丟失:
    • 生產者發送的消息未送達exchange
    • 消息到達exchange后未到達queue
  • MQ宕機,queue將消息丟失
  • consumer接收到消息后未消費就宕機

針對這些問題,RabbitMQ分別給出了解決方案:

  • 生產者確認機制
  • mq持久化
  • 消費者確認機制
  • 失敗重試機制

下面我們就通過案例來演示每一個步驟。

首先,導入課前資料提供的demo工程:

在這里插入圖片描述

項目結構如下:

在這里插入圖片描述

3.1.生產者消息確認

RabbitMQ提供了publisher confirm機制來避免消息發送到MQ過程中丟失。這種機制必須給每個消息指定一個唯一ID。消息發送到MQ以后,會返回一個結果給發送者,表示消息是否處理成功。

返回結果有兩種方式:

  • publisher-confirm,發送者確認
    • 消息成功投遞到交換機,返回ack
    • 消息未投遞到交換機,返回nack
  • publisher-return,發送者回執
    • 消息投遞到交換機了,但是沒有路由到隊列。返回ACK,及路由失敗原因。

在這里插入圖片描述

注意:

在這里插入圖片描述

3.1.1.修改配置

首先,修改publisher服務中的application.yml文件,添加下面的內容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
   

說明:

  • publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
    • simple:同步等待confirm結果,直到超時
    • correlated:異步回調,定義ConfirmCallback,MQ返回結果時會回調這個ConfirmCallback
  • publish-returns:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback
  • template.mandatory:定義消息路由失敗時的策略。true,則調用ReturnCallback;false:則直接丟棄消息

3.1.2.定義Return回調

每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置:

修改publisher服務,添加一個:

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 設置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投遞失敗,記錄日志
            log.info("消息發送失敗,應答碼{},原因{},交換機{},路由鍵{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有業務需要,可以重發消息
        });
    }
}

3.1.3.定義ConfirmCallback

ConfirmCallback可以在發送消息時指定,因為每個業務處理confirm成功或失敗的邏輯不一定相同。

在publisher服務的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個單元測試方法:

public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息體
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息發送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失敗
                log.error("消息發送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息發送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.發送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一會兒,等待ack回執
    Thread.sleep(2000);
}

3.2.消息持久化

生產者確認可以確保消息投遞到RabbitMQ的隊列中,但是消息發送到RabbitMQ以后,如果突然宕機,也可能導致消息丟失。

要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機制。

  • 交換機持久化
  • 隊列持久化
  • 消息持久化

3.2.1.交換機持久化

RabbitMQ中交換機默認是非持久化的,mq重啟后就丟失。

SpringAMQP中可以通過代碼指定交換機持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三個參數:交換機名稱、是否持久化、當沒有queue與其綁定時是否自動刪除
    return new DirectExchange("simple.direct", true, false);
}

事實上,默認情況下,由SpringAMQP聲明的交換機都是持久化的。

可以在RabbitMQ控制臺看到持久化的交換機都會帶上D的標示:

在這里插入圖片描述

3.2.2.隊列持久化

RabbitMQ中隊列默認是非持久化的,mq重啟后就丟失。

SpringAMQP中可以通過代碼指定交換機持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder構建隊列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

事實上,默認情況下,由SpringAMQP聲明的隊列都是持久化的。

可以在RabbitMQ控制臺看到持久化的隊列都會帶上D的標示:

在這里插入圖片描述

3.2.3.消息持久化

利用SpringAMQP發送消息時,可以設置消息的屬性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代碼指定:

在這里插入圖片描述

默認情況下,SpringAMQP發出的任何消息都是持久化的,不用特意指定。

3.3.消費者消息確認

RabbitMQ是閱后即焚機制,RabbitMQ確認消息被消費者消費后會立刻刪除。

而RabbitMQ是通過消費者回執來確認消費者是否成功處理消息的:消費者獲取消息后,應該向RabbitMQ發送ACK回執,表明自己已經處理消息。

設想這樣的場景:

  • 1)RabbitMQ投遞消息給消費者
  • 2)消費者獲取消息后,返回ACK給RabbitMQ
  • 3)RabbitMQ刪除消息
  • 4)消費者宕機,消息尚未處理

這樣,消息就丟失了。因此消費者返回ACK的時機非常重要。

而SpringAMQP則允許配置三種確認模式:

?manual:手動ack,需要在業務代碼結束后,調用api發送ack。

?auto:自動ack,由spring監測listener代碼是否出現異常,沒有異常則返回ack;拋出異常則返回nack

?none:關閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除

由此可知:

  • none模式下,消息投遞是不可靠的,可能丟失
  • auto模式類似事務機制,出現異常時返回nack,消息回滾到mq;沒有異常,返回ack
  • manual:自己根據業務情況,判斷什么時候該ack

一般,我們都是使用默認的auto即可。

發送者端:

@Test
    public void testSimpleQueue() {
        // 隊列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }

3.3.1.演示none模式

修改consumer服務的application.yml文件,添加下面內容:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 關閉ack

修改consumer服務的SpringRabbitListener類中的方法,模擬一個消息處理異常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    log.info("消費者接收到simple.queue的消息:【{}】", msg);
    // 模擬異常
    System.out.println(1 / 0);
    log.debug("消息處理完成!");
}

測試可以發現,當消息處理拋異常時,消息依然被RabbitMQ刪除了。

3.3.2.演示auto模式

再次把確認機制修改為auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 關閉ack

在異常位置打斷點,再次發送消息,程序卡在斷點時,可以發現此時消息狀態為unack(未確定狀態):

在這里插入圖片描述

拋出異常后,因為Spring會自動返回nack,所以消息恢復至Ready狀態,并且沒有被RabbitMQ刪除:

在這里插入圖片描述

3.4.消費失敗重試機制

當消費者出現異常后,消息會不斷requeue(重入隊)到隊列,再重新發送給消費者,然后再次異常,再次requeue,無限循環,導致mq的消息處理飆升,帶來不必要的壓力:

在這里插入圖片描述

怎么辦呢?

3.4.1.本地重試

我們可以利用Spring的retry機制,在消費者出現異常時利用本地重試,而不是無限制的requeue到mq隊列。

修改consumer服務的application.yml文件,添加內容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費者失敗重試
          initial-interval: 1000 # 初識的失敗等待時長為1秒
          multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數
          stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false

重啟consumer服務,重復之前的測試。可以發現:

  • 在重試3次后,SpringAMQP會拋出異常AmqpRejectAndDontRequeueException,說明本地重試觸發了
  • 查看RabbitMQ控制臺,發現消息被刪除了,說明最后SpringAMQP返回的是ack,mq刪除消息了

結論:

  • 開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試
  • 重試達到最大次數后,Spring會返回ack,消息會被丟棄

3.4.2.失敗策略

在之前的測試中,達到最大重試次數后,消息會被丟棄,這是由Spring內部機制決定的。

在開啟重試模式后,重試次數耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實現:

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式

  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊

  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機

比較優雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續由人工集中處理。

1)在consumer服務中定義處理失敗消息的交換機和隊列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定義一個RepublishMessageRecoverer,關聯隊列和交換機

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代碼:

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

3.5.總結

如何確保RabbitMQ消息的可靠性?

  • 開啟生產者確認機制,確保生產者的消息能到達隊列
  • 開啟持久化功能,確保消息未消費前在隊列中不會丟失
  • 開啟消費者確認機制為auto,由spring確認消息處理成功后完成ack
  • 開啟消費者失敗重試機制,并設置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機,交由人工處理

原文鏈接:https://blog.csdn.net/itcast_cn/article/details/127249194

欄目分類
最近更新