網站首頁 編程語言 正文
一、背景
實際工作中會有一些耗時的異步任務需要使用定時調度,比如發送郵件,拉取數據,執行定時腳本
通過celery 實現調度主要思想是 通過引入中間人redis,啟動 worker 進行任務執行?,celery-beat進行定時任務數據存儲
二、Celery動態添加定時任務的官方文檔
celery文檔:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers
celery 自定義調度類說明:?
自定義調度器類可以在命令行中指定(--scheduler參數)
django-celery-beat文檔 : https://pypi.org/project/django-celery-beat/
關于django-celery-beat 插件的說明:?
此擴展使您能夠將定期任務計劃存儲在數據庫中,可以從 Django 管理界面管理周期性任務,您可以在其中創建、編輯和刪除周期性任務以及它們應該運行的頻率
三、celery簡單實用
3.1 基礎環境配置
1. 安裝最新版本的Django
pip3 install django #當前我安裝的版本是 3.0.6
2. 創建項目
django-admin startproject typeidea django-admin startapp blog
3.安裝 celery
pip3 install django-celery pip3 install -U Celery pip3 install "celery[librabbitmq,redis,auth,msgpack]" pip3 install django-celery-beat # 用于動態添加定時任務 pip3 install django-celery-results pip3 install redis
3.2 測試使用Celery應用
1. 創建blog目錄、新建task.py
首先在Django項目中創建一個blog文件夾,并且在blog文件夾下創建tasks.py模塊, 如下:
?tasks.py代碼如下:?
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: tasks.py #Time: 2022/3/30 2:26 下午 #Author: julius """ from celery import Celery # 使用redis做為broker app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0') # 創建任務函數 @app.task def my_task(): print('任務正在執行...')
Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這里我們使用Redis作為中間人。my_task函數是我們編寫的一個任務函數, 通過加上裝飾器app.task, 將其注冊到broker的隊列中。
2. 啟動redis、創建worker
現在我們在創建一個worker, 等待處理隊列中的任務。
進入項目的根目錄,執行命令: celery -A celery_tasks.tasks worker -l info
?3. 調用任務
下面來測試一下功能,創建一個任務,加入任務隊列中,提供worker執行。
進入python終端, 執行如下代碼:
$ python manage.py shell >>> from blog.tasks import my_task >>> my_task.delay() <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>
調用一個任務函數,將會返回一個AsyncResult對象,這個對象可以用來檢查任務的狀態或者獲得任務的返回值。
4. 查看結果
在worker的終端查看任務執行情況,可以看到已經收到83484dfe-f729-417b-8e51-6c7ae32a1377 任務,并打印了任務執行信息
5. 存儲并查看任務執行狀態
把任務執行結果賦值給ret,然后調用result() 會產生?DisabledBackend 報錯,可見沒有配置后端存儲的時候并不能保存任務執行的狀態信息,下一節我們會講到如何配置backend保存任務執行結果
$ python manage.py shell >>> from blog.tasks import my_task >>> ret=my_task.delay() >>> ret.result()
四、配置backend存儲任務執行結果?
如果我們想跟蹤任務的狀態,Celery需要將結果保存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
1. 添加backend參數
在本例中我們使用Redis作為存儲結果的方案,通過Celery的backend參數來設定任務結果存儲地址。我們將tasks模塊修改如下:
from celery import Celery # 使用redis作為broker以及backend app = Celery('celery_tasks.tasks', broker='redis://127.0.0.1:6379/8', backend='redis://127.0.0.1:6379/9') # 創建任務函數 @app.task def my_task(a, b): print("任務函數正在執行....") return a + b
給Celery增加了backend參數,指定redis作為結果存儲,并將任務函數修改為兩個參數,并且有返回值。
2. 調用任務/查看任務執行結果
下面再來執行調用一下這個任務看看。
$ python manage.py shell >>> from blog.tasks import my_task >>> res=my_task.delay(10,40) >>> res.result 50 >>> res.failed() False
再來看看worker的執行情況,如下:
可以看到celery任務已經執行成功了。
但是這只是一個開始,下一步要看看如何添加定時的任務。
四、優化Celery目錄結構
上面直接將Celery的應用創建、配置、tasks任務全部寫在了一個文件,這樣在后面項目越來越大,也是不方便的。下面來拆分一下,并且添加一些常用的參數。
基本結構如下
$ vim typeidea/celery.py (Celery應用文件)
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: celery.py #Time: 2022/3/30 12:25 下午 #Author: julius """ import os from celery import Celery from blog import celeryconfig project_name='typeidea' # set the default django setting module for the 'celery' program os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings') app = Celery(project_name) app.config_from_object('django.conf:settings') app.autodiscover_tasks()
vim blog/celeryconfig.py (配置Celery的參數文件)
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: celeryconfig.py #Time: 2022/3/30 2:54 下午 #Author: julius """ # 設置結果存儲 from typeidea import settings import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings") CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 設置代理人broker BROKER_URL = 'redis://127.0.0.1:6379/1' # celery 的啟動工作數量設置 CELERY_WORKER_CONCURRENCY = 20 # 任務預取功能,就是每個工作的進程/線程在獲取任務的時候,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。 CELERYD_PREFETCH_MULTIPLIER = 20 # 非常重要,有些情況下可以防止死鎖 CELERYD_FORCE_EXECV = True # celery 的 worker 執行多少個任務后進行重啟操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果網絡資源有限,不建議開足馬力。 CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
vim blog/tasks.py (tasks 任務文件)
import time from blog.celery import app # 創建任務函數 @app.task def my_task(a, b, c): print('任務正在執行...') print('任務1函數休眠10s') time.sleep(10) return a + b + c
五、開始使用django-celery-beat調度器
使用 django-celery-beat 動態添加定時任務? celery 4.x 版本在 django 框架中是使用 django-celery-beat 進行動態添加定時任務的。前面雖然已經安裝了這個庫,但是還要再說明一下。
官網的配置說明
https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers
1. 安裝 django-celery-beat
pip3 install django-celery-beat
2.在項目的 settings 文件配置?django-celery-beat?
INSTALLED_APPS = [ 'blog', 'django_celery_beat', ... ] # Django設置時區 LANGUAGE_CODE = 'zh-hans' # 使用中國語言 TIME_ZONE = 'Asia/Shanghai' # 設置Django使用中國上海時間 # 如果USE_TZ設置為True時,Django會使用系統默認設置的時區,此時的TIME_ZONE不管有沒有設置都不起作用 # 如果USE_TZ 設置為False,TIME_ZONE = 'Asia/Shanghai', 則使用上海的UTC時間。 USE_TZ = False
3. 創建 django-celery-beat 相關表
執行Django數據庫遷移: python?manage.py migrate
4. 配置Celery使用 django-celery-beat
配置 celery.py
import os from celery import Celery from blog import celeryconfig # 為celery 設置環境變量 os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings") # 創建celery app app = Celery('blog') # 從單獨的配置模塊中加載配置 app.config_from_object(celeryconfig) # 設置app自動加載任務 app.autodiscover_tasks([ 'blog', ])
配置 celeryconfig.py
# 設置結果存儲 from typeidea import settings import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings") CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 設置代理人broker BROKER_URL = 'redis://127.0.0.1:6379/1' # celery 的啟動工作數量設置 CELERY_WORKER_CONCURRENCY = 20 # 任務預取功能,就是每個工作的進程/線程在獲取任務的時候,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。 CELERYD_PREFETCH_MULTIPLIER = 20 # 非常重要,有些情況下可以防止死鎖 CELERYD_FORCE_EXECV = True # celery 的 worker 執行多少個任務后進行重啟操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果網絡資源有限,不建議開足馬力。 CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
編寫任務 tasks.py
import time from celery import Celery from blog.celery import app # 使用redis做為broker # app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1') # 創建任務函數 @app.task def my_task(a, b, c): print('任務正在執行...') print('任務1函數休眠10s') time.sleep(10) return a + b + c @app.task def my_task2(): print("任務2函數正在執行....") print('任務2函數休眠10s') time.sleep(10)
5. 啟動定時任務work
啟動定時任務首先需要有一個work執行異步任務,然后再啟動一個定時器觸發任務。
啟動任務 work
$ celery -A blog worker -l info
啟動定時器觸發 beat
celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
六、具體操作演練
6.1 創建基于間隔時間的周期性任務
1. 初始化周期間隔對象interval
?對象
>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule >>> schedule, created = IntervalSchedule.objects.get_or_create( ... every=10, ... period=IntervalSchedule.SECONDS, ... ) >>> IntervalSchedule.objects.all() <QuerySet [<IntervalSchedule: every 10 seconds>]>
2.創建一個無參數的周期性間隔任務
>>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',) <PeriodicTask: my_task2: every 10 seconds>
beat 調度服務日志顯示如下:
?worker 服務日志顯示如下:
3.創建一個帶參數的周期性間隔任務
>>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30])) <PeriodicTask: my_task: every 10 seconds>
beat 調度服務日志結果:
?worker 服務日志結果:
4.如何高并發執行任務
需要并行執行任務的時候,就需要設置多個worker
來執行任務。?
6.2 創建一個不帶參數的周期性間隔任務
1.初始化?crontab
?的調度對象
>>> import pytz >>> schedule, _ = CrontabSchedule.objects.get_or_create( ... minute='*', ... hour='*', ... day_of_week='*', ... day_of_month='*', ... timezone=pytz.timezone('Asia/Shanghai') ... )
2. 創建不帶參數的定時任務
PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)
beat 調度服務執行結果?
?worker 執行服務結果
6.3 周期性任務的查詢、刪除操作
1. 周期性任務的查詢
>>> PeriodicTask.objects.all() <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]> >>> PeriodicTask.objects.get(name='my_task2_crontab') <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai> >>> for task in PeriodicTask.objects.all(): ... print(task.id) ... 1 13 >>> PeriodicTask.objects.get(id=13) <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai> >>> PeriodicTask.objects.get(name='my_task2_crontab') <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
?控制臺實際操作記錄
2.周期性任務的暫停/啟動
2.1 設置my_taks2_crontab 暫停任務
>>> my_task2_crontab = PeriodicTask.objects.get(id=13) >>> my_task2_crontab.enabled True >>> my_task2_crontab.enabled=False >>> my_task2_crontab.save()
查看worker輸出:
?可以看到worker從19:31以后已經沒有輸出了,說明已經成功吧my_task2_crontab 任務暫停
2.2 設置my_task2_crontab 開啟任務
把任務的 enabled 為 True 即可:
>>> my_task2_crontab.enabled False >>> my_task2_crontab.enabled=True >>> my_task2_crontab.save()
查看worker輸出:
?可以看到worker從19:36開始有輸出,說明已把my_task2_crontab 任務重新啟動
3. 周期性任務的刪除
獲取到指定的任務后調用delete(),再次查詢指定任務會發現已經不存在了
PeriodicTask.objects.get(name='my_task2_crontab').delete() >>> PeriodicTask.objects.get(name='my_task2_crontab') Traceback (most recent call last): File "<console>", line 1, in <module> File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method return getattr(self.get_queryset(), name)(*args, **kwargs) File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get raise self.model.DoesNotExist( django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.
總結
原文鏈接:https://blog.csdn.net/qq_36441027/article/details/123851915
相關推薦
- 2022-05-14 c++和python實現順序查找實例_C 語言
- 2022-08-28 IntelliJ IDEA 下debugger熱加載(Hot Swap)有時候失效解決
- 2023-05-22 PyTorch小功能之TensorDataset解讀_python
- 2022-05-06 Redis定時任務原理的實現_Redis
- 2023-05-09 React組件三大核心屬性State?props?Refs介紹_React
- 2024-01-14 三種線程安全的List
- 2022-08-18 詳解python?一維、二維列表的初始化問題_python
- 2022-06-01 React經典面試題之倒計時組件詳解_React
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支