日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網(wǎng)站首頁 編程語言 正文

Python?gRPC流式通信協(xié)議詳細講解_python

作者:雕弓 ? 更新時間: 2022-12-26 編程語言

ProtoBuf 協(xié)議

gRPC使用 protocol buffer 協(xié)議做為接口描述語言(IDL) ,來定義接口及傳遞的消息數(shù)據(jù)類型。

gRPC的message 與 service 均須在protobuf 文件中定義好,才能進行編譯。 該文件須按protobuf 語法編寫, 也比較簡單,當前只須學習gRPC用到的部分。

如下例:

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
  string greeting = 1;
}
message HelloResponse {
  string reply = 1;
}

說明:

  • syntax = “proto3” protobuf的版本號
  • package tutorial 主要是java用,python是用文件名做為module 名,可以不需要定義
  • Service : 就是定義1個 gRPC service, 在service代碼塊內(nèi),定義rpc 方法,指定request, response類型。 gRPC支持4種rpc方法
service interface_name {
rpc api_name( request ) returns ( response );
}

Message: 相當于接口函數(shù)的參數(shù)類型與返回值類型 ,需要分開定義

詳細 protobuf 使用教程,請參考菜鳥教程tutorialspoint 的教程 https://www.tutorialspoint.com/protobuf/index.htm

gRPC 4種通信模式介紹

1. 單向RPC

gRPC的術(shù)語為unary RPC,這種方式與函數(shù)調(diào)用類似, client 發(fā)送1條請求,server回1條響應。

rpc SayHello(HelloRequest) returns (HelloResponse);

2. 服務器流式處理 RPC

客戶端向服務器發(fā)送1條請求,服務器以回應多條響應消息,客戶機從返回的流中讀取數(shù)據(jù),直至沒有更多消息。 這時要在響應類型前加1個 stream關(guān)鍵字修飾。

rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

3. 客戶端流式處理 RPC

由客戶端寫入一系列消息并將其發(fā)送到服務。 客戶端完成消息寫入后,它將等待服務器讀取消息并返回其響應. 這種模式,要在request的類型前加 stream 修飾。

rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

4. 雙向流式處理 RPC

其中雙方使用讀寫流發(fā)送一系列消息。這兩個流獨立運行,因此客戶端和服務器可以按照它們喜歡的任何順序進行讀取和寫入:例如,服務器可以等待接收所有客戶端消息,然后再寫入響應,或者它可以交替讀取消息然后寫入消息,或者讀取和寫入的某種其他組合。將保留每個流中消息的順序。此模式下, request 與 response類型均需要用stream修飾。

rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

說明

  • 流式處理中,服務器A向客戶機B發(fā)送數(shù)據(jù),必須是在protobuf 中 rpc方法中Response message包含的數(shù)據(jù)類型。而不是任意發(fā)送數(shù)據(jù),程序會報錯。不限制的是響應中可以回1條或多條消息。
  • 如果需要A向B發(fā)送請求,則需要定義新的protobuf 文件,并編寫新的server, client python代碼,這時,兩臺機器的角色是倒過來,B為server, A為client。仍然符合gRPC的原則。
  • 如果一定需要在1條連接中,雙方互發(fā)請求,socket 模塊的低階API接口函數(shù)編程可以滿足要求,但必須注意,管理socket雙向通信必須小心翼翼,否則會造成混亂,

流程處理實現(xiàn)過程

1. 用protobuf 定義接口

下面以實例說明: users.proto ,

syntax = "proto3";
package users;
message User {
  string username = 1;
  uint32 user_id = 2;
}
message CreateUserRequest {
  string username = 1;
  string password = 2;
  string email = 3;
}
message CreateUserResult {
  User user = 1;
}
message GetUsersRequest {
  repeated User user = 1;
}
service Users {
  rpc CreateUser (users.CreateUserRequest) returns (users.CreateUserResult);
  rpc GetUsers (users.GetUsersRequest) returns (stream users.GetUsersResult);
}
message GetUsersResult {
  User user = 1;
}

2. 根據(jù).protobuf文件生成客戶方與服務方代碼

首先要安裝 grpcio-tools package:

pip install grpcio-tools

進入proto文件所在目錄,執(zhí)行如下命令

python -m grpc_tools.protoc

\ --proto_path=.

\ --python_out=.

\ --grpc_python_out=.

\ proto文件名

參數(shù)說明

  • proto_path=proto文件路徑
  • python_out=編譯生成的文件的路徑
  • grpc_python_out=編譯生成的接口文件路徑
  • ./route_guide.proto 是要編譯的協(xié)議文件

本例 :

python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. users.proto

生成的文件有兩個: Users_pb2.py 與 Users_pb2_grpc.py,

3. 服務器端代碼

from concurrent import futures
import time
import grpc
import users_pb2_grpc as users_service
import users_pb2 as users_messages
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class UsersService(users_service.UsersServicer):
    def CreateUser(self, request, context):
        metadata = dict(context.invocation_metadata())
        print(metadata)
        user = users_messages.User(username=request.username, user_id=1)
        return users_messages.CreateUserResult(user=user)
    def GetUsers(self, request, context):
        for user in request.user:
            user = users_messages.User(
                username=user.username, user_id=user.user_id
            )
            yield users_messages.GetUsersResult(user=user)
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    users_service.add_UsersServicer_to_server(UsersService(), server)
    server.add_insecure_port('0.0.0.0:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)
if __name__ == '__main__':
    serve()

4. 客戶端側(cè)代碼

import sys
import grpc
import users_pb2_grpc as users_service
import users_pb2 as users_messages
def run():
    channel = grpc.insecure_channel('localhost:50051')
    try:
        grpc.channel_ready_future(channel).result(timeout=10)
    except grpc.FutureTimeoutError:
        sys.exit('Error connecting to server')
    else:
        stub = users_service.UsersStub(channel)
        metadata = [('ip', '127.0.0.1')]
        response = stub.CreateUser(
            users_messages.CreateUserRequest(username='tom'),
            metadata=metadata,
        )
        if response:
            print("User created:", response.user.username)
        request = users_messages.GetUsersRequest(
            user=[users_messages.User(username="alexa", user_id=1),
                  users_messages.User(username="christie", user_id=1)]
        )
        response = stub.GetUsers(request)
        for resp in response:
            print(resp)
if __name__ == '__main__':
    run()

5. 測試代碼

打開兩個終端窗口,分別運行g(shù)rpc_server.py, grpc_client.py

可以看到client.py 窗口顯示

(enva) D:\workplace\python\enva\test1>py grpc_client.py
User created: tom
user {
  username: "alexa"
  user_id: 1
}
user {
  username: "christie"
  user_id: 1
}

服務器窗口同時顯示

(enva) D:\workplace\python\enva\test1>py grpc_server.py
{'user-agent': 'grpc-python/1.50.0 grpc-c/28.0.0 (windows; chttp2)', 'ip': '127.0.0.1'}

學習小記

流式處理編程,其實比較簡單,只是流式處理一方要構(gòu)建多條mesage,接口方法會自動逐條發(fā)送,接收側(cè)也只須遍歷讀取即可。流式處理用來發(fā)送大文件,如圖片,視頻之類,比REST有明顯優(yōu)勢,而且有規(guī)范接口,也便于團隊合作。

原文鏈接:https://blog.csdn.net/captain5339/article/details/127857840

欄目分類
最近更新