網站首頁 編程語言 正文
前言
此文章用來記錄自己學習延時隊列過程的文章,并用.NET這兩種方式實現了簡單的Demo。
延時隊列的應用場景 應用下單后,30分鐘沒有支付的話,則自動取消訂單活動開始前30分鐘,提醒參賽者參加活動?;顒咏Y束后,30分鐘后提醒未進行評價的參賽人員進行評價…
上述的場景都可以使用延時隊列進行對應的處理。
上面的場景雖說可以通過定時器也可以處理,但有點浪費資源, 而上述的場景時間是不定的,例如有兩個活動需要提醒參賽者參加,一個是7點開始 ,另一個是8點開始,那么觸發處理的一個是6點半,一個是7點半。
實現延時隊列的兩種方式
使用Rabbitmq實現延時隊列可以讓消息持久化,也支持分布式
? | 缺點 |
---|---|
第一種 | 第一種方式的缺陷以及解決方案 |
第二種 | 這個插件的當前設計并不真正適合具有大量延遲消息(例如成百上千或數百萬)的場景。詳情信息 |
利用rabbitmq死信隊列x-dead-letter-exchange和x-dead-letter-routing-key
實現需要創建兩對交換機和隊列,其中需要對其中一對的隊列進行設置x-dead-letter-exchange和x-dead-letter-routing-key屬性,屬性指定轉發到另一對的交換機,
隨后實現流程圖如下:
.NETCore實現方式
項目:.NET Core 控制臺項目
install-package RabbitMQ.Client
生產者代碼:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創建連接 var connection = connectionFactory.CreateConnection(); //創建通道 var channl = connection.CreateModel(); //指定隊列的x-dead-letter-exchange和x-dead-letter-routing-key Dictionary<string, object> queueArgs = new Dictionary<string, object>() { { "x-dead-letter-exchange","exchange.business.test" }, {"x-dead-letter-routing-key","businessRoutingkey" } }; //延時的交換機和隊列綁定 channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null); channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs); channl.QueueBind("queue.business.dlx", "exchange.business.dlx", ""); //業務的交換機和隊列綁定 channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null); channl.QueueDeclare("queue.business.test", true, false, false, null); channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null); Console.WriteLine("生產者開始發送消息"); while (true) { string message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); var properties = channl.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "5000"; //發送一條延時5秒的消息 channl.BasicPublish("exchange.business.dlx", "", properties, body); }
消費者
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創建連接 var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //給消費時添加一個委托 consumer.Received += (obj, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); //打印消費的消息 Console.WriteLine(message); channel.BasicAck(ea.DeliveryTag, false); }; //消費queue.business.test隊列的消息 channel.BasicConsume("queue.business.test", false, consumer); Console.ReadKey(); channel.Dispose(); connection.Close();
實現效果:
rabbitmq通過安裝插件的形式實現(推薦)
使用rabbitmq_delayed_message_exchange
插件提供的x-delayed-message
類型的交換機
下載插件的地址:https://www.rabbitmq.com/community-plugins.html
選中rabbitmq_delayed_message_exchange插件
該插件使用只需要聲明交換機的時候,指定x-delayed-message
類型,然后添加x-delayed-type
參數即可
.NET Core 實現
生產者
ConnectionFactory connectionFactory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); Dictionary<string, object> exchangeArgs = new Dictionary<string, object>() { {"x-delayed-type","direct" } }; //指定x-delayed-message 類型的交換機,并且添加x-delayed-type屬性 channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs); channel.QueueDeclare("plug.delay.queue", true, false, false, null); channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay"); var properties = channel.CreateBasicProperties(); Console.WriteLine("生產者開始發送消息"); Dictionary<string, object> headers = new Dictionary<string, object>() { {"x-delay","5000" } }; properties.Persistent = true; properties.Headers = headers; while (true) { string message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body); }
消費者:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創建連接 var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (obj, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine(message); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume("plug.delay.queue", false, consumer); Console.ReadKey(); channel.Dispose(); connection.Close();
實現效果:
第一種方式的缺陷以及解決方案
如果存在A、B消息進入了隊列中,A在前,B在后,如果B消息的過期時間比A的過期時間要早,消費的時候,并不會先消費B,再消費A,而是B會等A先消費,即使A要晚過期
舉例
生產者代碼修改成如下:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創建連接 var connection = connectionFactory.CreateConnection(); //創建通道 var channl = connection.CreateModel(); Dictionary<string, object> queueArgs = new Dictionary<string, object>() { { "x-dead-letter-exchange","exchange.business.test" }, {"x-dead-letter-routing-key","businessRoutingkey" } }; //延時的交換機和隊列綁定 channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null); channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs); channl.QueueBind("queue.business.dlx", "exchange.business.dlx", ""); //業務的交換機和隊列綁定 channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null); channl.QueueDeclare("queue.business.test", true, false, false, null); channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null); string message1 = "Hello Word!1"; string message2 = "Hello Word!2"; var body1 = Encoding.UTF8.GetBytes(message1); var body2 = Encoding.UTF8.GetBytes(message2); var properties = channl.CreateBasicProperties(); properties.Persistent = true; //先發送過期時間5秒的消息 properties.Expiration = "5000"; channl.BasicPublish("exchange.business.dlx", "", properties, body2); //再發送過期時間3秒的消息 properties.Expiration = "3000"; channl.BasicPublish("exchange.business.dlx", "", properties, body1);
結果:
這里先發了延時20秒的A消息,然后又發了延時10秒的B消息,但是最終結果并不是先消費了B消息,而是等A消息過期后,立刻再去消費B。
這個會影響什么業務呢?好比兩個C、D活動,C活動開始時間是7點,D活動開始時間是5點,那么D活動提醒需要等到C活動提醒后,才會立刻提醒,這明顯不符合我們的業務需求。
解決方案 每個活動都是單獨的創建自己的交換機和隊列使用第二種實現方式,即使用插件的形式。
第一種不太現實,因為如果活動多的話,則會創建很多的隊列,而且只會使用一次。
業務上還是推薦使用插件的實現方式。
第二種方式的效果
github地址:
https://github.com/MDZZ3/RabbitmqDelay
原文鏈接:https://blog.csdn.net/MDZZ666/article/details/120811287
相關推薦
- 2022-09-15 C#?List的賦值問題的解決_C#教程
- 2023-06-04 pandas.DataFrame中提取特定類型dtype的列_python
- 2022-01-30 取消radio的選中狀態
- 2022-06-29 Oracle中的常用函數詳解_oracle
- 2022-09-07 Android組件化、插件化詳細講解_Android
- 2022-07-23 Python實現環形鏈表_python
- 2022-08-15 ArrayList和LinkedList和Vector的區別
- 2024-07-14 關于Module中在junit測試方法和非測試方法中獲取相對路徑不一致的問題
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支