網站首頁 編程語言 正文
在《多線程與同步》中介紹了多線程及存在的問題,而通過使用多進程而非線程可有效地繞過全局解釋器鎖。 因此,通過multiprocessing模塊可充分地利用多核CPU的資源。
多進程
多進程是通過multiprocessing包來實現的,multiprocessing.Process對象(和多線程的threading.Thread類似)用來創建一個進程對象:
- 在類UNIX平臺上,需要對每個Process對象調用join()方法 (實際上等同于wait)避免其成為僵尸進程。
- multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式。
- 多進程應盡量避免共享資源。必要時可以通過共享內存和Manager的方法來共享資源。
僵尸進程
在unix或unix-like系統中,當一個子進程退出后,它就會變成一個僵尸進程,如果父進程沒有通過wait系統調用來讀取這個子進程的退出狀態的話,這個子進程就會一直維持僵尸進程狀態(占據部分系統資源,無法釋放)。
要清除僵尸進程,有:
結束父進程(一般是主進程):當父進程退出的時候僵尸進程也會被隨之清除。
讀取子進程退出狀態:如通過multiprocessing.Process產出的進程可以:
- 調用join()來等待子進程的方法來(內部會wait子進程);
- 在父進程中處理SIGCHLD信號:在處理程序中調用wait系統調用或者直接設置為SIG_IGN來清除僵尸進程;
把進程變成孤兒進程,這樣進程就會自動交由init進程來自動處理。
通過設定signal.signal(signal.SIGCHLD, signal.SIG_IGN)
或join進程可避免僵尸進程的產生
def zombieProc(): print("zombie running") time.sleep(5) print("zombie exit") if __name__ == '__main__': signal.signal(signal.SIGCHLD, signal.SIG_IGN) proc = multiprocessing.Process(target=zombieProc) proc.start() # proc.join() time.sleep(30)
Process類
Process([group [, target [, name [, args [, kwargs]]]]])
,實例化得到的對象,表示一個子進程任務:
- group參數未使用,值始終為None;
- target表示調用對象,即子進程要執行的任務;
- args表示調用對象的位置參數元組,args=(1, ‘test’, [‘one’]);
- kwargs表示調用對象的字典參數,kwargs={‘name’:‘mike’,‘age’:18};
- name為子進程的名稱;
Process類的屬性與方法:
- start():啟動進程,并執行其run方法;
- run():進程啟動時運行的方法,繼承Process類時必須要實現方法;
- terminate():強制終止進程,不會進行任何清理操作(若p創建了子進程,則子進程就成了僵尸進程);如進程還持有鎖等,那么也不會被釋放,進而導致死鎖;
- is_alive():返回進程是否在運行狀態;
- join([timeout]):等待進程終止;
- daemon:默認值為False,如果設為True,代表為守護進程(當父進程終止時,隨之終止;并且不能創建自己的新進程),必須在start()之前設置;
- name:進程的名稱;
- pid/ident:進程的pid;
- exitcode:進程在運行時為None、如果為–N,表示被信號N結束;
- authkey:進程的身份驗證碼(默認是由os.urandom()隨機生成的32字符的字符串),在涉及網絡連接的底層進程間通信時提供安全性;
也可通過os.getpid()
獲取進程的PID,os.getppid()
獲取父進程的PID。
函數方式
通過Process類直接運行函數:
def simpleRoutine(name, delay): print(f"routine {name} starting...") time.sleep(delay) print(f"routine {name} finished") if __name__ == '__main__': thrOne = multiprocessing.Process(target=simpleRoutine, args=("First", 1)) thrTwo = multiprocessing.Process(target=simpleRoutine, args=("Two", 2)) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
繼承方式
通過繼承Process類,并實現run方法來啟動進程:
class SimpleProcess(multiprocessing.Process): def __init__(self, name, delay): super().__init__() self.name = name self.delay = delay def run(self): print(f"Process {self.name} starting...") time.sleep(self.delay) print(f"Process {self.name} finished") if __name__ == '__main__': thrOne = SimpleProcess("First", 2) thrTwo = SimpleProcess("Second", 1) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
同步機制
進程間同步與線程間同步類似(只是所有對象都在multiprocessing模塊中):
- Lock/Rlock: 通過acquire()和release()來獲取與釋放鎖;
- Event: 事件信號,通過set()和clear()來設定與清楚信號;通過wait()來等待信號;
- Condition: 條件變量;通過wait()用來等待條件,通過notify/notify_all()來通知等待此條件的進程(等待與通知前,都需先持有鎖);
- Semaphore: 信號量;維護一個計數器;
- Barrier: 屏障;只有等待進程數量達到要求數量,才會同時開始執行屏障保護后的代碼。
屏障示例:
def waitBarrier(name, barr: multiprocessing.Barrier): print(f"{name} waiting for open") try: barr.wait() print(f"{name} running") time.sleep(2) except multiprocessing.BrokenBarrierError: print(f"{name} exception") print(f"{name} finished") def openFun(): # 屏障滿足條件時,執行一次 print("barrier opened") if __name__ == '__main__': signal = multiprocessing.Barrier(5, openFun) for i in range(10): multiprocessing.Process(target=waitBarrier, args=(i, signal)).start() time.sleep(1)
當第5個進程啟動時,前面5個進程會同時開始執行(openFun函數會執行一次);當第10個進程啟動時,后面5個進程會同時開始執行一次(openFun函數又會執行一次)。
狀態管理Managers
Managers提供了一種創建由多進程(包括跨機器間進程共享)共享的數據的方式:
-
multiprocessing.Manager()
返回一個SyncManager對象;此對象對應著一個管理者子進程(manager process)以及代理(其他子進程使用); - 它確保當某一進程修改了共享對象之后,其他進程中的共享對象也會得到更新;
- 其支持的類型有:list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value和Array。
多進程進共享字典與列表(每個進程中都能看到其他進程修改過的內容)
def worker(dictContext: dict, lstContext: list, name): pid = os.getpid() dictContext[name] = pid lstContext.append(pid) print(f"{name} worker: {lstContext}") def managerContext(): mgr = multiprocessing.Manager() multiprocessing.managers dictContext = mgr.dict() lstContext = mgr.list() jobs = [multiprocessing.Process(target=worker, args=(dictContext, lstContext, i)) for i in range(10)] for j in jobs: j.start() for j in jobs: j.join() print('Results:', dictContext)
原文鏈接:https://blog.csdn.net/alwaysrun/article/details/127164976
相關推薦
- 2022-04-29 C#獲取DataTable對象狀態DataRowState_C#教程
- 2022-05-29 C++掃盲篇之指針詳解_C 語言
- 2022-11-06 Android淺析viewBinding和DataBinding_Android
- 2022-07-24 python雙向循環鏈表實例詳解_python
- 2022-12-12 Android?SharedPreferences數據存儲詳解_Android
- 2024-03-08 SpringBoot項目多模塊開發詳解
- 2022-10-11 ABAP 云平臺向控制臺輸出消息
- 2022-01-02 前端生成二維碼及把頁面轉為圖片保存到本地
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支