日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Python?flask框架定時任務apscheduler應用介紹_python

作者:HHYZBC ? 更新時間: 2022-11-22 編程語言

flask-apschedulerapscheduler移植到了flask應用中,使得在flask中可以非常方便的使用定時任務了,除此之外,它還有如下幾個特性

  • 根據Flask配置加載調度器配置
  • 根據Flask配置加載任務調度器
  • 允許指定服務器運行任務
  • 提供RESTful API管理任務,也就是遠程管理任務
  • RESTful API提供認證

下載安裝

pip install flask-apscheduler

基本使用

flask-apscheduler的相關配置,我們會將它和其它擴展一起,放在應用的配置里

class Config(object):
    // 配置項
    JOBS = [
        {
            'id': 'job1',
            'func': 'run:add',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 3
        }
    ]
    SCHEDULER_API_ENABLED = True
def add(a, b):
    print(a+b)

JOBS列表的每一個元素表示一個定時任務,列子中只有一個interval任務,表示每隔3秒運行一次函數add。func指定調用的函數,args表示傳入函數的參數,trigger表示啟動方式,常用的有兩種,分別是trigger和cron。

上邊我們設置了SCHEDULER_API_ENABLED = True,可以通過訪問http://127.0.0.1:5000/scheduler,其中scheduler是默認的RESTful API前綴

通過查看源碼,可以發現flask-apscheduler提供了以下的接口

def _load_api(self):
    """
    Add the routes for the scheduler API.
    """
    self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
    self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
    self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
    self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
    self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
    self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
    self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
    self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
    self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')

如果需要查看當前運行的所有定時任務,則請求http://127.0.0.1:5000/scheduler/jobs即可。

trigger啟動方式

trigger表示間隔啟動,在trigger方式中,使用seconds配置間隔多久啟動一次,單位是秒。

cron啟動方式

cron表示定時啟動

class Config(object):
    JOBS = [
        {
            'id': 'job1',
            'func': 'scheduler:task',
            'args': (1, 2),
            'trigger': 'cron',
            'day': '*',
            'hour': '13',
            'minute': '16',
            'second': '20'
        }
    ]
    SCHEDULER_API_ENABLED = True
def task(a, b):
    print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))

該配置項則表示每天的13點16分20秒啟動一次。*表示全部。

有關常用的cron配置有:

day

  • 表示天

hour

  • 表示小時

minute

  • 表示分鐘

second

  • 表示秒

week

day_of_week

  • 星期幾,如星期天使用sun,星期五使用fri,其他的類似。

使用裝飾器定時啟動任務

from flask import Flask
from flask_apscheduler import APScheduler
import datetime
class Config(object):
    SCHEDULER_API_ENABLED = True
scheduler = APScheduler()
# interval examples
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
    print(str(datetime.datetime.now()) + ' Job 1 executed')

表示每隔30秒調用一次job1函數。

原文鏈接:https://blog.csdn.net/HHYZBC/article/details/127191429

欄目分類
最近更新