網站首頁 編程語言 正文
搜索rabbitmq鏡像
docker search rabbitmq:management
下載鏡像
docker pull rabbitmq:management
啟動容器
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management
打印容器
docker logs rabbitmq
訪問RabbitMQ Management
http://localhost:15672
賬戶密碼默認:guest
編寫生產者類
package com.xun.rabbitmqdemo.example;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 生成一個queue隊列
* 1、隊列名稱 QUEUE_NAME
* 2、隊列里面的消息是否持久化(默認消息存儲在內存中)
* 3、該隊列是否只供一個Consumer消費 是否共享 設置為true可以多個消費者消費
* 4、是否自動刪除 最后一個消費者斷開連接后 該隊列是否自動刪除
* 5、其他參數
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "Hello world!";
/**
* 發送一個消息
* 1、發送到哪個exchange交換機
* 2、路由的key
* 3、其他的參數信息
* 4、消息體
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" [x] Sent '"+message+"'");
channel.close();
connection.close();
}
}
運行該方法,可以看到控制臺的打印
name=hello的隊列收到Message
消費者
package com.xun.rabbitmqdemo.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setConnectionTimeout(600000);//milliseconds
factory.setRequestedHeartbeat(60);//seconds
factory.setHandshakeTimeout(6000);//milliseconds
factory.setRequestedChannelMax(5);
factory.setNetworkRecoveryInterval(500);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("Waiting for messages. ");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
工作隊列
RabbitMqUtils工具類
package com.xun.rabbitmqdemo.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
啟動2個工作線程
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
public class Work01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
};
System.out.println("C1 消費者啟動等待消費....");
/**
* 消費者消費消息
* 1、消費哪個隊列
* 2、消費成功后是否自動應答
* 3、消費的接口回調
* 4、消費未成功的接口回調
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
public class Work02 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
};
System.out.println("C2 消費者啟動等待消費....");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
啟動工作線程
啟動發送線程
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
try(Channel channel= RabbitMqUtils.getChannel();){
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//從控制臺接收消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發送消息完成:"+message);
}
}
}
}
啟動發送線程,此時發送線程等待鍵盤輸入
發送4個消息
可以看到2個工作線程按照順序分別接收message。
消息應答機制
rabbitmq將message發送給消費者后,就會將該消息標記為刪除。
但消費者在處理message過程中宕機,會導致消息的丟失。
因此需要設置手動應答。
生產者
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
try(Channel channel = RabbitMqUtils.getChannel()){
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("請輸入信息");
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
System.out.println("生產者task02發出消息"+ message);
}
}
}
}
消費者
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;
public class Work03 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Work03 等待接收消息處理時間較短");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:"+message);
/**
* 1、消息的標記tag
* 2、是否批量應答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
};
//采用手動應答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;
public class Work04 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Work04 等待接收消息處理時間較長");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
};
//采用手動應答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
工具類SleepUtils
package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
public static void sleep(int second){
try{
Thread.sleep(1000*second);
}catch (InterruptedException _ignored){
Thread.currentThread().interrupt();
}
}
}
模擬
work04等待30s后發出ack
在work04處理message時手動停止線程,可以看到message:dd被rabbitmq交給了work03
不公平分發
上面的輪詢分發,生產者依次向消費者按順序發送消息,但當消費者A處理速度很快,而消費者B處理速度很慢時,這種分發策略顯然是不合理的。
不公平分發:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
通過此配置,當消費者未處理完當前消息,rabbitmq會優先將該message分發給空閑消費者。
總結?
原文鏈接:https://blog.csdn.net/qq_44402069/article/details/124303944
相關推薦
- 2023-08-15 (el-Form)操作(不使用 ts):Element-plus 中 Form 表單組件校驗規則等的
- 2023-05-22 python使用ctypes調用第三方庫時出現undefined?symbol錯誤詳解_python
- 2022-10-07 Pandas數據分析固定時間點和時間差_python
- 2022-08-15 gitlab代碼合并(merge request )取消 [默認刪除分支(Delete source
- 2022-07-03 C#并行編程之信號量_C#教程
- 2022-11-06 Golang安裝和使用protocol-buffer流程介紹_Golang
- 2023-06-03 C#利用后綴表達式解析計算字符串公式_C#教程
- 2022-03-16 .Net?6簡介并和之前版本寫法做對比_基礎應用
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支