日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

docker啟動rabbitmq以及使用方式詳解_docker

作者:Maackia ? 更新時間: 2022-09-29 編程語言

搜索rabbitmq鏡像

docker search rabbitmq:management

下載鏡像

docker pull rabbitmq:management

啟動容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

打印容器

docker logs rabbitmq

訪問RabbitMQ Management

http://localhost:15672

賬戶密碼默認:guest

編寫生產者類

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        /**
         * 生成一個queue隊列
         * 1、隊列名稱 QUEUE_NAME
         * 2、隊列里面的消息是否持久化(默認消息存儲在內存中)
         * 3、該隊列是否只供一個Consumer消費 是否共享 設置為true可以多個消費者消費
         * 4、是否自動刪除 最后一個消費者斷開連接后 該隊列是否自動刪除
         * 5、其他參數
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "Hello world!";
        /**
         * 發送一個消息
         * 1、發送到哪個exchange交換機
         * 2、路由的key
         * 3、其他的參數信息
         * 4、消息體
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println(" [x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

運行該方法,可以看到控制臺的打印

name=hello的隊列收到Message

消費者

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setConnectionTimeout(600000);//milliseconds
        factory.setRequestedHeartbeat(60);//seconds
        factory.setHandshakeTimeout(6000);//milliseconds
        factory.setRequestedChannelMax(5);
        factory.setNetworkRecoveryInterval(500);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("Waiting for messages. ");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

工作隊列

RabbitMqUtils工具類

package com.xun.rabbitmqdemo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

啟動2個工作線程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
        };
        System.out.println("C1 消費者啟動等待消費....");
        /**
         * 消費者消費消息
         * 1、消費哪個隊列
         * 2、消費成功后是否自動應答
         * 3、消費的接口回調
         * 4、消費未成功的接口回調
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work02 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
        };
        System.out.println("C2 消費者啟動等待消費....");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

啟動工作線程

啟動發送線程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        try(Channel channel= RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //從控制臺接收消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("發送消息完成:"+message);
            }
        }
    }
}

啟動發送線程,此時發送線程等待鍵盤輸入

發送4個消息

可以看到2個工作線程按照順序分別接收message。

消息應答機制

rabbitmq將message發送給消費者后,就會將該消息標記為刪除。

但消費者在處理message過程中宕機,會導致消息的丟失。

因此需要設置手動應答。

生產者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        try(Channel channel = RabbitMqUtils.getChannel()){
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            Scanner scanner = new Scanner(System.in);
            System.out.println("請輸入信息");
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                System.out.println("生產者task02發出消息"+ message);
            }
        }
    }
}

消費者

package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work03 等待接收消息處理時間較短");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1、消息的標記tag
             * 2、是否批量應答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
        };
        //采用手動應答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work04 等待接收消息處理時間較長");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
        };
        //采用手動應答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

工具類SleepUtils

package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
    public static void sleep(int second){
        try{
            Thread.sleep(1000*second);
        }catch (InterruptedException _ignored){
            Thread.currentThread().interrupt();
        }
    }
}

模擬

work04等待30s后發出ack

在work04處理message時手動停止線程,可以看到message:dd被rabbitmq交給了work03

不公平分發

上面的輪詢分發,生產者依次向消費者按順序發送消息,但當消費者A處理速度很快,而消費者B處理速度很慢時,這種分發策略顯然是不合理的。
不公平分發:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通過此配置,當消費者未處理完當前消息,rabbitmq會優先將該message分發給空閑消費者。

總結?

原文鏈接:https://blog.csdn.net/qq_44402069/article/details/124303944

欄目分類
最近更新