網站首頁 編程語言 正文
最近在用fastapi框架開發web后端,由于近幾年python異步編程大火,fastapi憑借高性能也火了起來。本篇介紹了在異步環境下實現redis消息隊列的方法,代碼可以直接拷貝到fastapi中使用。
安裝相關庫
pip install aioredis
消息隊列實現及使用
我們使用redis的stream類型作為消息隊列的載體
首先我們創建一個目錄作為項目目錄:works/
創建配置文件
在項目根目錄下新建文件works/.env
在文件中寫入
export APP_ENV=development
export REDIS_URL="192.168.70.130/"
export REDIS_USER=
export REDIS_PASSWORD=
export REDIS_HOST="192.168.70.130"
export REDIS_PORT=6379
代碼實現
在項目目錄下創建py文件works/main.py
import os
from dotenv import load_dotenv
import aioredis
import asyncio
load_dotenv()
class Redis():
? ? def __init__(self):
? ? ? ? """initialize ?connection """
? ? ? ? self.REDIS_URL = os.environ['REDIS_URL']
? ? ? ? self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
? ? ? ? self.REDIS_USER = os.environ['REDIS_USER']
? ? ? ? self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
? ? ? ? self.REDIS_HOST = os.environ['REDIS_HOST']
? ? ? ? self.REDIS_PORT = os.environ['REDIS_PORT']
? ? ? ??
? ? async def create_connection(self):
? ? ? ? self.connection = aioredis.from_url(
? ? ? ? ? ? self.connection_url, db=0)
? ? ? ? return self.connection
class Producer:
? ? def __init__(self, redis_client):
? ? ? ? self.redis_client = redis_client
? ? async def add_to_stream(self, ?data: dict, stream_channel):
? ? ? ? """將一條數據添加到隊列
? ? ? ? Args:
? ? ? ? ? ? data (dict): _description_
? ? ? ? ? ? stream_channel (_type_): _description_
? ? ? ? Returns:
? ? ? ? ? ? _type_: _description_
? ? ? ? """
? ? ? ? try:
? ? ? ? ? ? msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
? ? ? ? ? ? print(f"Message id {msg_id} added to {stream_channel} stream")
? ? ? ? ? ? return msg_id
? ? ? ? except Exception as e:
? ? ? ? ? ? raise Exception(f"Error sending msg to stream => {e}")
class StreamConsumer:
? ? def __init__(self, redis_client):
? ? ? ? self.redis_client = redis_client
? ? async def consume_stream(self, count: int, block: int, ?stream_channel):
? ? ? ? """讀取隊列中的消息,但是并不刪除
? ? ? ? Args:
? ? ? ? ? ? count (int): _description_
? ? ? ? ? ? block (int): _description_
? ? ? ? ? ? stream_channel (_type_): _description_
? ? ? ? Returns:
? ? ? ? ? ? _type_: _description_
? ? ? ? """
? ? ? ? response = await self.redis_client.xread(
? ? ? ? ? ? streams={stream_channel: ?'0-0'}, count=count, block=block)
? ? ? ? return response
? ? async def delete_message(self, stream_channel, message_id):
? ? ? ? """成功消費數據后,調用此函數刪除隊列數據
? ? ? ? Args:
? ? ? ? ? ? stream_channel (_type_): _description_
? ? ? ? ? ? message_id (_type_): _description_
? ? ? ? """
? ? ? ? await self.redis_client.xdel(stream_channel, message_id)
async def main():
? ? redis_conn = await Redis().create_connection()
? ? produce = Producer(redis_conn)
? ? consumer = StreamConsumer(redis_conn)
? ? # 添加一個消息到隊列中
? ? data = {'xiaoming4':123}
? ? await produce.add_to_stream(data=data,stream_channel='message_channel')
? ??
? ? # 從隊列中拿出最新的1條數據
? ? data = await consumer.consume_stream(1,block=0,stream_channel='message_channel')
? ? print(data)
? ??
? ? # 輪詢等待隊列中的新消息
? ? response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
? ? if response:
? ? ? ? for stream, messagees in response:
? ? ? ? ? ? print('stream:',stream)
? ? ? ? ? ? for message in messagees:
? ? ? ? ? ? ? ? print('message: ',message)
? ? ? ? ? ? ? ? message_id = message[0]
? ? ? ? ? ? ? ? print('message_id: ',message_id)
? ? ? ? ? ? ? ? message_content = message[1]
? ? ? ? ? ? ? ? print('message_content: ',message_content)
? ? ? ? ? ? ? ? print('注意里面的鍵、值都變成了byte類型,需要進行解碼:')
? ? ? ? ? ? ? ? message_content:dict
? ? ? ? ? ? ? ? print('message_content_decode: ',{k.decode('utf-8'):v.decode('utf-8') for k,v in message_content.items()})
? ? # 消費成功后刪除隊列中的消息
? ? await consumer.delete_message(
? ? ? ? stream_channel='message_channel',message_id=message_id
? ? ) ? ?
if __name__ == '__main__':
? ? asyncio.run(main())
非常簡單好用,啟動一下看看吧
原文鏈接:https://blog.csdn.net/brandon_l/article/details/128477043
相關推薦
- 2023-01-26 C#實現批量Word轉換Html的示例代碼_C#教程
- 2022-08-20 oracle設置密碼復雜度及設置超時退出的功能_oracle
- 2022-07-19 CentOS8 服務器連接超時自動斷開問題解決
- 2022-08-23 python多線程對多核cpu的利用解析_python
- 2022-08-11 python?scatter繪制散點圖_python
- 2024-04-07 springboot后端接收前端傳數組參數方法
- 2021-12-07 Kotlin基本數據類型詳解_Android
- 2024-01-12 JPA實現不等于查詢
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支