網站首頁 編程語言 正文
1.基本介紹
Celery
是由Python
編寫的簡單,靈活,可靠的用來處理大量信息的分布式系統,它同時提供操作和維護分布式系統所需的工具。Celery
專注于實時任務處理,支持任務調度。
簡單的說,它就是一個分布式隊列的管理工具,用celery提供的接口快速實現并管理一個分布式的任務隊列。
有一點我們需要搞清楚,Celery
本身并不是任務隊列,它是一個分布式隊列的管理工具
,Celery封裝好了操作常見任務隊列的各種操作,比如說從監聽某個任務隊列并從該隊列中拿到數據進行消費。
2.使用場景
它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task
)和定時任務(crontab
)。
-
異步任務: 將耗時操作任務提交給
Celery
去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等 - 定時任務: 定時執行某件事情,比如每天數據統計
3.工作流程和組成部分
這里用一張圖片說明下:
Celery的架構由三部分組成,消息中間件(message broker
),任務執行單元(worker
)和任務執行結果存儲(task result store
)組成。
消息中間件:
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括RabbitMQ
, Redis
等等,官方推薦用rabbitMQ
,因為它持久穩定。
任務執行單元:
Worker
是Celery
提供的任務執行的單元,worker
并發的運行在分布式的系統節點中。
任務結果存儲:
Task result store
用來存儲Worker執行的任務的結果,Celery
支持以不同方式存儲任務的結果,包括AMQP, redis
等
另外, Celery還支持不同的并發和序列化的手段。
并發:Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等 先安裝模塊
pip install celery pip install redis
4.Celery執行異步任務
4.1 基礎使用
這里項目結構如下:
第一步:先創建celery相關配置配置celery_object.py
import celery # 執行如下命令: celery -A celery_object worker -l info backend = "redis://127.0.0.1:6379/4" # 設置redis的4號數據庫來存放結果 broker = "redis://127.0.0.1:6379/5" # 設置redis的5號數據庫存放消息中間件 celery_app = celery.Celery( "celery_demo", backend=backend, broker=broker, include=[ "celery_task", ], ) celery_app.conf.task_serializer = "json" celery_app.conf.result_serializer = "json" celery_app.conf.accept_content = ["json"] celery_app.conf.timezone = "Asia/Shanghai" # 時區 celery_app.conf.enable_utc = False # 是否使用UTC
參數說明:
- backend 就是異步任務執行完成以后,結果存放的地方。
- broker 就是具體執行任務的工作節點。
- celery.Celery()方法是實例化一個celery對象。
第二步:創建任務相關的文件celery_task.py
import time from celery_object import celery_app @celery_app.task def send_email(name): print("向%s發送郵件..." % name) time.sleep(5) print("向%s發送郵件完成" % name) return f"成功拿到{name}發送的郵件!" @celery_app.task def send_msg(name): print("向%s發送短信..." % name) time.sleep(5) print("向%s發送短信完成" % name) return f"成功拿到{name}發送的短信!"
通過@celery_app.task
這樣的裝飾器,成功的把對應的函數變成對應celery
的異步worker
函數。
緊接著我們在項目當前所在的目錄執行命令:
celery -A celery_object worker -l info
- -A 指的是application應用對象
- worker 就是工作人(固定寫法)
- -l 指的是日志級別,這里是打印
info
級別的日志
之后就可以有下面的輸出顯示就代表celery
啟動成功:
之后我們就可以向celery
生產任務了,創建produce_result
.py文件。
from celery_task import send_email, send_msg if __name__ == "__main__": for i in range(10): result = send_email.delay(f"張三{i}") print(result.id) result2 = send_msg.delay(f"李四{i}") print(result2.id)
運行生產任務的程序,會看到如下的數據,這里打印的就是任務ID。
然后在終端可以看到下面的東西,就代表celery
成功的拿到隊列中任務 并進行消費了。
然后打開我們的redis
可以看到有對應的數據記錄。
與此同時 我們還可以查看celery任務ID的狀態,check_result.py
寫入如下:
from celery.result import AsyncResult from celery_object import celery_app async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app) print("任務狀態:", async_result.status) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 將結果刪除 elif async_result.failed(): print("執行失敗") elif async_result.status == "PENDING": print("任務等待中被執行") elif async_result.status == "RETRY": print("任務異常后正在重試") elif async_result.status == "STARTED": print("任務已經開始被執行")
運行結果:
任務狀態: SUCCESS
成功拿到李四0發送的短信!
原文鏈接:https://blog.csdn.net/weixin_38819889/article/details/127113250
相關推薦
- 2022-07-01 python神經網絡ShuffleNetV2模型復現詳解_python
- 2022-10-27 Python?標準庫?fileinput與文件迭代器_python
- 2022-04-19 Go語言標準輸入輸出庫的基本使用教程_Golang
- 2022-11-18 Python實現常見數據格式轉換的方法詳解_python
- 2022-05-10 fatal: Not a git repository (or any of the parent
- 2022-12-23 Kotlin標準函數與靜態方法基礎知識詳解_Android
- 2022-06-13 ASP.NET?Core配置文件的獲取和設置_實用技巧
- 2022-06-01 Python學習之虛擬環境原理詳解_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支