日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

python使用pika庫調用rabbitmq參數使用詳情_python

作者:IT之一小佬 ? 更新時間: 2022-10-27 編程語言

前言:

python使用pika庫調用rabbitmq的參數有三種方式,分別如下所述:

1、應答參數

auto_ack=False
ch.basic_ack(delivery_tag=method.delivery_tag)

生產者模式:

示例代碼:

import pika
 
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
 
# 2.創建隊列
channel.queue_declare(queue='hello')
 
# 3.向指定隊列插入數據
channel.basic_publish(exchange='',  # 簡單模式
                      routing_key='hello',  # 指定隊列
                      body='Hello World!')  # 向隊列中添加的數據
 
print(" [x] Sent 'Hello World!'")

運行結果:

消費者模式:

示例代碼:

import pika
 
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
 
# 2.創建隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello隊列,這時就會報錯
channel.queue_declare(queue='hello')
 
# 3.確定回調函數
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello',
                      auto_ack=False,  # 手動應答方式
                      on_message_callback=callback)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()

運行結果:

注意:添加應答參數的好處是當消費者處理回調函數的時,萬一程序報錯,此時數據就會消失的。使用應答方式后,消費者程序萬一報錯,修改完程序后重新啟動程序還是可以繼續消費上一次的數據的。使用應答參數后,沒處理完一條數據都會給隊列一個反饋消息的,也就是說消費完一條消息后隊列才會刪除這條消息。這種方式效率會降低一些,根據項目中數據的重要性可以選擇是否需要這個參數。

2、持久化參數

#聲明queue
channel.queue_declare(queue='hello2', durable=True)  # 若聲明過,則換一個名字
 
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                          )
                      )

生成者方式:

示例代碼:

import pika
 
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
 
# 2.創建持久化隊列
# 注意:非持久化隊列不能變持久化隊列,反之也是這樣的,所有創建隊列中不能創建和非持久化隊列重名的隊列
channel.queue_declare(queue='hello2', durable=True)
 
# 3.向指定隊列插入數據
channel.basic_publish(exchange='',  # 簡單模式
                      routing_key='hello2',  # 指定隊列
                      body='Hello World!',  # 向隊列中添加的數據
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent 'Hello World!'")

運行結果:

消費者方式:

示例代碼:

import pika
 
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
 
# 2.創建持久化隊列
# 注意:非持久化隊列不能變持久化隊列,反之也是這樣的,所有創建隊列中不能創建和非持久化隊列重名的隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello2隊列,這時就會報錯
channel.queue_declare(queue='hello2', durable=True)

# 3.確定回調函數
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello2',  # 指定隊列
                      auto_ack=False,  # 手動應答方式
                      on_message_callback=callback)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()

運行結果:

注意:加入持久化參數的好處,當rabbitmq隊列萬一崩了時,此時隊列中的所有數據都會丟失,rabbitmq隊列中的數據是保存在內存中,當加入持久化參數后,數據將會保存在硬盤中,rabbitmq崩了或者重啟不會丟失數據。

3、分發參數

有兩個消費者同時監聽一個的隊列。其中一個線程sleep2秒,另一個消費者線程sleep1秒,但是處理的消息是一樣多。這種方式叫輪詢分發(round-robin)不管誰忙,都不會多給消息,總是你一個我一個。想要做到公平分發(fair dispatch),必須關閉自動應答ack,改成手動應答。使用basicQos(perfetch=1)限制每次只發送不超過1條消息到同一個消費者,消費者必須手動反饋告知隊列,才會發送下一個。

channel.basic_qos(prefetch_count=1)

生產者模式:

示例代碼:? ?【為了產生多條數據,將此程序執行多次】

import pika
 
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
 
# 2.創建隊列
channel.queue_declare(queue='hello3')
 
# 3.向指定隊列插入數據
channel.basic_publish(exchange='',  # 簡單模式
                      routing_key='hello3',  # 指定隊列
                      body='Hello World666!',  # 向隊列中添加的數據
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent 'Hello World!'")

運行結果:

消費者模式:

示例代碼1:

import pika
 
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
 
# 2.創建隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello2隊列,這時就會報錯
channel.queue_declare(queue='hello3')
 
# 3.確定回調函數
def callback(ch, method, properties, body):
    import time
    time.sleep(15)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
# 公平分發,若不加下面這行代碼,默認是輪詢分發
channel.basic_qos(prefetch_count=1)
 
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello3',  # 指定隊列
                      auto_ack=False,  # 手動應答方式
                      on_message_callback=callback)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()

運行結果:

示例代碼2:

import pika
 
# 1.連接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
 
# 2.創建隊列
# 注意:這一步不是必須的,但是如果消費者先啟動而不是生成者先啟動時,這時隊列中還沒有hello2隊列,這時就會報錯
channel.queue_declare(queue='hello3')
 
# 3.確定回調函數
def callback(ch, method, properties, body):
    import time
    time.sleep(3)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
# 公平分發,若不加下面這行代碼,默認是輪詢分發
channel.basic_qos(prefetch_count=1)
 
# 4.確定監聽隊列參數
channel.basic_consume(queue='hello3',  # 指定隊列
                      auto_ack=False,  # 手動應答方式
                      on_message_callback=callback)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
# 5.正式監聽
channel.start_consuming()

注意:當一個py文件執行多次時,會有下面提示:

?運行結果:

原文鏈接:https://blog.csdn.net/weixin_44799217/article/details/126532230

欄目分類
最近更新