網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
在《多進(jìn)程并發(fā)與同步》中介紹了進(jìn)程創(chuàng)建與信息共享,除此之外python還提供了更方便的進(jìn)程間通訊方式。
進(jìn)程間通訊
multiprocessing中提供了Pipe(一對(duì)一)和Queue(多對(duì)多)用于進(jìn)程間通訊。
隊(duì)列Queue
隊(duì)列是一個(gè)可用于進(jìn)程間共享的Queue(內(nèi)部使用pipe與鎖),其接口與普通隊(duì)列類似:
put(obj[, block[, timeout]])
:插入數(shù)據(jù)到隊(duì)列(默認(rèn)阻塞,且沒有超時(shí)時(shí)間);
- 若設(shè)定了超時(shí)且隊(duì)列已滿,會(huì)拋出queue.Full異常;
- 隊(duì)列已關(guān)閉時(shí),拋出ValueError異常
get([block[, timeout]])
:讀取并刪除一個(gè)元素;
- 若設(shè)定了超時(shí)且隊(duì)列為空,會(huì)拋出queue.Empty異常;
- 隊(duì)列已關(guān)閉時(shí),拋出ValueError異常;若已阻塞后,再關(guān)閉則會(huì)一直阻塞;
qsize()
:返回一個(gè)近似隊(duì)列長(zhǎng)度(因多進(jìn)程原因,長(zhǎng)度會(huì)有誤差);
empty()/full()
:隊(duì)列空或慢(因多進(jìn)程原因,會(huì)有誤差);
close()
:關(guān)閉隊(duì)列;
當(dāng)主進(jìn)程(創(chuàng)建Queue的)關(guān)閉隊(duì)列時(shí),子進(jìn)程中的隊(duì)列并沒有關(guān)閉,所以getElement進(jìn)程會(huì)一直阻塞等待(為保證能正常退出,需要設(shè)為后臺(tái)進(jìn)程):
def putElement(name, qu: multiprocessing.Queue): try: for i in range(10): qu.put(f"{name}-{i + 1}") time.sleep(.1) except ValueError: print("queue closed") print(f"{name}: put complete") def getElement(name, qu: multiprocessing.Queue): try: while True: r = qu.get() print(f"{name} recv: {r}") except ValueError: print("queue closed") print(f"{name}: get complete") if __name__ == '__main__': qu = multiprocessing.Queue(100) puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)] gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)] list(map(lambda f: f.start(), puts)) list(map(lambda f: f.start(), gets)) for f in puts: f.join() print("To close") qu.close() # 只是main中的close了,其他進(jìn)程中的并沒有
管道Pipe
multiprocessing.Pipe([duplex])
返回一個(gè)連接對(duì)象對(duì)(conn1, conn2)
。若duplex為True(默認(rèn)),創(chuàng)建的是雙向管道;否則conn1只能用于接收消息,conn2只能用于發(fā)送消息:
- send():發(fā)送消息;
- recv():接收消息;
進(jìn)程間的Pipe基于fork機(jī)制建立:
- 主進(jìn)程創(chuàng)建Pipe:Pipe的兩個(gè)Connections連接的的都是主進(jìn)程;
- 創(chuàng)建子進(jìn)程后,Pipe也被拷貝了一份:此時(shí)有了4個(gè)Connections;
- 主進(jìn)程關(guān)閉一個(gè)Out Connection,子進(jìn)程關(guān)閉一個(gè)In Connection:就建立好了一個(gè)輸入在主進(jìn)程,輸出在子進(jìn)程的管道。
def pipeProc(pipe): outPipe, inPipe = pipe inPipe.close() # 必須關(guān)閉,否則結(jié)束時(shí)不會(huì)收到EOFError異常 try: while True: r = outPipe.recv() print("Recv:", r) except EOFError: print("RECV end") if __name__ == '__main__': outPipe, inPipe = multiprocessing.Pipe() sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),)) sub.start() outPipe.close() # 必須在進(jìn)程成功運(yùn)行后,才可關(guān)閉 with inPipe: for x in range(10): inPipe.send(x) time.sleep(.1) print("send complete") sub.join()
進(jìn)程池Pool
雖然使用多進(jìn)程能提高效率,但進(jìn)程的創(chuàng)建與銷毀會(huì)消耗較長(zhǎng)時(shí)間;同時(shí),過多進(jìn)程會(huì)引起頻繁的調(diào)度,也增加了開銷。
進(jìn)程池中有固定數(shù)量的進(jìn)程:
- 請(qǐng)求到來(lái)時(shí),從池中取出一個(gè)進(jìn)程來(lái)處理任務(wù);理完畢后,進(jìn)程并不立即關(guān)閉,而是再放回進(jìn)程池中;
- 當(dāng)池中進(jìn)程數(shù)量不夠,請(qǐng)求就要等待,直到拿到空閑進(jìn)程后才能繼續(xù)執(zhí)行;
- 池中進(jìn)程的數(shù)量是固定的,隱藏同一時(shí)間最多有固定數(shù)量的進(jìn)程在運(yùn)行。
multiprocessing.Pool([processes[, initializer[, initargs]]])
- processes:要?jiǎng)?chuàng)建進(jìn)程數(shù)量(默認(rèn)
os.cpu_count()
個(gè)),在需要時(shí)才會(huì)創(chuàng)建; - initializer(*initargs):每個(gè)工作進(jìn)程啟動(dòng)時(shí)執(zhí)行的方法(一般processes為幾就執(zhí)行幾次);
Pool類中主要方法:
-
apply(func[, args[, kwds]])
:以阻塞方式,從池中獲取進(jìn)程并執(zhí)行func(*args,**kwargs)
; -
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
:異步方式(從池中獲取一個(gè)進(jìn)程)執(zhí)行func(*args,**kwargs)
,返回AsyncResult; -
map(func, iterable[, chunksize])/map_async
:map的并行版本(可同時(shí)處理多個(gè)任務(wù)),異步時(shí)返回MapResult; -
starmap(func, iterable[, chunksize])/starmap_async
:與map的區(qū)別是允許傳入多個(gè)參數(shù); -
imap(func, iterable[, chunksize])
:map的惰性版本(返回結(jié)果是可迭代對(duì)象),內(nèi)存消耗會(huì)低些,返回迭代器IMapIterator; -
imap_unordered(func, iterable[, chunksize])
:imap返回的結(jié)果順序與map順序是相同的,而此方法返回的順序是亂序的(不依次等待每個(gè)任務(wù)完成,先完成的先返回),返回迭代器IMapIterator; -
close()
:關(guān)閉,禁止繼續(xù)提交任務(wù)(已提交任務(wù)會(huì)繼續(xù)執(zhí)行完成); -
terminate()
:立即終止所有任務(wù); -
join()
:等待工作進(jìn)程完成(必須已close或terminate了);
def poolWorker(): print(f"worker in process {os.getpid()}") time.sleep(1) def poolWorkerOne(name): print(f"worker one {name} in process {os.getpid()}") time.sleep(random.random()) return name def poolWorkerTwo(first, second): res = first + second print(f"worker two {res} in process {os.getpid()}") time.sleep(1./(first+1)) return res def poolInit(): print("pool init") if __name__ == '__main__': workers = multiprocessing.Pool(5, poolInit) # poolInit會(huì)被調(diào)用5次(線程啟動(dòng)時(shí)) with workers: for i in range(5): workers.apply_async(poolWorker) arg = [(i, i) for i in range(10)] workers.map_async(poolWorkerOne, arg) results = workers.starmap_async(poolWorkerTwo, arg) # 每個(gè)元素(元組)會(huì)被拆分為獨(dú)立的參數(shù) print("Starmap:", results.get()) results = workers.imap_unordered(poolWorkerOne, arg) for r in results: # r是亂序的(若使用imap,則與輸入arg的順序相同) print("Unordered:", r) # 必須保證workers已close了 workers.join()
原文鏈接:https://blog.csdn.net/alwaysrun/article/details/127185356
相關(guān)推薦
- 2022-05-13 C++ 使用Poco庫(kù)實(shí)現(xiàn)XML的讀取和寫入
- 2022-06-24 淺談React?中的淺比較是如何工作的_React
- 2022-12-05 Python最長(zhǎng)回文子串問題_python
- 2022-02-26 Hutool cn.hutool.core.bean.BeanException: Set valu
- 2022-07-14 實(shí)現(xiàn)一個(gè)random?shuffle算法示例_C 語(yǔ)言
- 2021-12-10 解決線上Oracle連接耗時(shí)過長(zhǎng)的問題現(xiàn)象_oracle
- 2023-06-04 Kotlin比較與解釋Lazy與Lateinit的用法_Android
- 2023-01-14 C#實(shí)現(xiàn)啟動(dòng)項(xiàng)管理的示例代碼_C#教程
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支