日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

RocketMQ重復(fù)消費(fèi)問(wèn)題

作者:數(shù)據(jù)驅(qū)動(dòng)生活 更新時(shí)間: 2024-01-06 編程語(yǔ)言

RocketMQ無(wú)法避免消息重復(fù),能確保所有消息至少傳遞一次。在大多數(shù)情況下,消息不會(huì)重復(fù)。

問(wèn)題產(chǎn)生原因:

  • 生產(chǎn)者重復(fù)發(fā)送消息。例如一次消息發(fā)送還未完成,但消息已存于Brocker時(shí)Brocker宕機(jī),過(guò)后生產(chǎn)者會(huì)重試發(fā)送消息從而導(dǎo)致消息重復(fù)。

  • 消費(fèi)者擴(kuò)容進(jìn)行重平衡后消息還未消費(fèi)完成可能會(huì)導(dǎo)致消息重復(fù)消費(fèi)。

解決方案:首先生產(chǎn)者給消息攜帶唯一標(biāo)記(自定義key等業(yè)務(wù)控制,消息的msgId一定是全局唯一標(biāo)識(shí)符,但實(shí)際使用中可能會(huì)出現(xiàn)相同消息有兩個(gè)不同的msgId)。然后通過(guò)消費(fèi)者控制消息的冪等性(多次操作產(chǎn)生的影響均和第一次影響相同),可通過(guò)MySQL自定義去重表或Redis實(shí)現(xiàn)。

模擬生產(chǎn)者發(fā)送重復(fù)消息:

@SpringBootTest
public class KeyTest {
    @Test
    void testProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
        producer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
        producer.start();
        // 自定義key
        String key = UUID.randomUUID().toString();
        System.out.println(key);
        Message message = new Message("repeatTopic", null, key, "扣減庫(kù)存-1".getBytes());
        Message repeatMessage = new Message("repeatTopic", null, key, "扣減庫(kù)存-1".getBytes());
        producer.send(message);
        producer.send(repeatMessage);
        System.out.println("發(fā)送成功!");
        producer.shutdown();
    }
}

通過(guò)MySQL自定義去重表解決,設(shè)置表中key為唯一索引,只有當(dāng)key不存在時(shí)才能插入成功,失敗報(bào)錯(cuò)則消息已消費(fèi)返回消費(fèi)成功狀態(tài)碼,成功則消息未消費(fèi)執(zhí)行相應(yīng)業(yè)務(wù)邏輯。

@Test
void testConsumerByMySQL() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
    consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
    consumer.subscribe("repeatTopic", "*");
    consumer.registerMessageListener((MessageListenerConcurrently) (megs, context) -> {
        MessageExt messageExt = megs.get(0);
        String keys = messageExt.getKeys();
        String desc = new String(messageExt.getBody());
        try {
            // 插入數(shù)據(jù)庫(kù),key設(shè)置了唯一索引,插入成功則處理該消息,報(bào)錯(cuò)則表示已經(jīng)處理過(guò)該消息
            Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
            PreparedStatement preparedStatement = connection.prepareStatement("insert into `dedupe_table` (`key`, `desc`) values (?, ?)");
            preparedStatement.setString(1, keys);
            preparedStatement.setString(2, desc);
            preparedStatement.executeUpdate();
        } catch (SQLException e) {
            if (e instanceof SQLIntegrityConstraintViolationException) {
                // 唯一索引沖突異常,說(shuō)明消息已經(jīng)處理過(guò)
                System.out.println("該消息已處理!");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            e.printStackTrace();
        }
        // 消息未處理,進(jìn)行相應(yīng)業(yè)務(wù)操作
        System.out.println(keys + desc);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    System.in.read(); // 使用系統(tǒng)輸入阻塞掛起JVM
}

通過(guò)Redis解決,Redis中使用setnx命令插入String類型數(shù)據(jù),只有當(dāng)key不存在時(shí)才能插入成功,失敗則消息已消費(fèi)返回消費(fèi)成功狀態(tài)碼,成功則消息未消費(fèi)執(zhí)行相應(yīng)業(yè)務(wù)邏輯。

@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
void testConsumerByRedis() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
    consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
    consumer.subscribe("repeatTopic", "*");
    consumer.registerMessageListener((MessageListenerConcurrently) (megs, context) -> {
        MessageExt messageExt = megs.get(0);
        String keys = messageExt.getKeys();
        String desc = new String(messageExt.getBody());
        // Redis中String類型使用setnx命令插入,只有當(dāng)key不存在時(shí)才能插入成功。
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("rocketmq:repeat:" + keys, desc);
        if (!flag) {
            // key在Redis中已存在,說(shuō)明消息已經(jīng)處理過(guò)
            System.out.println("該消息已處理!");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // 消息未處理,進(jìn)行相應(yīng)業(yè)務(wù)操作
        System.out.println(keys + desc);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    System.in.read(); // 使用系統(tǒng)輸入阻塞掛起JVM
}

原文鏈接:https://blog.csdn.net/m0_62129859/article/details/134889820

  • 上一篇:沒(méi)有了
  • 下一篇:沒(méi)有了
欄目分類
最近更新