日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

python實(shí)現(xiàn)超時(shí)退出的三種方式總結(jié)_python

作者:Noah1995 ? 更新時(shí)間: 2022-12-25 編程語(yǔ)言

基于signal模塊實(shí)現(xiàn)

signal包負(fù)責(zé)在Python程序內(nèi)部處理信號(hào),典型的操作包括預(yù)設(shè)信號(hào)處理函數(shù),暫停并等待信號(hào),以及定時(shí)發(fā)出SIGALRM等。

要注意,signal包主要是針對(duì)UNIX平臺(tái)(比如Linux, MAC OS),而Windows內(nèi)核中由于對(duì)信號(hào)機(jī)制的支持不充分,所以在Windows上的Python不能發(fā)揮信號(hào)系統(tǒng)的功能。?

# coding:utf8
import time
import signal
?
?
# 自定義超時(shí)異常
class TimeoutError(Exception):
? ? def __init__(self, msg):
? ? ? ? super(TimeoutError, self).__init__()
? ? ? ? self.msg = msg
?
?
def time_out(interval, callback):
? ? def decorator(func):
? ? ? ? def handler(signum, frame):
? ? ? ? ? ? raise TimeoutError("run func timeout")
?
? ? ? ? def wrapper(*args, **kwargs):
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? signal.signal(signal.SIGALRM, handler)
? ? ? ? ? ? ? ? signal.alarm(interval) ? ? ? # interval秒后向進(jìn)程發(fā)送SIGALRM信號(hào)
? ? ? ? ? ? ? ? result = func(*args, **kwargs)
? ? ? ? ? ? ? ? signal.alarm(0) ? ? ? ? ? ? ?# 函數(shù)在規(guī)定時(shí)間執(zhí)行完后關(guān)閉alarm鬧鐘
? ? ? ? ? ? ? ? return result
? ? ? ? ? ? except TimeoutError, e:
? ? ? ? ? ? ? ? callback(e)
? ? ? ? return wrapper
? ? return decorator
?
?
def timeout_callback(e):
? ? print(e.msg)
?
?
@time_out(2, timeout_callback)
def task1():
? ? print("task1 start")
? ? time.sleep(3)
? ? print("task1 end")
?
?
@time_out(2, timeout_callback)
def task2():
? ? print("task2 start")
? ? time.sleep(1)
? ? print("task2 end")
?
?
if __name__ == "__main__":
? ? task1()
? ? task2()

輸出:

?task1 start
?run func timeout
?task2 start
?task2 end

基于子線程阻塞實(shí)現(xiàn)超時(shí)

# coding:utf8
import time
import threading
?
?
def callback_func():
? ? print('超時(shí)回調(diào)')
?
?
def time_out(interval, callback=None):
? ? def decorator(func):
? ? ? ? def wrapper(*args, **kwargs):
? ? ? ? ? ? t =threading.Thread(target=func, args=args, kwargs=kwargs)
? ? ? ? ? ? t.setDaemon(True) ?# 設(shè)置主線程技術(shù)子線程立刻結(jié)束
? ? ? ? ? ? t.start()
? ? ? ? ? ? t.join(interval) ?# 主線程阻塞等待interval秒
? ? ? ? ? ? if t.is_alive() and callback:
? ? ? ? ? ? ? ? return threading.Timer(0, callback).start() ?# 立即執(zhí)行回調(diào)函數(shù)
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? return
? ? ? ? return wrapper
? ? return decorator
?
?
@time_out(2, callback_func)
def task3(hh):
? ? print('**********task3****************')
? ? for i in range(3):
? ? ? ? time.sleep(1)
? ? ? ? print(i)
? ? ? ? print(hh)
?
?
@time_out(2, callback_func)
def task4(hh):
? ? print('**********task4****************')
? ? for i in range(3):
? ? ? ? # time.sleep(1)
? ? ? ? print(i)
? ? ? ? print(hh)
?
?
if __name__ == '__main__':
? ? task3('參數(shù)')
? ? task4('參數(shù)')

輸出:

**********task3****************
0
參數(shù)
1
參數(shù)
超時(shí)回調(diào)
**********task4****************
0
參數(shù)
1
參數(shù)
2
參數(shù)

基于協(xié)程實(shí)現(xiàn)

def callback_func():
? ? print('callback')
?
?
def time_out(interval, callback=None):
? ? def decorator(func):
? ? ? ? def wrapper(*args, **kwargs):
? ? ? ? ? ? ########## 該部分必選在requests之前導(dǎo)入
? ? ? ? ? ? import gevent
? ? ? ? ? ? from gevent import monkey
? ? ? ? ? ? monkey.patch_all()
? ? ? ? ? ? ##########
? ? ? ? ? ??
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? gevent.with_timeout(interval, func, *args, **kwargs)
? ? ? ? ? ? except gevent.timeout.Timeout as e:
? ? ? ? ? ? ? ? callback() if callback else None
?
? ? ? ? return wrapper
?
? ? return decorator
?
?
@time_out(3, callback_func)
def func(a, b):
? ? import time
? ? time.sleep(2)
? ? print(a,b)
?
?
func(1, 2)

原文鏈接:https://blog.csdn.net/weixin_42368421/article/details/101354628

欄目分類
最近更新