日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

SpringBoot3集成RocketMQ

作者:數據驅動生活 更新時間: 2024-01-06 編程語言

引入依賴:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-spring-boot-starter.version}</version>
</dependency>

編輯配置文件,NameServer服務地址等,可以添加更多詳細配置以滿足具體業務需求,生產者組一般不關注,消費者組則需要保證組內訂閱關系的一致性,可在創建消費者時配置,通常每個微服務只有一組消費者。

rocketmq:
  name-server: 192.168.101.128:9876
  producer:
    group: boot-producer-group

導入配置類,在SpringBoot3及其后續版本中,不會默認導入RocketMQ的配置類,可通過在啟動類使用@Import導入或創建其配置類。

@Import(RocketMQAutoConfiguration.class)

生產者,通過RocketMQTemplate其API發送消息。

@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
void testProducer() {
    // 同步消息
    rocketMQTemplate.syncSend("bootTestTopic", "我是boot的同步消息");
    // 異步消息
    rocketMQTemplate.asyncSend("bootTestTopic", "我是boot的異步消息", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("發送成功!");
        }
        @Override
        public void onException(Throwable throwable) {
            System.out.println("發送失敗!");
        }
    });
    // 單向消息
    rocketMQTemplate.sendOneWay("bootTestTopic", "我是boot的單向消息");
    // 延遲消息,必須創建Message對象,延遲等級在發送參數中
    Message<String> message = MessageBuilder.withPayload("我是boot的延遲消息").build();
    rocketMQTemplate.syncSend("bootTestTopic", message, 3000, 3);
    // 順序消息,在發送消息時通過參數直接傳入hashKey,保證消息順序在同一隊列
    List<MsgModel> msgModels = Arrays.asList(
            new MsgModel("qwer", 1, "下單"),
            new MsgModel("qwer", 1, "短信"),
            new MsgModel("qwer", 1, "物流"),
            new MsgModel("zxcv", 2, "下單"),
            new MsgModel("zxcv", 2, "短信"),
            new MsgModel("zxcv", 2, "物流")
    );
    for (MsgModel msgModel : msgModels) {
        rocketMQTemplate.syncSendOrderly("bootTestTopic", JSON.toJSONString(msgModel), msgModel.getOrderNumber());
    }
    // 攜帶Tag的消息
    rocketMQTemplate.syncSend("bootTestTopic:tagA", "我是boot帶tagA的消息");
    // 攜帶key的消息,key攜帶在消息頭中
    Message<String> msg = MessageBuilder
            .withPayload("我是boot帶key的消息")
            .setHeader(RocketMQHeaders.KEYS, "asfdsdghfjgh") // 攜帶key
            .build();
    rocketMQTemplate.syncSend("bootTestTopic:tagA", msg);
}

消費者,添加@Component和@RocketMQMessageListener注解,注解中指定消費者的具體配置,實現RocketMQListener接口,接口范型指定方法具體的參數類型,方法中對消息進行消費,方法沒有報錯消費成功,報錯則重試消費。

@Component
@RocketMQMessageListener(
        topic = "bootTestTopic",
        consumerGroup = "boot-test-consumer-group", // 消費者組,組內訂閱關系一致性
        consumeMode = ConsumeMode.CONCURRENTLY, // 消費模式,默認并發模式,消費順序消息使用順序模式
        maxReconsumeTimes = 5, // 消費重試次數
        selectorType = SelectorType.TAG, // 過濾類型,默認TAG
        selectorExpression = "tagA || tagB" // 過濾表達式,默認*全部
)
public class BootSimpleMsgListener implements RocketMQListener<MessageExt> {

    /**
     * 實現消費方法
     * 如果范型指定了固定類型的參數,那么消息體的內容就是該方法的參數
     * MessageExt是消息的所有內容
     * 方法沒有報錯消費成功,報錯就會重試
     * @param message
     */
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(message.getKeys()); // 獲取key
        System.out.println(new String(message.getBody()));
    }
}

原文鏈接:https://blog.csdn.net/m0_62129859/article/details/135005599

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新