日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

RabbitMq工作模式深度剖析與Spring整合MQ

作者:bingtanghulu_6 更新時間: 2022-05-11 編程語言

書接上回:RabbitMq概述與工作模式深度剖析_bingtanghulu_6的博客-CSDN博客

工作模式代碼已經發(fā)到gitee倉庫,需要的自取。代碼路徑:https://gitee.com/GengHongBo/rabbit-mqtest.git

1.?Publish/Subscribe(發(fā)布-訂閱模式)

發(fā)布訂閱模式對象:發(fā)布者,交換機,隊列,消費者1,消費者2。多個隊列綁定交換機,交換機會給每個隊列發(fā)送消息。

使用場景:up主跟粉絲訂閱消息的場景;天氣預報跟百度,微博等天氣專欄的關系。

?1.1 發(fā)布-訂閱模式-實戰(zhàn)創(chuàng)建

為了節(jié)省時間只放了一些關鍵代碼和截圖,以后的都是這樣。需要的可以去開頭的git倉庫自取。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;
import java.util.Scanner;

/**
 * 發(fā)布訂閱模式:新建一個TCP長連接綁定交換機
 */
public class ExchangeTest {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        String str = new Scanner(System.in).next();

        //綁定交換機
        //RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
        channel.basicPublish(RabbitMQConstant.EXCHANGE_PUBSUB,"",null,str.getBytes());

        channel.close();
        connection.close();


    }
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消費者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //設置連接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_PUBSUB,false,false,false,null);

        //隊列綁定交換機
        //RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_PUBSUB,RabbitMQConstant.EXCHANGE_PUBSUB,"");

        channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息

        //消費消息
        //"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
        channel.basicConsume(RabbitMQConstant.QUEUE_PUBSUB,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

?

?

?2.routing(路由模式)

路由模式對象:生產者,交換機,路由key,隊列,消費者1,消費2。。。,通過隊列綁定消費者并且指定唯一路由key的形式指定消費固定路由key的消息。

?2.1 routing實戰(zhàn)

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.util.*;

/**
 * 路由模式:新建一個TCP長連接綁定交換機
 */
public class ExchangeTest {

    public static void main(String[] args) throws Exception {


        HashMap map = new HashMap();
        map.put("key1","test1");
        map.put("key2","test2");
        map.put("key3","test3");
        map.put("key4","test4");
        map.put("key5","test5");
        map.put("key6","test6");
        map.put("key7","test7");


        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        Iterator> iterator = map.entrySet().iterator();
        while(iterator.hasNext()){
            Map.Entry entry = iterator.next();
            //綁定交換機
            //RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
            channel.basicPublish(RabbitMQConstant.EXCHANGE_ROUTING,entry.getKey(),null,entry.getKey().getBytes());
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消費者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //設置連接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);

        //隊列綁定交換機
        //RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key1");
        channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key2");

        channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息

        //消費消息
        //"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
        channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

?

?3.?Topics-通配符模式

通配符模式:與上面的類似。

應用場景:當routingkey過多時,會出現(xiàn)很多類似的代碼,使用通配符模式可以減少代碼量。#代表一個或多個詞,*代表一個詞。

?3.1 topic-通配符模式實戰(zhàn)

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * 通配符模式:新建一個TCP長連接綁定交換機
 */
public class ExchangeTest {

    public static void main(String[] args) throws Exception {


        HashMap map = new HashMap();
        map.put("key.key1.key.key","test1");
        map.put("key.key2.key.key","test2");
        map.put("key.key3.key.key","test3");
        map.put("com.test.key","com.test1");
        map.put("com.test.key1","com.test2");
        map.put("com.test.key2","com.test3");
        map.put("com.test.key3","com.test4");


        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        Iterator> iterator = map.entrySet().iterator();
        while(iterator.hasNext()){
            Map.Entry entry = iterator.next();
            //綁定交換機
            //RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
            channel.basicPublish(RabbitMQConstant.EXCHANGE_TOPICS,entry.getKey(),null,entry.getKey().getBytes());
        }

        channel.close();
        connection.close();
    }
}

import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消費者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //設置連接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);

        //隊列綁定交換機
        //RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_TOPICS,"key.#");

        channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息

        //消費消息
        //"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
        channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

4. rabbitMq消息確認機制

當生產者傳送消息到broker以后會出現(xiàn)兩種情況:confirm和return。confirm是確定有隊列的情況下,消費者確認或者不確認的情況。return是消息到達broker以后找不到隊列而發(fā)生退回的情況。

4.1 消息確認機制-實戰(zhàn)

import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * 消息確認機制案例
 */
public class ConfirmTest {

    public static void main(String[] args) throws Exception {


        HashMap map = new HashMap();
        map.put("key.key1.key.key","test1");
        map.put("key.key2.key.key","test2");
        map.put("key.key3.key.key","test3");
        map.put("com.test.key","com.test1");
        map.put("com.test.key1","com.test2");
        map.put("com.test.key2","com.test3");
        map.put("com.test.key3","com.test4");
        map.put("erro.key","com.erro");
        map.put("erro.key1","com.erro1");
        map.put("erro.key3","com.erro2");


        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        //開啟監(jiān)聽機制
        channel.confirmSelect();
        //確認監(jiān)聽機制
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {//確認返回
                System.out.println("接受到消息,參數:"+l+",b:"+b);
            }

            @Override
            public void handleNack(long l, boolean b) throws IOException {//拒絕消息
                System.out.println("接受到消息拒收消息,參數:"+l+",b:"+b);
            }
        });
        //返回監(jiān)聽機制
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return r) {
                System.err.println("===========================");
                System.err.println("Return編碼:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
                System.err.println("交換機:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
                System.err.println("Return主題:" + new String(r.getBody()));
                System.err.println("===========================");
            }
        });

        Iterator> iterator = map.entrySet().iterator();
        while(iterator.hasNext()){
            Map.Entry entry = iterator.next();
            //綁定交換機
            //RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
            channel.basicPublish(RabbitMQConstant.EXCHANGE_CONFIRM,entry.getKey(),null,entry.getKey().getBytes());
        }

        //在消息確認機制下我們不能關閉長連接
//        channel.close();
//        connection.close();
    }
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;

import java.io.IOException;

/**
 * 消費者
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception {

        //設置連接信息
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstant.QUEUE_CONFIRM1,false,false,false,null);

        //隊列綁定交換機
        //RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
        channel.queueBind(RabbitMQConstant.QUEUE_CONFIRM1,RabbitMQConstant.EXCHANGE_CONFIRM,"key.#");

        channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息

        //消費消息
        //"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
        channel.basicConsume(RabbitMQConstant.QUEUE_CONFIRM1,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1接收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

5.spring整合rabbitMQ

分兩個工程:生產者(spring-rabbit-consumerTest),消費者(spring-rabbit-consumerTest)

生產者:添加pom.xml依賴,配置文件整合,測試案例編寫

消費者:添加pom.xml依賴,配置文件整合,增加監(jiān)聽器,啟動監(jiān)聽

原文鏈接:https://blog.csdn.net/qq_21575929/article/details/123780207

欄目分類
最近更新