網站首頁 編程語言 正文
基于python實現RPC的demo
這是一個遠程過程調用(RPC)的實現demo,可以實現不同的python進程之間通信和互相調用函數,簡單易用,易于擴展。更多功能也可進一步完善,本文介紹了該實現的主要思路。
前言
計劃手擼一個rpc甚久了,在間歇性push自己下終于完成的差不多了。寫這個demo的原因,1)是為了學習與思考下這部分主體功能和實現思路,2)是調包時可以毫無心理負擔,并產生一種不過如此的優越感。
實現這部分內容主要依據的還是自己的想法,因此可能會有bug或者有更好的實現方式,僅供學習和參考,完整代碼可參考Gitee鏈接。
實現的時候用的是python2.7,忘記換了,下次一定更新。
一、主要內容
所謂RPC,是遠程過程調用(Remote Procedure Call)的簡寫,網上解釋很多,簡單來說,就是在當前進程調用其他進程的函數時,體驗就像是調用本地寫的函數一般。
本文實現的是在本地調用遠端的類class對象的接口,也就是本地的client不實例化類對象,調用的是server端的類對象接口。
為了達到讓調用層無須關心底層實現,擁有絲滑般的體驗,就需要以下幾個部分:
- 客戶端需要把類的接口提取出來,并將調用函數事件捕獲存儲起來;服務端需要把類的公有函數作為可遠程調用的接口。
- 客戶端把調用函數的事件(調用的函數,參數)進行序列化并發送給服務端;服務端將客戶端的調用事件反序列化,并執行相應的接口,將返回值發送給客戶端。
- 客戶端與服務端通過某種方式(一般就是網絡socket)進行通信。
在下面時序圖的灰色部分,對于調用方來說是透明的,它的執行結果應該和執行本地的函數時一致的。
二、實現步驟
1. 進程間的通信
本文采用了基于TCP的sokcet連接來進行進程之間的通信,更多實現細節可參考之前博客。
在此需要注意:
本文采用了select模塊來監聽網絡事件,如果服務端未收到任何的網絡消息會一直阻塞在這兒。如果服務端除了提供rpc調用服務之外還需要執行其他邏輯,那么應當采用非阻塞,輪詢socket的方式來判斷是否有新的網絡事件。
# ServerBase.py
def process(self):
readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values())
for conn in readable:
if conn is self.socket:
self._handle_conn()
else:
self._handle_recv(conn)
for conn in writable:
pass
for conn in exceptional:
self._handle_leave(conn)
客戶端的網絡事件本文通過創建新的線程來監聽的。并不會影響客戶端主線程的執行,因此可以盡情的阻塞。部分代碼如下:
# AsynCallback.py
class AsyncTaskManager(object):
? ? _asy_events = dict()
? ? def __init__(self, loop, *args):
? ? ? ? super(AsyncTaskManager, self).__init__()
? ? ? ? self._loop_fun = loop
? ? def __call__(self, *args, **kwargs):
? ? ? ? proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs)
? ? ? ? proc.start()
? ? def _exec_loop(self, *args, **kwargs):
? ? ? ? while True:
? ? ? ? ? ? net_resp = self._loop_fun(*args, **kwargs)
? ? ? ? ? ? for resp in net_resp:
? ? ? ? ? ? ? ? asy_event = self._asy_events.pop(resp.rid)
? ? ? ? ? ? ? ? asy_event.set()
# Client.py
class Client(TaskHandle, ClientBase):
?? ?@AsyncTaskManager
?? ?def process(self):
?? ? ? ?super(Client, self).process()
?? ? ? ?_events = []
?? ? ? ?while self.has_events:
?? ? ? ? ? ?event = self.get_next_event()
?? ? ? ? ? ?data = event[1]
?? ? ? ? ? ?_events.append(self.unpack_respond(data))
?? ? ? ?return _events
序列化方式,本文采用了庫pickle進行序列化與反序列化,使用它的原因是可以將自定義類對象也進行序列化,非常之高級。
2. 異步回調實現思路
對于需要返回值的函數調用,處理起來比較簡單,只需要將主線程阻塞等待,直至超時或者接收到了對應函數的返回值即可。本文采用了threading.Event來阻塞與喚醒調用的函數,同時采用了裝飾器來實現這功能。若日后有更好的方法,可以輕易進行替換。相關示例代碼如下所示:
@AsyncTaskManager.respond
def _handle_response(self, tid):
? ? """ 處理有返回值的情況
? ? 會阻塞線程直至收到返回值
? ? """
? ? task = self.pop_task(tid)
? ? if task.callback:
? ? ? ? task.callback()
? ? return self.pop_respond(tid)
@staticmethod
def respond(func):
? ? @wraps(func)
? ? def make_resp(handle, tid):
? ? ? ? """ 需要注意的是,和裝飾的函數參數含義需一致 """
? ? ? ? event = threading.Event()
? ? ? ? AsyncTaskManager._asy_events[tid] = event
? ? ? ? event.wait(timeout=TIME_OUT)
? ? ? ? return func(handle, tid)?? ?# 這兒才是真正執行_handle_response的地方
? ? return make_resp
在實際的應用過程中,應有這樣的情況,服務端與客戶端都是獨立的應用,通過rpc函數進行通信和交互,而并不是某方為另外一方提供服務,那么此時返回值并不必要,只需要將要做的事通知另一方即可。對于此種情況,可以采用異步回調的方式來告知調用方對應函數執行成功了。
在文中依舊采用線程來完成該功能,客戶端調用函數之后創建一個新線程并阻塞住,等待服務端將執行結果發回后再喚醒,如果有回調函數就執行。示例代碼如下:
@AsyncTaskManager.callback
def _handle_call_back(self, tid):
? ? """ 處理有回調函數的調用
? ? callback會等tid事件調用成功之后 才會回調,且不會有返回值
? ? """
? ? task = self.pop_task(tid)
? ? if task.callback:
? ? ? ? task.callback()
? ? ? ??
@staticmethod
def callback(func):
? ? @wraps(func)
? ? def make_thread(event, *args, **kwargs):
? ? ? ? event.wait(timeout=TIME_OUT)
? ? ? ? func(*args, **kwargs)
? ? def make_async(handle, tid):
? ? ? ? """ 注意點同上 """
? ? ? ? event = threading.Event()
? ? ? ? AsyncTaskManager._asy_events[tid] = event
? ? ? ? _task = threading.Thread(target=lambda: make_thread(event, handle, tid))
? ? return make_async
總結
原文鏈接:https://blog.csdn.net/wz2671/article/details/108269400
相關推薦
- 2022-04-23 通過自定義指令實現 element-ui的tooltip組件 文本長度超出顯示不超出不顯示
- 2022-12-24 Docker網絡模型以及容器通信詳解續篇_docker
- 2022-06-16 Rancher+Docker+SpringBoot實現微服務部署、擴容、環境監控_docker
- 2022-05-24 ASP.NET?MVC使用異步TPL模式_實用技巧
- 2022-03-21 C語言動態內存管理介紹_C 語言
- 2022-11-03 Python?文檔解析lxml庫的使用詳解_python
- 2022-05-08 PyTorch實現多維度特征輸入邏輯回歸_python
- 2022-09-15 c++中的字節序與符號位的問題_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支