網(wǎng)站首頁 編程語言 正文
flask 通常使用 flask_apscheduler 框架設計定時任務,flask_apscheduler 功能很全面,能按設定的時間規(guī)則執(zhí)行任務,可以持久化到各類數(shù)據(jù)庫(mysql,redis,mongodb),實現(xiàn)對定時任務增、刪、改、查等操作。
安裝
pip3 install flask_apscheduler
1、調(diào)用方法
方法一:使用 Config 類配置時間規(guī)則
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
# 列表類型,如有需要可以定義多個job
JOBS = [
{
'id': 'job_1', # 一個標識
'func': '__main__:job1', # 指定運行的函數(shù)
'args': (1, 2), # 傳入函數(shù)的參數(shù)
'trigger': 'interval', # 指定 定時任務的類型
'seconds': 5 # 運行的間隔時間
}
]
SCHEDULER_API_ENABLED = True
def job1(a, b): # 運行的定時任務的函數(shù)
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__) # 實例化flask
app.config.from_object(Config()) # 為實例化的 flask 引入配置
scheduler = APScheduler() # 實例化 APScheduler
scheduler.init_app(app) # 把任務列表放入 flask
scheduler.start() # 啟動任務列表
app.debug = True
app.run(host='0.0.0.0',port=8000) # 啟動 flask
方法二:使用裝飾器
from flask import Flask
from flask_apscheduler import APScheduler
# 實例化 APScheduler
scheduler = APScheduler()
@scheduler.task('interval', id='job_1', args=(1,2),seconds=5)
def job1(a, b): # 運行的定時任務的函數(shù)
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__) # 實例化flask
scheduler.start() # 啟動任務列表
app.debug=True
app.run(host='0.0.0.0',port= 8000) # 啟動 flask
方法三:通過調(diào)用 flask_apscheduler 的 api (推薦)
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
# 調(diào)度器在后臺線程中運行,不會阻塞當前線程
scheduler = BackgroundScheduler()
def job1(a, b): # 運行的定時任務的函數(shù)
print(str(a) + ' ' + str(b))
scheduler.add_job(func=job1, args=("1","2"),id="job_1", trigger="interval", seconds=5, replace_existing=False)
'''
func:定時任務執(zhí)行的函數(shù)名稱。
args:任務執(zhí)行函數(shù)的位置參數(shù),若無參數(shù)可不填
id:任務id,唯一標識,修改,刪除均以任務id作為標識
trigger:觸發(fā)器類型,參數(shù)可選:date、interval、cron
replace_existing:將任務持久化至數(shù)據(jù)庫中時,此參數(shù)必須添加,值為True。并且id值必須有。不然當程序重新啟動時,任務會被重復添加。
'''
if __name__ == '__main__':
app = Flask(__name__) # 實例化flask
scheduler.start() # 啟動任務列表
app.debug=True
app.run(host='0.0.0.0',port= 8000) # 啟動 flask
實例對象 scheduler 擁有增、刪、改、查等基本用法:
新增任務:add_job()
編輯任務:modify_job()
刪除任務:remove_job(id)(刪除所有任務:remove_all_jobs())
查詢?nèi)蝿眨篻et_job(id)(查詢所有任務:get_jobs())
暫停任務:pause_job(id)
恢復任務:resume_job(id)
運行任務:run_job(id)(立即運行,無視任務設置的時間規(guī)則)
原文鏈接:https://blog.csdn.net/weixin_39589455/article/details/133712526
- 上一篇:沒有了
- 下一篇:沒有了
相關(guān)推薦
- 2022-10-03 C++內(nèi)存泄漏的檢測與實現(xiàn)詳細流程_C 語言
- 2022-05-20 ElasticSearch 7.X系列之:細節(jié)問題
- 2022-08-19 Python包中__init__.py文件的作用與用法實例詳解_python
- 2022-05-01 教你利用python如何讀取txt中的數(shù)據(jù)_python
- 2022-04-14 Python之OptionParser模塊使用詳解_python
- 2021-12-15 C/C++?Qt?數(shù)據(jù)庫與Chart歷史數(shù)據(jù)展示_C 語言
- 2022-07-07 在Kubernetes集群中搭建Istio微服務網(wǎng)格的過程詳解_云其它
- 2023-01-27 Python異常與錯誤處理詳細講解_python
- 欄目分類
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學習環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支