日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Go語言實戰之實現一個簡單分布式系統_Golang

作者:MarvinZhang ? 更新時間: 2022-12-01 編程語言

引子

如今很多云原生系統、分布式系統,例如 Kubernetes,都是用 Go 語言寫的,這是因為 Go 語言天然支持異步編程,而且靜態語言能保證應用系統的穩定性。筆者的開源項目 Crawlab 作為爬蟲管理平臺,也應用到了分布式系統。本篇文章將介紹如何用 Go 語言編寫一個簡單的分布式系統。

思路

在開始寫代碼之前,我們先思考一下需要實現些什么。

  • 主節點(Master Node):中控系統,相當于軍隊中的指揮官,派發任務命令
  • 工作節點(Worker Node):執行者,相當于軍隊中的士兵,執行任務

除了上面的概念以外,我們需要實現一些簡單功能。

  • 上報運行狀態(Report Status):工作節點向主節點上報當前狀態
  • 分派任務(Assign Task):通過 API 向主節點發起請求,主節點再向工作節點分派任務
  • 運行腳本(Execute Script):工作節點執行任務中的腳本

整個流程示意圖如下。

實戰

節點通信

節點之間的通信在分布式系統中非常重要,畢竟每個節點或機器如果孤立運行,就失去了分布式系統的意義。因此,節點通信在分布式系統中是核心模塊。

gRPC 協議

首先,我們來想一下,如何讓節點之間進行相互通信。最常用的通信方式就是 API,不過這個通信方式有個缺點,就是需要將各個節點的 IP 地址及端口顯示暴露給其他節點,這在公網中是不太安全的。因此,我們選擇了 gRPC,一種流行的遠程過程調用(Remote Procedure Call,RPC)框架。這里我們不過多的解釋 RPC 或 gRPC 的原理,簡而言之,就是能讓調用者在遠程機器上執行命令的協議方式。

為了使用 gRPC 框架,我們先創建 go.mod 并輸入以下內容,并執行 go mod download。注意:對于國內的朋友,或許需要添加代理才能正常下載,可以先執行 export GOPROXY=goproxy.cn,direct 后再執行下載命令。

module go-distributed-system
?
go 1.17
?
require (
  github.com/golang/protobuf v1.5.0
  google.golang.org/grpc v1.27.0
  google.golang.org/protobuf v1.27.1
)

然后,我們創建 Protocol Buffers 文件 node.proto(表示節點對應的 gRPC 協議文件),并輸入以下內容。

syntax = "proto3";
?
package core;
option go_package = ".;core";
?
message Request {
  string action = 1;
}
?
message Response {
  string data = 1;
}
?
service NodeService {
  rpc ReportStatus(Request) returns (Response){};       // Simple RPC
  rpc AssignTask(Request) returns (stream Response){};  // Server-Side RPC
}

在這里我們創建了兩個 RPC 服務,分別是負責上報狀態的 Simple RPC ReportStatus 以及 Server-Side RPC AssignTask。Simple RPC 和 Server-Side RPC 的區別如下圖所示,主要區別在于 Server-Side RPC 可以從通過流(Stream)向客戶端(Client)主動發送數據,而 Simple RPC 只能從客戶端向服務端(Server)發請求。

創建好 .proto 文件后,我們需要將這個 gRPC 協議文件轉化為 .go 代碼文件,從而能被 Go 程序引用。在命令行窗口中執行如下命令。注意:編譯工具 protoc 不是自帶的,需要單獨下載,具體可以參考文檔 https://grpc.io/docs/protoc-installation/。

mkdir core
protoc --go_out=./core \
    --go-grpc_out=./core \
    node.proto

執行完后,可以在 core 目錄下看到兩個 Go 代碼文件, node.pb.gonode_grpc.pb.go,這相當于 Go 程序中對應的 gRPC 庫。

gRPC 服務端

現在開始編寫服務端邏輯。

咱們先創建一個新文件 core/node_service_server.go,輸入以下內容。主要邏輯就是實現了之前創建好的 gRPC 協議中的兩個調用方法。其中,暴露了 CmdChannel 這個通道(Channel)來獲取需要發送到工作節點的命令。

package core
?
import (
  "context"
)
?
type NodeServiceGrpcServer struct {
  UnimplementedNodeServiceServer
?
  // channel to receive command
  CmdChannel chan string
}
?
func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
  return &Response{Data: "ok"}, nil
}
?
func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
  for {
    select {
    case cmd := <-n.CmdChannel:
      // receive command and send to worker node (client)
      if err := server.Send(&Response{Data: cmd}); err != nil {
        return err
      }
    }
  }
}
?
var server *NodeServiceGrpcServer
?
// GetNodeServiceGrpcServer singleton service
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
  if server == nil {
    server = &NodeServiceGrpcServer{
      CmdChannel: make(chan string),
    }
  }
  return server
}

gRPC 客戶端

gRPC 客戶端不需要具體實現,我們通常只需要調用 gRPC 客戶端的方法,程序會自動發起向服務端的請求以及獲取后續的響應。

主節點

編寫好了節點通信的基礎部分,現在我們需要實現主節點了,這是整個中心化分布式系統的核心。

咱們創建一個新的文件 node.go,輸入以下內容。

package core
?
import (
  "github.com/gin-gonic/gin"
  "google.golang.org/grpc"
  "net"
  "net/http"
)
?
// MasterNode is the node instance
type MasterNode struct {
  api     *gin.Engine            // api server
  ln      net.Listener           // listener
  svr     *grpc.Server           // grpc server
  nodeSvr *NodeServiceGrpcServer // node service
}
?
func (n *MasterNode) Init() (err error) {
  // TODO: implement me
  panic("implement me")
}
?
func (n *MasterNode) Start() {
  // TODO: implement me
  panic("implement me")
}
?
var node *MasterNode
?
// GetMasterNode returns the node instance
func GetMasterNode() *MasterNode {
  if node == nil {
    // node
    node = &MasterNode{}
?
    // initialize node
    if err := node.Init(); err != nil {
      panic(err)
    }
  }
?
  return node
}

其中,我們創建了兩個占位方法 InitStart,我們分別實現。

在初始化方法 Init 中,我們需要做幾件事情:

  • 注冊 gRPC 服務
  • 注冊 API 服務

現在,在 Init 方法中加入如下代碼。

func (n *MasterNode) Init() (err error) {
  // grpc server listener with port as 50051
  n.ln, err = net.Listen("tcp", ":50051")
  if err != nil {
    return err
  }
?
  // grpc server
  n.svr = grpc.NewServer()
?
  // node service
  n.nodeSvr = GetNodeServiceGrpcServer()
?
  // register node service to grpc server
  RegisterNodeServiceServer(node.svr, n.nodeSvr)
?
  // api
  n.api = gin.Default()
  n.api.POST("/tasks", func(c *gin.Context) {
    // parse payload
    var payload struct {
      Cmd string `json:"cmd"`
    }
    if err := c.ShouldBindJSON(&payload); err != nil {
      c.AbortWithStatus(http.StatusBadRequest)
      return
    }
?
    // send command to node service
    n.nodeSvr.CmdChannel <- payload.Cmd
?
    c.AbortWithStatus(http.StatusOK)
  })
?
  return nil
}

可以看到,我們新建了一個 gRPC Server,并將之前的 NodeServiceGrpcServer 注冊了進去。另外,我們還用 gin 框架創建了一個簡單的 API 服務,可以 POST 請求到 /tasksNodeServiceGrpcServer 中的命令通道 CmdChannel 傳送命令。這樣就將各個部件串接起來了!

啟動方法 Start 很簡單,就是啟動 gRPC Server 以及 API Server。

func (n *MasterNode) Start() {
  // start grpc server
  go n.svr.Serve(n.ln)
?
  // start api server
  _ = n.api.Run(":9092")
?
  // wait for exit
  n.svr.Stop()
}

下一步,我們就要實現實際做任務的工作節點了。

工作節點

現在,我們創建一個新文件 core/worker_node.go,輸入以下內容。

package core
?
import (
  "context"
  "google.golang.org/grpc"
  "os/exec"
)
?
type WorkerNode struct {
  conn *grpc.ClientConn  // grpc client connection
  c    NodeServiceClient // grpc client
}
?
func (n *WorkerNode) Init() (err error) {
  // connect to master node
  n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
  if err != nil {
    return err
  }
?
  // grpc client
  n.c = NewNodeServiceClient(n.conn)
?
  return nil
}
?
func (n *WorkerNode) Start() {
  // log
  fmt.Println("worker node started")
?
  // report status
  _, _ = n.c.ReportStatus(context.Background(), &Request{})
?
  // assign task
  stream, _ := n.c.AssignTask(context.Background(), &Request{})
  for {
    // receive command from master node
    res, err := stream.Recv()
    if err != nil {
      return
    }
?
    // log command
    fmt.Println("received command: ", res.Data)
?
    // execute command
    parts := strings.Split(res.Data, " ")
    if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
      fmt.Println(err)
    }
  }
}
?
var workerNode *WorkerNode
?
func GetWorkerNode() *WorkerNode {
  if workerNode == nil {
    // node
    workerNode = &WorkerNode{}
?
    // initialize node
    if err := workerNode.Init(); err != nil {
      panic(err)
    }
  }
?
  return workerNode
}

其中,我們在初始化方法 Init 中創建了gRPC 客戶端,并連接了主節點的 gRPC 服務端。

在啟動方法 Start 中做了幾件事情:

  • 調用上報狀態(Report Status)的 Simple RPC 方法
  • 調用分配任務(Assign Task)的 Server-Side RPC 方法,獲取到了流(Stream)
  • 通過循環不斷接受流傳輸過來的來自服務端(也就是主節點)的信息,并執行命令

這樣,整個包含主節點、工作節點的分布式系統核心邏輯就寫好了!

將它們放在一起

最后,我們需要將這些核心邏輯用命令行工具封裝一下,以便啟用。

創建主程序文件 main.go,并輸入以下內容。

package main
?
import (
  "go-distributed-system/core"
  "os"
)
?
func main() {
  nodeType := os.Args[0]
  switch nodeType {
  case "master":
    core.GetMasterNode().Start()
  case "worker":
    core.GetWorkerNode().Start()
  default:
    panic("invalid node type")
  }
}

這樣,整個簡單的分布式系統就創建好了!

代碼效果

下面我們來運行一下代碼。

打開兩個命令行窗口,其中一個輸入 go run main.go master 啟動主節點,另一個輸入 go run main.go worker 啟動工作節點。

如果主節點啟動成功,將會看到如下日志信息。

[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
?
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)
?
[GIN-debug] POST   /tasks                    --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers)
[GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
[GIN-debug] Listening and serving HTTP on :9092

如果工作節點啟動成功,將會看到如下日志信息。

worker node started

主節點、工作節點都啟動成功后,我們在另外一個命令行窗口中輸入如下命令來發起 API 請求。

curl -X POST \
? -H "Content-Type: application/json" \
? -d '{"cmd": "touch /tmp/hello-distributed-system"}' \
? http://localhost:9092/tasks

在工作節點窗口應該可以看到日志 received command: touch /tmp/hello-distributed-system

然后查看文件是否順利生成,執行 ls -l /tmp/hello-distributed-system

-rw-r--r-- ?1 marvzhang ?wheel ? ? 0B Oct 26 12:22 /tmp/hello-distributed-system

文件成功生成,表示已經通過工作節點執行成功了!大功告成!

總結

本篇文章通過 RPC 框架 gRPC 以及 Go 語言自帶的 Channel,將節點串接起來,開發出了一個簡單的分布式系統。所用到的核心庫和技術:

  • gRPC
  • Protocol Buffers
  • channel
  • gin
  • os/exec

整個代碼示例倉庫在 GitHub 上: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system

原文鏈接:https://juejin.cn/post/7158685917006266398

欄目分類
最近更新