網站首頁 編程語言 正文
引子
如今很多云原生系統、分布式系統,例如 Kubernetes,都是用 Go 語言寫的,這是因為 Go 語言天然支持異步編程,而且靜態語言能保證應用系統的穩定性。筆者的開源項目 Crawlab 作為爬蟲管理平臺,也應用到了分布式系統。本篇文章將介紹如何用 Go 語言編寫一個簡單的分布式系統。
思路
在開始寫代碼之前,我們先思考一下需要實現些什么。
- 主節點(Master Node):中控系統,相當于軍隊中的指揮官,派發任務命令
- 工作節點(Worker Node):執行者,相當于軍隊中的士兵,執行任務
除了上面的概念以外,我們需要實現一些簡單功能。
- 上報運行狀態(Report Status):工作節點向主節點上報當前狀態
- 分派任務(Assign Task):通過 API 向主節點發起請求,主節點再向工作節點分派任務
- 運行腳本(Execute Script):工作節點執行任務中的腳本
整個流程示意圖如下。
實戰
節點通信
節點之間的通信在分布式系統中非常重要,畢竟每個節點或機器如果孤立運行,就失去了分布式系統的意義。因此,節點通信在分布式系統中是核心模塊。
gRPC 協議
首先,我們來想一下,如何讓節點之間進行相互通信。最常用的通信方式就是 API,不過這個通信方式有個缺點,就是需要將各個節點的 IP 地址及端口顯示暴露給其他節點,這在公網中是不太安全的。因此,我們選擇了 gRPC,一種流行的遠程過程調用(Remote Procedure Call,RPC)框架。這里我們不過多的解釋 RPC 或 gRPC 的原理,簡而言之,就是能讓調用者在遠程機器上執行命令的協議方式。
為了使用 gRPC 框架,我們先創建 go.mod
并輸入以下內容,并執行 go mod download
。注意:對于國內的朋友,或許需要添加代理才能正常下載,可以先執行 export GOPROXY=goproxy.cn,direct
后再執行下載命令。
module go-distributed-system ? go 1.17 ? require ( github.com/golang/protobuf v1.5.0 google.golang.org/grpc v1.27.0 google.golang.org/protobuf v1.27.1 )
然后,我們創建 Protocol Buffers 文件 node.proto
(表示節點對應的 gRPC 協議文件),并輸入以下內容。
syntax = "proto3"; ? package core; option go_package = ".;core"; ? message Request { string action = 1; } ? message Response { string data = 1; } ? service NodeService { rpc ReportStatus(Request) returns (Response){}; // Simple RPC rpc AssignTask(Request) returns (stream Response){}; // Server-Side RPC }
在這里我們創建了兩個 RPC 服務,分別是負責上報狀態的 Simple RPC ReportStatus
以及 Server-Side RPC AssignTask
。Simple RPC 和 Server-Side RPC 的區別如下圖所示,主要區別在于 Server-Side RPC 可以從通過流(Stream)向客戶端(Client)主動發送數據,而 Simple RPC 只能從客戶端向服務端(Server)發請求。
創建好 .proto
文件后,我們需要將這個 gRPC 協議文件轉化為 .go
代碼文件,從而能被 Go 程序引用。在命令行窗口中執行如下命令。注意:編譯工具 protoc
不是自帶的,需要單獨下載,具體可以參考文檔 https://grpc.io/docs/protoc-installation/。
mkdir core protoc --go_out=./core \ --go-grpc_out=./core \ node.proto
執行完后,可以在 core
目錄下看到兩個 Go 代碼文件, node.pb.go
和 node_grpc.pb.go
,這相當于 Go 程序中對應的 gRPC 庫。
gRPC 服務端
現在開始編寫服務端邏輯。
咱們先創建一個新文件 core/node_service_server.go
,輸入以下內容。主要邏輯就是實現了之前創建好的 gRPC 協議中的兩個調用方法。其中,暴露了 CmdChannel
這個通道(Channel)來獲取需要發送到工作節點的命令。
package core ? import ( "context" ) ? type NodeServiceGrpcServer struct { UnimplementedNodeServiceServer ? // channel to receive command CmdChannel chan string } ? func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) { return &Response{Data: "ok"}, nil } ? func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error { for { select { case cmd := <-n.CmdChannel: // receive command and send to worker node (client) if err := server.Send(&Response{Data: cmd}); err != nil { return err } } } } ? var server *NodeServiceGrpcServer ? // GetNodeServiceGrpcServer singleton service func GetNodeServiceGrpcServer() *NodeServiceGrpcServer { if server == nil { server = &NodeServiceGrpcServer{ CmdChannel: make(chan string), } } return server }
gRPC 客戶端
gRPC 客戶端不需要具體實現,我們通常只需要調用 gRPC 客戶端的方法,程序會自動發起向服務端的請求以及獲取后續的響應。
主節點
編寫好了節點通信的基礎部分,現在我們需要實現主節點了,這是整個中心化分布式系統的核心。
咱們創建一個新的文件 node.go
,輸入以下內容。
package core ? import ( "github.com/gin-gonic/gin" "google.golang.org/grpc" "net" "net/http" ) ? // MasterNode is the node instance type MasterNode struct { api *gin.Engine // api server ln net.Listener // listener svr *grpc.Server // grpc server nodeSvr *NodeServiceGrpcServer // node service } ? func (n *MasterNode) Init() (err error) { // TODO: implement me panic("implement me") } ? func (n *MasterNode) Start() { // TODO: implement me panic("implement me") } ? var node *MasterNode ? // GetMasterNode returns the node instance func GetMasterNode() *MasterNode { if node == nil { // node node = &MasterNode{} ? // initialize node if err := node.Init(); err != nil { panic(err) } } ? return node }
其中,我們創建了兩個占位方法 Init
和 Start
,我們分別實現。
在初始化方法 Init
中,我們需要做幾件事情:
- 注冊 gRPC 服務
- 注冊 API 服務
現在,在 Init
方法中加入如下代碼。
func (n *MasterNode) Init() (err error) { // grpc server listener with port as 50051 n.ln, err = net.Listen("tcp", ":50051") if err != nil { return err } ? // grpc server n.svr = grpc.NewServer() ? // node service n.nodeSvr = GetNodeServiceGrpcServer() ? // register node service to grpc server RegisterNodeServiceServer(node.svr, n.nodeSvr) ? // api n.api = gin.Default() n.api.POST("/tasks", func(c *gin.Context) { // parse payload var payload struct { Cmd string `json:"cmd"` } if err := c.ShouldBindJSON(&payload); err != nil { c.AbortWithStatus(http.StatusBadRequest) return } ? // send command to node service n.nodeSvr.CmdChannel <- payload.Cmd ? c.AbortWithStatus(http.StatusOK) }) ? return nil }
可以看到,我們新建了一個 gRPC Server,并將之前的 NodeServiceGrpcServer
注冊了進去。另外,我們還用 gin
框架創建了一個簡單的 API 服務,可以 POST 請求到 /tasks
向 NodeServiceGrpcServer
中的命令通道 CmdChannel
傳送命令。這樣就將各個部件串接起來了!
啟動方法 Start
很簡單,就是啟動 gRPC Server 以及 API Server。
func (n *MasterNode) Start() { // start grpc server go n.svr.Serve(n.ln) ? // start api server _ = n.api.Run(":9092") ? // wait for exit n.svr.Stop() }
下一步,我們就要實現實際做任務的工作節點了。
工作節點
現在,我們創建一個新文件 core/worker_node.go
,輸入以下內容。
package core ? import ( "context" "google.golang.org/grpc" "os/exec" ) ? type WorkerNode struct { conn *grpc.ClientConn // grpc client connection c NodeServiceClient // grpc client } ? func (n *WorkerNode) Init() (err error) { // connect to master node n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { return err } ? // grpc client n.c = NewNodeServiceClient(n.conn) ? return nil } ? func (n *WorkerNode) Start() { // log fmt.Println("worker node started") ? // report status _, _ = n.c.ReportStatus(context.Background(), &Request{}) ? // assign task stream, _ := n.c.AssignTask(context.Background(), &Request{}) for { // receive command from master node res, err := stream.Recv() if err != nil { return } ? // log command fmt.Println("received command: ", res.Data) ? // execute command parts := strings.Split(res.Data, " ") if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil { fmt.Println(err) } } } ? var workerNode *WorkerNode ? func GetWorkerNode() *WorkerNode { if workerNode == nil { // node workerNode = &WorkerNode{} ? // initialize node if err := workerNode.Init(); err != nil { panic(err) } } ? return workerNode }
其中,我們在初始化方法 Init
中創建了gRPC 客戶端,并連接了主節點的 gRPC 服務端。
在啟動方法 Start
中做了幾件事情:
- 調用上報狀態(Report Status)的 Simple RPC 方法
- 調用分配任務(Assign Task)的 Server-Side RPC 方法,獲取到了流(Stream)
- 通過循環不斷接受流傳輸過來的來自服務端(也就是主節點)的信息,并執行命令
這樣,整個包含主節點、工作節點的分布式系統核心邏輯就寫好了!
將它們放在一起
最后,我們需要將這些核心邏輯用命令行工具封裝一下,以便啟用。
創建主程序文件 main.go
,并輸入以下內容。
package main ? import ( "go-distributed-system/core" "os" ) ? func main() { nodeType := os.Args[0] switch nodeType { case "master": core.GetMasterNode().Start() case "worker": core.GetWorkerNode().Start() default: panic("invalid node type") } }
這樣,整個簡單的分布式系統就創建好了!
代碼效果
下面我們來運行一下代碼。
打開兩個命令行窗口,其中一個輸入 go run main.go master
啟動主節點,另一個輸入 go run main.go worker
啟動工作節點。
如果主節點啟動成功,將會看到如下日志信息。
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached. ? [GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production. - using env: export GIN_MODE=release - using code: gin.SetMode(gin.ReleaseMode) ? [GIN-debug] POST /tasks --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers) [GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value. Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details. [GIN-debug] Listening and serving HTTP on :9092
如果工作節點啟動成功,將會看到如下日志信息。
worker node started
主節點、工作節點都啟動成功后,我們在另外一個命令行窗口中輸入如下命令來發起 API 請求。
curl -X POST \
? -H "Content-Type: application/json" \
? -d '{"cmd": "touch /tmp/hello-distributed-system"}' \
? http://localhost:9092/tasks
在工作節點窗口應該可以看到日志 received command: touch /tmp/hello-distributed-system
。
然后查看文件是否順利生成,執行 ls -l /tmp/hello-distributed-system
。
-rw-r--r-- ?1 marvzhang ?wheel ? ? 0B Oct 26 12:22 /tmp/hello-distributed-system
文件成功生成,表示已經通過工作節點執行成功了!大功告成!
總結
本篇文章通過 RPC 框架 gRPC 以及 Go 語言自帶的 Channel,將節點串接起來,開發出了一個簡單的分布式系統。所用到的核心庫和技術:
- gRPC
- Protocol Buffers
- channel
- gin
- os/exec
整個代碼示例倉庫在 GitHub 上: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system
原文鏈接:https://juejin.cn/post/7158685917006266398
相關推薦
- 2022-09-19 Python使用read_csv讀數據遇到分隔符問題的2種解決方式_python
- 2022-06-27 Python查找多個字典公共鍵key的方法_python
- 2021-12-08 DBeaver下載安裝詳細教程_數據庫其它
- 2022-12-03 C++時間函數整理詳解_C 語言
- 2022-07-16 git查看和修改用戶名和郵箱
- 2022-03-28 Python中三種條件語句示例介紹_python
- 2023-05-23 深入了解React中的合成事件_React
- 2022-02-20 react中引入百度地圖時,去掉百度地圖的logo和信息
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支