日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網(wǎng)站首頁 編程語言 正文

Python進程間的通信一起來了解下_python

作者:程序猿-張益達 ? 更新時間: 2022-05-08 編程語言

通信方式

進程彼此之間互相隔離,要實現(xiàn)進程間通信(IPC),multiprocessing模塊主要通過隊列方式

隊列:隊列類似于一條管道,元素先進先出

需要注意的一點是:隊列都是在內(nèi)存中操作,進程退出,隊列清空,另外,隊列也是一個阻塞的形態(tài)

Queue介紹:

創(chuàng)建隊列的類(底層就是以管道和鎖定的方式實現(xiàn)):

Queue([maxsize]):創(chuàng)建共享的進程隊列,Queue是多進程安全的隊列,

可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞。maxsize是隊列中允許最大項數(shù),省略則無大小限制。

方法介紹:

def put(self, obj, block=True, timeout=None):插入數(shù)據(jù)到隊列中
Block值默認為True,代表當隊列已滿時,會阻塞。如果block為False,則隊列滿會報異常Queue.Full
timeout表示會阻塞到指定時間,直到有剩余的空間供插入,如果時間超時,則報異常Queue.Full
def get(self, block=True, timeout=None):從隊列中取出數(shù)據(jù)
Block值默認為True,代表當隊列為空時,會阻塞。如果block為False,則隊列空會報異常Queue.Empty
timeout表示會等待到指定時間,直到取出數(shù)據(jù),如果時間超時,則報異常Queue.Empty
def empty(self): 判斷隊列是否為空,如果空返回True
def full(self): 判斷隊列是否已滿,如果滿返回True
def qsize(self): 返回隊列的大小

應用舉例:

from multiprocessing import Process, Manager
q = Manager().Queue(2)
q.put(1)
q.put(2,block=False,timeout=2)
def func():
    print(q.get())
p = Process(target=func)
print("size",q.qsize())
print("full",q.full())
p.start()
p.join()
print("empty",q.empty())
print("get", q.get())
print("get", q.get(block=False,timeout=2))

輸出結(jié)果?

生產(chǎn)者和消費者模型

在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。

為什么要使用生產(chǎn)者和消費者模式

在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。

什么是生產(chǎn)者消費者模式

生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊:

生產(chǎn)者,只需要往隊列里面丟東西(生產(chǎn)者不需要關(guān)心消費者)

消費者,只需要從隊列里面拿東西(消費者也不需要關(guān)心生產(chǎn)者)

阻塞隊列就相當于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。

實現(xiàn)方式一:Queue

from multiprocessing import Process,Manager,active_children
import random
import queue
import time
 
class Producer(Process):
 
    def __init__(self,queue):
        super().__init__()
        self.queue = queue
 
    def run(self):
        for i in range(6):
            r = random.randint(0, 99)
            time.sleep(1)
            self.queue.put(r)
            print("add data{}".format(r))
 
class Consumer(Process):
 
    def __init__(self,queue):
        super().__init__()
        self.queue = queue
 
    def run(self):
        while True:
          if not self.queue.empty():
                data = self.queue.get()
                print("minus data{}".format(data))
 
 
if __name__ == '__main__':
    q = Manager().Queue() # 創(chuàng)建隊列
    p = Producer(q)
    c = Consumer(q)
    p.start()
    c.start()
    print(active_children())  # 查看現(xiàn)有的進程
    p.join()
    c.join()
    print("結(jié)束")
 
 

實現(xiàn)方式二:利用JoinableQueue

JoinableQueue([maxsize]):一個Queue對象,但隊列允許項目的使用者通知生成者項目已經(jīng)被成功處理。通知進程是使用共享的信號和條件變量來實現(xiàn)的。

JoinableQueue的實例除了與Queue對象相同的方法之外還具有:

task_done():使用者使用此方法發(fā)出信號,表示get()的返回項目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊列中刪除項目的數(shù)量,將引發(fā)ValueError異常

join():生產(chǎn)者調(diào)用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續(xù)到隊列中的每個項目均調(diào)用task_done()方法為止

from multiprocessing import Process,JoinableQueue
import os
import time
import random
def print_log(msg, log_type="prod"):
    if log_type == 'prod':
        print("\033[32;1m%s\033[0m" %msg)
    elif log_type == 'con':
        print("\033[31;1m%s\033[0m" %msg)
def producer(q):
    """
    生產(chǎn)者
    :param q: 
    :return: 
    """
    for i in range(10):
        data = random.randint(1,200)
        time.sleep(2)
        q.put(data)  # 放入隊列
        msg = "add data {}".format(data)
        print_log(msg)
    q.join()  # 生產(chǎn)者調(diào)用此方法進行阻塞,直到隊列中所有的項目均被處理。
    # 阻塞將持續(xù)到隊列中的每個項目均調(diào)用q.task_done()方法為止
def consumer(q):
    """
    消費者
    :param q: 
    :return: 
    """
    while True:
        if not q.empty():
            time.sleep(5)
            data = q.get()
            msg = "minus data{}".format(data)
            print_log(msg,"con")
            q.task_done()  # q.get()的返回項目已經(jīng)被處理
if __name__ == '__main__':
    q = JoinableQueue()
    prod = Process(target=producer, args=(q,))
    con = Process(target=consumer, args=(q,))
    con.daemon = True  # 設(shè)置為守護進程,但是不用擔心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊列中的所有元素
    # 開啟進程
    prod.start()
    con.start()
    prod.join()  # 等待生產(chǎn)和消費完成,主線程結(jié)束
    print("結(jié)束")

輸出結(jié)果

總結(jié)

原文鏈接:https://blog.csdn.net/weixin_41951954/article/details/123331551

欄目分類
最近更新