網站首頁 編程語言 正文
一、簡介:
RabbitMq 是實現了高級消息隊列協議(AMQP)的開源消息代理中間件。消息隊列是一種應用程序對應用程序的通行方式,應用程序通過寫消息,將消息傳遞于隊列,由另一應用程序讀取 完成通信。而作為中間件的 RabbitMq 無疑是目前最流行的消息隊列之一。
RabbitMq 應用場景廣泛:
- 系統的高可用:日常生活當中各種商城秒殺,高流量,高并發的場景。當服務器接收到如此大量請求處理業務時,有宕機的風險。某些業務可能極其復雜,但這部分不是高時效性,不需要立即反饋給用戶,我們可以將這部分處理請求拋給隊列,讓程序后置去處理,減輕服務器在高并發場景下的壓力。
- 分布式系統,集成系統,子系統之間的對接,以及架構設計中常常需要考慮消息隊列的應用。
二、RabbitMq 生產和消費
生產者(producter):隊列消息的產生者,負責生產消息,并將消息傳入隊列
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼
# 虛擬隊列需要指定參數 virtual_host,如果是默認的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創建
result = channel.queue_declare(queue = 'python-test')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向隊列插入數值 routing_key是隊列名
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
print(message)
connection.close()
消費者(consumer):隊列消息的接收者,負責 接收并處理 消息隊列中的消息
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息隊列,消息在這個隊列傳遞,如果不存在,則創建隊列
channel.queue_declare(queue = 'python-test', durable = False)
# 定義一個回調函數來處理消息隊列中的消息,這里是打印出來
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
# 告訴rabbitmq,用callback來接收消息
channel.basic_consume('python-test',callback)
# 開始接收信息,并進入阻塞狀態,隊列里有信息才會調用callback進行處理
channel.start_consuming()
三、RabbitMq 持久化
MQ默認建立的是臨時 queue 和 exchange,如果不聲明持久化,一旦 rabbitmq 掛掉,queue、exchange 將會全部丟失。所以我們一般在創建 queue 或者 exchange 的時候會聲明 持久化。
1.queue 聲明持久化
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創建。durable = True 代表消息隊列持久化存儲,False 非持久化存儲
result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 聲明持久化
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建.durable = True 代表exchange持久化存儲,False 非持久化存儲
channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一個非持久化的 queue 或 exchange ,執行上述代碼會報錯,因為當前狀態不能更改 queue 或 exchange 存儲屬性,需要刪除重建。如果 queue 和 exchange 中一個聲明了持久化,另一個沒有聲明持久化,則不允許綁定。
3.消息持久化
雖然 exchange 和 queue 都申明了持久化,但如果消息只存在內存里,rabbitmq 重啟后,內存里的東西還是會丟失。所以必須聲明消息也是持久化,從內存轉存到硬盤。
# 向隊列插入數值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
4.acknowledgement 消息不丟失
消費者(consumer)調用callback函數時,會存在處理消息失敗的風險,如果處理失敗,則消息丟失。但是也可以選擇消費者處理失敗時,將消息回退給 rabbitmq ,重新再被消費者消費,這個時候需要設置確認標識。
channel.basic_consume(callback,queue = 'python-test',
# no_ack 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉
no_ack = False)
四、RabbitMq 發布與訂閱
rabbitmq 的發布與訂閱要借助交換機(Exchange)的原理實現:
Exchange 一共有三種工作模式:fanout, direct, topicd
模式一:fanout
這種模式下,傳遞到 exchange 的消息將會轉發到所有與其綁定的 queue 上。
- 不需要指定 routing_key ,即使指定了也是無效。
- 需要提前將 exchange 和 queue 綁定,一個 exchange 可以綁定多個 queue,一個queue可以綁定多個exchange。
- 需要先啟動?訂閱者,此模式下的隊列是 consumer 隨機生成的,發布者?僅僅發布消息到 exchange ,由 exchange 轉發消息至 queue。
發布者:
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼
# 虛擬隊列需要指定參數 virtual_host,如果是默認的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向隊列插入數值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
channel.basic_publish(exchange = 'python-test',routing_key = '',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
print(message)
connection.close()
訂閱者:
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 創建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除
result = channel.queue_declare('',exclusive=True)
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
# 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue)
# 定義一個回調函數來處理消息隊列中的消息,這里是打印出來
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
channel.basic_consume(result.method.queue,callback,# 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉
auto_ack = False)
channel.start_consuming()
模式二:direct
這種工作模式的原理是 消息發送至 exchange,exchange 根據?路由鍵(routing_key)轉發到相對應的 queue 上。
- 可以使用默認 exchange =' ' ,也可以自定義 exchange
- 這種模式下不需要將 exchange 和 任何進行綁定,當然綁定也是可以的。可以將 exchange 和 queue ,routing_key 和 queue 進行綁定
- 傳遞或接受消息時 需要?指定 routing_key
- 需要先啟動?訂閱者,此模式下的隊列是 consumer 隨機生成的,發布者?僅僅發布消息到 exchange ,由 exchange 轉發消息至 queue。
發布者:
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼
# 虛擬隊列需要指定參數 virtual_host,如果是默認的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 指定 routing_key。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
print(message)
connection.close()
消費者:
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 創建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除
result = channel.queue_declare('',exclusive=True)
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
# 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId')
# 定義一個回調函數來處理消息隊列中的消息,這里是打印出來
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
#channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接受消息
channel.basic_consume(result.method.queue,callback,
# 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉
auto_ack = False)
channel.start_consuming()
模式三:topicd
這種模式和第二種模式差不多,exchange 也是通過 路由鍵 routing_key 來轉發消息到指定的 queue 。 不同點是?routing_key 使用正則表達式支持模糊匹配,但匹配規則又與常規的正則表達式不同,比如“#”是匹配全部,“*”是匹配一個詞。
舉例:routing_key =“#orderid#”,意思是將消息轉發至所有 routing_key 包含 “orderid” 字符的隊列中。代碼和模式二 類似,就不貼出來了。
原文鏈接:https://www.cnblogs.com/guyouyin123/p/13953897.html
相關推薦
- 2022-07-23 Go語言學習筆記之文件讀寫操作詳解_Golang
- 2022-10-26 jQuery?表單事件與遍歷詳情_jquery
- 2022-07-29 cypress測試本地web應用_web2.0
- 2022-06-14 GO語言協程創建使用并通過channel解決資源競爭_Golang
- 2022-09-15 Android?Jetpack庫剖析之Lifecycle組件篇_Android
- 2023-02-23 關于golang?字符串?int?uint?int64?uint64?互轉問題_Golang
- 2022-07-02 在React中使用axios發送請求
- 2022-05-21 通過StatefulSet部署有狀態服務應用實現方式_服務器其它
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支