網(wǎng)站首頁 編程語言 正文
一、python中進程間通信
業(yè)務(wù)場景:在當(dāng)前遇到的業(yè)務(wù)場景中,我們需要啟一個間隔任務(wù),這個間隔任務(wù)跑一個算法,然后把算法的結(jié)果進行一些處理,并入庫。任務(wù)目前間隔是一小時,算法運行時間要50多分鐘,留給結(jié)果處理的時間并不多,所以有可能會出現(xiàn)超時。目前來說,優(yōu)化方向在算法上會更為合理,因為結(jié)果處理本來就不用很多時間。但是在這個業(yè)務(wù)場景下,想要把結(jié)果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現(xiàn)的,說是壓縮為0,實際上就是在算法執(zhí)行完成后,再啟一個進程去處理,這樣就不會由于需要進行數(shù)據(jù)處理而影響到算法的運行,將算法和結(jié)果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結(jié)果處理作為一個周期,而現(xiàn)在是把算法運行和結(jié)果處理分為兩個周期去處理。
技術(shù)實現(xiàn)方案:
啟動二個進程,其中一個運行算法,在算法運行結(jié)束后,發(fā)送一個狀態(tài)值到另外一個進程,另外一個進程在收到狀態(tài)量后啟動數(shù)據(jù)處理即可。兩個進程間互不影響即可。其實也相當(dāng)于算法進程控制數(shù)據(jù)處理進程
測試場景構(gòu)造代碼:
from multiprocessing import Process,Pipe
import time
import sys
import os
def send_message(conn):
? ? for i in range(1000):
? ? ? ? print('send_message:%d'%i)
? ? ? ? print(os.getpid())
? ? ? ? conn.send(i)
? ? ? ? time.sleep(3)
def send_message1(conn):
? ? # for i in range(1000):
? ? print(conn.recv())
? ? while True:
? ? ? ? if conn.recv() % 5 == 0:
? ? ? ? ? ? print(' today is nice day')
? ? ? ? time.sleep(1)
if __name__ == '__main__':
? ? ? ? #創(chuàng)建一個進程通信管道
? ? left,right = Pipe()
? ? t1 = Process(target=send_message,args=(left,))
? ? t2 = Process(target=send_message1,args=(right,))
? ? t1.start()
? ? t2.start()
在這個案例場景下有一些需要注意的點:
- 一、time.sleep()的問題,睡眠指定時間,總是會出錯,具體的出錯原因到現(xiàn)在也沒有找到,這是原來出現(xiàn)的問題,在這里沒有做長時間的測試,所以不一定會出現(xiàn),但是還是要注意
- 二、代碼實現(xiàn)中與上述的描述差異有一些,如未啟用調(diào)度任務(wù),只是啟了一個間隔運行的任務(wù)。
- 三、數(shù)據(jù)處理進程一直處理空跑狀態(tài),會造成資源的浪費(更合理的應(yīng)該是形成阻塞狀態(tài),但是對于阻塞狀態(tài)的構(gòu)造缺乏認知,所以先犧牲資源
- 四、在上述描述的需求中,在算法運行及數(shù)據(jù)處理的上一節(jié)點還有一個調(diào)度任務(wù)在控制,這里未做出體現(xiàn),其實應(yīng)該把定時任務(wù)和數(shù)據(jù)處理作為兩個周期獨立出來才更符合上述描述中的需求。
二、設(shè)置狀態(tài)量控制另一個進程
?業(yè)務(wù)場景:在當(dāng)前遇到的業(yè)務(wù)場景中,我們需要啟一個間隔任務(wù),這個間隔任務(wù)跑一個算法,然后把算法的結(jié)果進行一些處理,并入庫。任務(wù)目前間隔是一小時,算法運行時間要50多分鐘,留給結(jié)果處理的時間并不多,所以有可能會出現(xiàn)超時。目前來說,優(yōu)化方向在算法上會更為合理,因為結(jié)果處理本來就不用很多時間。但是在這個業(yè)務(wù)場景下,想要把結(jié)果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現(xiàn)的,說是壓縮為0,實際上就是在算法執(zhí)行完成后,再啟一個進程去處理,這樣就不會由于需要進行數(shù)據(jù)處理而影響到算法的運行,將算法和結(jié)果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結(jié)果處理作為一個周期,而現(xiàn)在是把算法運行和結(jié)果處理分為兩個周期去處理。
上面的解決方案中只涉及到了啟用兩個進程去運行兩個任務(wù),并未涉及到啟用定時任務(wù)框架,所以可能會顯得和上述的業(yè)務(wù)場景不一致,所以在這里重新解決一下。上面也是沒有問題的,只是把定時任務(wù)框架也作為一個任務(wù)去處理即可。然后在定時任務(wù)運行完程后,向另外一個進程傳入一個參數(shù),作為啟動另一個進程的狀態(tài)量即可。當(dāng)然,在這里,兩個進程還是完全占滿的,即處理阻塞狀態(tài)。對于資源的利用還是沒有完全達到最好。后續(xù)再考慮使用進程池的方式,看是否可以讓其中的一個進程運行完后直接釋放資源。
技術(shù)解決方案如下:
from multiprocessing import Process,Pipe
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# schedule = BackgroundScheduler()
schedule = BlockingScheduler(timezone="Asia/Shanghai")
# schedule = AsyncIOScheduler(timezone="Asia/Shanghai")
def algorithm(conn):
? ? print('start_run')
? ? conn.send('please run')
? ? # time.sleep(5)
def worth_result(conn):
? ? while True:
? ? ? ? if conn.recv() == 'please run':
? ? ? ? ? ? print(conn.recv() + ' very nice!')
def time_job(conns):
? ? schedule.add_job(func=algorithm,trigger='interval',seconds=5,args=(conns,))
? ? schedule.start()
if __name__ == '__main__':
? ? left,right = Pipe()
? ? t1 = Process(target=time_job,args=(left,))
? ? t2 = Process(target=worth_result,args=(right,))
? ? t1.start()
? ? t2.start()
在這里還有一些點需要說明,定時任務(wù)選擇那一種類型其實都沒有關(guān)系,阻塞和非阻塞其實沒有關(guān)系,因為我們在這里是直接啟了兩個進程,每個進程間是相互獨立的,并非是在定時任務(wù)下啟用的兩個進程,所以不會影響的。
關(guān)于這個解決方案還有的問題:
- 一、上述所說,兩個進程是占滿的,所以對于資源來說,兩個進程的利用率一直很高
- 二、擴展性不足,如果在這個程序中還有其他需要處理的過程,就需要再添加進程,或者把他添加到當(dāng)前的進程之下,代碼重構(gòu)會比較麻煩一些
- 三、整個任務(wù)的控制不足,需要加以完善。比如對于運行狀態(tài)一些控制及查看,一般程序如果運行時間較長的話,我們應(yīng)該添加這樣的接口,否則啟動后如果沒有出結(jié)果,我們是不知道其運行狀態(tài),有一點被動
- 四、關(guān)于三,使用logging庫,應(yīng)該是可以直接去輸出其日志,但是日志庫作為第三方庫,相當(dāng)于是對整個運行狀態(tài)進行監(jiān)控,會不會再占用一個進程,這個需要去測試
- 五、完備性及容災(zāi)處理,如果程序由于資源等其他問題掛掉后,會有一些數(shù)據(jù)冗余下來,也就是一些算法未進行處理,這個時候需要考慮怎么樣去補數(shù)據(jù)?原始文件如果沒有保留下來呢?而且如果這些數(shù)據(jù)是極重要的數(shù)據(jù)該怎么處理?如果程序掛掉后,應(yīng)該如何快速的去處理呢?直接重啟嗎?
- 六、如果數(shù)據(jù)處理的進程所用的時間比算法還多,那該怎么辦?目前的業(yè)務(wù)來看,是遠低于的,但是如果是遠高于呢?可否將處理工作進行分配,利用多臺機器來處理,然后再把結(jié)果合并起來?
分布式處理的思想越來越濃。
原文鏈接:https://blog.csdn.net/qq_44862918/article/details/124842372
相關(guān)推薦
- 2022-08-17 Python連接數(shù)據(jù)庫并批量插入包含日期記錄的操作_python
- 2023-01-10 Go語言rune與字符串轉(zhuǎn)換的密切關(guān)系解析_Golang
- 2022-10-22 python?中的?super詳解_python
- 2022-03-20 Android實現(xiàn)桌面快捷方式實例代碼_Android
- 2022-08-21 使用?DataAnt?監(jiān)控?Apache?APISIX的原理解析_Linux
- 2022-08-22 詳解Go語言中for循環(huán),break和continue的使用_Golang
- 2022-12-25 Qt開發(fā)之QString類的使用教程詳解_C 語言
- 2022-03-12 .NET?MemoryCache如何清除全部緩存_C#教程
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支