首頁 > 軟體

Go語言Grpc Stream的實現

2022-06-20 14:02:49

Stream Grpc

在我們單次投遞的資料量很大的時候,比如傳輸一個二進位制檔案的時候,封包過大,會造成瞬時傳輸壓力。或者接收方接收到資料後,需要對資料做一系列的處理工作,

比如:資料過濾 -> 資料格式轉換 -> 資料求和 ,這種場景非常適合使用stream grpc,

Stream Grpc演示

syntax = "proto3";

package book_stream;

option go_package = "/book_stream";

service HelloStreamService {
  rpc BookListStream(BookListStreamRequest) returns (stream BookListStreamResponse){};
  rpc CreateBookStream(stream CreateBookStreamRequest) returns (CreateBookStreamResponse){}
  rpc FindBookByIdStream(stream FindBookByIdStreamRequest) returns (stream FindBookByIdStreamResponse){}
}

message BookListStreamRequest{
}

message BookListStreamResponse{
  BookPoint book = 1;
}

message CreateBookStreamRequest{
  BookPoint book = 1;
}

message CreateBookStreamResponse{
  repeated BookIdPoint idx = 1;
}

message FindBookByIdStreamRequest{
  BookIdPoint idx = 1;
}
message FindBookByIdStreamResponse{
  BookPoint book = 1;
}

message BookIdPoint{
  uint64 idx = 1;
}

message BookPoint{
  uint64 idx = 1;
  string name = 2;
  float price = 3;
  string author = 4;
}

執行protoc --go_out=plugins=grpc:. *.proto生成腳手架檔案

  • BookListStream伺服器端流式RPC
  • CreateBookStream使用者端流式RPC
  • FindBookByIdStream雙向流式RPC

注意,這裡只是用作方便演示使用,演示方法都不是執行緒安全的

伺服器端server

var port = 8888

func main() {
   server := grpc.NewServer()
   book_stream.RegisterHelloStreamServiceServer(server, new(HelloStreamServiceImpl))
   lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
   if err != nil {
      panic(err)
   }
   if err := server.Serve(lis); err != nil {
      panic(err)
   }
}

使用者端

func main() {
   var port = 8888
   conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithInsecure())
   if err != nil {
      panic(err)
   }
   defer conn.Close()
   client := book_stream.NewHelloStreamServiceClient(conn)

   ctx := context.Background()
   if err := createBookStream(ctx, client); err != nil {
      panic(err)
   }
   if err := printBookList(ctx, client); err != nil {
      panic(err)
   }
   if err := getBookListById(ctx, client); err != nil {
      panic(err)
   }
}

BookListStream

伺服器端流式 RPC,顯然是單向流,並代指 Server 為 Stream 而 Client 為普通 RPC 請求

簡單來講就是使用者端發起一次普通的 RPC 請求,伺服器端通過流式響應多次傳送資料集,使用者端 Recv 接收資料集。

server端實現

var bookStore = map[uint64]book_stream.BookPoint{
   1: {
      Idx:    1,
      Author: "程子",
      Price:  9.9,
      Name:   "遊戲思維",
   },
   2: {
      Idx:    2,
      Author: "丁銳",
      Price:  9.9,
      Name:   "活出必要的鋒芒",
   },
}


type HelloStreamServiceImpl struct{}

func (HelloStreamServiceImpl) BookListStream(_ *book_stream.BookListStreamRequest, streamServer book_stream.HelloStreamService_BookListStreamServer) error {
   for idx, bookPoint := range bookStore {
      err := streamServer.Send(&book_stream.BookListStreamResponse{Book: &book_stream.BookPoint{
         Idx:    idx,
         Name:   bookPoint.Name,
         Price:  bookPoint.GetPrice(),
         Author: bookPoint.Author,
      }})
      if err != nil {
         return err
      }
   }
   return nil
}

使用者端實現

func printBookList(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   req := &book_stream.BookListStreamRequest{}
   listStream, err := client.BookListStream(ctx, req)
   if err != nil {
      return err
   }
   for true {
      resp, err := listStream.Recv()
      if err != nil {
         if err == io.EOF {
            return nil
         }
         return err
      }
      fmt.Printf("%vn", *resp.Book)
   }
   return nil
}

CreateBookStream

使用者端流式 RPC,單向流,使用者端通過流式發起多次 RPC 請求給伺服器端,伺服器端發起一次響應給使用者端

server端實現

func (HelloStreamServiceImpl) CreateBookStream(server book_stream.HelloStreamService_CreateBookStreamServer) error {
   var resList []*book_stream.BookIdPoint
   for {
      resp, err := server.Recv()
      if err == io.EOF {
         return server.SendAndClose(&book_stream.CreateBookStreamResponse{Idx: resList})
      }
      if err != nil {
         return err
      }
      bookStore[resp.Book.Idx] = *resp.Book
      resList = append(resList, &book_stream.BookIdPoint{Idx: resp.Book.Idx})
   }
}

使用者端實現

var newBookStore = map[uint64]book_stream.BookPoint{
   3: {
      Idx:    3,
      Author: "程子1",
      Price:  9.9,
      Name:   "遊戲思維1",
   },
   4: {
      Idx:    4,
      Author: "丁銳1",
      Price:  9.9,
      Name:   "活出必要的鋒芒1",
   },
}

func createBookStream(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   stream, err := client.CreateBookStream(ctx)
   if err != nil {
      return err
   }
   for _, bookPoint := range newBookStore {
      if err := stream.Send(&book_stream.CreateBookStreamRequest{
         Book: &bookPoint,
      }); err != nil {
         return err
      }
   }
   recv, err := stream.CloseAndRecv()
   if err != nil {
      return err
   }
   fmt.Println(recv.Idx)
   return nil
}

stream.SendAndClose,它是做什麼用的呢?

在這段程式中,我們對每一個 Recv 都進行了處理,當發現 io.EOF (流關閉) 後,需要將最終的響應結果傳送給使用者端,同時關閉正在另外一側等待的 Recv

stream.CloseAndRecv 和 stream.SendAndClose 是配套使用的流方法,

FindBookByIdStream

伺服器端實現

func (HelloStreamServiceImpl) FindBookByIdStream(streamServer book_stream.HelloStreamService_FindBookByIdStreamServer) error {
   for {
      resp, err := streamServer.Recv()
      if err == io.EOF {
         return nil
      }
      if err != nil {
         return err
      }
      if book, ok := bookStore[resp.Idx.Idx]; ok {
         if err := streamServer.Send(&book_stream.FindBookByIdStreamResponse{Book: &book}); err != nil {
            return err
         }
      }
   }
}

使用者端實現

func getBookListById(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   stream, err := client.FindBookByIdStream(ctx)
   if err != nil {
      return err
   }
   var findList = []uint64{1, 2}
   for _, idx := range findList {
      err := stream.Send(&book_stream.FindBookByIdStreamRequest{Idx: &book_stream.BookIdPoint{Idx: idx}})
      if err != nil {
         return err
      }
      recv, err := stream.Recv()
      if err != nil {
         return err
      }
      fmt.Printf("%vn", recv.Book)
   }
   if err := stream.CloseSend(); err != nil {
      return err
   }
   return nil
}

到此這篇關於Go語言Grpc Stream的實現的文章就介紹到這了,更多相關Go語言Grpc Stream 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com