日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

go實現grpc四種數據流模式_Golang

作者:Jeff的技術棧 ? 更新時間: 2022-06-12 編程語言

1. 什么是數據流

grpc中的stream,srteam顧名思義就是一種流,可以源源不斷的推送數據,很適合傳輸一些大數據,或者服務端和客戶端長時間數據交互,比如客戶端可以向服務端訂閱一個數據,服務端就可以利用stream,源源不斷地推送數據。

底層還原成socket編程

2. grpc的四種數據流

1.簡單模式

2.服務端數據流模式(Server-side streaming RPC)

3.客戶端數據流模式(Client-side streaming RPC)

4.雙向數據流模式(Bidirectional streaming RPC)

2.1 簡單模式

? 這種模式最為傳統,即客戶端發起一次請求,服務端響應一個數據,這和大家平時熟悉的RPC沒有什么大的區別,上兩篇中介紹此模式。

2.2 服務端數據流模式

? 這種模式是客戶端發起一次請求,服務端返回一段連續的數據流。典型的例子是客戶端向服務端發送一個股票代碼,服務端就把該股票的實時數據源源不斷的返回給客戶端

2.3 客戶端數據流模式

? 與服務端數據流模式相反,這次是客戶端源源不斷的向服務端發送數據流,而在發送結束后,由服務端返回一個響應。典型的例子是物聯網終端向服務器報送數據。

2.4 雙向數據流

? 顧名思義,這是客戶端和服務端都可以向對方發送數據流,這個時候雙方的數據可以同時互相發送,也就是可以實現實時交互。典型的例子是聊天機器人。

3. 上代碼

3.1 代碼目錄

3.2 編寫stream.proto文件

stream是常量,寫在哪一邊,哪一邊就是數據流

syntax = "proto3";
option go_package = "./;proto";
service Greeter {
    // 定義方法,stream是常量,流模式
    rpc ServerStream (StreamRequestData) returns (stream StreamResponseData);      //服務端流模式,拉消息
    rpc ClientStream (stream StreamRequestData) returns (StreamResponseData);      //客戶端流模式,推消息
    rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData);  //雙向流模式,能推能拉
}
message StreamRequestData {
    string data = 1; //編號
}
message StreamResponseData {
    string data = 1; //編號
}

?生成go的protobuf文件命令:

cd到proto目錄下

命令:protoc -I . hello.proto ? --go_out=plugins=grpc:.

3.3 編寫server文件

package main
import (
	"file_test/grpc_go_stream/proto"
	"fmt"
	"net"
	"sync"
	"time"
	"google.golang.org/grpc"
)
const port = 8082
type server struct{}
func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error {
	i := 0
	for {
		i++
		//業務代碼
		_ = res.Send(&proto.StreamResponseData{
			Data: fmt.Sprintf("這是發給%s的數據流", req.Data),
		})
		time.Sleep(time.Second * 1)
		if i > 10 {
			break
		}
	}
	return nil
}
func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error {
	for {
		//業務代碼
		res, err := cliStr.Recv()
		if err != nil {
			fmt.Println("本次客戶端流數據發送完了:",err)
			break
		}
		fmt.Println("客戶端發來消息:",res.Data)
	}
	return nil
}
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
	wg:=sync.WaitGroup{}
	wg.Add(2)
	//接受客戶端消息的協程
	go func() {
		defer wg.Done()
		for  {
			//業務代碼
			res, err := allStr.Recv()
			if err != nil {
				fmt.Println("本次客戶端流數據發送完了:",err)
				break
			}
			fmt.Println("收到客戶端發來消息:",res.Data)
		}
	}()
	//發送消息給客戶端的協程
	go func() {
		defer wg.Done()
		i := 0
		for {
			i++
			//業務代碼
			_ = allStr.Send(&proto.StreamResponseData{
				Data: fmt.Sprintf("這是發給客戶端的數據流"),
			})
			time.Sleep(time.Second * 1)
			if i > 10 {
				break
			}
		}
	}()
	wg.Wait()
	return nil
}
// 啟動
func start() {
	// 1.實例化server
	g := grpc.NewServer()
	// 2.注冊邏輯到server中
	proto.RegisterGreeterServer(g, &server{})
	// 3.啟動server
	lis, err := net.Listen("tcp", "127.0.0.1:8082")
	if err != nil {
		panic("監聽錯誤:" + err.Error())
	}
	err = g.Serve(lis)
	if err != nil {
		panic("啟動錯誤:" + err.Error())
	}
}
func main() {
	start()
}

3.4 編寫client文件

package main
import (
	"context"
	"file_test/grpc_go_stream/proto"
	"fmt"
	"sync"
	"time"

	"google.golang.org/grpc"
)
var rpc proto.GreeterClient
func serverStreamDemo()  {
	//服務端流模式
	res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"})
	if err != nil {
		panic("rpc請求錯誤:"+err.Error())
	}
	for  {
		data,err:=res.Recv() //
		if err != nil {
			fmt.Println("客戶端發送完了:",err)
			return
		}
		fmt.Println("客戶端返回數據流值:",data.Data)
	}
}
func clientStreamDemo()  {
	//客戶端流模式
	cliStr, err := rpc.ClientStream(context.Background())
	if err != nil {
		panic("rpc請求錯誤:" + err.Error())
	}
	i := 0
	for {
		i++
		_ = cliStr.Send(&proto.StreamRequestData{
			Data: "jeff",
		})
		time.Sleep(time.Second * 1)
		if i > 10 {
			break
		}
	}
}
func clientAndServerStreamDemo()  {
	//雙向流模式
	allStr, _ := rpc.AllStream(context.Background())
	wg := sync.WaitGroup{}
	wg.Add(1)
	//接受服務端消息的協程
	go func() {
		defer wg.Done()
		for {
			//業務代碼
			res, err := allStr.Recv()
			if err != nil {
				fmt.Println("本次服務端流數據發送完了:", err)
				break
			}
			fmt.Println("收到服務端發來消息:", res.Data)
		}
	}()
	//發送消息給服務端的協程
	go func() {
		defer wg.Done()
		i := 0
		for {
			i++
			//業務代碼
			_ = allStr.Send(&proto.StreamRequestData{
				Data: fmt.Sprintf("這是發給服務端的數據流"),
			})
			time.Sleep(time.Second * 1)
			if i > 10 {
				break
			}
		}
	}()
	wg.Wait()
}
// 啟動
func start() {
	conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure())
	if err != nil {
		panic("rpc連接錯誤:" + err.Error())
	}
	defer conn.Close()
	rpc = proto.NewGreeterClient(conn) //初始化
	serverStreamDemo() //服務端流模式
	clientStreamDemo()  //客戶端流模式
	clientAndServerStreamDemo() // 雙向流模式
}
func main() {
	start()
}

原文鏈接:https://www.cnblogs.com/guyouyin123/p/16135335.html

欄目分類
最近更新