日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

python中celery的基本使用詳情_python

作者:埃菲爾沒有塔尖 ? 更新時間: 2022-11-19 編程語言

1.基本介紹

Celery 是由Python 編寫的簡單,靈活,可靠的用來處理大量信息的分布式系統,它同時提供操作和維護分布式系統所需的工具。Celery 專注于實時任務處理,支持任務調度。

簡單的說,它就是一個分布式隊列的管理工具,用celery提供的接口快速實現并管理一個分布式的任務隊列。

有一點我們需要搞清楚,Celery 本身并不是任務隊列,它是一個分布式隊列的管理工具,Celery封裝好了操作常見任務隊列的各種操作,比如說從監聽某個任務隊列并從該隊列中拿到數據進行消費。

2.使用場景

它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。

  • 異步任務: 將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等
  • 定時任務: 定時執行某件事情,比如每天數據統計

3.工作流程和組成部分

這里用一張圖片說明下:

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件:

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括RabbitMQ, Redis等等,官方推薦用rabbitMQ,因為它持久穩定。

任務執行單元:

WorkerCelery提供的任務執行的單元,worker并發的運行在分布式的系統節點中。

任務結果存儲:

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis

另外, Celery還支持不同的并發和序列化的手段。

并發:Prefork, Eventlet, gevent, threads/single threaded

序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等 先安裝模塊

pip install celery
pip install redis

4.Celery執行異步任務

4.1 基礎使用

這里項目結構如下:

第一步:先創建celery相關配置配置celery_object.py

import celery

# 執行如下命令: celery -A celery_object worker -l info

backend = "redis://127.0.0.1:6379/4"  # 設置redis的4號數據庫來存放結果
broker = "redis://127.0.0.1:6379/5"  # 設置redis的5號數據庫存放消息中間件
celery_app = celery.Celery(
    "celery_demo",
    backend=backend,
    broker=broker,
    include=[
        "celery_task",
    ],
)

celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]

celery_app.conf.timezone = "Asia/Shanghai"  # 時區
celery_app.conf.enable_utc = False  # 是否使用UTC

參數說明:

  • backend 就是異步任務執行完成以后,結果存放的地方。
  • broker 就是具體執行任務的工作節點。
  • celery.Celery()方法是實例化一個celery對象。

第二步:創建任務相關的文件celery_task.py

import time

from celery_object import celery_app

@celery_app.task
def send_email(name):
    print("向%s發送郵件..." % name)
    time.sleep(5)
    print("向%s發送郵件完成" % name)
    return f"成功拿到{name}發送的郵件!"

@celery_app.task
def send_msg(name):
    print("向%s發送短信..." % name)
    time.sleep(5)
    print("向%s發送短信完成" % name)
    return f"成功拿到{name}發送的短信!"

通過@celery_app.task這樣的裝飾器,成功的把對應的函數變成對應celery的異步worker函數。

緊接著我們在項目當前所在的目錄執行命令:

celery -A celery_object worker -l info
  • -A 指的是application應用對象
  • worker 就是工作人(固定寫法)
  • -l 指的是日志級別,這里是打印info級別的日志

之后就可以有下面的輸出顯示就代表celery動成功:

之后我們就可以向celery生產任務了,創建produce_result.py文件。

from celery_task import send_email, send_msg

if __name__ == "__main__":
    for i in range(10):
        result = send_email.delay(f"張三{i}")
        print(result.id)
        result2 = send_msg.delay(f"李四{i}")
        print(result2.id)

運行生產任務的程序,會看到如下的數據,這里打印的就是任務ID。

然后在終端可以看到下面的東西,就代表celery成功的拿到隊列中任務 并進行消費了。

然后打開我們的redis可以看到有對應的數據記錄。

與此同時 我們還可以查看celery任務ID的狀態,check_result.py寫入如下:

from celery.result import AsyncResult
from celery_object import celery_app

async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app)
print("任務狀態:", async_result.status)
if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 將結果刪除
elif async_result.failed():
    print("執行失敗")
elif async_result.status == "PENDING":
    print("任務等待中被執行")
elif async_result.status == "RETRY":
    print("任務異常后正在重試")
elif async_result.status == "STARTED":
    print("任務已經開始被執行")

運行結果:

任務狀態: SUCCESS
成功拿到李四0發送的短信!

原文鏈接:https://blog.csdn.net/weixin_38819889/article/details/127113250

欄目分類
最近更新