首頁 > 軟體

Golang實現簡易的rpc呼叫

2023-03-07 06:01:36

RPC(Remote Procedure Call Protocol)遠端過程呼叫協定。 一個通俗的描述是:使用者端在不知道呼叫細節的情況下,呼叫存在於遠端計算機上的某個物件,就像呼叫本地應用程式中的物件一樣。 比較正式的描述是:一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協定 從使用的方面來說,伺服器端和使用者端通過TCP/UDP/HTTP等通訊協定通訊,在通訊的時候使用者端指定好伺服器端的方法、引數等資訊通過序列化傳送到伺服器端,伺服器端可以通過已有的元資訊找到需要呼叫的方法,然後完成一次呼叫後序列化返回給使用者端(rpc更多的是指服務與服務之間的通訊,可以使用效率更高的協定和序列化格式去進行,並且可以進行有效的負載均衡和熔斷超時等,因此跟前後端之間的web的互動概念上是有點不一樣的) 用一張簡單的圖來表示

開始

本文只實現一個rpc框架基本的功能,不對效能做保證,因此儘量使用go原生自帶的net/json庫等進行操作,對使用方面不做stub(偷懶,只使用簡單的json格式指定需要呼叫的方法),用最簡單的方式實現一個簡易rpc框架,也不保證超時呼叫和服務發現等整合的邏輯,服務發現可以參考下文 本文程式碼地址(https://github.com/wuhuZhao/rpc_demo)

實現兩點之間的通訊(transport)

本段先實現兩端之間的通訊,只確保兩個端之間能互相通訊即可 server.go

package server

import (
	"fmt"
	"log"
	"net"
)

// Server: transport底層實現,通過Server去接受使用者端的位元組流
type Server struct {
	ls   net.Listener
	port int
}

// NewServer: 根據埠建立一個server
func NewServer(port int) *Server {
	s := &Server{port: port}
	s.init()
	return s
}

// init: 初始化伺服器端連線
func (s *Server) init() {
	l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", s.port))
	if err != nil {
		panic(err)
	}
	s.ls = l
}

// Start: 啟動伺服器端的埠監聽,採取一個conn一個g的模型,沒有使用reactor等高效能模型
func (s *Server) Start() {
	go func() {
		log.Printf("server [%s] start....", s.ls.Addr().String())
		for {
			conn, err := s.ls.Accept()
			if err != nil {
				panic(err)
			}
			go func() {
				buf := make([]byte, 1024)
				for {
					idx, err := conn.Read(buf)
					if err != nil {
						panic(err)
					}
					if len(buf) == 0 {
						continue
					}
					// todo 等序列化的資訊
					log.Printf("[conn: %v] get data: %vn", conn.RemoteAddr(), string(buf[:idx]))

				}
			}()
		}
	}()

}

// Close: 關閉服務監聽
func (s *Server) Close() error {
	return s.ls.Close()
}


// Close: 關閉服務監聽
func (s *Server) Close() error {
	return s.ls.Close()
}

client.go

package client

import (
	"fmt"
	"log"
	"net"
	"unsafe"
)

type Client struct {
	port int
	conn net.Conn
}

func NewClient(port int) *Client {
	c := &Client{port: port}
	c.init()
	return c
}

// init: initialize tcp client
func (c *Client) init() {
	conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", c.port))
	if err != nil {
		panic(err)
	}
	c.conn = conn

}

func (c *Client) Send(statement string) error {
	_, err := c.conn.Write(*(*[]byte)(unsafe.Pointer(&statement)))
	if err != nil {
		panic(err)
	}
	return nil
}

// Close: use to close connection
func (c *Client) Close() error {
	return c.conn.Close()
}

使用main.go做測試 main.go

package main

import (
	"rpc_demo/internal/client"
	"rpc_demo/internal/server"
	"time"
)

func main() {
	s := server.NewServer(9999)
	s.Start()
	time.Sleep(5 * time.Second)
	c := client.NewClient(9999)
	c.Send("this is a testn")
	time.Sleep(5 * time.Second)
}

執行一次main.go, go run main.go

2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test

可以證明第一部分的任務已經完成,可以實現兩端之間的通訊了

實現反射呼叫已註冊的方法

實現了雙端的通訊以後,我們在internal.go裡實現兩個方法,一個是註冊,一個是呼叫,因為go有執行時的反射,所以我們使用反射去註冊每一個需要呼叫到的方法,然後提供全域性唯一的函數名,讓client端可以實現指定方法的呼叫

internal.go

package internal

import (
	"errors"
	"fmt"
	"reflect"
	"runtime"
	"strings"
)

// 全域性唯一
var GlobalMethod = &Method{methods: map[string]reflect.Value{}}

type Method struct {
	methods map[string]reflect.Value
}

func (m *Method) register(impl interface{}) error {
	pl := reflect.ValueOf(impl)
	if pl.Kind() != reflect.Func {
		return errors.New("impl should be function")
	}
	// 獲取函數名
	methodName := runtime.FuncForPC(pl.Pointer()).Name()
	if len(strings.Split(methodName, ".")) < 1 {
		return errors.New("invalid function name")
	}
	lastFuncName := strings.Split(methodName, ".")[1]
	m.methods[lastFuncName] = pl
	fmt.Printf("methods: %vn", m.methods)
	return nil
}

func (m *Method) call(methodName string, callParams ...interface{}) ([]interface{}, error) {
	fn, ok := m.methods[methodName]
	if !ok {
		return nil, errors.New("impl method not found! Please Register first")
	}
	in := make([]reflect.Value, len(callParams))
	for i := 0; i < len(callParams); i++ {
		in[i] = reflect.ValueOf(callParams[i])
	}
	res := fn.Call(in)
	out := make([]interface{}, len(res))
	for i := 0; i < len(res); i++ {
		out[i] = res[i].Interface()
	}
	return out, nil
}

func Call(methodName string, callParams ...interface{}) ([]interface{}, error) {
	return GlobalMethod.call(methodName, callParams...)
}

func Register(impl interface{}) error {
	return GlobalMethod.register(impl)
}

在單測裡測試一下這個註冊和呼叫的功能internal_test.go

package internal

import (
	"testing"
)

func Sum(a, b int) int {
	return a + b
}
func TestRegister(t *testing.T) {
	err := Register(Sum)
	if err != nil {
		t.Fatalf("err: %vn", err)
	}
	t.Logf("test successn")
}

func TestCall(t *testing.T) {
	TestRegister(t)
	result, err := Call("Sum", 1, 2)
	if err != nil {
		t.Fatalf("err: %vn", err)
	}
	if len(result) != 1 {
		t.Fatalf("len(result) is not equal to 1n")
	}
	t.Logf("Sum(1,2) = %dn", result[0].(int))
	if err := recover(); err != nil {
		t.Fatalf("%vn", err)
	}
}

執行呼叫

/usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v

Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v

=== RUN   TestCall
methods: map[Sum:<func(int, int) int Value>]
    /root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:15: test success
    /root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:27: Sum(1,2) = 3
--- PASS: TestCall (0.00s)
PASS
ok      rpc_demo/internal    0.002s

可以看到這個註冊和呼叫的過程已經實現並且達到指定方法呼叫的作用

設計struct完整表達一次完整的rpc呼叫,並且封裝json庫中的Decoder和Encoder,完成序列化和反序列化

internal.go

type RpcRequest struct {
	MethodName string
	Params     []interface{}
}

type RpcResponses struct {
	Returns []interface{}
	Err     error
}

transport.go考慮可以對接更多的格式,所以抽象了一層進行使用(demo肯定沒有更多格式了)

package transport

// Transport: 序列化格式的抽象層,從connection中讀取資料序列化並且反序列化到connection中
type Transport interface {
	Decode(v interface{}) error
	Encode(v interface{}) error
	Close()
}

json_transport.go

package transport

import (
	"encoding/json"
	"net"
)

var _ Transport = (*JSONTransport)(nil)

type JSONTransport struct {
	encoder *json.Encoder
	decoder *json.Decoder
}

// NewJSONTransport: 負責讀取和寫入conn
func NewJSONTransport(conn net.Conn) *JSONTransport {
	return &JSONTransport{json.NewEncoder(conn), json.NewDecoder(conn)}
}

// Decode: use json package to decode
func (t *JSONTransport) Decode(v interface{}) error {
	if err := t.decoder.Decode(v); err != nil {
		return err
	}
	return nil
}

// Encode: use json package to encode
func (t *JSONTransport) Encode(v interface{}) error {
	if err := t.encoder.Encode(v); err != nil {
		return err
	}
	return nil
}

// Close: not implement
func (dec *JSONTransport) Close() {

}

然後我們將伺服器端和使用者端的邏輯進行修改,改成通過上面兩個結構體進行通訊,然後返回一次呼叫 server.go

//...
		for {
			conn, err := s.ls.Accept()
			if err != nil {
				panic(err)
			}
			tsp := transport.NewJSONTransport(conn)
			go func() {
				for {
					request := &internal.RpcRequest{}
					err := tsp.Decode(request)
					if err != nil {
						panic(err)
					}
					log.Printf("[server] get request: %vn", request)
					result, err := internal.Call(request.MethodName, request.Params...)
					log.Printf("[server] invoke method: %vn", result)
					if err != nil {
						response := &internal.RpcResponses{Returns: nil, Err: err}
						tsp.Encode(response)
						continue
					}
					response := &internal.RpcResponses{Returns: result, Err: err}
					if err := tsp.Encode(response); err != nil {
						log.Printf("[server] encode response err: %vn", err)
						continue
					}
				}
			}()
		}
        //...

client.go

// ...
// Call: remote invoke
func (c *Client) Call(methodName string, params ...interface{}) (res *internal.RpcResponses) {
	request := internal.RpcRequest{MethodName: methodName, Params: params}
	log.Printf("[client] create request to invoke server: %vn", request)
	err := c.tsp.Encode(request)
	if err != nil {
		panic(err)
	}
	res = &internal.RpcResponses{}
	if err := c.tsp.Decode(res); err != nil {
		panic(err)
	}
	log.Printf("[client] get response from server: %vn", res)
	return res
}
// ...

main.go

package main

import (
	"log"
	"rpc_demo/internal"
	"rpc_demo/internal/client"
	"rpc_demo/internal/server"
	"strings"
	"time"
)

// Rpc方法的一個簡易實現
func Join(a ...string) string {
	res := &strings.Builder{}
	for i := 0; i < len(a); i++ {
		res.WriteString(a[i])
	}
	return res.String()
}

func main() {
	internal.Register(Join)
	s := server.NewServer(9999)
	s.Start()
	time.Sleep(5 * time.Second)
	c := client.NewClient(9999)
	res := c.Call("Join", "aaaaa", "bbbbb", "ccccccccc", "end")
	if res.Err != nil {
		log.Printf("[main] get an error from server: %vn", res.Err)
		return
	}
	log.Printf("[main] get a response from server: %vn", res.Returns[0].(string))
	time.Sleep(5 * time.Second)
}

接下來我們執行一下main

[root@hecs-74066 rpc_demo]# go run main.go 
2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test

[root@hecs-74066 rpc_demo]# go run main.go 
2023/03/05 21:53:41 server [127.0.0.1:9999] start....
2023/03/05 21:53:46 [client] create request to invoke server: {Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] get request: &{Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] invoke method: [aaaaabbbbbcccccccccend]
2023/03/05 21:53:46 [client] get response from server: &{[aaaaabbbbbcccccccccend] <nil>}
2023/03/05 21:53:46 [main] get a response from server: aaaaabbbbbcccccccccend

總結(自我pua)

這樣我們就實現了一個簡單的rpc框架了,符合最簡單的架構圖,從client->序列化請求->transport -> 反序列化 ->server然後從server->序列化請求->transport->反序列化請求->client。當然從可用性的角度來說是差遠了,沒有實現stub程式碼,也沒有idl的實現,導致所有的註冊方法都是寫死,可用性不高,而且沒有整合服務發現(可以參考我的另一篇文章去整合)和熔斷等功能,也沒用中介軟體(也是我的另一篇文章)和超時等豐富的功能在裡面,並且最近看了不少rpc框架的原始碼,感覺這個demo的設計也差遠了。不過因為時間問題和程式碼的複雜性問題(單純懶),起碼算是實現了一個簡單的rpc框架。

推薦一些比較好的框架實現

到此這篇關於Golang實現簡易的rpc呼叫的文章就介紹到這了,更多相關Golang rpc呼叫內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com