網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
背景
由于最近工作需求,需要在已有項(xiàng)目添加一個(gè)新功能,實(shí)現(xiàn)配置熱加載的功能。所謂的配置熱加載,也就是說(shuō)當(dāng)服務(wù)收到配置更新消息之后,我們不用重啟服務(wù)就可以使用最新的配置去執(zhí)行任務(wù)。
如何實(shí)現(xiàn)
下面我分別采用多進(jìn)程、多線程、協(xié)程的方式去實(shí)現(xiàn)配置熱加載。
使用多進(jìn)程實(shí)現(xiàn)配置熱加載
如果我們代碼實(shí)現(xiàn)上使用多進(jìn)程, 主進(jìn)程1來(lái)更新配置并發(fā)送指令,任務(wù)的調(diào)用是進(jìn)程2,如何實(shí)現(xiàn)配置熱加載呢?
使用signal信號(hào)量來(lái)實(shí)現(xiàn)熱加載
當(dāng)主進(jìn)程收到配置更新的消息之后(配置讀取是如何收到配置更新的消息的? 這里我們暫不討論), 主進(jìn)程就向進(jìn)子程1發(fā)送kill信號(hào),子進(jìn)程1收到kill的信號(hào)就退出,之后由信號(hào)處理函數(shù)來(lái)啟動(dòng)一個(gè)新的進(jìn)程,使用最新的配置文件來(lái)繼續(xù)執(zhí)行任務(wù)。
main?函數(shù)
def main(): # 啟動(dòng)一個(gè)進(jìn)程執(zhí)行任務(wù) p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) # 注冊(cè)信號(hào) processes["case100"] = p1 #將進(jìn)程pid保存 num = 0 while True: # 模擬獲取配置更新 print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") sleep(2) if num == 4: kill_process(processes["case100"]) # kill 當(dāng)前進(jìn)程 if num == 8: kill_process(processes["case100"]) # kill 當(dāng)前進(jìn)程 if num == 12: kill_process(processes["case100"]) # kill 當(dāng)前進(jìn)程 num += 1
signal_handler?函數(shù)
def signal_handler(process: Process, func, signum, frame): # print(f"{signum=}") global counts if signum == 17: # 17 is SIGCHILD # 這個(gè)循環(huán)是為了忽略SIGTERM發(fā)出的信號(hào),避免搶占了主進(jìn)程發(fā)出的SIGCHILD for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process")
完整代碼如下
#! /usr/local/bin/python3.8 from multiprocessing import Process from typing import Dict import signal from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN import multiprocessing from multiprocessing import Process from typing import Callable from data import processes import sys from functools import partial import time processes: Dict[str, Process] = {} counts = 2 def run(process: Process): while True: print(f"{process} running...") time.sleep(1) def kill_process(process: Process): print(f"kill {process}") process.terminate() def monitor(process: Process, func: Callable): for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]: # SIGTERM is kill signal. # No SIGCHILD is not trigger singnal_handler, # No SIGINT is not handler ctrl+c, # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'> signal.signal(signame, partial(signal_handler, process, func)) def signal_handler(process: Process, func, signum, frame): print(f"{signum=}") global counts if signum == 17: # 17 is SIGTERM for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process") def main(): p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) processes["case100"] = p1 num = 0 while True: print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") time.sleep(2) if num == 4: kill_process(processes["case100"]) if num == 8: kill_process(processes["case100"]) if num == 12: kill_process(processes["case100"]) num += 1 if __name__ == '__main__': main()
執(zhí)行結(jié)果如下
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} p1 running... p1 running... kill <Process name='Process-1' pid=2533 parent=2532 started> multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} signum=17 Launch a new process p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... kill <Process name='Process-2' pid=2577 parent=2532 started> signum=17 Launch a new process multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1 processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>} p3 running... p3 running... multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1
總結(jié)
好處:使用信號(hào)量可以處理多進(jìn)程之間通信的問(wèn)題。
壞處:代碼不好寫(xiě),寫(xiě)出來(lái)代碼不好理解。信號(hào)量使用必須要很熟悉,不然很容易自己給自己寫(xiě)了一個(gè)bug.(所有初學(xué)者慎用,老司機(jī)除外。)
還有一點(diǎn)不是特別理解的就是process.terminate()
?發(fā)送出信號(hào)是SIGTERM
?number是15,但是第一次signal_handler
收到信號(hào)卻是number=17,如果我要去處理15的信號(hào),就會(huì)導(dǎo)致前一個(gè)進(jìn)程不能kill掉的問(wèn)題。歡迎有對(duì)信號(hào)量比較熟悉的大佬,前來(lái)指點(diǎn)迷津,不甚感謝。
采用multiprocessing.Event?來(lái)實(shí)現(xiàn)配置熱加載
實(shí)現(xiàn)邏輯是主進(jìn)程1 更新配置并發(fā)送指令。進(jìn)程2啟動(dòng)調(diào)度任務(wù)。
這時(shí)候當(dāng)主進(jìn)程1更新好配置之后,發(fā)送指令給進(jìn)程2,這時(shí)候的指令就是用Event一個(gè)異步事件通知。
直接上代碼
scheduler?函數(shù)
def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() # 設(shè)置set之后, is_set 為T(mén)rue print(f"Schedule will start ...") while task_schedule_event.is_set(): # is_set 為T(mén)rue的話(huà),那么任務(wù)就會(huì)一直執(zhí)行 run(case_configurations) print("Clearing all scheduling job ...")
event_scheduler?函數(shù)
def event_scheduler(case_config): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() # clear之后,is_set 為False print(f"Clear scheduler jobs ...") print(f"Schedule job ...")
完整代碼如下
import multiprocessing import time scheduler_notify_queue = multiprocessing.Queue() task_schedule_event = multiprocessing.Event() def run(case_configurations: str): print(f'{case_configurations} running...') time.sleep(3) def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() print(f"Schedule will start ...") while task_schedule_event.is_set(): run(case_configurations) print("Clearing all scheduling job ...") def event_scheduler(case_config: str): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() print(f"Clear scheduler jobs ...") print(f"Schedule job ...") def main(): scheduler_notify_queue.put('1') p = multiprocessing.Process(target=scheduler) p.start() count = 1 print(f'{count=}') while True: if count == 5: event_scheduler('100') if count == 10: event_scheduler('200') count += 1 time.sleep(1) if __name__ == '__main__': main()
執(zhí)行結(jié)果如下
wait message... Got case configurations case_configurations='1'... Schedule will start ... 1 running... 1 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='100'... Schedule will start ... 100 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='200'... Schedule will start ... 200 running... 200 running...
總結(jié)
使用Event事件通知,代碼不易出錯(cuò),代碼編寫(xiě)少,易讀。相比之前信號(hào)量的方法,推薦大家多使用這種方式。
使用多線程或協(xié)程的方式,其實(shí)和上述實(shí)現(xiàn)方式一致。唯一區(qū)別就是調(diào)用了不同庫(kù)中,queue
和?event
.
# threading scheduler_notify_queue = queue.Queue() task_schedule_event = threading.Event() # async scheduler_notify_queue = asyncio.Queue() task_schedule_event = asyncio.Event()
結(jié)語(yǔ)
具體的實(shí)現(xiàn)的方式有很多,也各自有各自的優(yōu)劣勢(shì)。我們需要去深刻理解到需求本身,才去做技術(shù)選型。
原文鏈接:https://www.cnblogs.com/aaron-948/p/16459059.html
相關(guān)推薦
- 2023-11-11 Jetson nano 安裝swapfile 解決Cannot allocate memory 問(wèn)題
- 2023-03-13 Pandas篩選某列過(guò)濾的方法_python
- 2022-09-30 Python使用draw類(lèi)繪制圖形示例講解_python
- 2023-11-11 Flask 表單form.validate_on_submit()什么情況下會(huì)是false——解決辦
- 2022-12-27 刪除Helm使用時(shí)關(guān)于kubernetes文件的警告問(wèn)題_云其它
- 2022-12-10 Python直接賦值與淺拷貝和深拷貝實(shí)例講解使用_python
- 2022-05-20 ElasticSearch 7.X系列之:細(xì)節(jié)問(wèn)題
- 2022-05-31 Python學(xué)習(xí)之日志模塊詳解_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門(mén)
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支