Golang in Action: How to implement a simple distributed system
Yeqing (Marvin) Zhang
Posted on October 26, 2022
Introduction
Nowadays, many cloud-native and distributed systems such as Kubernetes are written in Go. This is because Go natively supports not only asynchronous programming but also static typing to ensure system stability. My open-source project Crawlab, a web crawler management platform, has applied distributed architecture. This article will introduce about how to design and implement a simple distributed system.
Ideas
Before we start to code, we need to think about what we need to implement.
- Master Node: A central control system, similar to a troop commander to issue orders
- Worker Node: Executors, similar to soldiers to execute tasks
Apart from the concepts above, we would need to implement some simple functionalities.
- Report Status: Worker nodes report current status to the master node.
- Assign Task: Client makes API requests to the master node which assign tasks to worker nodes.
- Execute Script: Worker nodes execute scripts from tasks.
The overall architectural diagram is as below.
Action
Node Communication
The communication between nodes are very important in distributed systems. If each node runs individually, it will be not necessary to use a distributed system. Therefore, the communication module is an essential part in distributed systems.
gRPC Protocol
First, let's think about how to make nodes to communicate with each other. The most common way is API, which yet has a drawback that it requires nodes to expose their IP addresses and ports to others, which is very insecure particularly in the public network. In light of that, we chose gRPC, a popular Remote Procedure Call (RPC) framework. We won't go deep into its technical details, but it is actually a mechanism to allow remote machines to execute commands by RPC callers.
To use gRPC, let's first create a file named go.mod
and enter below content, then execute go mod download
.
module go-distributed-system
go 1.17
require (
github.com/golang/protobuf v1.5.0
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.27.1
)
Then we create a Protocol Buffers file node.proto
, a gRPC protocol file, and enter below content.
syntax = "proto3";
package core;
option go_package = ".;core";
message Request {
string action = 1;
}
message Response {
string data = 1;
}
service NodeService {
rpc ReportStatus(Request) returns (Response){}; // Simple RPC
rpc AssignTask(Request) returns (stream Response){}; // Server-Side RPC
}
Here we have created two RPC service methods: one for reporting current status with a Simple RPC, another for assigning tasks with a Server-Side RPC. The difference between Simple RPC and Server-Side RPC is that Server-Side RPC can allow the server (master node in our case) to send data to the client (worker node) through a stream, but Simple RPC can only allow clients to call servers.
After .proto
file is created, we need to compile it into .go
code file so that it can be used by the Go program. Let's execute the command as below. (Note: the compiler protocol
is not built-in and needs to be downloaded, please refer to https://grpc.io/docs/protoc-installation/)
mkdir core
protoc --go_out=./core \
--go-grpc_out=./core \
node.proto
After it is executed, you can see two Go code files under the directory core
, node.pb.go
and node_grpc.pb.go
respectively, which serve as the gRPC library.
gRPC Server
Now let's start writing server-side code.
Firstly create a new file core/node_service_server.go
, and enter the content below. It implemented the two gRPC service methods created before. The channel CmdChannel
in particular would transfer commands to be executed on worker nodes.
package core
import (
"context"
)
type NodeServiceGrpcServer struct {
UnimplementedNodeServiceServer
// channel to receive command
CmdChannel chan string
}
func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
return &Response{Data: "ok"}, nil
}
func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
for {
select {
case cmd := <-n.CmdChannel:
// receive command and send to worker node (client)
if err := server.Send(&Response{Data: cmd}); err != nil {
return err
}
}
}
}
var server *NodeServiceGrpcServer
// GetNodeServiceGrpcServer singleton service
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
if server == nil {
server = &NodeServiceGrpcServer{
CmdChannel: make(chan string),
}
}
return server
}
gRPC Client
We don't have to care too much about the implementation of gRPC client. Instead, we only need to call the methods in the gRPC client, and the rest of the service requests and response will be handled automatically by the program.
Master Node
After we implemented the node communication part, we can now write the master node, which is the core of the distributed system.
Let's create a new file node.go
and enter the content below.
package core
import (
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"net"
"net/http"
)
// MasterNode is the node instance
type MasterNode struct {
api *gin.Engine // api server
ln net.Listener // listener
svr *grpc.Server // grpc server
nodeSvr *NodeServiceGrpcServer // node service
}
func (n *MasterNode) Init() (err error) {
// TODO: implement me
panic("implement me")
}
func (n *MasterNode) Start() {
// TODO: implement me
panic("implement me")
}
var node *MasterNode
// GetMasterNode returns the node instance
func GetMasterNode() *MasterNode {
if node == nil {
// node
node = &MasterNode{}
// initialize node
if err := node.Init(); err != nil {
panic(err)
}
}
return node
}
There are two placeholder methods Init
and Start
to be implemented.
In the initialization method Init
, we will two things:
- Regster gRPC services
- Register API services
Now, we can add below code in Init
method.
func (n *MasterNode) Init() (err error) {
// grpc server listener with port as 50051
n.ln, err = net.Listen("tcp", ":50051")
if err != nil {
return err
}
// grpc server
n.svr = grpc.NewServer()
// node service
n.nodeSvr = GetNodeServiceGrpcServer()
// register node service to grpc server
RegisterNodeServiceServer(node.svr, n.nodeSvr)
// api
n.api = gin.Default()
n.api.POST("/tasks", func(c *gin.Context) {
// parse payload
var payload struct {
Cmd string `json:"cmd"`
}
if err := c.ShouldBindJSON(&payload); err != nil {
c.AbortWithStatus(http.StatusBadRequest)
return
}
// send command to node service
n.nodeSvr.CmdChannel <- payload.Cmd
c.AbortWithStatus(http.StatusOK)
})
return nil
}
Here we created a gRPC server, and registered NodeServiceGrpcServer
created before. We then use API framework gin
to create a simple API service, which can allow POST request to /tasks
to send commands to the channel CmdChannel
and pass to NodeServiceGrpcServer
. All pieces have been put together!
The starting method Start
is quite simple, which is simplely to start the gRPC server and API server.
func (n *MasterNode) Start() {
// start grpc server
go n.svr.Serve(n.ln)
// start api server
_ = n.api.Run(":9092")
// wait for exit
n.svr.Stop()
}
下一步,我们就要实现实际做任务的工作节点了。
Worker Node
现在,我们创建一个新文件 core/worker_node.go
,输入以下内容。
package core
import (
"context"
"google.golang.org/grpc"
"os/exec"
)
type WorkerNode struct {
conn *grpc.ClientConn // grpc client connection
c NodeServiceClient // grpc client
}
func (n *WorkerNode) Init() (err error) {
// connect to master node
n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
return err
}
// grpc client
n.c = NewNodeServiceClient(n.conn)
return nil
}
func (n *WorkerNode) Start() {
// log
fmt.Println("worker node started")
// report status
_, _ = n.c.ReportStatus(context.Background(), &Request{})
// assign task
stream, _ := n.c.AssignTask(context.Background(), &Request{})
for {
// receive command from master node
res, err := stream.Recv()
if err != nil {
return
}
// log command
fmt.Println("received command: ", res.Data)
// execute command
parts := strings.Split(res.Data, " ")
if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
fmt.Println(err)
}
}
}
var workerNode *WorkerNode
func GetWorkerNode() *WorkerNode {
if workerNode == nil {
// node
workerNode = &WorkerNode{}
// initialize node
if err := workerNode.Init(); err != nil {
panic(err)
}
}
return workerNode
}
In the above code, we created the gRPC client and connected it to the gRPC server in Init
method.
In Start
method, we have done several things:
- Report status with a Simple RPC method.
- Assign tasks to acquire a stream with a Server-Side RPC method.
- Continuously receive data from the server (master node) via the acquired stream and execute commands.
Now we have completed all core logics in the distributed systems.
Putting them all together
Finally, we need to encapsulate these core functionalities.
Create the main entry file main.go
and enter the content below.
package main
import (
"go-distributed-system/core"
"os"
)
func main() {
nodeType := os.Args[0]
switch nodeType {
case "master":
core.GetMasterNode().Start()
case "worker":
core.GetWorkerNode().Start()
default:
panic("invalid node type")
}
}
Now the simple distributed system is all done!
Final Results
We can then test the code.
Open two command prompts. Enter go run main.go master
in one prompt to start the master node, and enter go run main.go worker
to start the worker node in another.
If the master code starts successfully, you should be able to see the logs below.
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
- using env: export GIN_MODE=release
- using code: gin.SetMode(gin.ReleaseMode)
[GIN-debug] POST /tasks --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers)
[GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
[GIN-debug] Listening and serving HTTP on :9092
For worker node, you can see logs like this.
worker node started
After the master node and worker node have all started, we can open another command prompt to execute the command below to make an API call.
curl -X POST \
-H "Content-Type: application/json" \
-d '{"cmd": "touch /tmp/hello-distributed-system"}' \
http://localhost:9092/tasks
In the worker node logs, you should be able to see received command: touch /tmp/hello-distributed-system
.
Then let's check if the file has been created by executing ls -l /tmp/hello-distributed-system
.
-rw-r--r-- 1 marvzhang wheel 0B Oct 26 12:22 /tmp/hello-distributed-system
The file was successfully created, which means the worker node has executed the task successfully. Hooray!
Conclusion
This article introduced a way to develop a simple distributed system written in Golang, with gRPC and built-in Go channel.
Core libraries and techniques:
- gRPC
- Protocol Buffers
- channel
- gin
- os/exec
The code of the whole project is on GitHub: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system
Posted on October 26, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.