網站首頁 編程語言 正文
三種消息過濾方式
RocketMQ的過濾方式主要分為如下2種,其中類模式過濾的方式會在RocketMQ 5.0.0版本中被移除,所以不進行詳細分析;
根據Tag過濾
public class TagFilterProducer {
public static final String RPODUCER_GROUP_NAME = "tagFilterProducerGroup";
public static final String TOPIC_NAME = "tagFilterTopic";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
producer.setNamesrvAddr("192.168.199.138:9876");
producer.setSendMsgTimeout(6000);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 12; i++) {
Message message = new Message(TOPIC_NAME, tags[i % tags.length], ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
producer.shutdown();
}
}
Producer端總共發送了12條數據。TagA,TagB,TagC的數據各4條。
public class TagFilterConsumer {
public static final String CONSUMER_GROUP_NAME = "tagFilterConsumerGroup";
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP_NAME);
consumer.setNamesrvAddr("192.168.199.138:9876");
consumer.subscribe(TagFilterProducer.TOPIC_NAME, "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s receive new message %s", Thread.currentThread().getName(), list);
System.out.println();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
Consumer端訂閱了TagA和TagC的數據,最終只接收到8條數據,TagA和TagC的數據各4條
根據SQL92語法過濾
在發送消息的時候,可以給消息設置很多擴展屬性,SQL92語法過濾就是根據這些擴展屬性來過濾消息的(其實tag也是擴展屬性的一種)
rocketmq默認是不支持sql92語法過濾的,需要在broker.conf中加入如下配置
enablePropertyFilter = true
filterSupportRetry = true
public class SqlFilterProducer {
public static final String RPODUCER_GROUP_NAME = "sqlFilterProducerGroup";
public static final String TOPIC_NAME = "sqlFilterTopic";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
producer.setNamesrvAddr("192.168.199.138:9876");
producer.setSendMsgTimeout(6000);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 12; i++) {
Message message = new Message(TOPIC_NAME, tags[i % tags.length], ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
message.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
producer.shutdown();
}
}
public class SqlFilterConsumer {
public static final String CONSUMER_GROUP_NAME = "sqlFilterConsumerGroup";
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP_NAME);
consumer.setNamesrvAddr("192.168.199.138:9876");
consumer.subscribe(SqlFilterProducer.TOPIC_NAME,
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))"
+ "and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s receive new message %s \n", Thread.currentThread().getName(), list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
生產者發送了12條數據,消費者消費了2條數據(tag為TagA或TagB,屬性a的值在0到3之間)
原理解析
因為通過tag過濾的方式最常用,因此只分析tag過濾的實現過程。
如果讓你實現通過tag過濾你會在Consumer端進行過濾還是在Broker端進行過濾?
可能大多數人都會選擇在Broker端進行過濾,因為在Consumer端進行過濾,會有無用的消息被發送到Consumer端,造成帶寬浪費
其實RocketMQ根據Tag過濾消息的方式需要Consumer端和Broker端一起來完成
- Broker端收到消息后,先根據ConsumerQueue中的tagCods(tag的hashcode)進行過濾,過濾完成后從CommitLog取出對應的消息值,然后返回給Consumer; (節省寬帶浪費))
- Consumer收到消息后再根據tag的具體值進行過濾,然后再開始消費消息(因為有可能2個tag的內容不同,但是hashcode相同,所以還需要根據內容再過濾一次)
可能你會說那目前這種方式可能會有大量不匹配的消息發送到客戶端!
首先tag的hashcode沖突的概率比較低。其次,你完全可以在設置tagName的時候避免和之前tagName的hashcode產生沖突;
一個消費組訂閱一個主題下的不同tag,為什么會會丟消息?
- 消息隊列的負載均衡,一個消費端只會分配到一個或者多個隊列(負載均衡),在同一個時間,一個隊列只能分配到一個消費者;
- 同一個tag會分布在不同的隊列中,但消費者C1分配到的隊列為q0,q1,q0,q1中有taga和tagb的消息,但tagb的消息會被消費者C1過濾,但這部分消息,卻不會被C2消費,造成了消息丟失。
原文鏈接:https://blog.csdn.net/One_hundred_nice/article/details/126064738
- 上一篇:HadoopWindows下客戶端環境配置
- 下一篇:@Autowired實現的原理
相關推薦
- 2022-02-09 Qt5連接并操作PostgreSQL數據庫的實現示例_C 語言
- 2022-07-10 node支持ES6模塊化練習
- 2023-01-05 詳解C++中四種類型的轉換_C 語言
- 2023-04-18 Python實現常見的4種坐標互相轉換_python
- 2022-09-09 python中對開區間和閉區間的理解_python
- 2022-12-29 Python利用tkinter實現一個簡易番茄鐘的示例代碼_python
- 2022-07-08 go語言代碼生成器code?generator使用示例介紹_Golang
- 2022-04-27 .Net?Core中使用MongoDB搭建集群與項目實戰_基礎應用
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支