網(wǎng)站首頁 編程語言 正文
一、通信方式
進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊主要通過隊(duì)列方式
隊(duì)列:隊(duì)列類似于一條管道,元素先進(jìn)先出
需要注意的一點(diǎn)是:隊(duì)列都是在內(nèi)存中操作,進(jìn)程退出,隊(duì)列清空,另外,隊(duì)列也是一個(gè)阻塞的形態(tài)
二、Queue介紹
創(chuàng)建隊(duì)列的類(底層就是以管道和鎖定的方式實(shí)現(xiàn)):
Queue([maxsize]):創(chuàng)建共享的進(jìn)程隊(duì)列,Queue
是多進(jìn)程安全的隊(duì)列,
可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。maxsize
是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。
三、方法介紹
-
def put(self, obj, block=True, timeout=None):
插入數(shù)據(jù)到隊(duì)列中,Block值默認(rèn)為True,代表當(dāng)隊(duì)列已滿時(shí),會(huì)阻塞。如果block為False,則隊(duì)列滿會(huì)報(bào)異常Queue.Full,timeout表示會(huì)阻塞到指定時(shí)間,直到有剩余的空間供插入,如果時(shí)間超時(shí),則報(bào)異常Queue.Full - ?def get(self, block=True, timeout=None):從隊(duì)列中取出數(shù)據(jù),Block值默認(rèn)為True,代表當(dāng)隊(duì)列為空時(shí),會(huì)阻塞。如果block為False,則隊(duì)列空會(huì)報(bào)異常Queue.Empty,timeout表示會(huì)等待到指定時(shí)間,直到取出數(shù)據(jù),如果時(shí)間超時(shí),則報(bào)異常Queue.Empty
- ?def empty(self): 判斷隊(duì)列是否為空,如果空返回True
- def full(self): 判斷隊(duì)列是否已滿,如果滿返回True
- def qsize(self): 返回隊(duì)列的大小
應(yīng)用舉例:
from multiprocessing import Process, Manager q = Manager().Queue(2) q.put(1) q.put(2,block=False,timeout=2) def func(): ? ? print(q.get()) ? p = Process(target=func) print("size",q.qsize()) print("full",q.full()) p.start() p.join() print("empty",q.empty()) print("get", q.get()) print("get", q.get(block=False,timeout=2))
輸出結(jié)果:
三、生產(chǎn)者和消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費(fèi)者模式?
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
四、什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊:
生產(chǎn)者,只需要往隊(duì)列里面丟東西(生產(chǎn)者不需要關(guān)心消費(fèi)者)
消費(fèi)者,只需要從隊(duì)列里面拿東西(消費(fèi)者也不需要關(guān)心生產(chǎn)者)
阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
實(shí)現(xiàn)方式一:Queue
from multiprocessing import Process,Manager,active_children import random import queue import time ? class Producer(Process): ? ? ? def __init__(self,queue): ? ? ? ? super().__init__() ? ? ? ? self.queue = queue ? ? ? def run(self): ? ? ? ? for i in range(6): ? ? ? ? ? ? r = random.randint(0, 99) ? ? ? ? ? ? time.sleep(1) ? ? ? ? ? ? self.queue.put(r) ? ? ? ? ? ? print("add data{}".format(r)) ? class Consumer(Process): ? ? ? def __init__(self,queue): ? ? ? ? super().__init__() ? ? ? ? self.queue = queue ? ? ? def run(self): ? ? ? ? while True: ? ? ? ? ? if not self.queue.empty(): ? ? ? ? ? ? ? ? data = self.queue.get() ? ? ? ? ? ? ? ? print("minus data{}".format(data)) ? ? if __name__ == '__main__': ? ? q = Manager().Queue() # 創(chuàng)建隊(duì)列 ? ? p = Producer(q) ? ? c = Consumer(q) ? ? p.start() ? ? c.start() ? ? print(active_children()) ?# 查看現(xiàn)有的進(jìn)程 ? ? p.join() ? ? c.join() ? ? print("結(jié)束") ? ? >>>輸出 [, , ] add data83 minus data83 add data72 minus data72 add data8 minus data8 add data63 minus data63 add data75 minus data75 add data52 minus data52
實(shí)現(xiàn)方式二:利用JoinableQueue
JoinableQueue([maxsize]):一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。JoinableQueue
的實(shí)例除了與Queue對(duì)象相同的方法之外還具有:
? ? ?task_done():使用者使用此方法發(fā)出信號(hào),表示get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常
? ? ?join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用task_done()方法為止
from multiprocessing import Process,JoinableQueue import os import time import random ? ? def print_log(msg, log_type="prod"): ? ? if log_type == 'prod': ? ? ? ? print("\033[32;1m%s\033[0m" %msg) ? ? elif log_type == 'con': ? ? ? ? print("\033[31;1m%s\033[0m" %msg) ? def producer(q): ? ? """ ? ? 生產(chǎn)者 ? ? :param q:? ? ? :return:? ? ? """ ? ? for i in range(10): ? ? ? ? data = random.randint(1,200) ? ? ? ? time.sleep(2) ? ? ? ? q.put(data) ?# 放入隊(duì)列 ? ? ? ? msg = "add data {}".format(data) ? ? ? ? print_log(msg) ? ? q.join() ?# 生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。 ? ? # 阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止 ? ? ? ? def consumer(q): ? ? """ ? ? 消費(fèi)者 ? ? :param q:? ? ? :return:? ? ? """ ? ? while True: ? ? ? ? if not q.empty(): ? ? ? ? ? ? time.sleep(5) ? ? ? ? ? ? data = q.get() ? ? ? ? ? ? msg = "minus data{}".format(data) ? ? ? ? ? ? print_log(msg,"con") ? ? ? ? ? ? q.task_done() ?# q.get()的返回項(xiàng)目已經(jīng)被處理 ? ? if __name__ == '__main__': ? ? q = JoinableQueue() ? ? prod = Process(target=producer, args=(q,)) ? ? con = Process(target=consumer, args=(q,)) ? ? con.daemon = True ?# 設(shè)置為守護(hù)進(jìn)程,但是不用擔(dān)心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊(duì)列中的所有元素 ? ? # 開啟進(jìn)程 ? ? prod.start() ? ? con.start() ? ? ? prod.join() ?# 等待生產(chǎn)和消費(fèi)完成,主線程結(jié)束 ? ? print("結(jié)束")
輸出結(jié)果:
原文鏈接:https://blog.csdn.net/weixin_41951954/article/details/123331551
相關(guān)推薦
- 2022-11-29 ASP.NET?Identity的基本用法_實(shí)用技巧
- 2022-09-24 VisualStudio?制作Dynamic?Link?Library動(dòng)態(tài)鏈接庫文件的詳細(xì)過程_C
- 2022-05-27 Docker常見命令介紹_docker
- 2022-04-14 error: failed to push some refs to ‘http://git.tex
- 2022-10-05 react-router-dom入門使用教程(前端路由原理)_React
- 2022-02-22 如何利用nginx做代理緩存淺析_nginx
- 2022-12-01 Go語言中基本數(shù)據(jù)類型的相互轉(zhuǎn)換詳解_Golang
- 2022-09-22 繼承關(guān)系下構(gòu)造方法的訪問特點(diǎn)
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支