日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Python多進程并發與同步機制超詳細講解_python

作者:alwaysrun ? 更新時間: 2023-01-28 編程語言

在《多線程與同步》中介紹了多線程及存在的問題,而通過使用多進程而非線程可有效地繞過全局解釋器鎖。 因此,通過multiprocessing模塊可充分地利用多核CPU的資源。

多進程

多進程是通過multiprocessing包來實現的,multiprocessing.Process對象(和多線程的threading.Thread類似)用來創建一個進程對象:

  • 在類UNIX平臺上,需要對每個Process對象調用join()方法 (實際上等同于wait)避免其成為僵尸進程。
  • multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式。
  • 多進程應盡量避免共享資源。必要時可以通過共享內存和Manager的方法來共享資源。

僵尸進程

在unix或unix-like系統中,當一個子進程退出后,它就會變成一個僵尸進程,如果父進程沒有通過wait系統調用來讀取這個子進程的退出狀態的話,這個子進程就會一直維持僵尸進程狀態(占據部分系統資源,無法釋放)。

要清除僵尸進程,有:

結束父進程(一般是主進程):當父進程退出的時候僵尸進程也會被隨之清除。

讀取子進程退出狀態:如通過multiprocessing.Process產出的進程可以:

  • 調用join()來等待子進程的方法來(內部會wait子進程);
  • 在父進程中處理SIGCHLD信號:在處理程序中調用wait系統調用或者直接設置為SIG_IGN來清除僵尸進程;

把進程變成孤兒進程,這樣進程就會自動交由init進程來自動處理。

通過設定signal.signal(signal.SIGCHLD, signal.SIG_IGN)或join進程可避免僵尸進程的產生

def zombieProc():
    print("zombie running")
    time.sleep(5)
    print("zombie exit")
if __name__ == '__main__':
    signal.signal(signal.SIGCHLD, signal.SIG_IGN)
    proc = multiprocessing.Process(target=zombieProc)
    proc.start()
    # proc.join()
    time.sleep(30)

Process類

Process([group [, target [, name [, args [, kwargs]]]]]),實例化得到的對象,表示一個子進程任務:

  • group參數未使用,值始終為None;
  • target表示調用對象,即子進程要執行的任務;
  • args表示調用對象的位置參數元組,args=(1, ‘test’, [‘one’]);
  • kwargs表示調用對象的字典參數,kwargs={‘name’:‘mike’,‘age’:18};
  • name為子進程的名稱;

Process類的屬性與方法:

  • start():啟動進程,并執行其run方法;
  • run():進程啟動時運行的方法,繼承Process類時必須要實現方法;
  • terminate():強制終止進程,不會進行任何清理操作(若p創建了子進程,則子進程就成了僵尸進程);如進程還持有鎖等,那么也不會被釋放,進而導致死鎖;
  • is_alive():返回進程是否在運行狀態;
  • join([timeout]):等待進程終止;
  • daemon:默認值為False,如果設為True,代表為守護進程(當父進程終止時,隨之終止;并且不能創建自己的新進程),必須在start()之前設置;
  • name:進程的名稱;
  • pid/ident:進程的pid;
  • exitcode:進程在運行時為None、如果為–N,表示被信號N結束;
  • authkey:進程的身份驗證碼(默認是由os.urandom()隨機生成的32字符的字符串),在涉及網絡連接的底層進程間通信時提供安全性;

也可通過os.getpid()獲取進程的PID,os.getppid()獲取父進程的PID。

函數方式

通過Process類直接運行函數:

def simpleRoutine(name, delay):
    print(f"routine {name} starting...")
    time.sleep(delay)
    print(f"routine {name} finished")
if __name__ == '__main__':
    thrOne = multiprocessing.Process(target=simpleRoutine, args=("First", 1))
    thrTwo = multiprocessing.Process(target=simpleRoutine, args=("Two", 2))
    thrOne.start()
    thrTwo.start()
    thrOne.join()
    thrTwo.join()

繼承方式

通過繼承Process類,并實現run方法來啟動進程:

class SimpleProcess(multiprocessing.Process):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    def run(self):
        print(f"Process {self.name} starting...")
        time.sleep(self.delay)
        print(f"Process {self.name} finished")
if __name__ == '__main__':
    thrOne = SimpleProcess("First", 2)
    thrTwo = SimpleProcess("Second", 1)
    thrOne.start()
    thrTwo.start()
    thrOne.join()
    thrTwo.join()

同步機制

進程間同步與線程間同步類似(只是所有對象都在multiprocessing模塊中):

  • Lock/Rlock: 通過acquire()和release()來獲取與釋放鎖;
  • Event: 事件信號,通過set()和clear()來設定與清楚信號;通過wait()來等待信號;
  • Condition: 條件變量;通過wait()用來等待條件,通過notify/notify_all()來通知等待此條件的進程(等待與通知前,都需先持有鎖);
  • Semaphore: 信號量;維護一個計數器;
  • Barrier: 屏障;只有等待進程數量達到要求數量,才會同時開始執行屏障保護后的代碼。

屏障示例:

def waitBarrier(name, barr: multiprocessing.Barrier):
    print(f"{name} waiting for open")
    try:
        barr.wait()
        print(f"{name} running")
        time.sleep(2)
    except multiprocessing.BrokenBarrierError:
        print(f"{name} exception")
    print(f"{name} finished")
def openFun():  # 屏障滿足條件時,執行一次
    print("barrier opened")
if __name__ == '__main__':
    signal = multiprocessing.Barrier(5, openFun)
    for i in range(10):
        multiprocessing.Process(target=waitBarrier, args=(i, signal)).start()  
        time.sleep(1)  

當第5個進程啟動時,前面5個進程會同時開始執行(openFun函數會執行一次);當第10個進程啟動時,后面5個進程會同時開始執行一次(openFun函數又會執行一次)。

狀態管理Managers

Managers提供了一種創建由多進程(包括跨機器間進程共享)共享的數據的方式:

  • multiprocessing.Manager()返回一個SyncManager對象;此對象對應著一個管理者子進程(manager process)以及代理(其他子進程使用);
  • 它確保當某一進程修改了共享對象之后,其他進程中的共享對象也會得到更新;
  • 其支持的類型有:list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value和Array。

多進程進共享字典與列表(每個進程中都能看到其他進程修改過的內容)

def worker(dictContext: dict, lstContext: list, name):
    pid = os.getpid()
    dictContext[name] = pid
    lstContext.append(pid)
    print(f"{name} worker: {lstContext}")
def managerContext():
    mgr = multiprocessing.Manager()
    multiprocessing.managers
    dictContext = mgr.dict()
    lstContext = mgr.list()
    jobs = [multiprocessing.Process(target=worker, args=(dictContext, lstContext, i)) for i in range(10)]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:', dictContext)

原文鏈接:https://blog.csdn.net/alwaysrun/article/details/127164976

欄目分類
最近更新