<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
如今很多雲原生系統、分散式系統,例如 Kubernetes,都是用 Go 語言寫的,這是因為 Go 語言天然支援非同步程式設計,而且靜態語言能保證應用系統的穩定性。筆者的開源專案 Crawlab 作為爬蟲管理平臺,也應用到了分散式系統。本篇文章將介紹如何用 Go 語言編寫一個簡單的分散式系統。
在開始寫程式碼之前,我們先思考一下需要實現些什麼。
除了上面的概念以外,我們需要實現一些簡單功能。
整個流程示意圖如下。
節點之間的通訊在分散式系統中非常重要,畢竟每個節點或機器如果孤立執行,就失去了分散式系統的意義。因此,節點通訊在分散式系統中是核心模組。
gRPC 協定
首先,我們來想一下,如何讓節點之間進行相互通訊。最常用的通訊方式就是 API,不過這個通訊方式有個缺點,就是需要將各個節點的 IP 地址及埠顯示暴露給其他節點,這在公網中是不太安全的。因此,我們選擇了 gRPC,一種流行的遠端過程呼叫(Remote Procedure Call,RPC)框架。這裡我們不過多的解釋 RPC 或 gRPC 的原理,簡而言之,就是能讓呼叫者在遠端機器上執行命令的協定方式。
為了使用 gRPC 框架,我們先建立 go.mod
並輸入以下內容,並執行 go mod download
。注意:對於國內的朋友,或許需要新增代理才能正常下載,可以先執行 export GOPROXY=goproxy.cn,direct
後再執行下載命令。
module go-distributed-system go 1.17 require ( github.com/golang/protobuf v1.5.0 google.golang.org/grpc v1.27.0 google.golang.org/protobuf v1.27.1 )
然後,我們建立 Protocol Buffers 檔案 node.proto
(表示節點對應的 gRPC 協定檔案),並輸入以下內容。
syntax = "proto3"; package core; option go_package = ".;core"; message Request { string action = 1; } message Response { string data = 1; } service NodeService { rpc ReportStatus(Request) returns (Response){}; // Simple RPC rpc AssignTask(Request) returns (stream Response){}; // Server-Side RPC }
在這裡我們建立了兩個 RPC 服務,分別是負責上報狀態的 Simple RPC ReportStatus
以及 Server-Side RPC AssignTask
。Simple RPC 和 Server-Side RPC 的區別如下圖所示,主要區別在於 Server-Side RPC 可以從通過流(Stream)向用戶端(Client)主動傳送資料,而 Simple RPC 只能從使用者端向伺服器端(Server)發請求。
建立好 .proto
檔案後,我們需要將這個 gRPC 協定檔案轉化為 .go
程式碼檔案,從而能被 Go 程式參照。在命令列視窗中執行如下命令。注意:編譯工具 protoc
不是自帶的,需要單獨下載,具體可以參考檔案 https://grpc.io/docs/protoc-installation/。
mkdir core protoc --go_out=./core --go-grpc_out=./core node.proto
執行完後,可以在 core
目錄下看到兩個 Go 程式碼檔案, node.pb.go
和 node_grpc.pb.go
,這相當於 Go 程式中對應的 gRPC 庫。
gRPC 伺服器端
現在開始編寫伺服器端邏輯。
咱們先建立一個新檔案 core/node_service_server.go
,輸入以下內容。主要邏輯就是實現了之前建立好的 gRPC 協定中的兩個呼叫方法。其中,暴露了 CmdChannel
這個通道(Channel)來獲取需要傳送到工作節點的命令。
package core import ( "context" ) type NodeServiceGrpcServer struct { UnimplementedNodeServiceServer // channel to receive command CmdChannel chan string } func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) { return &Response{Data: "ok"}, nil } func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error { for { select { case cmd := <-n.CmdChannel: // receive command and send to worker node (client) if err := server.Send(&Response{Data: cmd}); err != nil { return err } } } } var server *NodeServiceGrpcServer // GetNodeServiceGrpcServer singleton service func GetNodeServiceGrpcServer() *NodeServiceGrpcServer { if server == nil { server = &NodeServiceGrpcServer{ CmdChannel: make(chan string), } } return server }
gRPC 使用者端
gRPC 使用者端不需要具體實現,我們通常只需要呼叫 gRPC 使用者端的方法,程式會自動發起向伺服器端的請求以及獲取後續的響應。
編寫好了節點通訊的基礎部分,現在我們需要實現主節點了,這是整個中心化分散式系統的核心。
咱們建立一個新的檔案 node.go
,輸入以下內容。
package core import ( "github.com/gin-gonic/gin" "google.golang.org/grpc" "net" "net/http" ) // MasterNode is the node instance type MasterNode struct { api *gin.Engine // api server ln net.Listener // listener svr *grpc.Server // grpc server nodeSvr *NodeServiceGrpcServer // node service } func (n *MasterNode) Init() (err error) { // TODO: implement me panic("implement me") } func (n *MasterNode) Start() { // TODO: implement me panic("implement me") } var node *MasterNode // GetMasterNode returns the node instance func GetMasterNode() *MasterNode { if node == nil { // node node = &MasterNode{} // initialize node if err := node.Init(); err != nil { panic(err) } } return node }
其中,我們建立了兩個佔位方法 Init
和 Start
,我們分別實現。
在初始化方法 Init
中,我們需要做幾件事情:
現在,在 Init
方法中加入如下程式碼。
func (n *MasterNode) Init() (err error) { // grpc server listener with port as 50051 n.ln, err = net.Listen("tcp", ":50051") if err != nil { return err } // grpc server n.svr = grpc.NewServer() // node service n.nodeSvr = GetNodeServiceGrpcServer() // register node service to grpc server RegisterNodeServiceServer(node.svr, n.nodeSvr) // api n.api = gin.Default() n.api.POST("/tasks", func(c *gin.Context) { // parse payload var payload struct { Cmd string `json:"cmd"` } if err := c.ShouldBindJSON(&payload); err != nil { c.AbortWithStatus(http.StatusBadRequest) return } // send command to node service n.nodeSvr.CmdChannel <- payload.Cmd c.AbortWithStatus(http.StatusOK) }) return nil }
可以看到,我們新建了一個 gRPC Server,並將之前的 NodeServiceGrpcServer
註冊了進去。另外,我們還用 gin
框架建立了一個簡單的 API 服務,可以 POST 請求到 /tasks
向 NodeServiceGrpcServer
中的命令通道 CmdChannel
傳送命令。這樣就將各個部件串接起來了!
啟動方法 Start
很簡單,就是啟動 gRPC Server 以及 API Server。
func (n *MasterNode) Start() { // start grpc server go n.svr.Serve(n.ln) // start api server _ = n.api.Run(":9092") // wait for exit n.svr.Stop() }
下一步,我們就要實現實際做任務的工作節點了。
現在,我們建立一個新檔案 core/worker_node.go
,輸入以下內容。
package core import ( "context" "google.golang.org/grpc" "os/exec" ) type WorkerNode struct { conn *grpc.ClientConn // grpc client connection c NodeServiceClient // grpc client } func (n *WorkerNode) Init() (err error) { // connect to master node n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { return err } // grpc client n.c = NewNodeServiceClient(n.conn) return nil } func (n *WorkerNode) Start() { // log fmt.Println("worker node started") // report status _, _ = n.c.ReportStatus(context.Background(), &Request{}) // assign task stream, _ := n.c.AssignTask(context.Background(), &Request{}) for { // receive command from master node res, err := stream.Recv() if err != nil { return } // log command fmt.Println("received command: ", res.Data) // execute command parts := strings.Split(res.Data, " ") if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil { fmt.Println(err) } } } var workerNode *WorkerNode func GetWorkerNode() *WorkerNode { if workerNode == nil { // node workerNode = &WorkerNode{} // initialize node if err := workerNode.Init(); err != nil { panic(err) } } return workerNode }
其中,我們在初始化方法 Init
中建立了gRPC 使用者端,並連線了主節點的 gRPC 伺服器端。
在啟動方法 Start
中做了幾件事情:
這樣,整個包含主節點、工作節點的分散式系統核心邏輯就寫好了!
最後,我們需要將這些核心邏輯用命令列工具封裝一下,以便啟用。
建立主程式檔案 main.go
,並輸入以下內容。
package main import ( "go-distributed-system/core" "os" ) func main() { nodeType := os.Args[0] switch nodeType { case "master": core.GetMasterNode().Start() case "worker": core.GetWorkerNode().Start() default: panic("invalid node type") } }
這樣,整個簡單的分散式系統就建立好了!
下面我們來執行一下程式碼。
開啟兩個命令列視窗,其中一個輸入 go run main.go master
啟動主節點,另一個輸入 go run main.go worker
啟動工作節點。
如果主節點啟動成功,將會看到如下紀錄檔資訊。
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached. [GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production. - using env: export GIN_MODE=release - using code: gin.SetMode(gin.ReleaseMode) [GIN-debug] POST /tasks --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers) [GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value. Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details. [GIN-debug] Listening and serving HTTP on :9092
如果工作節點啟動成功,將會看到如下紀錄檔資訊。
worker node started
主節點、工作節點都啟動成功後,我們在另外一個命令列視窗中輸入如下命令來發起 API 請求。
curl -X POST
-H "Content-Type: application/json"
-d '{"cmd": "touch /tmp/hello-distributed-system"}'
http://localhost:9092/tasks
在工作節點視窗應該可以看到紀錄檔 received command: touch /tmp/hello-distributed-system
。
然後檢視檔案是否順利生成,執行 ls -l /tmp/hello-distributed-system
。
-rw-r--r-- 1 marvzhang wheel 0B Oct 26 12:22 /tmp/hello-distributed-system
檔案成功生成,表示已經通過工作節點執行成功了!大功告成!
本篇文章通過 RPC 框架 gRPC 以及 Go 語言自帶的 Channel,將節點串接起來,開發出了一個簡單的分散式系統。所用到的核心庫和技術:
整個程式碼範例倉庫在 GitHub 上: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system
到此這篇關於Go語言實戰之實現一個簡單分散式系統的文章就介紹到這了,更多相關Go語言分散式系統內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45