首頁 > 軟體

Python程序間的通訊一起來了解下

2022-03-09 10:00:37

通訊方式

程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組主要通過佇列方式

佇列:佇列類似於一條管道,元素先進先出

需要注意的一點是:佇列都是在記憶體中操作,程序退出,佇列清空,另外,佇列也是一個阻塞的形態

Queue介紹:

建立佇列的類(底層就是以管道和鎖定的方式實現):

Queue([maxsize]):建立共用的程序佇列,Queue是多程序安全的佇列,

可以使用Queue實現多程序之間的資料傳遞。maxsize是佇列中允許最大項數,省略則無大小限制。

方法介紹:

def put(self, obj, block=True, timeout=None):插入資料到佇列中
Block值預設為True,代表當佇列已滿時,會阻塞。如果block為False,則佇列滿會報異常Queue.Full
timeout表示會阻塞到指定時間,直到有剩餘的空間供插入,如果時間超時,則報異常Queue.Full
def get(self, block=True, timeout=None):從佇列中取出資料
Block值預設為True,代表當佇列為空時,會阻塞。如果block為False,則佇列空會報異常Queue.Empty
timeout表示會等待到指定時間,直到取出資料,如果時間超時,則報異常Queue.Empty
def empty(self): 判斷佇列是否為空,如果空返回True
def full(self): 判斷佇列是否已滿,如果滿返回True
def qsize(self): 返回佇列的大小

應用舉例:

from multiprocessing import Process, Manager
q = Manager().Queue(2)
q.put(1)
q.put(2,block=False,timeout=2)
def func():
    print(q.get())
p = Process(target=func)
print("size",q.qsize())
print("full",q.full())
p.start()
p.join()
print("empty",q.empty())
print("get", q.get())
print("get", q.get(block=False,timeout=2))

輸出結果 

生產者和消費者模型

在並行程式設計中使用生產者和消費者模式能夠解決絕大多數並行問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊:

生產者,只需要往佇列裡面丟東西(生產者不需要關心消費者)

消費者,只需要從佇列裡面拿東西(消費者也不需要關心生產者)

阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

實現方式一:Queue

from multiprocessing import Process,Manager,active_children
import random
import queue
import time
 
class Producer(Process):
 
    def __init__(self,queue):
        super().__init__()
        self.queue = queue
 
    def run(self):
        for i in range(6):
            r = random.randint(0, 99)
            time.sleep(1)
            self.queue.put(r)
            print("add data{}".format(r))
 
class Consumer(Process):
 
    def __init__(self,queue):
        super().__init__()
        self.queue = queue
 
    def run(self):
        while True:
          if not self.queue.empty():
                data = self.queue.get()
                print("minus data{}".format(data))
 
 
if __name__ == '__main__':
    q = Manager().Queue() # 建立佇列
    p = Producer(q)
    c = Consumer(q)
    p.start()
    c.start()
    print(active_children())  # 檢視現有的程序
    p.join()
    c.join()
    print("結束")
 
 

實現方式二:利用JoinableQueue

JoinableQueue([maxsize]):一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理。通知程序是使用共用的訊號和條件變數來實現的。

JoinableQueue的範例除了與Queue物件相同的方法之外還具有:

task_done():使用者使用此方法發出訊號,表示get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常

join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫task_done()方法為止

from multiprocessing import Process,JoinableQueue
import os
import time
import random
def print_log(msg, log_type="prod"):
    if log_type == 'prod':
        print("33[32;1m%s33[0m" %msg)
    elif log_type == 'con':
        print("33[31;1m%s33[0m" %msg)
def producer(q):
    """
    生產者
    :param q: 
    :return: 
    """
    for i in range(10):
        data = random.randint(1,200)
        time.sleep(2)
        q.put(data)  # 放入佇列
        msg = "add data {}".format(data)
        print_log(msg)
    q.join()  # 生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。
    # 阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止
def consumer(q):
    """
    消費者
    :param q: 
    :return: 
    """
    while True:
        if not q.empty():
            time.sleep(5)
            data = q.get()
            msg = "minus data{}".format(data)
            print_log(msg,"con")
            q.task_done()  # q.get()的返回專案已經被處理
if __name__ == '__main__':
    q = JoinableQueue()
    prod = Process(target=producer, args=(q,))
    con = Process(target=consumer, args=(q,))
    con.daemon = True  # 設定為守護行程,但是不用擔心,producer內呼叫q.join保證了consumer已經處理完佇列中的所有元素
    # 開啟程序
    prod.start()
    con.start()
    prod.join()  # 等待生產和消費完成,主執行緒結束
    print("結束")

輸出結果

總結

本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!   


IT145.com E-mail:sddin#qq.com