首頁 > 軟體

Python語法學習之程序間的通訊方式

2022-04-11 13:02:01

什麼是程序的通訊

這裡舉一個例子接介紹通訊的機制:通訊 一詞大家並不陌生,比如一個人要給他的女友打電話。當建立了通話之後,在這個通話的過程中就是建立了一條隱形的 佇列 (記住這個詞)。此時這個人就會通過對話的方式不停的將資訊告訴女友,而這個人的女友也是在傾聽著。(嗯…我個人覺得大部分情況下可能是反著來的)。

這裡可以將他們兩個人比作是兩個程序,"這個人"的程序需要將資訊傳送給"女友"的程序,就需要一個佇列的幫助。而女友需要不停的接收佇列的資訊,可以做一些其他的事情,所以兩個程序之間的通訊主要依賴於佇列。

這個佇列可以支援傳送訊息與接收訊息,“這個人"負責傳送訊息,反之"女友” 負責的是接收訊息。

既然佇列才是重點,那麼來看一下佇列要如何建立。

佇列的建立 - multiprocessing

依然使用 multiprocessing 模組,呼叫該模組的 Queue 函數來實現佇列的建立。

函數名介紹引數返回值
Queue佇列的建立mac_count佇列物件

Queue 函數功能介紹:呼叫 Queue 可以建立佇列;它有一個引數 mac_count 代表佇列最大可以建立多少資訊,如果不傳預設是無限長度。範例化一個佇列物件之後,需要操作這個佇列的物件進行放入與取出資料。

程序之間通訊的方法

函數名介紹引數返回值
put將訊息放入佇列message
get獲取佇列訊息str

put 函數功能介紹:將資料傳入。它有一個引數 message ,是一個字串型別。

get 函數功能介紹:用來接收佇列中的資料。(其實這裡就是一個常用的json場景,有很多的資料傳輸都是 字串 的,佇列的插入與獲取就是使用的字串,所以 json 就非常適用這個場景。)

接下來就來練習一下 佇列的使用 。

程序間的通訊 - 佇列演示案例

程式碼範例如下:

# coding:utf-8


import json
import multiprocessing


class Work(object):     # 定義一個 Work 類
    def __init__(self, queue):      # 建構函式傳入一個 '佇列物件' --> queue
            self.queue = queue

    def send(self, message):        # 定義一個 send(傳送) 函數,傳入 message
                                    # [這裡有個隱藏的bug,就是隻判斷了傳入的是否字串型別;如果傳入的是函數、類、集合等依然會報錯]
        if not isinstance(message, str):    # 判斷傳入的 message 是否為字串,若不是,則進行 json 序列化
            message = json.dumps(message)
        self.queue.put(message)     # 利用 queue 的佇列範例化物件將 message 傳送出去

    def receive(self):      # 定義一個 receive(接收) 函數,不需傳入引數,但是因為接收是一個源源不斷的過程,所以需要使用 while 迴圈
        while 1:
            result = self.queue.get()   # 獲取 '佇列物件' --> queue 傳入的message
                                        # 由於我們接收的 message 可能不是一個字串,所以要程序異常的捕獲
            try:                        # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res
                res = json.loads(result)
            except:
                res = result
            print('接收到的資訊為:{}'.format(res))


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    work = Work(queue)
    send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的訊息'},))
    receive = multiprocessing.Process(target=work.receive)

    send.start()
    receive.start()

使用佇列建立程序間通訊遇到的異常

但是這裡會出現一個 報錯,如下圖:

報錯截圖範例如下:

這裡的報錯提示是 檔案沒有被發現的意思 。其實這裡是我們使用 佇列做 put() 和 get()的時候 有一把無形的鎖加了上去,就是上圖中圈中的 .SemLock 。我們不需要去關心造成這個錯誤的具體原因,要解決這個問題其實也很簡單。

FileNotFoundError: [Errno 2] No such file or directory 異常的解決

我們只需要給 send 或者 receive 其中一個子程序新增 join 阻塞程序即可,理論上如此。但是我們的 receive子程序是一個 while迴圈,它會一直執行,所以只需要給 send 子程序加上一個 join 即可。

解決示意圖如下:

PS:雖然解決了報錯問題,但是程式沒有正常退出。

實際上由於我們的 receive 程序是個 while迴圈,並不知道要處理到什麼時候,沒有辦法立刻終止。所以我們需要在 receive 程序 使用 terminate() 函數終結接收端。

執行結果如下:

批次給 send 函數加入資料

新建一個函數,寫入 for迴圈 模擬批次新增要傳送的訊息

然後再給這個模擬批次傳送資料的函數新增一個執行緒。

範例程式碼如下:

# coding:utf-8


import json
import time
import multiprocessing


class Work(object):     # 定義一個 Work 類
    def __init__(self, queue):      # 建構函式傳入一個 '佇列物件' --> queue
            self.queue = queue

    def send(self, message):        # 定義一個 send(傳送) 函數,傳入 message
                                    # [這裡有個隱藏的bug,就是隻判斷了傳入的是否字串型別;如果傳入的是函數、類、集合等依然會報錯]
        if not isinstance(message, str):    # 判斷傳入的 message 是否為字串,若不是,則進行 json 序列化
            message = json.dumps(message)
        self.queue.put(message)     # 利用 queue 的佇列範例化物件將 message 傳送出去


    def send_all(self):             # 定義一個 send_all(傳送)函數,然後通過for迴圈模擬批次傳送的 message
        for i in range(20):
            self.queue.put('第 {} 次迴圈,傳送的訊息為:{}'.format(i, i))
            time.sleep(1)



    def receive(self):      # 定義一個 receive(接收) 函數,不需傳入引數,但是因為接收是一個源源不斷的過程,所以需要使用 while 迴圈
        while 1:
            result = self.queue.get()   # 獲取 '佇列物件' --> queue 傳入的message
                                        # 由於我們接收的 message 可能不是一個字串,所以要程序異常的捕獲
            try:                        # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res
                res = json.loads(result)
            except:
                res = result
            print('接收到的資訊為:{}'.format(res))


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    work = Work(queue)
    send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的訊息'},))
    receive = multiprocessing.Process(target=work.receive)
    send_all = multiprocessing.Process(target=work.send_all,)


    send_all.start()    # 這裡因為 send 只執行了1次,然後就結束了。而 send_all 卻要回圈20次,它的執行時間是最長的,資訊也是傳送的最多的
    send.start()
    receive.start()

    # send.join()       # 使用 send 的阻塞會造成 send_all 迴圈還未結束 ,receive.terminate() 函數接收端就會終結。
    send_all.join()     # 所以我們只需要阻塞最長使用率的程序就可以了
    receive.terminate()

執行結果如下:

從上圖中我們可以看到 send 與 send_all 兩個程序都可以通過 queue這個範例化的 Queue 物件傳送訊息,同樣的 receive接收函數也會將兩個程序傳入的 message 列印輸出出來。

小節

該章節我們通過佇列的方式實現了程序間通訊的方法,並且瞭解了佇列的使用方法。一個佇列中,有一端(這裡我們演示的是 send端)通過 put方法實現新增相關的資訊,另一端使用 get 方法獲取相關的資訊;兩個程序相互配合達到一個程序通訊的效果。

其實程序之間的通訊不僅僅只有佇列這一種方式,感興趣的話還可以通過 管道、號誌、共用記憶體的方式來實現。可以自行拓展一下。

程序間通訊的其他方式 - 補充

python提供了多種程序通訊的方式,包括訊號,管道,訊息佇列,號誌,共用記憶體,socket等

主要Queue和Pipe這兩種方式,Queue用於多個程序間實現通訊,Pipe是兩個程序的通訊。

1.管道:分為匿名管道和命名管道

匿名管道:在核心中申請一塊固定大小的緩衝區,程式擁有寫入和讀取的權利,一般使用fock函數實現父子程序的通訊

命名管道:在記憶體中申請一塊固定大小的緩衝區,程式擁有寫入和讀取的權利,沒有血緣關係的程序也可以程序間通訊

特點:面向位元組流;生命週期隨核心;自帶同步互斥機制;半雙工,單向通訊,兩個管道實現雙向通訊

2.訊息佇列:在核心中建立一個佇列,佇列中每個元素是一個資料包,不同的程序可以通過控制程式碼去存取這個佇列。訊息佇列提供了一個從一個程序向另外一個程序傳送一塊資料的方法。每個資料塊都被認為是有一個型別,接收者程序接收的資料塊可以有不同的型別。訊息佇列也有管道一樣的不足,就是每個訊息的最大長度是有上限的,每個訊息佇列的總的位元組數是有上限的,系統上訊息佇列的總數也有一個上限

特點:訊息佇列可以被認為是一個全域性的一個連結串列,連結串列節點中存放著資料包的型別和內容,有訊息佇列的識別符號進行標記;訊息佇列允許一個或多個程序寫入或讀取訊息;訊息佇列的生命週期隨核心;訊息佇列可實現雙向通訊

3.號誌:在核心中建立一個號誌集合(本質上是陣列),陣列的元素(號誌)都是1,使用P操作進行-1,使用V操作+1

P(sv):如果sv的值大於零,就給它減1;如果它的值為零,就掛起該程式的執行

V(sv):如果有其他程序因等待sv而被掛起,就讓它恢復執行,如果沒有程序因等待sv而掛起,就給它加1

PV操作用於同一個程序,實現互斥;PV操作用於不同程序,實現同步

功能:對臨界資源進行保護

4.共用記憶體:將同一塊實體記憶體一塊對映到不同的程序的虛擬地址空間中,實現不同程序間對同一資源的共用。共用記憶體可以說是最有用的程序間通訊方式,也是最快的IPC形式

特點:不同從使用者態到核心態的頻繁切換和拷貝資料,直接從記憶體中讀取就可以;共用記憶體是臨界資源,所以需要操作時必須要保證原子性。使用號誌或者互斥鎖都可以.

以上就是Python語法學習之程序間的通訊方式的詳細內容,更多關於Python程序通訊方式的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com