日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

python實現超時退出的三種方式總結_python

作者:Noah1995 ? 更新時間: 2022-12-25 編程語言

基于signal模塊實現

signal包負責在Python程序內部處理信號,典型的操作包括預設信號處理函數,暫停并等待信號,以及定時發出SIGALRM等。

要注意,signal包主要是針對UNIX平臺(比如Linux, MAC OS),而Windows內核中由于對信號機制的支持不充分,所以在Windows上的Python不能發揮信號系統的功能。?

# coding:utf8
import time
import signal
?
?
# 自定義超時異常
class TimeoutError(Exception):
? ? def __init__(self, msg):
? ? ? ? super(TimeoutError, self).__init__()
? ? ? ? self.msg = msg
?
?
def time_out(interval, callback):
? ? def decorator(func):
? ? ? ? def handler(signum, frame):
? ? ? ? ? ? raise TimeoutError("run func timeout")
?
? ? ? ? def wrapper(*args, **kwargs):
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? signal.signal(signal.SIGALRM, handler)
? ? ? ? ? ? ? ? signal.alarm(interval) ? ? ? # interval秒后向進程發送SIGALRM信號
? ? ? ? ? ? ? ? result = func(*args, **kwargs)
? ? ? ? ? ? ? ? signal.alarm(0) ? ? ? ? ? ? ?# 函數在規定時間執行完后關閉alarm鬧鐘
? ? ? ? ? ? ? ? return result
? ? ? ? ? ? except TimeoutError, e:
? ? ? ? ? ? ? ? callback(e)
? ? ? ? return wrapper
? ? return decorator
?
?
def timeout_callback(e):
? ? print(e.msg)
?
?
@time_out(2, timeout_callback)
def task1():
? ? print("task1 start")
? ? time.sleep(3)
? ? print("task1 end")
?
?
@time_out(2, timeout_callback)
def task2():
? ? print("task2 start")
? ? time.sleep(1)
? ? print("task2 end")
?
?
if __name__ == "__main__":
? ? task1()
? ? task2()

輸出:

?task1 start
?run func timeout
?task2 start
?task2 end

基于子線程阻塞實現超時

# coding:utf8
import time
import threading
?
?
def callback_func():
? ? print('超時回調')
?
?
def time_out(interval, callback=None):
? ? def decorator(func):
? ? ? ? def wrapper(*args, **kwargs):
? ? ? ? ? ? t =threading.Thread(target=func, args=args, kwargs=kwargs)
? ? ? ? ? ? t.setDaemon(True) ?# 設置主線程技術子線程立刻結束
? ? ? ? ? ? t.start()
? ? ? ? ? ? t.join(interval) ?# 主線程阻塞等待interval秒
? ? ? ? ? ? if t.is_alive() and callback:
? ? ? ? ? ? ? ? return threading.Timer(0, callback).start() ?# 立即執行回調函數
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? return
? ? ? ? return wrapper
? ? return decorator
?
?
@time_out(2, callback_func)
def task3(hh):
? ? print('**********task3****************')
? ? for i in range(3):
? ? ? ? time.sleep(1)
? ? ? ? print(i)
? ? ? ? print(hh)
?
?
@time_out(2, callback_func)
def task4(hh):
? ? print('**********task4****************')
? ? for i in range(3):
? ? ? ? # time.sleep(1)
? ? ? ? print(i)
? ? ? ? print(hh)
?
?
if __name__ == '__main__':
? ? task3('參數')
? ? task4('參數')

輸出:

**********task3****************
0
參數
1
參數
超時回調
**********task4****************
0
參數
1
參數
2
參數

基于協程實現

def callback_func():
? ? print('callback')
?
?
def time_out(interval, callback=None):
? ? def decorator(func):
? ? ? ? def wrapper(*args, **kwargs):
? ? ? ? ? ? ########## 該部分必選在requests之前導入
? ? ? ? ? ? import gevent
? ? ? ? ? ? from gevent import monkey
? ? ? ? ? ? monkey.patch_all()
? ? ? ? ? ? ##########
? ? ? ? ? ??
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? gevent.with_timeout(interval, func, *args, **kwargs)
? ? ? ? ? ? except gevent.timeout.Timeout as e:
? ? ? ? ? ? ? ? callback() if callback else None
?
? ? ? ? return wrapper
?
? ? return decorator
?
?
@time_out(3, callback_func)
def func(a, b):
? ? import time
? ? time.sleep(2)
? ? print(a,b)
?
?
func(1, 2)

原文鏈接:https://blog.csdn.net/weixin_42368421/article/details/101354628

欄目分類
最近更新