網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
Docker啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的詳細(xì)過(guò)程_docker
作者:zoeil ? 更新時(shí)間: 2023-06-04 編程語(yǔ)言一、Docker拉取鏡像并啟動(dòng)RabbitMQ
拉取鏡像
docker pull rabbitmq:3.8.8-management
查看鏡像
docker images rabbitmq
?啟動(dòng)鏡像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management
Linux虛擬機(jī)記得開放5672端口或者關(guān)閉防火墻,在window通過(guò) 主機(jī)ip:15672 訪問(wèn)rabbitmq控制臺(tái)
?用戶名密碼默認(rèn)為guest
二、Hello World
(一)依賴導(dǎo)入
<!--指定 jdk 編譯版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依賴客戶端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一個(gè)依賴-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
(二)消息生產(chǎn)者
工作原理
- Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker
- Connection:publisher/consumer 和 broker 之間的 TCP 連接
- Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection 的開銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè) thread 創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開銷
- Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最終被送到這里等待 consumer 取走
我們需要先獲取連接(Connection),然后通過(guò)連接獲取信道(Channel),這里我們演示簡(jiǎn)單例子,可以直接跳過(guò)交換機(jī)(Exchange)發(fā)送隊(duì)列(Queue)
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置主機(jī)ip
factory.setHost("182.92.234.71");
// 設(shè)置用戶名
factory.setUsername("guest");
// 設(shè)置密碼
factory.setPassword("guest");
//channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
Connection connection = factory.newConnection();
// 獲取信道
Channel channel = connection.createChannel();
/*
* 生成一個(gè)隊(duì)列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)
* 1.隊(duì)列名稱
* 2.隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中
* 3.該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi)
* 4.是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除
* 5.其他參數(shù)
**/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello rabbitmq";
/*
* 發(fā)送一個(gè)消息
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 1.發(fā)送到哪個(gè)交換機(jī)
* 2.路由的key是哪個(gè)
* 3.其他的參數(shù)信息
* 4.發(fā)送消息的消息體
*
**/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("發(fā)送成功");
}
}
(三)消息消費(fèi)者
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置主機(jī)ip
factory.setHost("182.92.234.71");
// 設(shè)置用戶名
factory.setUsername("guest");
// 設(shè)置密碼
factory.setPassword("guest");
//channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
Connection connection = factory.newConnection();
// 獲取信道
Channel channel = connection.createChannel();
// 推送的消息如何進(jìn)行消費(fèi)的回調(diào)接口
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
// 取消消費(fèi)的一個(gè)回調(diào)接口,如在消費(fèi)的時(shí)候隊(duì)列被刪除了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消費(fèi)被中斷");
};
/*
* 消費(fèi)者消費(fèi)消息
* basicConsume(String queue, boolean autoAck,
* DeliverCallback deliverCallback, CancelCallback cancelCallback)
* 1.消費(fèi)哪個(gè)隊(duì)列
* 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答
* 3.消費(fèi)者未成功消費(fèi)的回調(diào)
**/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
三、實(shí)現(xiàn)輪訓(xùn)分發(fā)消息
(一)抽取工具類
可以發(fā)現(xiàn),上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個(gè)工具類來(lái)調(diào)用,并使用單例模式-餓漢式完成信道的初始化
public class RabbitMqUtils {
private static Channel channel;
static {
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置ip地址
factory.setHost("192.168.23.100");
// 設(shè)置用戶名
factory.setUsername("guest");
// 設(shè)置密碼
factory.setPassword("guest");
try {
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 獲取信道
channel = connection.createChannel();
} catch (Exception e) {
System.out.println("創(chuàng)建信道失敗,錯(cuò)誤信息:" + e.getMessage());
}
}
public static Channel getChannel() {
return channel;
}
}
(二)啟動(dòng)兩個(gè)工作線程
相當(dāng)于前面的消費(fèi)者,我們只需要寫一個(gè)類,通過(guò)ideal實(shí)現(xiàn)多線程啟動(dòng)即可模擬兩個(gè)線程
public class Worker01 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = ( consumerTag, message) -> {
System.out.println("接受到消息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = (cunsumerTag) -> {
System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
// 啟動(dòng)兩次,第一次為C1, 第二次為C2
System.out.println("C2消費(fèi)者等待消費(fèi)消息");
channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
}
}
(三)啟動(dòng)發(fā)送線程
public class Test01 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 通過(guò)控制臺(tái)輸入充當(dāng)消息,使輪訓(xùn)演示更明顯
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
System.out.println("消息發(fā)送完成:" + message);
}
}
}
結(jié)果?
四、實(shí)現(xiàn)手動(dòng)應(yīng)答
(一)消息應(yīng)答概念
消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成 了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消 息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù) 發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。 為了保證消息在發(fā)送過(guò)程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi)者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。
自動(dòng)應(yīng)答:消費(fèi)者發(fā)送后立即被認(rèn)為已經(jīng)傳送成功。這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。
當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息, 沒有對(duì)傳遞的消息數(shù)量進(jìn)行限制 , 當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終 使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并 以某種速率能夠處理這些消息的情況下使用 。
手動(dòng)應(yīng)答:消費(fèi)者接受到消息并順利完成業(yè)務(wù)后再調(diào)用方法進(jìn)行確認(rèn),rabbitmq 才可以把該消息刪除
(二)消息應(yīng)答的方法
- Channel.basicAck(用于肯定確認(rèn))
- RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
- Channel.basicNack(用于否定確認(rèn))
- Channel.basicReject(用于否定確認(rèn))
- 與 Channel.basicNack 相比少一個(gè)參數(shù)Multiple
- multiple 的 true 和 false 代表不同意思
? ? ? ? true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
? ? ? ? 比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)
? ? ? ? 5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答
? ? ? ? false 同上面相比
? ? ? ? 只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答
- 不處理該消息了直接拒絕,可以將其丟棄了
(三)消息自動(dòng)重新入隊(duì)?
如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確 保不會(huì)丟失任何消息。
(四)消息手動(dòng)應(yīng)答代碼?
1、生產(chǎn)者
public class Test01 {
private final static String QUEUE_NAME = "ack";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
System.out.println("消息發(fā)送完成:" + message);
}
}
}
2、睡眠工具類模擬業(yè)務(wù)執(zhí)行
public class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000 * second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
3、消費(fèi)者
public class Worker01 {
private final static String QUEUE_NAME = "ack";
public static void main(String[] args) throws Exception {
System.out.println("C1,業(yè)務(wù)時(shí)間短");
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = ( consumerTag, message) -> {
SleepUtils.sleep(1); // 模擬業(yè)務(wù)執(zhí)行1秒
System.out.println("接受到消息:" + new String(message.getBody()));
/*
* 1、消息標(biāo)識(shí)
* 2、是否啟動(dòng)批量確認(rèn),false:否。
* 啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
* 時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (cunsumerTag) -> {
System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
}
}
==============================================================================
public class Worker02 {
private final static String QUEUE_NAME = "ack";
public static void main(String[] args) throws Exception {
System.out.println("C2,業(yè)務(wù)時(shí)間長(zhǎng)");
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = ( consumerTag, message) -> {
SleepUtils.sleep(15); // 模擬業(yè)務(wù)執(zhí)行15秒
System.out.println("接受到消息:" + new String(message.getBody()));
/*
* 1、消息標(biāo)識(shí)
* 2、是否啟動(dòng)批量確認(rèn),false:否。
* 啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
* 時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (cunsumerTag) -> {
System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
}
}
worker01業(yè)務(wù)時(shí)間短,worker02業(yè)務(wù)時(shí)間長(zhǎng),我們提前終止worker02模擬出異常,可以看到消息dd會(huì)被放回隊(duì)列由worker01接收處理。
注意:這里需要先啟動(dòng)生產(chǎn)者聲明隊(duì)列ack,不然啟動(dòng)消費(fèi)者會(huì)報(bào)錯(cuò)
最后一個(gè)案例我們可以看到消息輪訓(xùn)+消息自動(dòng)重新入隊(duì)+手動(dòng)應(yīng)答。
原文鏈接:https://blog.csdn.net/m0_62946761/article/details/129168455
- 上一篇:沒有了
- 下一篇:沒有了
相關(guān)推薦
- 2022-08-16 C/C++函數(shù)的調(diào)用約定的使用_C 語(yǔ)言
- 2022-07-16 構(gòu)建npm配置包
- 2022-10-25 C++中命名空間(namespace)詳解及其作用介紹_C 語(yǔ)言
- 2022-06-02 slf4j Logger使用{}占位符輸出日志
- 2023-04-07 React?Mobx狀態(tài)管理工具的使用_React
- 2024-03-07 SpringAOP對(duì)獲取Bean的影響理解
- 2022-09-27 React?Native?中限制導(dǎo)入某些組件和模塊的方法_React
- 2022-01-30 判斷element Ui 表格(el-table)中復(fù)選框中的選中狀態(tài)
- 欄目分類
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支