網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
RocketMQ無(wú)法避免消息重復(fù),能確保所有消息至少傳遞一次。在大多數(shù)情況下,消息不會(huì)重復(fù)。
問(wèn)題產(chǎn)生原因:
-
生產(chǎn)者重復(fù)發(fā)送消息。例如一次消息發(fā)送還未完成,但消息已存于Brocker時(shí)Brocker宕機(jī),過(guò)后生產(chǎn)者會(huì)重試發(fā)送消息從而導(dǎo)致消息重復(fù)。
-
消費(fèi)者擴(kuò)容進(jìn)行重平衡后消息還未消費(fèi)完成可能會(huì)導(dǎo)致消息重復(fù)消費(fèi)。
解決方案:首先生產(chǎn)者給消息攜帶唯一標(biāo)記(自定義key等業(yè)務(wù)控制,消息的msgId一定是全局唯一標(biāo)識(shí)符,但實(shí)際使用中可能會(huì)出現(xiàn)相同消息有兩個(gè)不同的msgId)。然后通過(guò)消費(fèi)者控制消息的冪等性(多次操作產(chǎn)生的影響均和第一次影響相同),可通過(guò)MySQL自定義去重表或Redis實(shí)現(xiàn)。
模擬生產(chǎn)者發(fā)送重復(fù)消息:
@SpringBootTest
public class KeyTest {
@Test
void testProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
producer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
producer.start();
// 自定義key
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("repeatTopic", null, key, "扣減庫(kù)存-1".getBytes());
Message repeatMessage = new Message("repeatTopic", null, key, "扣減庫(kù)存-1".getBytes());
producer.send(message);
producer.send(repeatMessage);
System.out.println("發(fā)送成功!");
producer.shutdown();
}
}
通過(guò)MySQL自定義去重表解決,設(shè)置表中key為唯一索引,只有當(dāng)key不存在時(shí)才能插入成功,失敗報(bào)錯(cuò)則消息已消費(fèi)返回消費(fèi)成功狀態(tài)碼,成功則消息未消費(fèi)執(zhí)行相應(yīng)業(yè)務(wù)邏輯。
@Test
void testConsumerByMySQL() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (megs, context) -> {
MessageExt messageExt = megs.get(0);
String keys = messageExt.getKeys();
String desc = new String(messageExt.getBody());
try {
// 插入數(shù)據(jù)庫(kù),key設(shè)置了唯一索引,插入成功則處理該消息,報(bào)錯(cuò)則表示已經(jīng)處理過(guò)該消息
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
PreparedStatement preparedStatement = connection.prepareStatement("insert into `dedupe_table` (`key`, `desc`) values (?, ?)");
preparedStatement.setString(1, keys);
preparedStatement.setString(2, desc);
preparedStatement.executeUpdate();
} catch (SQLException e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
// 唯一索引沖突異常,說(shuō)明消息已經(jīng)處理過(guò)
System.out.println("該消息已處理!");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
e.printStackTrace();
}
// 消息未處理,進(jìn)行相應(yīng)業(yè)務(wù)操作
System.out.println(keys + desc);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.in.read(); // 使用系統(tǒng)輸入阻塞掛起JVM
}
通過(guò)Redis解決,Redis中使用setnx命令插入String類型數(shù)據(jù),只有當(dāng)key不存在時(shí)才能插入成功,失敗則消息已消費(fèi)返回消費(fèi)成功狀態(tài)碼,成功則消息未消費(fèi)執(zhí)行相應(yīng)業(yè)務(wù)邏輯。
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
void testConsumerByRedis() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (megs, context) -> {
MessageExt messageExt = megs.get(0);
String keys = messageExt.getKeys();
String desc = new String(messageExt.getBody());
// Redis中String類型使用setnx命令插入,只有當(dāng)key不存在時(shí)才能插入成功。
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("rocketmq:repeat:" + keys, desc);
if (!flag) {
// key在Redis中已存在,說(shuō)明消息已經(jīng)處理過(guò)
System.out.println("該消息已處理!");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 消息未處理,進(jìn)行相應(yīng)業(yè)務(wù)操作
System.out.println(keys + desc);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.in.read(); // 使用系統(tǒng)輸入阻塞掛起JVM
}
原文鏈接:https://blog.csdn.net/m0_62129859/article/details/134889820
- 上一篇:沒(méi)有了
- 下一篇:沒(méi)有了
相關(guān)推薦
- 2022-06-02 基于Redis6.2.6版本部署Redis?Cluster集群的問(wèn)題_Redis
- 2024-03-01 解決 “TypeError: Cannot read properties of undefined
- 2022-09-13 Go語(yǔ)言中map使用和并發(fā)安全詳解_Golang
- 2022-11-06 Swift使用SnapKit模仿Kingfisher第三方擴(kuò)展優(yōu)化_Swift
- 2023-04-02 pytorch?transform數(shù)據(jù)處理轉(zhuǎn)c++問(wèn)題_python
- 2022-06-21 C語(yǔ)言字符串函數(shù)與內(nèi)存函數(shù)精講_C 語(yǔ)言
- 2022-06-04 C#復(fù)雜XML反序列化為實(shí)體對(duì)象兩種方式小結(jié)_C#教程
- 2021-11-09 C++11?thread多線程編程創(chuàng)建方式_C 語(yǔ)言
- 欄目分類
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支