網(wǎng)站首頁 編程語言 正文
1.基本介紹
Celery
是由Python
編寫的簡單,靈活,可靠的用來處理大量信息的分布式系統(tǒng),它同時提供操作和維護(hù)分布式系統(tǒng)所需的工具。Celery
專注于實時任務(wù)處理,支持任務(wù)調(diào)度。
簡單的說,它就是一個分布式隊列的管理工具,用celery提供的接口快速實現(xiàn)并管理一個分布式的任務(wù)隊列。
有一點我們需要搞清楚,Celery
本身并不是任務(wù)隊列,它是一個分布式隊列的管理工具
,Celery封裝好了操作常見任務(wù)隊列的各種操作,比如說從監(jiān)聽某個任務(wù)隊列并從該隊列中拿到數(shù)據(jù)進(jìn)行消費。
2.使用場景
它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現(xiàn)異步任務(wù)(async task
)和定時任務(wù)(crontab
)。
-
異步任務(wù): 將耗時操作任務(wù)提交給
Celery
去異步執(zhí)行,比如發(fā)送短信/郵件、消息推送、音視頻處理等等 - 定時任務(wù): 定時執(zhí)行某件事情,比如每天數(shù)據(jù)統(tǒng)計
3.工作流程和組成部分
這里用一張圖片說明下:
Celery的架構(gòu)由三部分組成,消息中間件(message broker
),任務(wù)執(zhí)行單元(worker
)和任務(wù)執(zhí)行結(jié)果存儲(task result store
)組成。
消息中間件:
Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括RabbitMQ
, Redis
等等,官方推薦用rabbitMQ
,因為它持久穩(wěn)定。
任務(wù)執(zhí)行單元:
Worker
是Celery
提供的任務(wù)執(zhí)行的單元,worker
并發(fā)的運行在分布式的系統(tǒng)節(jié)點中。
任務(wù)結(jié)果存儲:
Task result store
用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery
支持以不同方式存儲任務(wù)的結(jié)果,包括AMQP, redis
等
另外, Celery還支持不同的并發(fā)和序列化的手段。
并發(fā):Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等 先安裝模塊
pip install celery pip install redis
4.Celery執(zhí)行異步任務(wù)
4.1 基礎(chǔ)使用
這里項目結(jié)構(gòu)如下:
第一步:先創(chuàng)建celery相關(guān)配置配置celery_object.py
import celery # 執(zhí)行如下命令: celery -A celery_object worker -l info backend = "redis://127.0.0.1:6379/4" # 設(shè)置redis的4號數(shù)據(jù)庫來存放結(jié)果 broker = "redis://127.0.0.1:6379/5" # 設(shè)置redis的5號數(shù)據(jù)庫存放消息中間件 celery_app = celery.Celery( "celery_demo", backend=backend, broker=broker, include=[ "celery_task", ], ) celery_app.conf.task_serializer = "json" celery_app.conf.result_serializer = "json" celery_app.conf.accept_content = ["json"] celery_app.conf.timezone = "Asia/Shanghai" # 時區(qū) celery_app.conf.enable_utc = False # 是否使用UTC
參數(shù)說明:
- backend 就是異步任務(wù)執(zhí)行完成以后,結(jié)果存放的地方。
- broker 就是具體執(zhí)行任務(wù)的工作節(jié)點。
- celery.Celery()方法是實例化一個celery對象。
第二步:創(chuàng)建任務(wù)相關(guān)的文件celery_task.py
import time from celery_object import celery_app @celery_app.task def send_email(name): print("向%s發(fā)送郵件..." % name) time.sleep(5) print("向%s發(fā)送郵件完成" % name) return f"成功拿到{name}發(fā)送的郵件!" @celery_app.task def send_msg(name): print("向%s發(fā)送短信..." % name) time.sleep(5) print("向%s發(fā)送短信完成" % name) return f"成功拿到{name}發(fā)送的短信!"
通過@celery_app.task
這樣的裝飾器,成功的把對應(yīng)的函數(shù)變成對應(yīng)celery
的異步worker
函數(shù)。
緊接著我們在項目當(dāng)前所在的目錄執(zhí)行命令:
celery -A celery_object worker -l info
- -A 指的是application應(yīng)用對象
- worker 就是工作人(固定寫法)
- -l 指的是日志級別,這里是打印
info
級別的日志
之后就可以有下面的輸出顯示就代表celery
啟動成功:
之后我們就可以向celery
生產(chǎn)任務(wù)了,創(chuàng)建produce_result
.py文件。
from celery_task import send_email, send_msg if __name__ == "__main__": for i in range(10): result = send_email.delay(f"張三{i}") print(result.id) result2 = send_msg.delay(f"李四{i}") print(result2.id)
運行生產(chǎn)任務(wù)的程序,會看到如下的數(shù)據(jù),這里打印的就是任務(wù)ID。
然后在終端可以看到下面的東西,就代表celery
成功的拿到隊列中任務(wù) 并進(jìn)行消費了。
然后打開我們的redis
可以看到有對應(yīng)的數(shù)據(jù)記錄。
與此同時 我們還可以查看celery任務(wù)ID的狀態(tài),check_result.py
寫入如下:
from celery.result import AsyncResult from celery_object import celery_app async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app) print("任務(wù)狀態(tài):", async_result.status) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 將結(jié)果刪除 elif async_result.failed(): print("執(zhí)行失敗") elif async_result.status == "PENDING": print("任務(wù)等待中被執(zhí)行") elif async_result.status == "RETRY": print("任務(wù)異常后正在重試") elif async_result.status == "STARTED": print("任務(wù)已經(jīng)開始被執(zhí)行")
運行結(jié)果:
任務(wù)狀態(tài): SUCCESS
成功拿到李四0發(fā)送的短信!
原文鏈接:https://blog.csdn.net/weixin_38819889/article/details/127113250
相關(guān)推薦
- 2022-08-06 C#使用Tesseract進(jìn)行Ocr識別的方法實現(xiàn)_C#教程
- 2023-12-15 Linux系統(tǒng)——退出vi編輯模式
- 2021-12-22 Docker部署Microsoft?Sql?Server詳細(xì)步驟_docker
- 2022-05-01 jquery實現(xiàn)移動端按鈕組左右滑動_jquery
- 2022-05-11 tomcat啟動時提示端口被占用解決辦法
- 2023-02-09 C++開發(fā)protobuf動態(tài)解析工具_(dá)C 語言
- 2022-06-13 C++IO流之fstream,?stringstream使用小結(jié)_C 語言
- 2022-05-25 swagger 3.0.0 版本和springboot整合啟動失敗
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支