網站首頁 編程語言 正文
前言
之前有介紹了用Linux crontab的方式來實現定時任務,這是使用Linux內置模塊來實現的。而在Python中,還可以用第三方包來管理定時任務,比如celery、apscheduler。相對來說apscheduler使用起來更簡單一些,這里來介紹一下apscheduler的使用方法。
首先安裝起來很簡單,運行pip install apscheduler
即可。
初識apscheduler
來個簡單的例子看看apscheduler是如何使用的。
#encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('時間:{}, 測試apscheduler'.format(now)) task = BlockingScheduler() task.add_job(func=sch_test, trigger='cron', second='*/10') task.start()
上述例子很簡單,我們首先要定義一個apscheduler的對象,然后add_job添加任務,最后start開啟任務就行了。
例子是每隔10秒運行一次sch_test任務,運行結果如下:
時間:2022-10-08 15:16:30, 測試apscheduler
時間:2022-10-08 15:16:40, 測試apscheduler
時間:2022-10-08 15:16:50, 測試apscheduler
時間:2022-10-08 15:17:00, 測試apscheduler
如果我們要在執行任務函數時攜帶參數,只要在add_job函數中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')
。
apscheduler有哪些模塊
上面例子中我們初步了解到如何使用apschedulerl了,接下來需要知道apscheduler的設計框架。apscheduler有四個主要模塊,分別是:觸發器triggers、任務存儲器job_stores、執行器executors、調度器schedulers。
1. 觸發器triggers:
觸發器指的是任務指定的觸發方式,例子中我們用的是“cron”方式。我們可以選擇cron、date、interval中的一個。
1.cron表示的是定時任務,類似linux crontab,在指定的時間觸發。
可用參數如下:
參數 | 釋義 |
---|---|
year | 年份(4位數,如2022) |
month | 月份(1-12) |
day | 一個月的第幾天(1-31) |
week | 一年的第幾周(1-53) |
day_of_week | 一星期的第幾天(0-6) |
hour | 小時 |
minute | 分鐘 |
second | 秒 |
start_date | 開始時間 |
end_date | 結束時間 |
timezone | 時區 |
jitter | 觸發的誤差時間 |
除此之外,我們還可用表達式類型去設置cron。比如常用的有:
表達式 | 釋義 |
---|---|
* | 每個值都觸發 |
*/n | 每隔n觸發一次 |
a-b | 在a-b內任何時間都觸發 |
a,b,c | 分別在a,b,c時間觸發 |
使用方法示例,在每天7點20分執行一次:
task.add_job(func=sch_test, args=('定時任務',), trigger='cron',
hour='7', minute='20')
2.date表示具體到某個時間的一次性任務;
使用方法示例:
# 使用run_date指定運行時間 task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30)) # 或者用next_run_time task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
3.interval表示的是循環任務,指定一個間隔時間,每過間隔時間執行一次。
interval可設置如下的參數:
參數 | 釋義 |
---|---|
weeks | 周 |
days | 一個月的第幾天 |
hours | 小時 |
minutes | 分鐘 |
seconds | 秒 |
start_date | 間隔觸發的開始時間 |
end_date | 間隔觸發的結束時間 |
jitter | 觸發的時間誤差 |
使用方法示例,每隔3秒執行一次sch_test任務:
task.add_job(func=sch_test, args=('循環任務',), trigger='interval', seconds=3)
。
來個例子把3種觸發器都使用一遍:
# encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('時間:{}, {}測試apscheduler'.format(now, job_type)) task = BlockingScheduler() task.add_job(func=sch_test, args=('一次性任務',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3)) task.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5') task.add_job(func=sch_test, args=('循環任務',), trigger='interval', seconds=3) task.start()
打印部分結果:
時間:2022-10-08 15:45:49, 一次性任務測試apscheduler
時間:2022-10-08 15:45:49, 循環任務測試apscheduler
時間:2022-10-08 15:45:50, 定時任務測試apscheduler
時間:2022-10-08 15:45:52, 循環任務測試apscheduler
時間:2022-10-08 15:45:55, 定時任務測試apscheduler
時間:2022-10-08 15:45:55, 循環任務測試apscheduler
時間:2022-10-08 15:45:58, 循環任務測試apscheduler
通過代碼示例和結果展示,我們可清晰的知道不同觸發器的使用區別。
2. 任務存儲器job_stores
顧名思義,任務存儲器是存儲任務的地方,默認都是存儲在內存中。我們也可自定義存儲方式,比如將任務存到mysql中。這里有以下幾種選擇:
存儲器類型 | 釋義 |
---|---|
MemoryJobStore | 任務存儲在內存中 |
SQLAlchemyJobStore | 使用sqlalchemy作為存儲方式,存儲在數據庫 |
MongoDBJobStore | 存儲在mongodb中 |
RedisJobStore | 存儲在redis中 |
通常默認存儲在內存即可,但若程序故障重啟的話,會重新拉取任務運行了,如果你對任務的執行要求高,那么可以選擇其他的存儲器。
使用SQLAlchemyJobStore存儲器示例:
from apscheduler.schedulers.blocking import BlockingScheduler def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('時間:{}, {}測試apscheduler'.format(now, job_type)) sched = BlockingScheduler() # 使用mysql存儲任務 sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任務 sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5') sched.start()
3. 執行器executors
執行器的功能就是將任務放到線程池或進程池中運行。有以下幾種選擇:
執行器類型 | 釋義 |
---|---|
ThreadPoolExecutor | 線程池執行器 |
ProcessPoolExecutor | 進程池執行器 |
GeventExecutor | Gevent 程序執行器 |
TornadoExecutor | Tornado 程序執行器 |
TwistedExecutor | Twisted 程序執行器 |
AsyncIOExecutor | asyncio 程序執行器 |
默認是ThreadPoolExecutor, 常用的也就是第線程和進程池執行器。如果應用是CPU密集型操作,可用ProcessPoolExecutor來執行。
4. 調度器schedulers
調度器屬于apscheduler的核心,它扮演著統籌整個apscheduler系統的角色,存儲器、執行器、觸發器在它的調度下正常運行。調度器有以下幾個:
調度器 | 使用場景 |
---|---|
BlockingScheduler | 當調度器是你應用中唯一要運行的,start開啟后會阻塞 |
BackgroundScheduler | 適用于調度程序在應用程序的后臺運行,start開啟后不會阻塞 |
AsyncIOScheduler | 當程序使用了asyncio的異步框架時使用。 |
GeventScheduler | 當程序用了Tornado的時候用 |
TwistedScheduler | 當程序用了Twisted的時候用 |
QtScheduler | 當應用是QT應用的時候用 |
不是特定場景下,我們最常用的是BlockingScheduler調度器。
異常監聽
定時任務在運行時,若出現錯誤,需要設置監聽機制,我們通常結合logging模塊記錄錯誤信息。
使用示例:
from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging # logging日志配置打印格式及保存位置 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='sche.log', filemode='a') def log_listen(event): if event.exception : print ( '任務出錯,報錯信息:{}'.format(event.exception)) else: print ( '任務正常運行...' ) def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('時間:{}, {}測試apscheduler'.format(now, job_type)) print(1/0) sched = BlockingScheduler() # 使用mysql存儲任務 sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任務 sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5') # 配置任務執行完成及錯誤時的監聽 sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志監聽 sched._logger = logging sched.start()
apscheduler的封裝使用
上面介紹了apscheduler框架的主要模塊,我們基本能掌握怎樣使用apscheduler了。下面就來封裝一下apscheduler吧,以后要用直接在這份代碼上修改就行了。
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging import logging.handlers import os import datetime class LoggerUtils(): def init_logger(self, logger_name): # 日志格式 formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') log_obj = logging.getLogger(logger_name) log_obj.setLevel(logging.INFO) # 設置log存儲位置 path = '/data/logs/' filename = '{}{}.log'.format(path, logger_name) if not os.path.exists(path): os.makedirs(path) # 設置日志按照時間分割 timeHandler = logging.handlers.TimedRotatingFileHandler( filename, when='D', # 按照什么維度切割, S:秒,M:分,H:小時,D:天,W:周 interval=1, # 多少天切割一次 backupCount=10 # 保留幾天 ) timeHandler.setLevel(logging.INFO) timeHandler.setFormatter(formatter) log_obj.addHandler(timeHandler) return log_obj class Scheduler(LoggerUtils): def __init__(self): # 執行器設置 executors = { 'default': ThreadPoolExecutor(10), # 設置一個名為“default”的ThreadPoolExecutor,其worker值為10 'processpool': ProcessPoolExecutor(5) # 設置一個名為“processpool”的ProcessPoolExecutor,其worker值為5 } self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors) # 存儲器設置 # 這里使用sqlalchemy存儲器,將任務存儲在mysql sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' self.scheduler.add_jobstore('sqlalchemy',url=sql_url) def log_listen(event): if event.exception: # 日志記錄 self.scheduler._logger.error(event.traceback) # 配置任務執行完成及錯誤時的監聽 self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志監聽 self.scheduler._logger = self.init_logger('sche_test') def add_job(self, *args, **kwargs): """添加任務""" self.scheduler.add_job(*args, **kwargs) def start(self): """開啟任務""" self.scheduler.start() # 測試任務 def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('時間:{}, {}測試apscheduler'.format(now, job_type)) print(1/0) # 添加任務,開啟任務 sched = Scheduler() # 添加任務 sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5') # 開啟任務 sched.start()
小結
這篇文章介紹了Python實現定時任務的又一利器apscheduler,通過簡單例子及apscheduler框架的主要模塊分解,我們可以根據實際需求配置好模塊信息,再結合logging模塊,我們可以實時監控到定時任務的運行情況。
原文鏈接:https://juejin.cn/post/7152318359625793544
相關推薦
- 2023-01-30 python第三方異步日志庫loguru簡介_python
- 2023-03-18 git?push?origin?HEAD:refs/for/master?的意思分析_其它綜合
- 2024-03-08 學習基于ssm框架前后端分離實現注冊登錄MD5加密的心得體會
- 2022-11-19 springboot整合使用云服務器上的Redis方法_Redis
- 2023-03-16 python中asyncore異步模塊的實現_python
- 2022-08-19 Python利用memory_profiler查看內存占用情況_python
- 2022-01-14 path.join()和path.resolve()區別
- 2022-01-21 Docker報錯:OCI runtime exec failed: exec failed: con
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支