Golang in Action: How to implement a simple distributed system

tikazyq

Yeqing (Marvin) Zhang

Posted on October 26, 2022

Golang in Action: How to implement a simple distributed system

Introduction

Nowadays, many cloud-native and distributed systems such as Kubernetes are written in Go. This is because Go natively supports not only asynchronous programming but also static typing to ensure system stability. My open-source project Crawlab, a web crawler management platform, has applied distributed architecture. This article will introduce about how to design and implement a simple distributed system.

Ideas

Before we start to code, we need to think about what we need to implement.

  • Master Node: A central control system, similar to a troop commander to issue orders
  • Worker Node: Executors, similar to soldiers to execute tasks

Apart from the concepts above, we would need to implement some simple functionalities.

  • Report Status: Worker nodes report current status to the master node.
  • Assign Task: Client makes API requests to the master node which assign tasks to worker nodes.
  • Execute Script: Worker nodes execute scripts from tasks.

The overall architectural diagram is as below.

Main Process Diagram

Action

Node Communication

The communication between nodes are very important in distributed systems. If each node runs individually, it will be not necessary to use a distributed system. Therefore, the communication module is an essential part in distributed systems.

gRPC Protocol

First, let's think about how to make nodes to communicate with each other. The most common way is API, which yet has a drawback that it requires nodes to expose their IP addresses and ports to others, which is very insecure particularly in the public network. In light of that, we chose gRPC, a popular Remote Procedure Call (RPC) framework. We won't go deep into its technical details, but it is actually a mechanism to allow remote machines to execute commands by RPC callers.

To use gRPC, let's first create a file named go.mod and enter below content, then execute go mod download.



module go-distributed-system

go 1.17

require (
    github.com/golang/protobuf v1.5.0
    google.golang.org/grpc v1.27.0
    google.golang.org/protobuf v1.27.1
)


Enter fullscreen mode Exit fullscreen mode

Then we create a Protocol Buffers file node.proto, a gRPC protocol file, and enter below content.



syntax = "proto3";

package core;
option go_package = ".;core";

message Request {
  string action = 1;
}

message Response {
  string data = 1;
}

service NodeService {
  rpc ReportStatus(Request) returns (Response){};       // Simple RPC
  rpc AssignTask(Request) returns (stream Response){};  // Server-Side RPC
}


Enter fullscreen mode Exit fullscreen mode

Here we have created two RPC service methods: one for reporting current status with a Simple RPC, another for assigning tasks with a Server-Side RPC. The difference between Simple RPC and Server-Side RPC is that Server-Side RPC can allow the server (master node in our case) to send data to the client (worker node) through a stream, but Simple RPC can only allow clients to call servers.

RPC Diagram

After .proto file is created, we need to compile it into .go code file so that it can be used by the Go program. Let's execute the command as below. (Note: the compiler protocol is not built-in and needs to be downloaded, please refer to https://grpc.io/docs/protoc-installation/)



mkdir core
protoc --go_out=./core \
    --go-grpc_out=./core \
    node.proto


Enter fullscreen mode Exit fullscreen mode

After it is executed, you can see two Go code files under the directory core, node.pb.go and node_grpc.pb.go respectively, which serve as the gRPC library.

gRPC Server

Now let's start writing server-side code.

Firstly create a new file core/node_service_server.go, and enter the content below. It implemented the two gRPC service methods created before. The channel CmdChannel in particular would transfer commands to be executed on worker nodes.



package core

import (
    "context"
)

type NodeServiceGrpcServer struct {
    UnimplementedNodeServiceServer

    // channel to receive command
    CmdChannel chan string
}

func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
    return &Response{Data: "ok"}, nil
}

func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
    for {
        select {
        case cmd := <-n.CmdChannel:
            // receive command and send to worker node (client)
            if err := server.Send(&Response{Data: cmd}); err != nil {
                return err
            }
        }
    }
}

var server *NodeServiceGrpcServer

// GetNodeServiceGrpcServer singleton service
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
    if server == nil {
        server = &NodeServiceGrpcServer{
            CmdChannel: make(chan string),
        }
    }
    return server
}


Enter fullscreen mode Exit fullscreen mode

gRPC Client

We don't have to care too much about the implementation of gRPC client. Instead, we only need to call the methods in the gRPC client, and the rest of the service requests and response will be handled automatically by the program.

Master Node

After we implemented the node communication part, we can now write the master node, which is the core of the distributed system.

Let's create a new file node.go and enter the content below.



package core

import (
    "github.com/gin-gonic/gin"
    "google.golang.org/grpc"
    "net"
    "net/http"
)

// MasterNode is the node instance
type MasterNode struct {
    api     *gin.Engine            // api server
    ln      net.Listener           // listener
    svr     *grpc.Server           // grpc server
    nodeSvr *NodeServiceGrpcServer // node service
}

func (n *MasterNode) Init() (err error) {
  // TODO: implement me
  panic("implement me")
}

func (n *MasterNode) Start() {
  // TODO: implement me
  panic("implement me")
}

var node *MasterNode

// GetMasterNode returns the node instance
func GetMasterNode() *MasterNode {
    if node == nil {
        // node
        node = &MasterNode{}

        // initialize node
        if err := node.Init(); err != nil {
            panic(err)
        }
    }

    return node
}


Enter fullscreen mode Exit fullscreen mode

There are two placeholder methods Init and Start to be implemented.

In the initialization method Init, we will two things:

  • Regster gRPC services
  • Register API services

Now, we can add below code in Init method.



func (n *MasterNode) Init() (err error) {
    // grpc server listener with port as 50051
    n.ln, err = net.Listen("tcp", ":50051")
    if err != nil {
        return err
    }

    // grpc server
    n.svr = grpc.NewServer()

    // node service
    n.nodeSvr = GetNodeServiceGrpcServer()

    // register node service to grpc server
    RegisterNodeServiceServer(node.svr, n.nodeSvr)

    // api
    n.api = gin.Default()
    n.api.POST("/tasks", func(c *gin.Context) {
        // parse payload
        var payload struct {
            Cmd string `json:"cmd"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.AbortWithStatus(http.StatusBadRequest)
            return
        }

        // send command to node service
        n.nodeSvr.CmdChannel <- payload.Cmd

        c.AbortWithStatus(http.StatusOK)
    })

    return nil
}


Enter fullscreen mode Exit fullscreen mode

Here we created a gRPC server, and registered NodeServiceGrpcServer created before. We then use API framework gin to create a simple API service, which can allow POST request to /tasks to send commands to the channel CmdChannel and pass to NodeServiceGrpcServer. All pieces have been put together!

The starting method Start is quite simple, which is simplely to start the gRPC server and API server.



func (n *MasterNode) Start() {
    // start grpc server
    go n.svr.Serve(n.ln)

    // start api server
    _ = n.api.Run(":9092")

    // wait for exit
    n.svr.Stop()
}


Enter fullscreen mode Exit fullscreen mode

下一步,我们就要实现实际做任务的工作节点了。

Worker Node

现在,我们创建一个新文件 core/worker_node.go,输入以下内容。



package core

import (
    "context"
    "google.golang.org/grpc"
    "os/exec"
)

type WorkerNode struct {
    conn *grpc.ClientConn  // grpc client connection
    c    NodeServiceClient // grpc client
}

func (n *WorkerNode) Init() (err error) {
    // connect to master node
    n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        return err
    }

    // grpc client
    n.c = NewNodeServiceClient(n.conn)

    return nil
}

func (n *WorkerNode) Start() {
    // log
    fmt.Println("worker node started")

    // report status
    _, _ = n.c.ReportStatus(context.Background(), &Request{})

    // assign task
    stream, _ := n.c.AssignTask(context.Background(), &Request{})
    for {
        // receive command from master node
        res, err := stream.Recv()
        if err != nil {
            return
        }

        // log command
        fmt.Println("received command: ", res.Data)

        // execute command
        parts := strings.Split(res.Data, " ")
        if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
            fmt.Println(err)
        }
    }
}

var workerNode *WorkerNode

func GetWorkerNode() *WorkerNode {
    if workerNode == nil {
        // node
        workerNode = &WorkerNode{}

        // initialize node
        if err := workerNode.Init(); err != nil {
            panic(err)
        }
    }

    return workerNode
}


Enter fullscreen mode Exit fullscreen mode

In the above code, we created the gRPC client and connected it to the gRPC server in Init method.

In Start method, we have done several things:

  1. Report status with a Simple RPC method.
  2. Assign tasks to acquire a stream with a Server-Side RPC method.
  3. Continuously receive data from the server (master node) via the acquired stream and execute commands.

Now we have completed all core logics in the distributed systems.

Putting them all together

Finally, we need to encapsulate these core functionalities.

Create the main entry file main.go and enter the content below.



package main

import (
    "go-distributed-system/core"
    "os"
)

func main() {
    nodeType := os.Args[0]
    switch nodeType {
    case "master":
        core.GetMasterNode().Start()
    case "worker":
        core.GetWorkerNode().Start()
    default:
        panic("invalid node type")
    }
}


Enter fullscreen mode Exit fullscreen mode

Now the simple distributed system is all done!

Final Results

We can then test the code.

Open two command prompts. Enter go run main.go master in one prompt to start the master node, and enter go run main.go worker to start the worker node in another.

If the master code starts successfully, you should be able to see the logs below.



[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] POST   /tasks                    --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers)
[GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
[GIN-debug] Listening and serving HTTP on :9092


Enter fullscreen mode Exit fullscreen mode

For worker node, you can see logs like this.



worker node started


Enter fullscreen mode Exit fullscreen mode

After the master node and worker node have all started, we can open another command prompt to execute the command below to make an API call.



curl -X POST \
    -H "Content-Type: application/json" \
    -d '{"cmd": "touch /tmp/hello-distributed-system"}' \
    http://localhost:9092/tasks


Enter fullscreen mode Exit fullscreen mode

In the worker node logs, you should be able to see received command: touch /tmp/hello-distributed-system.

Then let's check if the file has been created by executing ls -l /tmp/hello-distributed-system.



-rw-r--r--  1 marvzhang  wheel     0B Oct 26 12:22 /tmp/hello-distributed-system


Enter fullscreen mode Exit fullscreen mode

The file was successfully created, which means the worker node has executed the task successfully. Hooray!

Conclusion

This article introduced a way to develop a simple distributed system written in Golang, with gRPC and built-in Go channel.

Core libraries and techniques:

The code of the whole project is on GitHub: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system

💖 💪 🙅 🚩
tikazyq
Yeqing (Marvin) Zhang

Posted on October 26, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related