網站首頁 編程語言 正文
引入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>
編輯配置文件,NameServer服務地址等,可以添加更多詳細配置以滿足具體業務需求,生產者組一般不關注,消費者組則需要保證組內訂閱關系的一致性,可在創建消費者時配置,通常每個微服務只有一組消費者。
rocketmq:
name-server: 192.168.101.128:9876
producer:
group: boot-producer-group
導入配置類,在SpringBoot3及其后續版本中,不會默認導入RocketMQ的配置類,可通過在啟動類使用@Import導入或創建其配置類。
@Import(RocketMQAutoConfiguration.class)
生產者,通過RocketMQTemplate其API發送消息。
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
void testProducer() {
// 同步消息
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的同步消息");
// 異步消息
rocketMQTemplate.asyncSend("bootTestTopic", "我是boot的異步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("發送成功!");
}
@Override
public void onException(Throwable throwable) {
System.out.println("發送失敗!");
}
});
// 單向消息
rocketMQTemplate.sendOneWay("bootTestTopic", "我是boot的單向消息");
// 延遲消息,必須創建Message對象,延遲等級在發送參數中
Message<String> message = MessageBuilder.withPayload("我是boot的延遲消息").build();
rocketMQTemplate.syncSend("bootTestTopic", message, 3000, 3);
// 順序消息,在發送消息時通過參數直接傳入hashKey,保證消息順序在同一隊列
List<MsgModel> msgModels = Arrays.asList(
new MsgModel("qwer", 1, "下單"),
new MsgModel("qwer", 1, "短信"),
new MsgModel("qwer", 1, "物流"),
new MsgModel("zxcv", 2, "下單"),
new MsgModel("zxcv", 2, "短信"),
new MsgModel("zxcv", 2, "物流")
);
for (MsgModel msgModel : msgModels) {
rocketMQTemplate.syncSendOrderly("bootTestTopic", JSON.toJSONString(msgModel), msgModel.getOrderNumber());
}
// 攜帶Tag的消息
rocketMQTemplate.syncSend("bootTestTopic:tagA", "我是boot帶tagA的消息");
// 攜帶key的消息,key攜帶在消息頭中
Message<String> msg = MessageBuilder
.withPayload("我是boot帶key的消息")
.setHeader(RocketMQHeaders.KEYS, "asfdsdghfjgh") // 攜帶key
.build();
rocketMQTemplate.syncSend("bootTestTopic:tagA", msg);
}
消費者,添加@Component和@RocketMQMessageListener注解,注解中指定消費者的具體配置,實現RocketMQListener接口,接口范型指定方法具體的參數類型,方法中對消息進行消費,方法沒有報錯消費成功,報錯則重試消費。
@Component
@RocketMQMessageListener(
topic = "bootTestTopic",
consumerGroup = "boot-test-consumer-group", // 消費者組,組內訂閱關系一致性
consumeMode = ConsumeMode.CONCURRENTLY, // 消費模式,默認并發模式,消費順序消息使用順序模式
maxReconsumeTimes = 5, // 消費重試次數
selectorType = SelectorType.TAG, // 過濾類型,默認TAG
selectorExpression = "tagA || tagB" // 過濾表達式,默認*全部
)
public class BootSimpleMsgListener implements RocketMQListener<MessageExt> {
/**
* 實現消費方法
* 如果范型指定了固定類型的參數,那么消息體的內容就是該方法的參數
* MessageExt是消息的所有內容
* 方法沒有報錯消費成功,報錯就會重試
* @param message
*/
@Override
public void onMessage(MessageExt message) {
System.out.println(message.getKeys()); // 獲取key
System.out.println(new String(message.getBody()));
}
}
原文鏈接:https://blog.csdn.net/m0_62129859/article/details/135005599
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2023-08-01 React之組件的分類、使用,事件對象,this指向問題,修改狀態以及受控組件與非受控組件
- 2022-05-21 redis數據一致性的實現示例_Redis
- 2022-11-04 關于docker部署服務時ip無法訪問服務正常的問題_docker
- 2022-06-08 SpringBoot jar包瘦身操作 -Dloader.path使用
- 2023-05-30 docker如何查看容器啟動命令(已運行的容器)_docker
- 2022-08-25 C#中的高效IO庫System.IO.Pipelines_C#教程
- 2022-12-11 Dart多個future隊列完成加入順序關系及原子性論證_Dart
- 2022-04-22 Mac環境下使用CLion調試redis 6.X源碼
- 欄目分類
-
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支