日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

RocketMQ消息過濾是如何實現的?

作者:Gimtom 更新時間: 2022-07-30 編程語言

三種消息過濾方式

RocketMQ的過濾方式主要分為如下2種,其中類模式過濾的方式會在RocketMQ 5.0.0版本中被移除,所以不進行詳細分析;
在這里插入圖片描述

根據Tag過濾

public class TagFilterProducer {

    public static final String RPODUCER_GROUP_NAME = "tagFilterProducerGroup";
    public static final String TOPIC_NAME = "tagFilterTopic";

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
        producer.setNamesrvAddr("192.168.199.138:9876");
        producer.setSendMsgTimeout(6000);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 12; i++) {
            Message message = new Message(TOPIC_NAME, tags[i % tags.length], ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        producer.shutdown();
    }
}

Producer端總共發送了12條數據。TagA,TagB,TagC的數據各4條。

public class TagFilterConsumer {

    public static final String CONSUMER_GROUP_NAME = "tagFilterConsumerGroup";

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP_NAME);
        consumer.setNamesrvAddr("192.168.199.138:9876");
        consumer.subscribe(TagFilterProducer.TOPIC_NAME, "TagA || TagC");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.printf("%s receive new message %s", Thread.currentThread().getName(), list);
                System.out.println();
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

Consumer端訂閱了TagA和TagC的數據,最終只接收到8條數據,TagA和TagC的數據各4條

根據SQL92語法過濾

在發送消息的時候,可以給消息設置很多擴展屬性,SQL92語法過濾就是根據這些擴展屬性來過濾消息的(其實tag也是擴展屬性的一種)

rocketmq默認是不支持sql92語法過濾的,需要在broker.conf中加入如下配置

enablePropertyFilter = true
filterSupportRetry = true
public class SqlFilterProducer {

    public static final String RPODUCER_GROUP_NAME = "sqlFilterProducerGroup";
    public static final String TOPIC_NAME = "sqlFilterTopic";

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
        producer.setNamesrvAddr("192.168.199.138:9876");
        producer.setSendMsgTimeout(6000);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 12; i++) {
            Message message = new Message(TOPIC_NAME, tags[i % tags.length], ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            message.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        producer.shutdown();
    }
}
public class SqlFilterConsumer {

    public static final String CONSUMER_GROUP_NAME = "sqlFilterConsumerGroup";

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP_NAME);
        consumer.setNamesrvAddr("192.168.199.138:9876");
        consumer.subscribe(SqlFilterProducer.TOPIC_NAME,
                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))"
                + "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.printf("%s receive new message %s \n", Thread.currentThread().getName(), list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

生產者發送了12條數據,消費者消費了2條數據(tag為TagA或TagB,屬性a的值在0到3之間)

原理解析

因為通過tag過濾的方式最常用,因此只分析tag過濾的實現過程。

如果讓你實現通過tag過濾你會在Consumer端進行過濾還是在Broker端進行過濾?

可能大多數人都會選擇在Broker端進行過濾,因為在Consumer端進行過濾,會有無用的消息被發送到Consumer端,造成帶寬浪費
在這里插入圖片描述
其實RocketMQ根據Tag過濾消息的方式需要Consumer端和Broker端一起來完成

  1. Broker端收到消息后,先根據ConsumerQueue中的tagCods(tag的hashcode)進行過濾,過濾完成后從CommitLog取出對應的消息值,然后返回給Consumer; (節省寬帶浪費))
  2. Consumer收到消息后再根據tag的具體值進行過濾,然后再開始消費消息(因為有可能2個tag的內容不同,但是hashcode相同,所以還需要根據內容再過濾一次)
    在這里插入圖片描述
    可能你會說那目前這種方式可能會有大量不匹配的消息發送到客戶端!

首先tag的hashcode沖突的概率比較低。其次,你完全可以在設置tagName的時候避免和之前tagName的hashcode產生沖突;

一個消費組訂閱一個主題下的不同tag,為什么會會丟消息?

  1. 消息隊列的負載均衡,一個消費端只會分配到一個或者多個隊列(負載均衡),在同一個時間,一個隊列只能分配到一個消費者;
  2. 同一個tag會分布在不同的隊列中,但消費者C1分配到的隊列為q0,q1,q0,q1中有taga和tagb的消息,但tagb的消息會被消費者C1過濾,但這部分消息,卻不會被C2消費,造成了消息丟失。
    在這里插入圖片描述

原文鏈接:https://blog.csdn.net/One_hundred_nice/article/details/126064738

欄目分類
最近更新