網站首頁 編程語言 正文
1. 什么是數據流
grpc中的stream,srteam顧名思義就是一種流,可以源源不斷的推送數據,很適合傳輸一些大數據,或者服務端和客戶端長時間數據交互,比如客戶端可以向服務端訂閱一個數據,服務端就可以利用stream,源源不斷地推送數據。
底層還原成socket編程
2. grpc的四種數據流
1.簡單模式
2.服務端數據流模式(Server-side streaming RPC)
3.客戶端數據流模式(Client-side streaming RPC)
4.雙向數據流模式(Bidirectional streaming RPC)
2.1 簡單模式
? 這種模式最為傳統,即客戶端發起一次請求,服務端響應一個數據,這和大家平時熟悉的RPC沒有什么大的區別,上兩篇中介紹此模式。
2.2 服務端數據流模式
? 這種模式是客戶端發起一次請求,服務端返回一段連續的數據流。典型的例子是客戶端向服務端發送一個股票代碼,服務端就把該股票的實時數據源源不斷的返回給客戶端
2.3 客戶端數據流模式
? 與服務端數據流模式相反,這次是客戶端源源不斷的向服務端發送數據流,而在發送結束后,由服務端返回一個響應。典型的例子是物聯網終端向服務器報送數據。
2.4 雙向數據流
? 顧名思義,這是客戶端和服務端都可以向對方發送數據流,這個時候雙方的數據可以同時互相發送,也就是可以實現實時交互。典型的例子是聊天機器人。
3. 上代碼
3.1 代碼目錄
3.2 編寫stream.proto文件
stream是常量,寫在哪一邊,哪一邊就是數據流
syntax = "proto3"; option go_package = "./;proto"; service Greeter { // 定義方法,stream是常量,流模式 rpc ServerStream (StreamRequestData) returns (stream StreamResponseData); //服務端流模式,拉消息 rpc ClientStream (stream StreamRequestData) returns (StreamResponseData); //客戶端流模式,推消息 rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData); //雙向流模式,能推能拉 } message StreamRequestData { string data = 1; //編號 } message StreamResponseData { string data = 1; //編號 }
?生成go的protobuf文件命令:
cd到proto目錄下
命令:protoc -I . hello.proto ? --go_out=plugins=grpc:.
3.3 編寫server文件
package main import ( "file_test/grpc_go_stream/proto" "fmt" "net" "sync" "time" "google.golang.org/grpc" ) const port = 8082 type server struct{} func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error { i := 0 for { i++ //業務代碼 _ = res.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("這是發給%s的數據流", req.Data), }) time.Sleep(time.Second * 1) if i > 10 { break } } return nil } func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error { for { //業務代碼 res, err := cliStr.Recv() if err != nil { fmt.Println("本次客戶端流數據發送完了:",err) break } fmt.Println("客戶端發來消息:",res.Data) } return nil } func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error { wg:=sync.WaitGroup{} wg.Add(2) //接受客戶端消息的協程 go func() { defer wg.Done() for { //業務代碼 res, err := allStr.Recv() if err != nil { fmt.Println("本次客戶端流數據發送完了:",err) break } fmt.Println("收到客戶端發來消息:",res.Data) } }() //發送消息給客戶端的協程 go func() { defer wg.Done() i := 0 for { i++ //業務代碼 _ = allStr.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("這是發給客戶端的數據流"), }) time.Sleep(time.Second * 1) if i > 10 { break } } }() wg.Wait() return nil } // 啟動 func start() { // 1.實例化server g := grpc.NewServer() // 2.注冊邏輯到server中 proto.RegisterGreeterServer(g, &server{}) // 3.啟動server lis, err := net.Listen("tcp", "127.0.0.1:8082") if err != nil { panic("監聽錯誤:" + err.Error()) } err = g.Serve(lis) if err != nil { panic("啟動錯誤:" + err.Error()) } } func main() { start() }
3.4 編寫client文件
package main import ( "context" "file_test/grpc_go_stream/proto" "fmt" "sync" "time" "google.golang.org/grpc" ) var rpc proto.GreeterClient func serverStreamDemo() { //服務端流模式 res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"}) if err != nil { panic("rpc請求錯誤:"+err.Error()) } for { data,err:=res.Recv() // if err != nil { fmt.Println("客戶端發送完了:",err) return } fmt.Println("客戶端返回數據流值:",data.Data) } } func clientStreamDemo() { //客戶端流模式 cliStr, err := rpc.ClientStream(context.Background()) if err != nil { panic("rpc請求錯誤:" + err.Error()) } i := 0 for { i++ _ = cliStr.Send(&proto.StreamRequestData{ Data: "jeff", }) time.Sleep(time.Second * 1) if i > 10 { break } } } func clientAndServerStreamDemo() { //雙向流模式 allStr, _ := rpc.AllStream(context.Background()) wg := sync.WaitGroup{} wg.Add(1) //接受服務端消息的協程 go func() { defer wg.Done() for { //業務代碼 res, err := allStr.Recv() if err != nil { fmt.Println("本次服務端流數據發送完了:", err) break } fmt.Println("收到服務端發來消息:", res.Data) } }() //發送消息給服務端的協程 go func() { defer wg.Done() i := 0 for { i++ //業務代碼 _ = allStr.Send(&proto.StreamRequestData{ Data: fmt.Sprintf("這是發給服務端的數據流"), }) time.Sleep(time.Second * 1) if i > 10 { break } } }() wg.Wait() } // 啟動 func start() { conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure()) if err != nil { panic("rpc連接錯誤:" + err.Error()) } defer conn.Close() rpc = proto.NewGreeterClient(conn) //初始化 serverStreamDemo() //服務端流模式 clientStreamDemo() //客戶端流模式 clientAndServerStreamDemo() // 雙向流模式 } func main() { start() }
原文鏈接:https://www.cnblogs.com/guyouyin123/p/16135335.html
相關推薦
- 2022-12-29 C#使用Lambda表達式簡化代碼的示例詳解_C#教程
- 2023-07-14 element組件中的時間選擇器,禁用選擇時間,picker-options屬性詳解,時間選擇器范圍
- 2023-04-11 Golang使用協程實現批量獲取數據_Golang
- 2022-07-28 Golang配置管理庫?Viper的教程詳解_Golang
- 2022-08-04 深入理解pytorch庫的dockerfile_python
- 2022-06-19 Visual?Studio創建WPF項目_實用技巧
- 2022-01-13 macOS 升級后 nvm 安裝的 node 和 npm 出錯
- 2022-06-07 ?分享一個Python?遇到數據庫超好用的模塊_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支