網站首頁 編程語言 正文
前言
schedule是一個第三方輕量級的任務調度模塊,可以按照秒,分,小時,日期或者自定義事件執行時間。
如果想執行多個任務,也可以添加多個task。
首先安裝schedule庫:
pip install schedule
1、按時間間隔執行定時任務
示例代碼1:
import schedule
from datetime import datetime
def task():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts)
def task2():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + '666!')
def func():
# 清空任務
schedule.clear()
# 創建一個按3秒間隔執行任務
schedule.every(3).seconds.do(task)
# 創建一個按2秒間隔執行任務
schedule.every(2).seconds.do(task2)
while True:
schedule.run_pending()
func()
運行結果:
示例代碼2:
import schedule
import time
def job(name):
print("her name is : ", name)
name = "張三"
schedule.every(10).minutes.do(job, name)
schedule.every().hour.do(job, name)
schedule.every().day.at("10:30").do(job, name)
schedule.every(5).to(10).days.do(job, name)
schedule.every().monday.do(job, name)
schedule.every().wednesday.at("13:15").do(job, name)
while True:
schedule.run_pending()
time.sleep(1)
參數解釋:
- 每隔十分鐘執行一次任務
- 每隔一小時執行一次任務
- 每天的10:30執行一次任務
- 每隔5到10天執行一次任務?
- 每周一的這個時候執行一次任務
- 每周三13:15執行一次任務
- run_pending:運行所有可以運行的任務
?注意:schedule方法是串行的,也就是說,如果各個任務之間時間不沖突,那是沒問題的;如果時間有沖突的話,會串行的執行命令。
2、裝飾器:通過 @repeat() 裝飾靜態方法
示例代碼:
from datetime import datetime
from schedule import every, repeat, run_pending
@repeat(every(3).seconds)
def task():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + '-333!')
@repeat(every(5).seconds)
def task2():
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + "-555555!")
while True:
run_pending()
運行結果:
3、傳遞參數
示例代碼:
from datetime import datetime
import schedule
def task(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
def task2(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
schedule.every(3).seconds.do(task, s='-333')
schedule.every(5).seconds.do(task, s='-555')
while True:
schedule.run_pending()
運行結果:
4、使用裝飾器傳遞參數
示例代碼:
from datetime import datetime
from schedule import every, repeat, run_pending
@repeat(every(3).seconds, '-333')
def task(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
@repeat(every(5).seconds, '-555')
def task2(s):
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
print(ts + s)
while True:
run_pending()
運行結果:
5、取消定時任務
示例代碼:?
import schedule
i = 0
def some_task():
global i
i += 1
print(i)
if i == 5:
schedule.cancel_job(job)
print('cancel job')
exit(0)
job = schedule.every().second.do(some_task)
while True:
schedule.run_pending()
運行結果:
6、在指定時間執行一次任務
示例代碼:
import time
import schedule
def job_that_executes_once():
print('Hello')
return schedule.CancelJob
schedule.every().minute.at(':30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
運行結果:
7、根據標簽檢索任務
示例代碼:
# 檢索所有任務:schedule.get_jobs()
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
運行結果:
8、根據標簽取消任務
示例代碼:
# 取消所有任務:schedule.clear()
import schedule
def greet(name):
print('Hello {}'.format(name))
if name == 'Cancel':
schedule.clear('second-tasks')
print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
schedule.run_pending()
運行結果:
9、運行任務到某時間
示例代碼:
import schedule
from datetime import datetime, timedelta, time
def job():
print('working...')
schedule.every().second.until('23:59').do(job) # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job) # 8小時后停止
schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止
while True:
schedule.run_pending()
運行結果:
10、馬上運行所有任務(主要用于測試)
示例代碼:
import schedule
def job():
print('working...')
def job1():
print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3) # 任務間延遲3秒
運行結果:
11、并行運行:使用 Python 內置隊列實現
示例代碼:?
import threading
import time
import schedule
def job1():
print("I'm running on thread %s" % threading.current_thread())
def job2():
print("I'm running on thread %s" % threading.current_thread())
def job3():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
schedule.run_pending()
time.sleep(1)
運行結果:
總結
原文鏈接:https://blog.csdn.net/weixin_44799217/article/details/127352957
相關推薦
- 2022-09-14 Python深入分析@property裝飾器的應用_python
- 2023-10-14 C/C++實現操作系統進程調度算法,FCFS, RR, SPN, SRT, HRRN
- 2022-05-10 torch.cuda.is_available()返回false最終解決方案
- 2022-12-13 python字典如何獲取最大和最小value對應的key_python
- 2023-02-12 Python利用物理引擎Pymunk編寫一個解壓小游戲_python
- 2022-07-16 Electron項目中的NSIS配置項
- 2023-05-29 postgresql數據庫配置文件postgresql.conf,pg_hba.conf,pg_id
- 2022-04-19 Windows中Python上傳文件到Liunx下的fastdfs
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支