日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

Go語(yǔ)言Grpc?Stream的實(shí)現(xiàn)_Golang

作者:范閑 ? 更新時(shí)間: 2022-08-11 編程語(yǔ)言

Stream Grpc

在我們單次投遞的數(shù)據(jù)量很大的時(shí)候,比如傳輸一個(gè)二進(jìn)制文件的時(shí)候,數(shù)據(jù)包過(guò)大,會(huì)造成瞬時(shí)傳輸壓力。或者接收方接收到數(shù)據(jù)后,需要對(duì)數(shù)據(jù)做一系列的處理工作,

比如:數(shù)據(jù)過(guò)濾 -> 數(shù)據(jù)格式轉(zhuǎn)換 -> 數(shù)據(jù)求和 ,這種場(chǎng)景非常適合使用stream grpc,

Stream Grpc演示

syntax = "proto3";

package book_stream;

option go_package = "/book_stream";

service HelloStreamService {
  rpc BookListStream(BookListStreamRequest) returns (stream BookListStreamResponse){};
  rpc CreateBookStream(stream CreateBookStreamRequest) returns (CreateBookStreamResponse){}
  rpc FindBookByIdStream(stream FindBookByIdStreamRequest) returns (stream FindBookByIdStreamResponse){}
}

message BookListStreamRequest{
}

message BookListStreamResponse{
  BookPoint book = 1;
}

message CreateBookStreamRequest{
  BookPoint book = 1;
}

message CreateBookStreamResponse{
  repeated BookIdPoint idx = 1;
}

message FindBookByIdStreamRequest{
  BookIdPoint idx = 1;
}
message FindBookByIdStreamResponse{
  BookPoint book = 1;
}

message BookIdPoint{
  uint64 idx = 1;
}

message BookPoint{
  uint64 idx = 1;
  string name = 2;
  float price = 3;
  string author = 4;
}

運(yùn)行protoc --go_out=plugins=grpc:. *.proto生成腳手架文件

  • BookListStream服務(wù)端流式RPC
  • CreateBookStream客戶(hù)端流式RPC
  • FindBookByIdStream雙向流式RPC

注意,這里只是用作方便演示使用,演示方法都不是線程安全的

服務(wù)端server

var port = 8888

func main() {
   server := grpc.NewServer()
   book_stream.RegisterHelloStreamServiceServer(server, new(HelloStreamServiceImpl))
   lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
   if err != nil {
      panic(err)
   }
   if err := server.Serve(lis); err != nil {
      panic(err)
   }
}

客戶(hù)端

func main() {
   var port = 8888
   conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithInsecure())
   if err != nil {
      panic(err)
   }
   defer conn.Close()
   client := book_stream.NewHelloStreamServiceClient(conn)

   ctx := context.Background()
   if err := createBookStream(ctx, client); err != nil {
      panic(err)
   }
   if err := printBookList(ctx, client); err != nil {
      panic(err)
   }
   if err := getBookListById(ctx, client); err != nil {
      panic(err)
   }
}

BookListStream

服務(wù)器端流式 RPC,顯然是單向流,并代指 Server 為 Stream 而 Client 為普通 RPC 請(qǐng)求

簡(jiǎn)單來(lái)講就是客戶(hù)端發(fā)起一次普通的 RPC 請(qǐng)求,服務(wù)端通過(guò)流式響應(yīng)多次發(fā)送數(shù)據(jù)集,客戶(hù)端 Recv 接收數(shù)據(jù)集。

server端實(shí)現(xiàn)

var bookStore = map[uint64]book_stream.BookPoint{
   1: {
      Idx:    1,
      Author: "程子",
      Price:  9.9,
      Name:   "游戲思維",
   },
   2: {
      Idx:    2,
      Author: "丁銳",
      Price:  9.9,
      Name:   "活出必要的鋒芒",
   },
}


type HelloStreamServiceImpl struct{}

func (HelloStreamServiceImpl) BookListStream(_ *book_stream.BookListStreamRequest, streamServer book_stream.HelloStreamService_BookListStreamServer) error {
   for idx, bookPoint := range bookStore {
      err := streamServer.Send(&book_stream.BookListStreamResponse{Book: &book_stream.BookPoint{
         Idx:    idx,
         Name:   bookPoint.Name,
         Price:  bookPoint.GetPrice(),
         Author: bookPoint.Author,
      }})
      if err != nil {
         return err
      }
   }
   return nil
}

客戶(hù)端實(shí)現(xiàn)

func printBookList(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   req := &book_stream.BookListStreamRequest{}
   listStream, err := client.BookListStream(ctx, req)
   if err != nil {
      return err
   }
   for true {
      resp, err := listStream.Recv()
      if err != nil {
         if err == io.EOF {
            return nil
         }
         return err
      }
      fmt.Printf("%v\n", *resp.Book)
   }
   return nil
}

CreateBookStream

客戶(hù)端流式 RPC,單向流,客戶(hù)端通過(guò)流式發(fā)起多次?RPC 請(qǐng)求給服務(wù)端,服務(wù)端發(fā)起一次響應(yīng)給客戶(hù)端

server端實(shí)現(xiàn)

func (HelloStreamServiceImpl) CreateBookStream(server book_stream.HelloStreamService_CreateBookStreamServer) error {
   var resList []*book_stream.BookIdPoint
   for {
      resp, err := server.Recv()
      if err == io.EOF {
         return server.SendAndClose(&book_stream.CreateBookStreamResponse{Idx: resList})
      }
      if err != nil {
         return err
      }
      bookStore[resp.Book.Idx] = *resp.Book
      resList = append(resList, &book_stream.BookIdPoint{Idx: resp.Book.Idx})
   }
}

客戶(hù)端實(shí)現(xiàn)

var newBookStore = map[uint64]book_stream.BookPoint{
   3: {
      Idx:    3,
      Author: "程子1",
      Price:  9.9,
      Name:   "游戲思維1",
   },
   4: {
      Idx:    4,
      Author: "丁銳1",
      Price:  9.9,
      Name:   "活出必要的鋒芒1",
   },
}

func createBookStream(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   stream, err := client.CreateBookStream(ctx)
   if err != nil {
      return err
   }
   for _, bookPoint := range newBookStore {
      if err := stream.Send(&book_stream.CreateBookStreamRequest{
         Book: &bookPoint,
      }); err != nil {
         return err
      }
   }
   recv, err := stream.CloseAndRecv()
   if err != nil {
      return err
   }
   fmt.Println(recv.Idx)
   return nil
}

stream.SendAndClose,它是做什么用的呢?

在這段程序中,我們對(duì)每一個(gè) Recv 都進(jìn)行了處理,當(dāng)發(fā)現(xiàn)?io.EOF?(流關(guān)閉) 后,需要將最終的響應(yīng)結(jié)果發(fā)送給客戶(hù)端,同時(shí)關(guān)閉正在另外一側(cè)等待的 Recv

stream.CloseAndRecv?和?stream.SendAndClose?是配套使用的流方法,

FindBookByIdStream

服務(wù)端實(shí)現(xiàn)

func (HelloStreamServiceImpl) FindBookByIdStream(streamServer book_stream.HelloStreamService_FindBookByIdStreamServer) error {
   for {
      resp, err := streamServer.Recv()
      if err == io.EOF {
         return nil
      }
      if err != nil {
         return err
      }
      if book, ok := bookStore[resp.Idx.Idx]; ok {
         if err := streamServer.Send(&book_stream.FindBookByIdStreamResponse{Book: &book}); err != nil {
            return err
         }
      }
   }
}

客戶(hù)端實(shí)現(xiàn)

func getBookListById(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   stream, err := client.FindBookByIdStream(ctx)
   if err != nil {
      return err
   }
   var findList = []uint64{1, 2}
   for _, idx := range findList {
      err := stream.Send(&book_stream.FindBookByIdStreamRequest{Idx: &book_stream.BookIdPoint{Idx: idx}})
      if err != nil {
         return err
      }
      recv, err := stream.Recv()
      if err != nil {
         return err
      }
      fmt.Printf("%v\n", recv.Book)
   }
   if err := stream.CloseSend(); err != nil {
      return err
   }
   return nil
}

原文鏈接:https://juejin.cn/post/7110794610653265933

欄目分類(lèi)
最近更新