日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

python定時任務schedule庫用法詳細講解_python

作者:IT之一小佬 ? 更新時間: 2023-02-27 編程語言

前言

schedule是一個第三方輕量級的任務調度模塊,可以按照秒,分,小時,日期或者自定義事件執行時間。

如果想執行多個任務,也可以添加多個task。

首先安裝schedule庫:

pip install schedule

1、按時間間隔執行定時任務

示例代碼1:

import schedule
from datetime import datetime
 
def task():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts)
 
def task2():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + '666!')
 
def func():
    # 清空任務
    schedule.clear()
    # 創建一個按3秒間隔執行任務
    schedule.every(3).seconds.do(task)
    # 創建一個按2秒間隔執行任務
    schedule.every(2).seconds.do(task2)
    while True:
        schedule.run_pending()
 
func()

運行結果:

示例代碼2:

import schedule
import time
 
def job(name):
    print("her name is : ", name)
 
name = "張三"
schedule.every(10).minutes.do(job, name)
schedule.every().hour.do(job, name)
schedule.every().day.at("10:30").do(job, name)
schedule.every(5).to(10).days.do(job, name)
schedule.every().monday.do(job, name)
schedule.every().wednesday.at("13:15").do(job, name)
 
while True:
    schedule.run_pending()
    time.sleep(1)

參數解釋:

  • 每隔十分鐘執行一次任務
  • 每隔一小時執行一次任務
  • 每天的10:30執行一次任務
  • 每隔5到10天執行一次任務?
  • 每周一的這個時候執行一次任務
  • 每周三13:15執行一次任務
  • run_pending:運行所有可以運行的任務

?注意:schedule方法是串行的,也就是說,如果各個任務之間時間不沖突,那是沒問題的;如果時間有沖突的話,會串行的執行命令。

2、裝飾器:通過 @repeat() 裝飾靜態方法

示例代碼:

from datetime import datetime
from schedule import every, repeat, run_pending
 
@repeat(every(3).seconds)
def task():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + '-333!')
 
@repeat(every(5).seconds)
def task2():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + "-555555!")
 
while True:
    run_pending()

運行結果:

3、傳遞參數

示例代碼:

from datetime import datetime
import schedule
 
def task(s):
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + s)
 
def task2(s):
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + s)
 
schedule.every(3).seconds.do(task, s='-333')
schedule.every(5).seconds.do(task, s='-555')
while True:
    schedule.run_pending()

運行結果:

4、使用裝飾器傳遞參數

示例代碼:

from datetime import datetime
from schedule import every, repeat, run_pending
 
@repeat(every(3).seconds, '-333')
def task(s):
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + s)
 
@repeat(every(5).seconds, '-555')
def task2(s):
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + s)
 
while True:
    run_pending()

運行結果:

5、取消定時任務

示例代碼:?

import schedule
 
i = 0
 
def some_task():
    global i
    i += 1
    print(i)
    if i == 5:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
 
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

運行結果:

6、在指定時間執行一次任務

示例代碼:

import time
import schedule
 
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
 
schedule.every().minute.at(':30').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

運行結果:

7、根據標簽檢索任務

示例代碼:

# 檢索所有任務:schedule.get_jobs()
import schedule
 
def greet(name):
    print('Hello {}'.format(name))
 
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)

運行結果:

8、根據標簽取消任務

示例代碼:

# 取消所有任務:schedule.clear()
import schedule
 
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
 
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
    schedule.run_pending()

運行結果:

9、運行任務到某時間

示例代碼:

import schedule
from datetime import datetime, timedelta, time
 
def job():
    print('working...')
 
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小時后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止
while True:
    schedule.run_pending()

運行結果:

10、馬上運行所有任務(主要用于測試)

示例代碼:

import schedule
 
def job():
    print('working...')
 
def job1():
    print('Hello...')
 
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任務間延遲3秒

運行結果:

11、并行運行:使用 Python 內置隊列實現

示例代碼:?

import threading
import time
import schedule
 
def job1():
    print("I'm running on thread %s" % threading.current_thread())
 
def job2():
    print("I'm running on thread %s" % threading.current_thread())
 
def job3():
    print("I'm running on thread %s" % threading.current_thread())
 
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
 
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

運行結果:

總結

原文鏈接:https://blog.csdn.net/weixin_44799217/article/details/127352957

欄目分類
最近更新