首頁 > 軟體

Python多程序之程序同步及通訊詳解

2021-11-25 19:01:35

上篇文章介紹了什麼是程序、程序與程式的關係、程序的建立與使用、建立程序池等,接下來就來介紹一下程序同步及程序通訊。

程序同步

當多個程序使用同一份資料資源的時候,因為程序的執行沒有順序,執行起來也無法控制,如果不加以干預,往往會引發資料安全或順序混亂的問題,所以要在多個程序讀寫共用資料資源的時候加以適當的策略,來保證資料的一致性問題。

Lock(鎖)

一個Lock物件有兩個方法:acquire()和release()來控制共用資料的讀寫許可權, 看下面這張圖片,使用多程序的時候會經常出現這種情況,這是因為多個程序都在搶佔輸出資源,共用同一列印終端,從而造成了輸出資訊的錯亂。

那麼就可以使用Lock機制:

import multiprocessing
import random
import time
def work(lock, i):
    lock.acquire()
    print("work'{}'執行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
    time.sleep(random.randint(0, 2))
    print("work'{}'執行完畢......".format(i))
    lock.release()
if __name__ == '__main__':
    lock = multiprocessing.Lock()
    for i in range(5):
        p = multiprocessing.Process(target=work, args=(lock, i))
        p.start()

由於引入了Lock機制,同一時間只能有一個程序搶佔到輸出資源,其他程序等待該程序結束,鎖釋放到,才可以搶佔,這樣會解決多程序間資源競爭導致資料錯亂的問題,但是由並行執行變成了序列執行,會犧牲執行效率。

程序通訊

上篇文章說過,程序之間互相隔離,資料是獨立的,預設情況下互不影響,那要如何實現程序間通訊呢?Python提供了多種程序通訊的方式,下面就來說一下。

Queue(佇列)

multiprocessing模組提供的Queue多程序安全的訊息佇列,可以實現多程序之間的資料傳遞。

說明

  • 初始化Queue()物件時(例如:q=Queue()),若括號中沒有指定最⼤可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭)。
  • Queue.qsize():返回當前佇列包含的訊息數量。
  • Queue.empty():如果佇列為空,返回True,反之False。
  • Queue.full():如果佇列滿了,返回True,反之False。
  • Queue.get(block, timeout):獲取佇列中的⼀條訊息,然後將其從列隊中移除,block預設值為True。如果block使⽤預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為⽌,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲Queue.Empty異常;如果block值為False,訊息列隊如果為空,則會⽴刻丟擲Queue.Empty異常。
  • Queue.get_nowait():相當Queue.get(False)。
  • Queue.put(item, block, timeout):將item訊息寫⼊佇列,block預設值為True,如果block使⽤預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫⼊,此時程式將被阻塞(停在寫⼊狀態),直到訊息列隊騰出空間為⽌,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲Queue.Full異常;如果block值為False,訊息列隊如果沒有空間可寫⼊,則會⽴刻丟擲Queue.Full異常。
  • Queue.put_nowait(item):相當於Queue.put(item, False)。
from multiprocessing import Process, Queue
import time
def write_task(queue):
    """
    向佇列中寫入資料
    :param queue: 佇列
    :return:
    """
    for i in range(5):
        if queue.full():
            print("佇列已滿!")
        message = "訊息{}".format(str(i))
        queue.put(message)
        print("訊息{}寫入佇列".format(str(i)))
def read_task(queue):
    """
    從佇列讀取資料
    :param queue: 佇列
    :return:
    """
    while True:
        print("從佇列讀取:{}".format(queue.get(True)))
if __name__ == '__main__':
    print("主程序執行......")
    # 主程序建立Queue,最大訊息數量為3
    queue = Queue(3)
    pw = Process(target=write_task, args=(queue, ))
    pr = Process(target=read_task, args=(queue, ))
    pw.start()
    pr.start()

執行結果為:

從結果我們可以看出,佇列最大可以放入3條訊息,後面再來訊息,要等read_task從佇列裡取出後才行。

Pipe(管道)

Pipe常用於兩個程序,兩個程序分別位於管道的兩端,Pipe(duplex)方法返回(conn1,conn2)代表一個管道的兩端,duplex引數預設為True,即全雙工模式,若為False,conn1只負責接收資訊,conn2負責傳送。

send()和recv()方法分別是傳送和接受訊息的方法。

import multiprocessing
import time
import random
def proc_send(pipe):
    """
    傳送訊息
    :param pipe:管道一端
    :return:
    """
    for i in range(10):
        print("process send:{}".format(str(i)))
        pipe.send(i)
        time.sleep(random.random())
def proc_recv(pipe):
    """
    接收訊息
    :param pipe:管道一端
    :return:
    """
    while True:
        print("Process recv:{}".format(pipe.recv()))
        time.sleep(random.random())
if __name__ == '__main__':
    # 主程序建立pipe
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc_send,args=(pipe[0], ))
    p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], ))
    p1.start()
    p2.start()
    p1.join()
    p2.terminate()

執行結果為:

Semaphore(號誌)

Semaphore用來控制對共用資源的存取數量,和程序池的最大連線數類似。

import multiprocessing
import random
import time
def work(s, i):
    s.acquire()
    print("work'{}'執行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
    time.sleep(i*2)
    print("work'{}'執行完畢......".format(i))
    s.release()
if __name__ == '__main__':
    s = multiprocessing.Semaphore(2)
    for i in range(1, 7):
        p = multiprocessing.Process(target=work, args=(s, i))
        p.start()

上面的程式碼中使用Semaphore限制了最多有2個程序同時執行,那麼來一個程序獲得一把鎖,計數加1,當計數等於2時,後面再來的程序均需要等待,等前面的程序釋放掉,才可以獲得鎖。

號誌與程序池的概念上類似,但是要區分開來,號誌涉及到加鎖的概念。

Event(事件)

Event用來實現程序間同步通訊的。執行的機制是:全域性定義了一個flag,如果flag值為False,當程式執行event.wait()方法時就會阻塞,如果flag值為True時,程式執行event.wait()方法時不會阻塞繼續執行。

Event常⽤函數:

  • event.wait():在程序中插入一個標記(flag),預設為False,可以設定timeout。
  • event.set():使flag為Ture。
  • event.clear():使flag為False。
  • event.is_set():判斷flag是否為True。
import multiprocessing
import time
def wait_for_event(e):
    print("wait_for_event執行")
    e.wait()
    print("wait_for_event: e.is_set():{}".format(e.is_set()))
def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout執行")
    # 只會阻塞2s
    e.wait(t)
    print("wait_for_event_timeout:e.is_set:{}".format(e.is_set()))
if __name__ == "__main__":
    e = multiprocessing.Event()
    p1 = multiprocessing.Process(target=wait_for_event, args=(e,))
    p1.start()
    p2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 2))
    p2.start()
    time.sleep(4)
    # 4s之後使用e.set()將flag設為Ture
    e.set()
    print("主程序:flag設定為True")

執行結果如下:

總結

本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!


IT145.com E-mail:sddin#qq.com