網站首頁 編程語言 正文
通信方式
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊主要通過隊列方式
隊列:隊列類似于一條管道,元素先進先出
需要注意的一點是:隊列都是在內存中操作,進程退出,隊列清空,另外,隊列也是一個阻塞的形態
Queue介紹:
創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,
可以使用Queue實現多進程之間的數據傳遞。maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
def put(self, obj, block=True, timeout=None):插入數據到隊列中 Block值默認為True,代表當隊列已滿時,會阻塞。如果block為False,則隊列滿會報異常Queue.Full timeout表示會阻塞到指定時間,直到有剩余的空間供插入,如果時間超時,則報異常Queue.Full def get(self, block=True, timeout=None):從隊列中取出數據 Block值默認為True,代表當隊列為空時,會阻塞。如果block為False,則隊列空會報異常Queue.Empty timeout表示會等待到指定時間,直到取出數據,如果時間超時,則報異常Queue.Empty def empty(self): 判斷隊列是否為空,如果空返回True def full(self): 判斷隊列是否已滿,如果滿返回True def qsize(self): 返回隊列的大小
應用舉例:
from multiprocessing import Process, Manager q = Manager().Queue(2) q.put(1) q.put(2,block=False,timeout=2) def func(): print(q.get()) p = Process(target=func) print("size",q.qsize()) print("full",q.full()) p.start() p.join() print("empty",q.empty()) print("get", q.get()) print("get", q.get(block=False,timeout=2))
輸出結果?
生產者和消費者模型
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊:
生產者,只需要往隊列里面丟東西(生產者不需要關心消費者)
消費者,只需要從隊列里面拿東西(消費者也不需要關心生產者)
阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
實現方式一:Queue
from multiprocessing import Process,Manager,active_children import random import queue import time class Producer(Process): def __init__(self,queue): super().__init__() self.queue = queue def run(self): for i in range(6): r = random.randint(0, 99) time.sleep(1) self.queue.put(r) print("add data{}".format(r)) class Consumer(Process): def __init__(self,queue): super().__init__() self.queue = queue def run(self): while True: if not self.queue.empty(): data = self.queue.get() print("minus data{}".format(data)) if __name__ == '__main__': q = Manager().Queue() # 創建隊列 p = Producer(q) c = Consumer(q) p.start() c.start() print(active_children()) # 查看現有的進程 p.join() c.join() print("結束")
實現方式二:利用JoinableQueue
JoinableQueue([maxsize]):一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue的實例除了與Queue對象相同的方法之外還具有:
task_done():使用者使用此方法發出信號,表示get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常
join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用task_done()方法為止
from multiprocessing import Process,JoinableQueue import os import time import random def print_log(msg, log_type="prod"): if log_type == 'prod': print("\033[32;1m%s\033[0m" %msg) elif log_type == 'con': print("\033[31;1m%s\033[0m" %msg) def producer(q): """ 生產者 :param q: :return: """ for i in range(10): data = random.randint(1,200) time.sleep(2) q.put(data) # 放入隊列 msg = "add data {}".format(data) print_log(msg) q.join() # 生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。 # 阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止 def consumer(q): """ 消費者 :param q: :return: """ while True: if not q.empty(): time.sleep(5) data = q.get() msg = "minus data{}".format(data) print_log(msg,"con") q.task_done() # q.get()的返回項目已經被處理 if __name__ == '__main__': q = JoinableQueue() prod = Process(target=producer, args=(q,)) con = Process(target=consumer, args=(q,)) con.daemon = True # 設置為守護進程,但是不用擔心,producer內調用q.join保證了consumer已經處理完隊列中的所有元素 # 開啟進程 prod.start() con.start() prod.join() # 等待生產和消費完成,主線程結束 print("結束")
輸出結果
總結
原文鏈接:https://blog.csdn.net/weixin_41951954/article/details/123331551
相關推薦
- 2022-04-09 SpringBoot默認日志框架(slf4j)的使用以及配置文件
- 2022-04-24 python使用技巧-查找文件?_python
- 2022-09-20 Winform使用FTP實現自動更新_C#教程
- 2022-10-10 Python3?re.search()方法的具體使用_python
- 2023-07-04 解決Uncaught (in promise) TypeError: Cannot read pro
- 2021-12-15 幾個小技巧幫你實現Golang永久阻塞_Golang
- 2022-10-05 matplotlib之Pyplot模塊繪制三維散點圖使用顏色表示數值大小_python
- 2022-06-29 利用python實現你說我猜游戲的完整實例_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支