日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達者為師

網(wǎng)站首頁 編程語言 正文

python中進程間通信及設(shè)置狀態(tài)量控制另一個進程_python

作者:PT、小小馬 ? 更新時間: 2022-07-15 編程語言

一、python中進程間通信

業(yè)務(wù)場景:在當(dāng)前遇到的業(yè)務(wù)場景中,我們需要啟一個間隔任務(wù),這個間隔任務(wù)跑一個算法,然后把算法的結(jié)果進行一些處理,并入庫。任務(wù)目前間隔是一小時,算法運行時間要50多分鐘,留給結(jié)果處理的時間并不多,所以有可能會出現(xiàn)超時。目前來說,優(yōu)化方向在算法上會更為合理,因為結(jié)果處理本來就不用很多時間。但是在這個業(yè)務(wù)場景下,想要把結(jié)果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現(xiàn)的,說是壓縮為0,實際上就是在算法執(zhí)行完成后,再啟一個進程去處理,這樣就不會由于需要進行數(shù)據(jù)處理而影響到算法的運行,將算法和結(jié)果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結(jié)果處理作為一個周期,而現(xiàn)在是把算法運行和結(jié)果處理分為兩個周期去處理。

技術(shù)實現(xiàn)方案:

啟動二個進程,其中一個運行算法,在算法運行結(jié)束后,發(fā)送一個狀態(tài)值到另外一個進程,另外一個進程在收到狀態(tài)量后啟動數(shù)據(jù)處理即可。兩個進程間互不影響即可。其實也相當(dāng)于算法進程控制數(shù)據(jù)處理進程

測試場景構(gòu)造代碼:

from multiprocessing import Process,Pipe
import time
import sys
import os
def send_message(conn):
? ? for i in range(1000):
? ? ? ? print('send_message:%d'%i)
? ? ? ? print(os.getpid())
? ? ? ? conn.send(i)
? ? ? ? time.sleep(3)
def send_message1(conn):
? ? # for i in range(1000):
? ? print(conn.recv())
? ? while True:
? ? ? ? if conn.recv() % 5 == 0:
? ? ? ? ? ? print(' today is nice day')
? ? ? ? time.sleep(1)
if __name__ == '__main__':
? ? ? ? #創(chuàng)建一個進程通信管道
? ? left,right = Pipe()
? ? t1 = Process(target=send_message,args=(left,))
? ? t2 = Process(target=send_message1,args=(right,))
? ? t1.start()
? ? t2.start()

在這個案例場景下有一些需要注意的點:

  • 一、time.sleep()的問題,睡眠指定時間,總是會出錯,具體的出錯原因到現(xiàn)在也沒有找到,這是原來出現(xiàn)的問題,在這里沒有做長時間的測試,所以不一定會出現(xiàn),但是還是要注意
  • 二、代碼實現(xiàn)中與上述的描述差異有一些,如未啟用調(diào)度任務(wù),只是啟了一個間隔運行的任務(wù)。
  • 三、數(shù)據(jù)處理進程一直處理空跑狀態(tài),會造成資源的浪費(更合理的應(yīng)該是形成阻塞狀態(tài),但是對于阻塞狀態(tài)的構(gòu)造缺乏認知,所以先犧牲資源
  • 四、在上述描述的需求中,在算法運行及數(shù)據(jù)處理的上一節(jié)點還有一個調(diào)度任務(wù)在控制,這里未做出體現(xiàn),其實應(yīng)該把定時任務(wù)和數(shù)據(jù)處理作為兩個周期獨立出來才更符合上述描述中的需求。

二、設(shè)置狀態(tài)量控制另一個進程

?業(yè)務(wù)場景:在當(dāng)前遇到的業(yè)務(wù)場景中,我們需要啟一個間隔任務(wù),這個間隔任務(wù)跑一個算法,然后把算法的結(jié)果進行一些處理,并入庫。任務(wù)目前間隔是一小時,算法運行時間要50多分鐘,留給結(jié)果處理的時間并不多,所以有可能會出現(xiàn)超時。目前來說,優(yōu)化方向在算法上會更為合理,因為結(jié)果處理本來就不用很多時間。但是在這個業(yè)務(wù)場景下,想要把結(jié)果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現(xiàn)的,說是壓縮為0,實際上就是在算法執(zhí)行完成后,再啟一個進程去處理,這樣就不會由于需要進行數(shù)據(jù)處理而影響到算法的運行,將算法和結(jié)果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結(jié)果處理作為一個周期,而現(xiàn)在是把算法運行和結(jié)果處理分為兩個周期去處理。

上面的解決方案中只涉及到了啟用兩個進程去運行兩個任務(wù),并未涉及到啟用定時任務(wù)框架,所以可能會顯得和上述的業(yè)務(wù)場景不一致,所以在這里重新解決一下。上面也是沒有問題的,只是把定時任務(wù)框架也作為一個任務(wù)去處理即可。然后在定時任務(wù)運行完程后,向另外一個進程傳入一個參數(shù),作為啟動另一個進程的狀態(tài)量即可。當(dāng)然,在這里,兩個進程還是完全占滿的,即處理阻塞狀態(tài)。對于資源的利用還是沒有完全達到最好。后續(xù)再考慮使用進程池的方式,看是否可以讓其中的一個進程運行完后直接釋放資源。

技術(shù)解決方案如下:

from multiprocessing import Process,Pipe
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# schedule = BackgroundScheduler()
schedule = BlockingScheduler(timezone="Asia/Shanghai")
# schedule = AsyncIOScheduler(timezone="Asia/Shanghai")
def algorithm(conn):
? ? print('start_run')
? ? conn.send('please run')
? ? # time.sleep(5)
def worth_result(conn):
? ? while True:
? ? ? ? if conn.recv() == 'please run':
? ? ? ? ? ? print(conn.recv() + ' very nice!')
def time_job(conns):
? ? schedule.add_job(func=algorithm,trigger='interval',seconds=5,args=(conns,))
? ? schedule.start()
if __name__ == '__main__':
? ? left,right = Pipe()
? ? t1 = Process(target=time_job,args=(left,))
? ? t2 = Process(target=worth_result,args=(right,))
? ? t1.start()
? ? t2.start()

在這里還有一些點需要說明,定時任務(wù)選擇那一種類型其實都沒有關(guān)系,阻塞和非阻塞其實沒有關(guān)系,因為我們在這里是直接啟了兩個進程,每個進程間是相互獨立的,并非是在定時任務(wù)下啟用的兩個進程,所以不會影響的。

關(guān)于這個解決方案還有的問題:

  • 一、上述所說,兩個進程是占滿的,所以對于資源來說,兩個進程的利用率一直很高
  • 二、擴展性不足,如果在這個程序中還有其他需要處理的過程,就需要再添加進程,或者把他添加到當(dāng)前的進程之下,代碼重構(gòu)會比較麻煩一些
  • 三、整個任務(wù)的控制不足,需要加以完善。比如對于運行狀態(tài)一些控制及查看,一般程序如果運行時間較長的話,我們應(yīng)該添加這樣的接口,否則啟動后如果沒有出結(jié)果,我們是不知道其運行狀態(tài),有一點被動
  • 四、關(guān)于三,使用logging庫,應(yīng)該是可以直接去輸出其日志,但是日志庫作為第三方庫,相當(dāng)于是對整個運行狀態(tài)進行監(jiān)控,會不會再占用一個進程,這個需要去測試
  • 五、完備性及容災(zāi)處理,如果程序由于資源等其他問題掛掉后,會有一些數(shù)據(jù)冗余下來,也就是一些算法未進行處理,這個時候需要考慮怎么樣去補數(shù)據(jù)?原始文件如果沒有保留下來呢?而且如果這些數(shù)據(jù)是極重要的數(shù)據(jù)該怎么處理?如果程序掛掉后,應(yīng)該如何快速的去處理呢?直接重啟嗎?
  • 六、如果數(shù)據(jù)處理的進程所用的時間比算法還多,那該怎么辦?目前的業(yè)務(wù)來看,是遠低于的,但是如果是遠高于呢?可否將處理工作進行分配,利用多臺機器來處理,然后再把結(jié)果合并起來?

分布式處理的思想越來越濃。

原文鏈接:https://blog.csdn.net/qq_44862918/article/details/124842372

欄目分類
最近更新