首頁 > 軟體

基於python實現rpc遠端過程呼叫

2022-06-12 14:00:44

基於python實現RPC的demo

這是一個遠端過程呼叫(RPC)的實現demo,可以實現不同的python程序之間通訊和互相呼叫函數,簡單易用,易於擴充套件。更多功能也可進一步完善,本文介紹了該實現的主要思路。

前言

計劃手擼一個rpc甚久了,在間歇性push自己下終於完成的差不多了。寫這個demo的原因,1)是為了學習與思考下這部分主體功能和實現思路,2)是調包時可以毫無心理負擔,併產生一種不過如此的優越感。
實現這部分內容主要依據的還是自己的想法,因此可能會有bug或者有更好的實現方式,僅供學習和參考,完整程式碼可參考Gitee連結。
實現的時候用的是python2.7,忘記換了,下次一定更新。

一、主要內容

所謂RPC,是遠端過程呼叫(Remote Procedure Call)的簡寫,網上解釋很多,簡單來說,就是在當前程序呼叫其他程序的函數時,體驗就像是呼叫本地寫的函數一般。
本文實現的是在本地呼叫遠端的類class物件的介面,也就是原生的client不範例化類物件,呼叫的是server端的類物件介面。
為了達到讓呼叫層無須關心底層實現,擁有絲滑般的體驗,就需要以下幾個部分:

  • 使用者端需要把類的介面提取出來,並將呼叫函數事件捕獲儲存起來;伺服器端需要把類的公有函數作為可遠端呼叫的介面。
  • 使用者端把呼叫函數的事件(呼叫的函數,引數)進行序列化並行送給伺服器端;伺服器端將使用者端的呼叫事件反序列化,並執行相應的介面,將返回值傳送給使用者端。
  • 使用者端與伺服器端通過某種方式(一般就是網路socket)進行通訊。

在下面時序圖的灰色部分,對於呼叫方來說是透明的,它的執行結果應該和執行原生的函數時一致的。

二、實現步驟

1. 程序間的通訊

本文采用了基於TCP的sokcet連線來進行程序之間的通訊,更多實現細節可參考之前部落格。
在此需要注意:

本文采用了select模組來監聽網路事件,如果伺服器端未收到任何的網路訊息會一直阻塞在這兒。如果伺服器端除了提供rpc呼叫服務之外還需要執行其他邏輯,那麼應當採用非阻塞,輪詢socket的方式來判斷是否有新的網路事件。

# ServerBase.py
def process(self):
    readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values())
    for conn in readable:
        if conn is self.socket:
            self._handle_conn()
        else:
            self._handle_recv(conn)
    for conn in writable:
        pass
    for conn in exceptional:
        self._handle_leave(conn)

使用者端的網路事件本文通過建立新的執行緒來監聽的。並不會影響使用者端主執行緒的執行,因此可以盡情的阻塞。部分程式碼如下:

# AsynCallback.py
class AsyncTaskManager(object):
    _asy_events = dict()

    def __init__(self, loop, *args):
        super(AsyncTaskManager, self).__init__()
        self._loop_fun = loop

    def __call__(self, *args, **kwargs):
        proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs)
        proc.start()

    def _exec_loop(self, *args, **kwargs):
        while True:
            net_resp = self._loop_fun(*args, **kwargs)
            for resp in net_resp:
                asy_event = self._asy_events.pop(resp.rid)
                asy_event.set()
# Client.py
class Client(TaskHandle, ClientBase):

    @AsyncTaskManager
    def process(self):
        super(Client, self).process()
        _events = []
        while self.has_events:
            event = self.get_next_event()
            data = event[1]
            _events.append(self.unpack_respond(data))
        return _events

序列化方式,本文采用了庫pickle進行序列化與反序列化,使用它的原因是可以將自定義類物件也進行序列化,非常之高階。

2. 非同步回撥實現思路

對於需要返回值的函數呼叫,處理起來比較簡單,只需要將主執行緒阻塞等待,直至超時或者接收到了對應函數的返回值即可。本文采用了threading.Event來阻塞與喚醒呼叫的函數,同時採用了裝飾器來實現這功能。若日後有更好的方法,可以輕易進行替換。相關範例程式碼如下所示:

@AsyncTaskManager.respond
def _handle_response(self, tid):
    """ 處理有返回值的情況
    會阻塞執行緒直至收到返回值
    """
    task = self.pop_task(tid)
    if task.callback:
        task.callback()
    return self.pop_respond(tid)

@staticmethod
def respond(func):
    @wraps(func)
    def make_resp(handle, tid):
        """ 需要注意的是,和裝飾的函數引數含義需一致 """
        event = threading.Event()
        AsyncTaskManager._asy_events[tid] = event
        event.wait(timeout=TIME_OUT)
        return func(handle, tid)    # 這兒才是真正執行_handle_response的地方
    return make_resp

在實際的應用過程中,應有這樣的情況,伺服器端與使用者端都是獨立的應用,通過rpc函數進行通訊和互動,而並不是某方為另外一方提供服務,那麼此時返回值並不必要,只需要將要做的事通知另一方即可。對於此種情況,可以採用非同步回撥的方式來告知呼叫方對應函數執行成功了。

在文中依舊採用執行緒來完成該功能,使用者端呼叫函數之後建立一個新執行緒並阻塞住,等待伺服器端將執行結果發回後再喚醒,如果有回撥函數就執行。範例程式碼如下:

@AsyncTaskManager.callback
def _handle_call_back(self, tid):
    """ 處理有回撥函數的呼叫
    callback會等tid事件呼叫成功之後 才會回撥,且不會有返回值
    """
    task = self.pop_task(tid)
    if task.callback:
        task.callback()
        
@staticmethod
def callback(func):
    @wraps(func)
    def make_thread(event, *args, **kwargs):
        event.wait(timeout=TIME_OUT)
        func(*args, **kwargs)

    def make_async(handle, tid):
        """ 注意點同上 """
        event = threading.Event()
        AsyncTaskManager._asy_events[tid] = event
        _task = threading.Thread(target=lambda: make_thread(event, handle, tid))

    return make_async

總結

到此這篇關於基於python實現rpc遠端過程呼叫的文章就介紹到這了,更多相關python rpc遠端呼叫內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com