網站首頁 編程語言 正文
Stream Grpc
在我們單次投遞的數據量很大的時候,比如傳輸一個二進制文件的時候,數據包過大,會造成瞬時傳輸壓力。或者接收方接收到數據后,需要對數據做一系列的處理工作,
比如:數據過濾 -> 數據格式轉換 -> 數據求和 ,這種場景非常適合使用stream grpc,
Stream Grpc演示
syntax = "proto3"; package book_stream; option go_package = "/book_stream"; service HelloStreamService { rpc BookListStream(BookListStreamRequest) returns (stream BookListStreamResponse){}; rpc CreateBookStream(stream CreateBookStreamRequest) returns (CreateBookStreamResponse){} rpc FindBookByIdStream(stream FindBookByIdStreamRequest) returns (stream FindBookByIdStreamResponse){} } message BookListStreamRequest{ } message BookListStreamResponse{ BookPoint book = 1; } message CreateBookStreamRequest{ BookPoint book = 1; } message CreateBookStreamResponse{ repeated BookIdPoint idx = 1; } message FindBookByIdStreamRequest{ BookIdPoint idx = 1; } message FindBookByIdStreamResponse{ BookPoint book = 1; } message BookIdPoint{ uint64 idx = 1; } message BookPoint{ uint64 idx = 1; string name = 2; float price = 3; string author = 4; }
運行protoc --go_out=plugins=grpc:. *.proto
生成腳手架文件
-
BookListStream
服務端流式RPC -
CreateBookStream
客戶端流式RPC -
FindBookByIdStream
雙向流式RPC
注意,這里只是用作方便演示使用,演示方法都不是線程安全的
服務端server
var port = 8888 func main() { server := grpc.NewServer() book_stream.RegisterHelloStreamServiceServer(server, new(HelloStreamServiceImpl)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { panic(err) } if err := server.Serve(lis); err != nil { panic(err) } }
客戶端
func main() { var port = 8888 conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithInsecure()) if err != nil { panic(err) } defer conn.Close() client := book_stream.NewHelloStreamServiceClient(conn) ctx := context.Background() if err := createBookStream(ctx, client); err != nil { panic(err) } if err := printBookList(ctx, client); err != nil { panic(err) } if err := getBookListById(ctx, client); err != nil { panic(err) } }
BookListStream
服務器端流式 RPC,顯然是單向流,并代指 Server 為 Stream 而 Client 為普通 RPC 請求
簡單來講就是客戶端發起一次普通的 RPC 請求,服務端通過流式響應多次發送數據集,客戶端 Recv 接收數據集。
server端實現
var bookStore = map[uint64]book_stream.BookPoint{ 1: { Idx: 1, Author: "程子", Price: 9.9, Name: "游戲思維", }, 2: { Idx: 2, Author: "丁銳", Price: 9.9, Name: "活出必要的鋒芒", }, } type HelloStreamServiceImpl struct{} func (HelloStreamServiceImpl) BookListStream(_ *book_stream.BookListStreamRequest, streamServer book_stream.HelloStreamService_BookListStreamServer) error { for idx, bookPoint := range bookStore { err := streamServer.Send(&book_stream.BookListStreamResponse{Book: &book_stream.BookPoint{ Idx: idx, Name: bookPoint.Name, Price: bookPoint.GetPrice(), Author: bookPoint.Author, }}) if err != nil { return err } } return nil }
客戶端實現
func printBookList(ctx context.Context, client book_stream.HelloStreamServiceClient) error { req := &book_stream.BookListStreamRequest{} listStream, err := client.BookListStream(ctx, req) if err != nil { return err } for true { resp, err := listStream.Recv() if err != nil { if err == io.EOF { return nil } return err } fmt.Printf("%v\n", *resp.Book) } return nil }
CreateBookStream
客戶端流式 RPC,單向流,客戶端通過流式發起多次?RPC 請求給服務端,服務端發起一次響應給客戶端
server端實現
func (HelloStreamServiceImpl) CreateBookStream(server book_stream.HelloStreamService_CreateBookStreamServer) error { var resList []*book_stream.BookIdPoint for { resp, err := server.Recv() if err == io.EOF { return server.SendAndClose(&book_stream.CreateBookStreamResponse{Idx: resList}) } if err != nil { return err } bookStore[resp.Book.Idx] = *resp.Book resList = append(resList, &book_stream.BookIdPoint{Idx: resp.Book.Idx}) } }
客戶端實現
var newBookStore = map[uint64]book_stream.BookPoint{ 3: { Idx: 3, Author: "程子1", Price: 9.9, Name: "游戲思維1", }, 4: { Idx: 4, Author: "丁銳1", Price: 9.9, Name: "活出必要的鋒芒1", }, } func createBookStream(ctx context.Context, client book_stream.HelloStreamServiceClient) error { stream, err := client.CreateBookStream(ctx) if err != nil { return err } for _, bookPoint := range newBookStore { if err := stream.Send(&book_stream.CreateBookStreamRequest{ Book: &bookPoint, }); err != nil { return err } } recv, err := stream.CloseAndRecv() if err != nil { return err } fmt.Println(recv.Idx) return nil }
stream.SendAndClose
,它是做什么用的呢?
在這段程序中,我們對每一個 Recv 都進行了處理,當發現?io.EOF
?(流關閉) 后,需要將最終的響應結果發送給客戶端,同時關閉正在另外一側等待的 Recv
stream.CloseAndRecv
?和?stream.SendAndClose
?是配套使用的流方法,
FindBookByIdStream
服務端實現
func (HelloStreamServiceImpl) FindBookByIdStream(streamServer book_stream.HelloStreamService_FindBookByIdStreamServer) error { for { resp, err := streamServer.Recv() if err == io.EOF { return nil } if err != nil { return err } if book, ok := bookStore[resp.Idx.Idx]; ok { if err := streamServer.Send(&book_stream.FindBookByIdStreamResponse{Book: &book}); err != nil { return err } } } }
客戶端實現
func getBookListById(ctx context.Context, client book_stream.HelloStreamServiceClient) error { stream, err := client.FindBookByIdStream(ctx) if err != nil { return err } var findList = []uint64{1, 2} for _, idx := range findList { err := stream.Send(&book_stream.FindBookByIdStreamRequest{Idx: &book_stream.BookIdPoint{Idx: idx}}) if err != nil { return err } recv, err := stream.Recv() if err != nil { return err } fmt.Printf("%v\n", recv.Book) } if err := stream.CloseSend(); err != nil { return err } return nil }
原文鏈接:https://juejin.cn/post/7110794610653265933
相關推薦
- 2022-06-23 詳解windows?server?2012的DHCP保留地址導出導入、DHCP故障轉移配置、DNS條
- 2022-09-21 go語言中的defer關鍵字_Golang
- 2022-07-01 python神經網絡Batch?Normalization底層原理詳解_python
- 2022-04-26 jQuery實現鎖定頁面元素(表格列)_jquery
- 2022-10-17 android中px、sp與dp之間進行轉換詳解_Android
- 2022-12-02 docker進階教程之dockerfile優化鏡像大小_docker
- 2023-05-17 一文速學Python+Pyecharts繪制樹形圖_python
- 2022-08-18 python數據可視化pygal模擬擲骰子實現示例_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支