How to use ZeroMQ Pipeline Pattern in Golang

franciscomendes10866

Francisco Mendes

Posted on September 9, 2021

How to use ZeroMQ Pipeline Pattern in Golang

Overview

I bet many of us have thought about decopulating a backend and splitting it into microservices. Let's say you have a monolithic backend and then you decide to add something like file processing and you'd rather have a microservice that has the sole function of processing files.

But let's assume that you want to process several files simultaneously instead of one at a time, in this case I believe it would be ideal to distribute the work among several microservices responsible solely and exclusively for processing files.

To distribute the work among the different applications we will need an intermediary and the most popular solution is the use of a message broker. However, not all of us need a solution as advanced as the use of a message broker, it is in these specific cases (smaller applications) that I like to use ZeroMQ.

If you don't know ZeroMQ, that's okay because it's a technology that isn't widely shared in the community, so if you want to know more about ZeroMQ, I recommend reading this article, which will give you a better introduction than me.

Today's example

The idea of today's example is to create a simple application (server) that will send multiple messages to another application/s (worker) that will be responsible for just logging those same messages.

Let's code

As you may have already understood, we are going to have two backends. One of the backends we will call a server, which will be our message sender. The other backend will be the worker, which will be our small microservice.

First and foremost, let's install our dependencies:

go get github.com/pebbe/zmq4
Enter fullscreen mode Exit fullscreen mode

Now we can start working on our server but first I have to explain the pattern we are going to use today.

The Pipeline pattern, also known as Push/Pull, allows you to distribute tasks among several workers evenly, which are arranged in a certain pipeline.

Now that you have a little idea we can start by importing our client and configuring it:

// @/server/server.go
package main

import (
    zmq "github.com/pebbe/zmq4"
)

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUSH)
    s.Bind("tcp://*:5555")

    // ...
}
Enter fullscreen mode Exit fullscreen mode

Then let's create a for loop to send a total of one hundred messages to our worker/s. First let's log which message is being sent, then we'll send that same message using the s.Send() function.

But before sending the message we have to convert the integer number to string. Finally, to delay the sending of messages, let's add a timeout of five hundred milliseconds.

// @/server/server.go
package main

import (
    "fmt"
    "strconv"
    "time"

    zmq "github.com/pebbe/zmq4"
)

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUSH)
    s.Bind("tcp://*:5555")

    for i := 0; i < 100; i++ {
        fmt.Printf("Sending Job %d...\n", i)
        if _, err := s.Send("Job "+strconv.Itoa(i), 0); err != nil {
            panic(err)
        }
        time.Sleep(500 * time.Millisecond)
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we can start working on our worker. First let's import our client and configure it:

// @/worker/worker.go
package main

import (
    zmq "github.com/pebbe/zmq4"
)

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PULL)
    if err := s.Connect("tcp://localhost:5555"); err != nil {
        panic(err)
    }

    // ...
}
Enter fullscreen mode Exit fullscreen mode

In our worker we are going to add an infinite for loop so that it never stops its execution and what we are going to do is receive the messages from our server using the s.Recv() function. Finally, let's log each of the messages that our worker receives.

// @/worker/worker.go
package main

import (
    "fmt"

    zmq "github.com/pebbe/zmq4"
)

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PULL)
    if err := s.Connect("tcp://localhost:5555"); err != nil {
        panic(err)
    }

    for {
        if msg, err := s.Recv(0); err != nil {
            panic(err)
        } else {
            fmt.Printf("Received %s\n", msg)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The way we are going to test our project is very simple, we will have three windows in the terminal open and only one will be to start the server and the rest will be used by our workers. Like this:

testing article example

As you can see from the gif, the workers ran at different times but the tasks were evenly distributed between them.

In the same way as when I stopped the execution of one of the workers and then started it again, the tasks were evenly distributed again without any problem.

Conclusion

As always, I hope you found it interesting. If you noticed any errors in this article, please mention them in the comments. 🧑🏻‍💻

Hope you have a great day! ✌️

💖 💪 🙅 🚩
franciscomendes10866
Francisco Mendes

Posted on September 9, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related