日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

RocketMQ死信消息解決方案

作者:數據驅動生活 更新時間: 2024-01-06 編程語言

當消息消費重試達到閾值后,將不再投遞到消費者,而是存于名為"%DLQ%消費者組名"主題中為死信消息。

死信消息解決方案:

A、通過業務控制避免死信消息,當重試次數大于自定義時,將消息記錄到特定位置或通知人工處理并完成消費,該方案使用較多。

@Test
void testConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        MessageExt messageExt = msgs.get(0);
        System.out.println(new Date() + "----" + new String(messageExt.getBody()) + "----" + messageExt.getReconsumeTimes());
        try {
            // 業務處理
            int num = 10/0;
        } catch (Exception e) {
            // 控制重試,當重試次數大于自定義時,將消息記錄到特定位置或通知人工處理并成功消費
            int reconsumeTimes = messageExt.getReconsumeTimes();
            if (reconsumeTimes < 3) {
                // 重試
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            // 不再重試
            System.out.println("將消息記錄到特定位置或通知人工處理!");
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    System.in.read(); // 使用系統輸入阻塞掛起JVM
}

B、消費死信主題消息,將消息記錄到特定位置或通知人工處理并完成消費,但對于過多的死信主題則需要很多的消費者,該方案使用較少。

@Test
void testConsumerDead() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-consumer-group");
    consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
    // 訂閱死信消息
    consumer.subscribe("%DLQ%retry-consumer-group", "*");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        // 業務處理
        MessageExt messageExt = msgs.get(0);
        System.out.println(new Date() + "----" + new String(messageExt.getBody()) + "----" + messageExt.getReconsumeTimes());
        System.out.println("死信消息記錄到特定位置或通知人工處理!");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    System.in.read(); // 使用系統輸入阻塞掛起JVM
}

原文鏈接:https://blog.csdn.net/m0_62129859/article/details/134917433

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新