網站首頁 編程語言 正文
flask-apscheduler
將apscheduler
移植到了flask
應用中,使得在flask
中可以非常方便的使用定時任務了,除此之外,它還有如下幾個特性
- 根據
Flask
配置加載調度器配置 - 根據
Flask
配置加載任務調度器 - 允許指定服務器運行任務
- 提供
RESTful API
管理任務,也就是遠程管理任務 - 為
RESTful API
提供認證
下載安裝
pip install flask-apscheduler
基本使用
flask-apscheduler
的相關配置,我們會將它和其它擴展一起,放在應用的配置里
class Config(object): // 配置項 JOBS = [ { 'id': 'job1', 'func': 'run:add', 'args': (1, 2), 'trigger': 'interval', 'seconds': 3 } ] SCHEDULER_API_ENABLED = True def add(a, b): print(a+b)
JOBS列表的每一個元素表示一個定時任務,列子中只有一個interval任務,表示每隔3秒運行一次函數add。func指定調用的函數,args表示傳入函數的參數,trigger表示啟動方式,常用的有兩種,分別是trigger和cron。
上邊我們設置了SCHEDULER_API_ENABLED = True
,可以通過訪問http://127.0.0.1:5000/scheduler,其中scheduler
是默認的RESTful API
前綴
通過查看源碼,可以發現flask-apscheduler提供了以下的接口
def _load_api(self): """ Add the routes for the scheduler API. """ self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET') self._add_url_route('add_job', '/jobs', api.add_job, 'POST') self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET') self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET') self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE') self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH') self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST') self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST') self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
如果需要查看當前運行的所有定時任務,則請求http://127.0.0.1:5000/scheduler/jobs即可。
trigger啟動方式
trigger表示間隔啟動,在trigger方式中,使用seconds配置間隔多久啟動一次,單位是秒。
cron啟動方式
cron表示定時啟動
class Config(object): JOBS = [ { 'id': 'job1', 'func': 'scheduler:task', 'args': (1, 2), 'trigger': 'cron', 'day': '*', 'hour': '13', 'minute': '16', 'second': '20' } ] SCHEDULER_API_ENABLED = True def task(a, b): print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))
該配置項則表示每天的13點16分20秒啟動一次。*表示全部。
有關常用的cron配置有:
day
- 表示天
hour
- 表示小時
minute
- 表示分鐘
second
- 表示秒
week
- 周
day_of_week
- 星期幾,如星期天使用sun,星期五使用fri,其他的類似。
使用裝飾器定時啟動任務
from flask import Flask from flask_apscheduler import APScheduler import datetime class Config(object): SCHEDULER_API_ENABLED = True scheduler = APScheduler() # interval examples @scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900) def job1(): print(str(datetime.datetime.now()) + ' Job 1 executed')
表示每隔30秒調用一次job1函數。
原文鏈接:https://blog.csdn.net/HHYZBC/article/details/127191429
相關推薦
- 2022-07-13 Python內建類型int源碼學習_python
- 2023-01-19 Python中方法的缺省參數問題解讀_python
- 2022-06-19 python繪制餅圖和直方圖的方法_python
- 2022-09-13 Android?Studio實現簡單補間動畫_Android
- 2023-08-16 使用el-row和el-col實現快速布局
- 2022-05-21 ASP.NET?MVC中兩個配置文件的作用詳解_基礎應用
- 2022-07-24 .Net行為型設計模式之中介者模式(Mediator)_基礎應用
- 2022-04-01 hive not in效率優化
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支