<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
RPC(Remote Procedure Call Protocol)遠端過程呼叫協定。 一個通俗的描述是:使用者端在不知道呼叫細節的情況下,呼叫存在於遠端計算機上的某個物件,就像呼叫本地應用程式中的物件一樣。 比較正式的描述是:一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協定 從使用的方面來說,伺服器端和使用者端通過TCP/UDP/HTTP等通訊協定通訊,在通訊的時候使用者端指定好伺服器端的方法、引數等資訊通過序列化傳送到伺服器端,伺服器端可以通過已有的元資訊找到需要呼叫的方法,然後完成一次呼叫後序列化返回給使用者端(rpc更多的是指服務與服務之間的通訊,可以使用效率更高的協定和序列化格式去進行,並且可以進行有效的負載均衡和熔斷超時等,因此跟前後端之間的web的互動概念上是有點不一樣的) 用一張簡單的圖來表示
本文只實現一個rpc框架基本的功能,不對效能做保證,因此儘量使用go原生自帶的net/json庫等進行操作,對使用方面不做stub(偷懶,只使用簡單的json格式指定需要呼叫的方法),用最簡單的方式實現一個簡易rpc框架,也不保證超時呼叫和服務發現等整合的邏輯,服務發現可以參考下文 本文程式碼地址(https://github.com/wuhuZhao/rpc_demo)
本段先實現兩端之間的通訊,只確保兩個端之間能互相通訊即可 server.go
package server import ( "fmt" "log" "net" ) // Server: transport底層實現,通過Server去接受使用者端的位元組流 type Server struct { ls net.Listener port int } // NewServer: 根據埠建立一個server func NewServer(port int) *Server { s := &Server{port: port} s.init() return s } // init: 初始化伺服器端連線 func (s *Server) init() { l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", s.port)) if err != nil { panic(err) } s.ls = l } // Start: 啟動伺服器端的埠監聽,採取一個conn一個g的模型,沒有使用reactor等高效能模型 func (s *Server) Start() { go func() { log.Printf("server [%s] start....", s.ls.Addr().String()) for { conn, err := s.ls.Accept() if err != nil { panic(err) } go func() { buf := make([]byte, 1024) for { idx, err := conn.Read(buf) if err != nil { panic(err) } if len(buf) == 0 { continue } // todo 等序列化的資訊 log.Printf("[conn: %v] get data: %vn", conn.RemoteAddr(), string(buf[:idx])) } }() } }() } // Close: 關閉服務監聽 func (s *Server) Close() error { return s.ls.Close() } // Close: 關閉服務監聽 func (s *Server) Close() error { return s.ls.Close() }
client.go
package client import ( "fmt" "log" "net" "unsafe" ) type Client struct { port int conn net.Conn } func NewClient(port int) *Client { c := &Client{port: port} c.init() return c } // init: initialize tcp client func (c *Client) init() { conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", c.port)) if err != nil { panic(err) } c.conn = conn } func (c *Client) Send(statement string) error { _, err := c.conn.Write(*(*[]byte)(unsafe.Pointer(&statement))) if err != nil { panic(err) } return nil } // Close: use to close connection func (c *Client) Close() error { return c.conn.Close() }
使用main.go做測試 main.go
package main import ( "rpc_demo/internal/client" "rpc_demo/internal/server" "time" ) func main() { s := server.NewServer(9999) s.Start() time.Sleep(5 * time.Second) c := client.NewClient(9999) c.Send("this is a testn") time.Sleep(5 * time.Second) }
執行一次main.go
, go run main.go
2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test
可以證明第一部分的任務已經完成,可以實現兩端之間的通訊了
實現了雙端的通訊以後,我們在internal.go
裡實現兩個方法,一個是註冊,一個是呼叫,因為go有執行時的反射,所以我們使用反射去註冊每一個需要呼叫到的方法,然後提供全域性唯一的函數名,讓client端可以實現指定方法的呼叫
internal.go
package internal import ( "errors" "fmt" "reflect" "runtime" "strings" ) // 全域性唯一 var GlobalMethod = &Method{methods: map[string]reflect.Value{}} type Method struct { methods map[string]reflect.Value } func (m *Method) register(impl interface{}) error { pl := reflect.ValueOf(impl) if pl.Kind() != reflect.Func { return errors.New("impl should be function") } // 獲取函數名 methodName := runtime.FuncForPC(pl.Pointer()).Name() if len(strings.Split(methodName, ".")) < 1 { return errors.New("invalid function name") } lastFuncName := strings.Split(methodName, ".")[1] m.methods[lastFuncName] = pl fmt.Printf("methods: %vn", m.methods) return nil } func (m *Method) call(methodName string, callParams ...interface{}) ([]interface{}, error) { fn, ok := m.methods[methodName] if !ok { return nil, errors.New("impl method not found! Please Register first") } in := make([]reflect.Value, len(callParams)) for i := 0; i < len(callParams); i++ { in[i] = reflect.ValueOf(callParams[i]) } res := fn.Call(in) out := make([]interface{}, len(res)) for i := 0; i < len(res); i++ { out[i] = res[i].Interface() } return out, nil } func Call(methodName string, callParams ...interface{}) ([]interface{}, error) { return GlobalMethod.call(methodName, callParams...) } func Register(impl interface{}) error { return GlobalMethod.register(impl) }
在單測裡測試一下這個註冊和呼叫的功能internal_test.go
package internal import ( "testing" ) func Sum(a, b int) int { return a + b } func TestRegister(t *testing.T) { err := Register(Sum) if err != nil { t.Fatalf("err: %vn", err) } t.Logf("test successn") } func TestCall(t *testing.T) { TestRegister(t) result, err := Call("Sum", 1, 2) if err != nil { t.Fatalf("err: %vn", err) } if len(result) != 1 { t.Fatalf("len(result) is not equal to 1n") } t.Logf("Sum(1,2) = %dn", result[0].(int)) if err := recover(); err != nil { t.Fatalf("%vn", err) } }
執行呼叫
/usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v
Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v
=== RUN TestCall
methods: map[Sum:<func(int, int) int Value>]
/root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:15: test success
/root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:27: Sum(1,2) = 3
--- PASS: TestCall (0.00s)
PASS
ok rpc_demo/internal 0.002s
可以看到這個註冊和呼叫的過程已經實現並且達到指定方法呼叫的作用
設計struct完整表達一次完整的rpc呼叫,並且封裝json庫中的Decoder和Encoder,完成序列化和反序列化
internal.go
type RpcRequest struct { MethodName string Params []interface{} } type RpcResponses struct { Returns []interface{} Err error }
transport.go
考慮可以對接更多的格式,所以抽象了一層進行使用(demo肯定沒有更多格式了)
package transport // Transport: 序列化格式的抽象層,從connection中讀取資料序列化並且反序列化到connection中 type Transport interface { Decode(v interface{}) error Encode(v interface{}) error Close() }
json_transport.go
package transport import ( "encoding/json" "net" ) var _ Transport = (*JSONTransport)(nil) type JSONTransport struct { encoder *json.Encoder decoder *json.Decoder } // NewJSONTransport: 負責讀取和寫入conn func NewJSONTransport(conn net.Conn) *JSONTransport { return &JSONTransport{json.NewEncoder(conn), json.NewDecoder(conn)} } // Decode: use json package to decode func (t *JSONTransport) Decode(v interface{}) error { if err := t.decoder.Decode(v); err != nil { return err } return nil } // Encode: use json package to encode func (t *JSONTransport) Encode(v interface{}) error { if err := t.encoder.Encode(v); err != nil { return err } return nil } // Close: not implement func (dec *JSONTransport) Close() { }
然後我們將伺服器端和使用者端的邏輯進行修改,改成通過上面兩個結構體進行通訊,然後返回一次呼叫 server.go
//... for { conn, err := s.ls.Accept() if err != nil { panic(err) } tsp := transport.NewJSONTransport(conn) go func() { for { request := &internal.RpcRequest{} err := tsp.Decode(request) if err != nil { panic(err) } log.Printf("[server] get request: %vn", request) result, err := internal.Call(request.MethodName, request.Params...) log.Printf("[server] invoke method: %vn", result) if err != nil { response := &internal.RpcResponses{Returns: nil, Err: err} tsp.Encode(response) continue } response := &internal.RpcResponses{Returns: result, Err: err} if err := tsp.Encode(response); err != nil { log.Printf("[server] encode response err: %vn", err) continue } } }() } //...
client.go
// ... // Call: remote invoke func (c *Client) Call(methodName string, params ...interface{}) (res *internal.RpcResponses) { request := internal.RpcRequest{MethodName: methodName, Params: params} log.Printf("[client] create request to invoke server: %vn", request) err := c.tsp.Encode(request) if err != nil { panic(err) } res = &internal.RpcResponses{} if err := c.tsp.Decode(res); err != nil { panic(err) } log.Printf("[client] get response from server: %vn", res) return res } // ...
main.go
package main import ( "log" "rpc_demo/internal" "rpc_demo/internal/client" "rpc_demo/internal/server" "strings" "time" ) // Rpc方法的一個簡易實現 func Join(a ...string) string { res := &strings.Builder{} for i := 0; i < len(a); i++ { res.WriteString(a[i]) } return res.String() } func main() { internal.Register(Join) s := server.NewServer(9999) s.Start() time.Sleep(5 * time.Second) c := client.NewClient(9999) res := c.Call("Join", "aaaaa", "bbbbb", "ccccccccc", "end") if res.Err != nil { log.Printf("[main] get an error from server: %vn", res.Err) return } log.Printf("[main] get a response from server: %vn", res.Returns[0].(string)) time.Sleep(5 * time.Second) }
接下來我們執行一下main
[root@hecs-74066 rpc_demo]# go run main.go
2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test
[root@hecs-74066 rpc_demo]# go run main.go
2023/03/05 21:53:41 server [127.0.0.1:9999] start....
2023/03/05 21:53:46 [client] create request to invoke server: {Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] get request: &{Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] invoke method: [aaaaabbbbbcccccccccend]
2023/03/05 21:53:46 [client] get response from server: &{[aaaaabbbbbcccccccccend] <nil>}
2023/03/05 21:53:46 [main] get a response from server: aaaaabbbbbcccccccccend
這樣我們就實現了一個簡單的rpc框架了,符合最簡單的架構圖,從client->序列化請求->transport -> 反序列化 ->server然後從server->序列化請求->transport->反序列化請求->client。當然從可用性的角度來說是差遠了,沒有實現stub程式碼,也沒有idl的實現,導致所有的註冊方法都是寫死,可用性不高,而且沒有整合服務發現(可以參考我的另一篇文章去整合)和熔斷等功能,也沒用中介軟體(也是我的另一篇文章)和超時等豐富的功能在裡面,並且最近看了不少rpc框架的原始碼,感覺這個demo的設計也差遠了。不過因為時間問題和程式碼的複雜性問題(單純懶),起碼算是實現了一個簡單的rpc框架。
推薦一些比較好的框架實現
到此這篇關於Golang實現簡易的rpc呼叫的文章就介紹到這了,更多相關Golang rpc呼叫內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45