網站首頁 編程語言 正文
書接上回:RabbitMq概述與工作模式深度剖析_bingtanghulu_6的博客-CSDN博客
工作模式代碼已經發(fā)到gitee倉庫,需要的自取。代碼路徑:https://gitee.com/GengHongBo/rabbit-mqtest.git
1.?Publish/Subscribe(發(fā)布-訂閱模式)
發(fā)布訂閱模式對象:發(fā)布者,交換機,隊列,消費者1,消費者2。多個隊列綁定交換機,交換機會給每個隊列發(fā)送消息。
使用場景:up主跟粉絲訂閱消息的場景;天氣預報跟百度,微博等天氣專欄的關系。
?1.1 發(fā)布-訂閱模式-實戰(zhàn)創(chuàng)建
為了節(jié)省時間只放了一些關鍵代碼和截圖,以后的都是這樣。需要的可以去開頭的git倉庫自取。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
import java.util.Scanner;
/**
* 發(fā)布訂閱模式:新建一個TCP長連接綁定交換機
*/
public class ExchangeTest {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
String str = new Scanner(System.in).next();
//綁定交換機
//RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicPublish(RabbitMQConstant.EXCHANGE_PUBSUB,"",null,str.getBytes());
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消費者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//設置連接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_PUBSUB,false,false,false,null);
//隊列綁定交換機
//RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
channel.queueBind(RabbitMQConstant.QUEUE_PUBSUB,RabbitMQConstant.EXCHANGE_PUBSUB,"");
channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息
//消費消息
//"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicConsume(RabbitMQConstant.QUEUE_PUBSUB,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
?
?
?2.routing(路由模式)
路由模式對象:生產者,交換機,路由key,隊列,消費者1,消費2。。。,通過隊列綁定消費者并且指定唯一路由key的形式指定消費固定路由key的消息。
?2.1 routing實戰(zhàn)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.util.*;
/**
* 路由模式:新建一個TCP長連接綁定交換機
*/
public class ExchangeTest {
public static void main(String[] args) throws Exception {
HashMap map = new HashMap();
map.put("key1","test1");
map.put("key2","test2");
map.put("key3","test3");
map.put("key4","test4");
map.put("key5","test5");
map.put("key6","test6");
map.put("key7","test7");
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
Iterator> iterator = map.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry entry = iterator.next();
//綁定交換機
//RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicPublish(RabbitMQConstant.EXCHANGE_ROUTING,entry.getKey(),null,entry.getKey().getBytes());
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消費者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//設置連接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);
//隊列綁定交換機
//RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key1");
channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_ROUTING,"key2");
channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息
//消費消息
//"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
?
?3.?Topics-通配符模式
通配符模式:與上面的類似。
應用場景:當routingkey過多時,會出現(xiàn)很多類似的代碼,使用通配符模式可以減少代碼量。#代表一個或多個詞,*代表一個詞。
?3.1 topic-通配符模式實戰(zhàn)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 通配符模式:新建一個TCP長連接綁定交換機
*/
public class ExchangeTest {
public static void main(String[] args) throws Exception {
HashMap map = new HashMap();
map.put("key.key1.key.key","test1");
map.put("key.key2.key.key","test2");
map.put("key.key3.key.key","test3");
map.put("com.test.key","com.test1");
map.put("com.test.key1","com.test2");
map.put("com.test.key2","com.test3");
map.put("com.test.key3","com.test4");
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
Iterator> iterator = map.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry entry = iterator.next();
//綁定交換機
//RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicPublish(RabbitMQConstant.EXCHANGE_TOPICS,entry.getKey(),null,entry.getKey().getBytes());
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消費者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//設置連接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_ROUTING1,false,false,false,null);
//隊列綁定交換機
//RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
channel.queueBind(RabbitMQConstant.QUEUE_ROUTING1,RabbitMQConstant.EXCHANGE_TOPICS,"key.#");
channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息
//消費消息
//"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicConsume(RabbitMQConstant.QUEUE_ROUTING1,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
4. rabbitMq消息確認機制
當生產者傳送消息到broker以后會出現(xiàn)兩種情況:confirm和return。confirm是確定有隊列的情況下,消費者確認或者不確認的情況。return是消息到達broker以后找不到隊列而發(fā)生退回的情況。
4.1 消息確認機制-實戰(zhàn)
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 消息確認機制案例
*/
public class ConfirmTest {
public static void main(String[] args) throws Exception {
HashMap map = new HashMap();
map.put("key.key1.key.key","test1");
map.put("key.key2.key.key","test2");
map.put("key.key3.key.key","test3");
map.put("com.test.key","com.test1");
map.put("com.test.key1","com.test2");
map.put("com.test.key2","com.test3");
map.put("com.test.key3","com.test4");
map.put("erro.key","com.erro");
map.put("erro.key1","com.erro1");
map.put("erro.key3","com.erro2");
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//開啟監(jiān)聽機制
channel.confirmSelect();
//確認監(jiān)聽機制
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {//確認返回
System.out.println("接受到消息,參數:"+l+",b:"+b);
}
@Override
public void handleNack(long l, boolean b) throws IOException {//拒絕消息
System.out.println("接受到消息拒收消息,參數:"+l+",b:"+b);
}
});
//返回監(jiān)聽機制
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return r) {
System.err.println("===========================");
System.err.println("Return編碼:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
System.err.println("交換機:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
System.err.println("Return主題:" + new String(r.getBody()));
System.err.println("===========================");
}
});
Iterator> iterator = map.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry entry = iterator.next();
//綁定交換機
//RabbitMQConstant.EXCHANGE_PUBSUB:交換機;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicPublish(RabbitMQConstant.EXCHANGE_CONFIRM,entry.getKey(),null,entry.getKey().getBytes());
}
//在消息確認機制下我們不能關閉長連接
// channel.close();
// connection.close();
}
}
import com.rabbitmq.client.*;
import util.RabbitMQConstant;
import util.RabbitMQUtil;
import java.io.IOException;
/**
* 消費者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//設置連接信息
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConstant.QUEUE_CONFIRM1,false,false,false,null);
//隊列綁定交換機
//RabbitMQConstant.QUEUE_PUBSUB:隊列名稱;RabbitMQConstant.EXCHANGE_PUBSUB:交換機名稱;"":路由key(暫時用不到)
channel.queueBind(RabbitMQConstant.QUEUE_CONFIRM1,RabbitMQConstant.EXCHANGE_CONFIRM,"key.#");
channel.basicQos(1);//消費完一個消息再去隊列拿其他的消息
//消費消息
//"":交換機,helloworld模式不使用;queueName:隊列名稱;null:額外的設置屬性;消息內容
channel.basicConsume(RabbitMQConstant.QUEUE_CONFIRM1,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
5.spring整合rabbitMQ
分兩個工程:生產者(spring-rabbit-consumerTest),消費者(spring-rabbit-consumerTest)
生產者:添加pom.xml依賴,配置文件整合,測試案例編寫
消費者:添加pom.xml依賴,配置文件整合,增加監(jiān)聽器,啟動監(jiān)聽
原文鏈接:https://blog.csdn.net/qq_21575929/article/details/123780207
相關推薦
- 2022-11-30 Golang迭代如何在Go中循環(huán)數據結構使用詳解_Golang
- 2022-07-02 基于np.arange與np.linspace細微區(qū)別(數據溢出問題)_python
- 2023-04-01 pytorch和numpy默認浮點類型位數詳解_python
- 2022-03-06 SQLServer批量插入數據的三種方式及性能對比_C#教程
- 2022-09-23 Windows?10搭建FTP服務器圖文教程_FTP服務器
- 2022-09-07 Python?CSV?文件解析和生成方法示例_python
- 2023-07-14 echarts圖表進度條類型圖
- 2022-08-14 Android?中TextureView和SurfaceView的屬性方法及示例說明_Android
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學習環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支