網站首頁 編程語言 正文
前言:
python使用pika庫調用rabbitmq的參數有三種方式,分別如下所述:
1、應答參數
auto_ack=False
ch.basic_ack(delivery_tag=method.delivery_tag)
生產者模式:
示例代碼:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.創建隊列
channel.queue_declare(queue='hello')
# 3.向指定隊列插入數據
channel.basic_publish(exchange='', # 簡單模式
routing_key='hello', # 指定隊列
body='Hello World!') # 向隊列中添加的數據
print(" [x] Sent 'Hello World!'")
運行結果:
消費者模式:
示例代碼:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.創建隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello隊列,這時就會報錯
channel.queue_declare(queue='hello')
# 3.確定回調函數
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello',
auto_ack=False, # 手動應答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()
運行結果:
注意:添加應答參數的好處是當消費者處理回調函數的時,萬一程序報錯,此時數據就會消失的。使用應答方式后,消費者程序萬一報錯,修改完程序后重新啟動程序還是可以繼續消費上一次的數據的。使用應答參數后,沒處理完一條數據都會給隊列一個反饋消息的,也就是說消費完一條消息后隊列才會刪除這條消息。這種方式效率會降低一些,根據項目中數據的重要性可以選擇是否需要這個參數。
2、持久化參數
#聲明queue
channel.queue_declare(queue='hello2', durable=True) # 若聲明過,則換一個名字
channel.basic_publish(exchange='',
routing_key='hello2',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
生成者方式:
示例代碼:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.創建持久化隊列
# 注意:非持久化隊列不能變持久化隊列,反之也是這樣的,所有創建隊列中不能創建和非持久化隊列重名的隊列
channel.queue_declare(queue='hello2', durable=True)
# 3.向指定隊列插入數據
channel.basic_publish(exchange='', # 簡單模式
routing_key='hello2', # 指定隊列
body='Hello World!', # 向隊列中添加的數據
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")
運行結果:
消費者方式:
示例代碼:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.創建持久化隊列
# 注意:非持久化隊列不能變持久化隊列,反之也是這樣的,所有創建隊列中不能創建和非持久化隊列重名的隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello2隊列,這時就會報錯
channel.queue_declare(queue='hello2', durable=True)
# 3.確定回調函數
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello2', # 指定隊列
auto_ack=False, # 手動應答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()
運行結果:
注意:加入持久化參數的好處,當rabbitmq隊列萬一崩了時,此時隊列中的所有數據都會丟失,rabbitmq隊列中的數據是保存在內存中,當加入持久化參數后,數據將會保存在硬盤中,rabbitmq崩了或者重啟不會丟失數據。
3、分發參數
有兩個消費者同時監聽一個的隊列。其中一個線程sleep2秒,另一個消費者線程sleep1秒,但是處理的消息是一樣多。這種方式叫輪詢分發(round-robin)不管誰忙,都不會多給消息,總是你一個我一個。想要做到公平分發(fair dispatch),必須關閉自動應答ack,改成手動應答。使用basicQos(perfetch=1)限制每次只發送不超過1條消息到同一個消費者,消費者必須手動反饋告知隊列,才會發送下一個。
channel.basic_qos(prefetch_count=1)
生產者模式:
示例代碼:? ?【為了產生多條數據,將此程序執行多次】
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.創建隊列
channel.queue_declare(queue='hello3')
# 3.向指定隊列插入數據
channel.basic_publish(exchange='', # 簡單模式
routing_key='hello3', # 指定隊列
body='Hello World666!', # 向隊列中添加的數據
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")
運行結果:
消費者模式:
示例代碼1:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.創建隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello2隊列,這時就會報錯
channel.queue_declare(queue='hello3')
# 3.確定回調函數
def callback(ch, method, properties, body):
import time
time.sleep(15)
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 公平分發,若不加下面這行代碼,默認是輪詢分發
channel.basic_qos(prefetch_count=1)
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello3', # 指定隊列
auto_ack=False, # 手動應答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()
運行結果:
示例代碼2:
import pika
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.創建隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello2隊列,這時就會報錯
channel.queue_declare(queue='hello3')
# 3.確定回調函數
def callback(ch, method, properties, body):
import time
time.sleep(3)
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 公平分發,若不加下面這行代碼,默認是輪詢分發
channel.basic_qos(prefetch_count=1)
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello3', # 指定隊列
auto_ack=False, # 手動應答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()
注意:當一個py文件執行多次時,會有下面提示:
?運行結果:
原文鏈接:https://blog.csdn.net/weixin_44799217/article/details/126532230
相關推薦
- 2022-10-29 qt輸出自定義的pdf文件源碼詳解
- 2022-06-06 python?包之?threading?多線程_python
- 2022-08-19 Exception evaluating SpringEL expression異常處理
- 2022-05-24 SQL?Server表空間碎片化回收的實現_MsSql
- 2022-07-08 python中的annotate函數使用_python
- 2024-07-15 在物理及和虛擬主機上配置ftp,實現上傳和下載的功能(five day)
- 2022-04-22 mac安裝oh-my-zsh出現command not found: npm問題解決
- 2022-07-17 Android?studio實現簡易的計算器功能_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支