日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

.NETCore基于RabbitMQ實現延時隊列的兩方法_實用技巧

作者:我贏了算我輸 ? 更新時間: 2022-11-15 編程語言

前言

此文章用來記錄自己學習延時隊列過程的文章,并用.NET這兩種方式實現了簡單的Demo。

延時隊列的應用場景 應用下單后,30分鐘沒有支付的話,則自動取消訂單活動開始前30分鐘,提醒參賽者參加活動?;顒咏Y束后,30分鐘后提醒未進行評價的參賽人員進行評價…

上述的場景都可以使用延時隊列進行對應的處理。

上面的場景雖說可以通過定時器也可以處理,但有點浪費資源, 而上述的場景時間是不定的,例如有兩個活動需要提醒參賽者參加,一個是7點開始 ,另一個是8點開始,那么觸發處理的一個是6點半,一個是7點半。

實現延時隊列的兩種方式

使用Rabbitmq實現延時隊列可以讓消息持久化,也支持分布式

? 缺點
第一種 第一種方式的缺陷以及解決方案
第二種 這個插件的當前設計并不真正適合具有大量延遲消息(例如成百上千或數百萬)的場景。詳情信息

利用rabbitmq死信隊列x-dead-letter-exchange和x-dead-letter-routing-key

實現需要創建兩對交換機和隊列,其中需要對其中一對的隊列進行設置x-dead-letter-exchange和x-dead-letter-routing-key屬性,屬性指定轉發到另一對的交換機,

隨后實現流程圖如下:

.NETCore實現方式

項目:.NET Core 控制臺項目

install-package RabbitMQ.Client

生產者代碼:

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //創建連接
            var connection = connectionFactory.CreateConnection();

            //創建通道
            var channl = connection.CreateModel();

           //指定隊列的x-dead-letter-exchange和x-dead-letter-routing-key
            Dictionary<string, object> queueArgs = new Dictionary<string, object>()
            {
                { "x-dead-letter-exchange","exchange.business.test" },
                {"x-dead-letter-routing-key","businessRoutingkey" }
            };

            //延時的交換機和隊列綁定
            channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);
            channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);
            channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");

            //業務的交換機和隊列綁定
            channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);
            channl.QueueDeclare("queue.business.test", true, false, false, null);
            channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);

            Console.WriteLine("生產者開始發送消息");
            while (true)
            {

                string message = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channl.CreateBasicProperties();
                properties.Persistent = true;
                properties.Expiration = "5000";
                //發送一條延時5秒的消息
                channl.BasicPublish("exchange.business.dlx", "", properties, body);

            }

消費者

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //創建連接
            var connection = connectionFactory.CreateConnection();

            var channel = connection.CreateModel();

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //給消費時添加一個委托
            consumer.Received += (obj, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                //打印消費的消息
                Console.WriteLine(message);
                channel.BasicAck(ea.DeliveryTag, false);
            };

            //消費queue.business.test隊列的消息
            channel.BasicConsume("queue.business.test", false, consumer);

            Console.ReadKey();
            channel.Dispose();
            connection.Close();

實現效果:

rabbitmq通過安裝插件的形式實現(推薦)

使用rabbitmq_delayed_message_exchange 插件提供的x-delayed-message類型的交換機

下載插件的地址:https://www.rabbitmq.com/community-plugins.html
選中rabbitmq_delayed_message_exchange插件

該插件使用只需要聲明交換機的時候,指定x-delayed-message類型,然后添加x-delayed-type參數即可

.NET Core 實現

生產者

            ConnectionFactory connectionFactory = new ConnectionFactory()
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            var connection = connectionFactory.CreateConnection();

            var channel = connection.CreateModel();

            Dictionary<string, object> exchangeArgs = new Dictionary<string, object>()
            {
                {"x-delayed-type","direct" }
            };

            //指定x-delayed-message 類型的交換機,并且添加x-delayed-type屬性
            channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs);

            channel.QueueDeclare("plug.delay.queue", true, false, false, null);

            channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay");

            var properties = channel.CreateBasicProperties();
            Console.WriteLine("生產者開始發送消息");
            Dictionary<string, object> headers = new Dictionary<string, object>()
            {
                {"x-delay","5000" }
            };
            properties.Persistent = true;
            properties.Headers = headers;
            while (true)
            {

                string message = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body);

            }

消費者:

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //創建連接
            var connection = connectionFactory.CreateConnection();

            var channel = connection.CreateModel();

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            consumer.Received += (obj, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine(message);
                channel.BasicAck(ea.DeliveryTag, false);
            };

            channel.BasicConsume("plug.delay.queue", false, consumer);

            Console.ReadKey();
            channel.Dispose();
            connection.Close();

實現效果:

第一種方式的缺陷以及解決方案

如果存在A、B消息進入了隊列中,A在前,B在后,如果B消息的過期時間比A的過期時間要早,消費的時候,并不會先消費B,再消費A,而是B會等A先消費,即使A要晚過期

舉例

生產者代碼修改成如下:

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //創建連接
            var connection = connectionFactory.CreateConnection();

            //創建通道
            var channl = connection.CreateModel();

            Dictionary<string, object> queueArgs = new Dictionary<string, object>()
            {
                { "x-dead-letter-exchange","exchange.business.test" },
                {"x-dead-letter-routing-key","businessRoutingkey" }
            };

            //延時的交換機和隊列綁定
            channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);
            channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);
            channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");

            //業務的交換機和隊列綁定
            channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);
            channl.QueueDeclare("queue.business.test", true, false, false, null);
            channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);

            string message1 = "Hello Word!1";
            string message2 = "Hello Word!2";
            var body1 = Encoding.UTF8.GetBytes(message1);
            var body2 = Encoding.UTF8.GetBytes(message2);
            var properties = channl.CreateBasicProperties();
            properties.Persistent = true;
            //先發送過期時間5秒的消息
            properties.Expiration = "5000";
            channl.BasicPublish("exchange.business.dlx", "", properties, body2);

            //再發送過期時間3秒的消息
            properties.Expiration = "3000";
            channl.BasicPublish("exchange.business.dlx", "", properties, body1);

結果:

這里先發了延時20秒的A消息,然后又發了延時10秒的B消息,但是最終結果并不是先消費了B消息,而是等A消息過期后,立刻再去消費B。

這個會影響什么業務呢?好比兩個C、D活動,C活動開始時間是7點,D活動開始時間是5點,那么D活動提醒需要等到C活動提醒后,才會立刻提醒,這明顯不符合我們的業務需求。

解決方案 每個活動都是單獨的創建自己的交換機和隊列使用第二種實現方式,即使用插件的形式。

第一種不太現實,因為如果活動多的話,則會創建很多的隊列,而且只會使用一次。

業務上還是推薦使用插件的實現方式。

第二種方式的效果

github地址:

https://github.com/MDZZ3/RabbitmqDelay

原文鏈接:https://blog.csdn.net/MDZZ666/article/details/120811287

欄目分類
最近更新