<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
上一篇介紹了簡單模式RPC,當資料量大或者需要不斷傳輸資料時候,我們應該使用流式RPC,它允許我們邊處理邊傳輸資料。本篇先介紹伺服器端流式RPC。
伺服器端流式RPC:使用者端傳送請求到伺服器,拿到一個流去讀取返回的訊息序列。 使用者端讀取返回的流,直到裡面沒有任何訊息。
1.使用者端要獲取某原油股的實時走勢,使用者端傳送一個請求
2.伺服器端實時返回該股票的走勢
新建server_stream.proto檔案
1.定義傳送資訊
// 定義傳送請求資訊 message SimpleRequest{ // 定義傳送的引數,採用駝峰命名方式,小寫加下劃線,如:student_name // 請求引數 string data = 1; }
2.定義接收資訊
// 定義流式響應資訊 message StreamResponse{ // 流式響應資料 string stream_value = 1; }
3.定義服務方法ListValue
伺服器端流式rpc,只要在響應資料前新增stream即可
// 定義我們的服務(可定義多個服務,每個服務可定義多個介面) service StreamServer{ // 伺服器端流式rpc,在響應資料前新增stream rpc ListValue(SimpleRequest)returns(stream StreamResponse){}; }
4.編譯proto檔案
進入server_stream.proto所在目錄,執行指令:
protoc --go_out=plugins=grpc:./ ./server_stream.proto
1.定義我們的服務,並實現ListValue方法
// SimpleService 定義我們的服務 type StreamService struct{} // ListValue 實現ListValue方法 func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error { for n := 0; n < 5; n++ { // 向流中傳送訊息, 預設每次send送訊息最大長度為`math.MaxInt32`bytes err := srv.Send(&pb.StreamResponse{ StreamValue: req.Data + strconv.Itoa(n), }) if err != nil { return err } } return nil }
初學者可能覺得比較迷惑,ListValue的引數和返回值是怎樣確定的。其實這些都是編譯proto時生成的.pb.go檔案中有定義,我們只需要實現就可以了。
2.啟動gRPC伺服器
const ( // Address 監聽地址 Address string = ":8000" // Network 網路通訊協定 Network string = "tcp" ) func main() { // 監聽本地埠 listener, err := net.Listen(Network, Address) if err != nil { log.Fatalf("net.Listen err: %v", err) } log.Println(Address + " net.Listing...") // 新建gRPC伺服器範例 // 預設單次接收最大訊息長度為`1024*1024*4`bytes(4M),單次傳送訊息最大長度為`math.MaxInt32`bytes // grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024*4), grpc.MaxSendMsgSize(math.MaxInt32)) grpcServer := grpc.NewServer() // 在gRPC伺服器註冊我們的服務 pb.RegisterStreamServerServer(grpcServer, &StreamService{}) //用伺服器 Serve() 方法以及我們的埠資訊區實現阻塞等待,直到程序被殺死或者 Stop() 被呼叫 err = grpcServer.Serve(listener) if err != nil { log.Fatalf("grpcServer.Serve err: %v", err) } }
執行伺服器端
go run server.go :8000 net.Listing...
1.建立呼叫伺服器端ListValue方法
// listValue 呼叫伺服器端的ListValue方法 func listValue() { // 建立傳送結構體 req := pb.SimpleRequest{ Data: "stream server grpc ", } // 呼叫我們的服務(ListValue方法) stream, err := grpcClient.ListValue(context.Background(), &req) if err != nil { log.Fatalf("Call ListStr err: %v", err) } for { //Recv() 方法接收伺服器端訊息,預設每次Recv()最大訊息長度為`1024*1024*4`bytes(4M) res, err := stream.Recv() // 判斷訊息流是否已經結束 if err == io.EOF { break } if err != nil { log.Fatalf("ListStr get stream err: %v", err) } // 列印返回值 log.Println(res.StreamValue) } }
2.啟動gRPC使用者端
// Address 連線地址 const Address string = ":8000" var grpcClient pb.StreamServerClient func main() { // 連線伺服器 conn, err := grpc.Dial(Address, grpc.WithInsecure()) if err != nil { log.Fatalf("net.Connect err: %v", err) } defer conn.Close() // 建立gRPC連線 grpcClient = pb.NewStreamServerClient(conn) route() listValue() }
執行使用者端
go run client.go
stream server grpc 0
stream server grpc 1
stream server grpc 2
stream server grpc 3
stream server grpc 4
使用者端不斷從伺服器端獲取資料
假如伺服器端不停傳送資料,類似獲取股票走勢實時資料,使用者端能自己停止獲取資料嗎?
答案:可以的
1.我們把伺服器端的ListValue方法稍微修改
// ListValue 實現ListValue方法 func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error { for n := 0; n < 15; n++ { // 向流中傳送訊息, 預設每次send送訊息最大長度為`math.MaxInt32`bytes err := srv.Send(&pb.StreamResponse{ StreamValue: req.Data + strconv.Itoa(n), }) if err != nil { return err } log.Println(n) time.Sleep(1 * time.Second) } return nil }
2.再把使用者端呼叫ListValue方法的實現稍作修改,就可以得到結果了
// listValue 呼叫伺服器端的ListValue方法 func listValue() { // 建立傳送結構體 req := pb.SimpleRequest{ Data: "stream server grpc ", } // 呼叫我們的服務(Route方法) // 同時傳入了一個 context.Context ,在有需要時可以讓我們改變RPC的行為,比如超時/取消一個正在執行的RPC stream, err := grpcClient.ListValue(context.Background(), &req) if err != nil { log.Fatalf("Call ListStr err: %v", err) } for { //Recv() 方法接收伺服器端訊息,預設每次Recv()最大訊息長度為`1024*1024*4`bytes(4M) res, err := stream.Recv() // 判斷訊息流是否已經結束 if err == io.EOF { break } if err != nil { log.Fatalf("ListStr get stream err: %v", err) } // 列印返回值 log.Println(res.StreamValue) break } //可以使用CloseSend()關閉stream,這樣伺服器端就不會繼續產生流訊息 //呼叫CloseSend()後,若繼續呼叫Recv(),會重新啟用stream,接著之前結果獲取訊息 stream.CloseSend() }
只需要呼叫CloseSend()方法,就可以關閉伺服器端的stream,讓它停止傳送資料。值得注意的是,呼叫CloseSend()後,若繼續呼叫Recv(),會重新啟用stream,接著當前的結果繼續獲取訊息。
這能完美解決使用者端暫停->繼續獲取資料的操作。
本篇介紹了伺服器端流式RPC的簡單使用,使用者端發起一個請求,伺服器端不停返回資料,直到伺服器端停止傳送資料或使用者端主動停止接收資料為止。下篇將介紹使用者端流式RPC。
教學原始碼地址:https://github.com/Bingjian-Zhu/go-grpc-example
參考:gRPC官方檔案中文版
更多關於Go gRPC伺服器端流式RPC的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45