日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

基于python實現rpc遠程過程調用_python

作者:_流雲 ? 更新時間: 2022-08-04 編程語言

基于python實現RPC的demo

這是一個遠程過程調用(RPC)的實現demo,可以實現不同的python進程之間通信和互相調用函數,簡單易用,易于擴展。更多功能也可進一步完善,本文介紹了該實現的主要思路。

前言

計劃手擼一個rpc甚久了,在間歇性push自己下終于完成的差不多了。寫這個demo的原因,1)是為了學習與思考下這部分主體功能和實現思路,2)是調包時可以毫無心理負擔,并產生一種不過如此的優越感。
實現這部分內容主要依據的還是自己的想法,因此可能會有bug或者有更好的實現方式,僅供學習和參考,完整代碼可參考Gitee鏈接。
實現的時候用的是python2.7,忘記換了,下次一定更新。

一、主要內容

所謂RPC,是遠程過程調用(Remote Procedure Call)的簡寫,網上解釋很多,簡單來說,就是在當前進程調用其他進程的函數時,體驗就像是調用本地寫的函數一般。
本文實現的是在本地調用遠端的類class對象的接口,也就是本地的client不實例化類對象,調用的是server端的類對象接口。
為了達到讓調用層無須關心底層實現,擁有絲滑般的體驗,就需要以下幾個部分:

  • 客戶端需要把類的接口提取出來,并將調用函數事件捕獲存儲起來;服務端需要把類的公有函數作為可遠程調用的接口。
  • 客戶端把調用函數的事件(調用的函數,參數)進行序列化并發送給服務端;服務端將客戶端的調用事件反序列化,并執行相應的接口,將返回值發送給客戶端。
  • 客戶端與服務端通過某種方式(一般就是網絡socket)進行通信。

在下面時序圖的灰色部分,對于調用方來說是透明的,它的執行結果應該和執行本地的函數時一致的。

二、實現步驟

1. 進程間的通信

本文采用了基于TCP的sokcet連接來進行進程之間的通信,更多實現細節可參考之前博客。
在此需要注意:

本文采用了select模塊來監聽網絡事件,如果服務端未收到任何的網絡消息會一直阻塞在這兒。如果服務端除了提供rpc調用服務之外還需要執行其他邏輯,那么應當采用非阻塞,輪詢socket的方式來判斷是否有新的網絡事件。

# ServerBase.py
def process(self):
    readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values())
    for conn in readable:
        if conn is self.socket:
            self._handle_conn()
        else:
            self._handle_recv(conn)
    for conn in writable:
        pass
    for conn in exceptional:
        self._handle_leave(conn)

客戶端的網絡事件本文通過創建新的線程來監聽的。并不會影響客戶端主線程的執行,因此可以盡情的阻塞。部分代碼如下:

# AsynCallback.py
class AsyncTaskManager(object):
? ? _asy_events = dict()

? ? def __init__(self, loop, *args):
? ? ? ? super(AsyncTaskManager, self).__init__()
? ? ? ? self._loop_fun = loop

? ? def __call__(self, *args, **kwargs):
? ? ? ? proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs)
? ? ? ? proc.start()

? ? def _exec_loop(self, *args, **kwargs):
? ? ? ? while True:
? ? ? ? ? ? net_resp = self._loop_fun(*args, **kwargs)
? ? ? ? ? ? for resp in net_resp:
? ? ? ? ? ? ? ? asy_event = self._asy_events.pop(resp.rid)
? ? ? ? ? ? ? ? asy_event.set()
# Client.py
class Client(TaskHandle, ClientBase):

?? ?@AsyncTaskManager
?? ?def process(self):
?? ? ? ?super(Client, self).process()
?? ? ? ?_events = []
?? ? ? ?while self.has_events:
?? ? ? ? ? ?event = self.get_next_event()
?? ? ? ? ? ?data = event[1]
?? ? ? ? ? ?_events.append(self.unpack_respond(data))
?? ? ? ?return _events

序列化方式,本文采用了庫pickle進行序列化與反序列化,使用它的原因是可以將自定義類對象也進行序列化,非常之高級。

2. 異步回調實現思路

對于需要返回值的函數調用,處理起來比較簡單,只需要將主線程阻塞等待,直至超時或者接收到了對應函數的返回值即可。本文采用了threading.Event來阻塞與喚醒調用的函數,同時采用了裝飾器來實現這功能。若日后有更好的方法,可以輕易進行替換。相關示例代碼如下所示:

@AsyncTaskManager.respond
def _handle_response(self, tid):
? ? """ 處理有返回值的情況
? ? 會阻塞線程直至收到返回值
? ? """
? ? task = self.pop_task(tid)
? ? if task.callback:
? ? ? ? task.callback()
? ? return self.pop_respond(tid)

@staticmethod
def respond(func):
? ? @wraps(func)
? ? def make_resp(handle, tid):
? ? ? ? """ 需要注意的是,和裝飾的函數參數含義需一致 """
? ? ? ? event = threading.Event()
? ? ? ? AsyncTaskManager._asy_events[tid] = event
? ? ? ? event.wait(timeout=TIME_OUT)
? ? ? ? return func(handle, tid)?? ?# 這兒才是真正執行_handle_response的地方
? ? return make_resp

在實際的應用過程中,應有這樣的情況,服務端與客戶端都是獨立的應用,通過rpc函數進行通信和交互,而并不是某方為另外一方提供服務,那么此時返回值并不必要,只需要將要做的事通知另一方即可。對于此種情況,可以采用異步回調的方式來告知調用方對應函數執行成功了。

在文中依舊采用線程來完成該功能,客戶端調用函數之后創建一個新線程并阻塞住,等待服務端將執行結果發回后再喚醒,如果有回調函數就執行。示例代碼如下:

@AsyncTaskManager.callback
def _handle_call_back(self, tid):
? ? """ 處理有回調函數的調用
? ? callback會等tid事件調用成功之后 才會回調,且不會有返回值
? ? """
? ? task = self.pop_task(tid)
? ? if task.callback:
? ? ? ? task.callback()
? ? ? ??
@staticmethod
def callback(func):
? ? @wraps(func)
? ? def make_thread(event, *args, **kwargs):
? ? ? ? event.wait(timeout=TIME_OUT)
? ? ? ? func(*args, **kwargs)

? ? def make_async(handle, tid):
? ? ? ? """ 注意點同上 """
? ? ? ? event = threading.Event()
? ? ? ? AsyncTaskManager._asy_events[tid] = event
? ? ? ? _task = threading.Thread(target=lambda: make_thread(event, handle, tid))

? ? return make_async

總結

原文鏈接:https://blog.csdn.net/wz2671/article/details/108269400

欄目分類
最近更新