Golang: consuming data in reactive microservices
Sebastian Segura
Posted on August 11, 2021
Introduction
In this article we are going to discuss how to use Golang to show a simple strategy to implement a data ingestion service in a reactive microservices architecture.
Why Microservices?
A microservices architecture is based on the single responsible principle, which states "gather together those things that change for the same reason, and separate those things that change for different reasons", and extends it to the loosely coupled services which can be developed, deployed, and maintained independently. Each of these services is responsible for discrete task and can communicate with other services in different ways to solve a larger complex business problem.
Reactive Microservices
Reactive systems are based on a message driven architecture. Asynchronous non-blocking messaging allows us to decouple Reactive microservices in time and failure. It means that microservices are not dependent on the response from each other. If a request to a microservice fails, the failure won't propagate, so the client service can continue to operate waiting for the response.
Why Golang to build Microservices?
You can build microservices practically in any language, after all, microservices are a concept rather than a specific framework or tool. That being said, some languages are better suited and, or have better support for microservices than others. One of them is Golang.
Golang is very light-weight, very fast, and has a fantastic support for concurrency, which is a powerful capability when running across several machines and cores.
A data ingestion example
With all this said, we can think about a simple example of a Golang data ingestion service as a reactive microservice example. We are going to use RabbitMQ as a message broker. Our service is going to consume every new message from the queue and perform operations on it.
We can start with a base RabbitMQ setup on which we will work.
// consumer.go
package consumer
import (
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
const (
AMQP_URL = "<YOUR_RABBITMQ_URL>"
)
type Consumer interface {
InitConsumer() error
}
type consumer struct {}
func New() Consumer {
return &consumer{}
}
func (c *consumer) InitConsumer() error {
// conn
conn, err := amqp.Dial(AMQP_URL)
if err != nil {
log.Fatalf("ERROR: fail init RabbitMQ consumer %s", err.Error())
return err
}
// create channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("ERROR: fail to create a channel %s", err.Error())
return err
}
// create queue
queue, err := ch.QueueDeclare(
consumerCh, // channel name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("ERROR: fail to create a queue %s", err.Error())
return err
}
// channel
msgChannel, err := ch.Consume(
queue.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("ERROR: fail to create a message channel %s", err.Error())
return err
}
// TODO: consume messages from the queue
}
Producer-Consumer pattern
A Golang channel is a typed value that allows goroutines to synchronize and exchange information.
In our previous example, we declared a message channel to get the data from RabbitMQ. We are going to use this channel to implement the consumer part of the producer-consumer pattern.
The producer-consumer pattern is a pattern in which one or more consumers carry out a job created by a producer. The producer and consumers exchange jobs by sharing one or more channels.
// consumer.go
package consumer
import (
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
const (
AMQP_URL = "<YOUR_RABBITMQ_URL>"
)
type Consumer interface {
InitConsumer() error
}
type consumer struct {}
func New() Consumer {
return &consumer{}
}
func (c *consumer) InitConsumer() error {
// conn
conn, err := amqp.Dial(AMQP_URL)
if err != nil {
log.Fatalf("ERROR: fail init RabbitMQ consumer %s", err.Error())
return err
}
// create channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("ERROR: fail to create a channel %s", err.Error())
return err
}
// create queue
queue, err := ch.QueueDeclare(
consumerCh, // channel name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("ERROR: fail to create a queue %s", err.Error())
return err
}
// channel
msgChannel, err := ch.Consume(
queue.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("ERROR: fail to create a message channel %s", err.Error())
return err
}
// consume messages from the queue
for {
select {
case msg := <-msgChannel:
log.Debugf("received msg: %s", msg.Body)
// parse message
var parsedMsg m.Msg
err = json.Unmarshal(msg.Body, &parsedMsg)
if err != nil {
log.Errorf("fail to parse message %s", err.Error())
}
// ack for message
err = msg.Ack(true)
if err != nil {
log.Errorf("fail to ack: %s", err.Error())
return err
}
// TODO: handle message
}
}
}
Worker pool pattern
In Golang we can achieve concurrency by using goroutines. Goroutines are independently executing functions in the same address space.
So, what is a worker pool?
Worker pool is a pattern to achieve concurrency using a fixed number of workers to execute multiple tasks on a queue. We use goroutines to spawn a worker and implement the queue using a channel. The defined number of workers will pull a task from the queue, and finish up it. When the task has been done, the worker will keep pulling the new one until the queue is empty.
We are going to implement our worker pool in a separate package.
// pool.go
package pool
import (
"context"
"github.com/segmentio/fasthash/fnv1a"
)
type WorkerPool struct {
maxWorkers int
taskQueue []chan func()
stoppedChan chan struct{}
ctx context.Context
cancel context.CancelFunc
}
func New(maxWorkers int) *WorkerPool {
// there must be at least one worker
if maxWorkers < 1 {
maxWorkers = 1
}
ctx, cancel := context.WithCancel(context.Background())
// taskQueue is unbuffered since items are always removed immediately
pool := &WorkerPool{
taskQueue: make([]chan func(), maxWorkers),
maxWorkers: maxWorkers,
stoppedChan: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
// start task dispatcher
pool.dispatch()
return pool
}
func (p *WorkerPool) Stop() {
p.cancel()
}
func (p *WorkerPool) Submit(uid string, task func()) {
idx := fnv1a.HashString64(uid) % uint64(p.maxWorkers)
if task != nil {
p.taskQueue[idx] <- task
}
}
func (p *WorkerPool) dispatch() {
for i := 0; i < p.maxWorkers; i++ {
p.taskQueue[i] = make(chan func())
go startWorker(p.taskQueue[i], p.ctx)
}
}
func startWorker(taskChan chan func(), ctx context.Context) {
go func() {
var task func()
for {
select {
case task = <-taskChan:
// execute the task
task()
case <-ctx.Done():
return
}
}
}()
}
Finally, we will modify our consumer package to use the worker pool to handle each message received.
// consumer.go
package consumer
import (
"github.com/seb7887/consumer/internal/pool"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
const (
AMQP_URL = "<YOUR_RABBITMQ_URL>"
NUM_OF_WORKERS = 4096
)
type Consumer interface {
InitConsumer() error
SubmitWork(*m.Msg)
}
type consumer struct {
wpool *pool.WorkerPool
}
func New() Consumer {
return &consumer{
wpool: pool.New(NUM_OF_WORKERS),
}
}
func (c *consumer) InitConsumer() error {
// conn
conn, err := amqp.Dial(AMQP_URL)
if err != nil {
log.Fatalf("ERROR: fail init RabbitMQ consumer %s", err.Error())
return err
}
// create channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("ERROR: fail to create a channel %s", err.Error())
return err
}
// create queue
queue, err := ch.QueueDeclare(
consumerCh, // channel name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("ERROR: fail to create a queue %s", err.Error())
return err
}
// channel
msgChannel, err := ch.Consume(
queue.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("ERROR: fail to create a message channel %s", err.Error())
return err
}
// consume messages from the queue
for {
select {
case msg := <-msgChannel:
log.Debugf("received msg: %s", msg.Body)
// parse message
var parsedMsg m.Msg
err = json.Unmarshal(msg.Body, &parsedMsg)
if err != nil {
log.Errorf("fail to parse message %s", err.Error())
}
// ack for message
err = msg.Ack(true)
if err != nil {
log.Errorf("fail to ack: %s", err.Error())
return err
}
// handle message
c.SubmitWork(&parsedMessage)
}
}
}
func (c *consumer) SubmitWork(msg *m.Msg) {
if c.wpool != nil {
c.wpool = pool.New(config.GetConfig().NumOfWorkers)
}
uuidWithHypen := uuid.New()
uuid := strings.Replace(uuidWithHypen.String(), "-", "", -1)
c.wpool.Submit(uuid, func() {
// Perform some operation...
})
}
Conclusion
It's easy to implement a data ingestion service using Golang and its basic units, like channels and goroutines.
Posted on August 11, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.