Golang: consuming data in reactive microservices

seb7887

Sebastian Segura

Posted on August 11, 2021

Golang: consuming data in reactive microservices

Introduction

In this article we are going to discuss how to use Golang to show a simple strategy to implement a data ingestion service in a reactive microservices architecture.

Why Microservices?

A microservices architecture is based on the single responsible principle, which states "gather together those things that change for the same reason, and separate those things that change for different reasons", and extends it to the loosely coupled services which can be developed, deployed, and maintained independently. Each of these services is responsible for discrete task and can communicate with other services in different ways to solve a larger complex business problem.

Reactive Microservices

Reactive systems are based on a message driven architecture. Asynchronous non-blocking messaging allows us to decouple Reactive microservices in time and failure. It means that microservices are not dependent on the response from each other. If a request to a microservice fails, the failure won't propagate, so the client service can continue to operate waiting for the response.

Why Golang to build Microservices?

You can build microservices practically in any language, after all, microservices are a concept rather than a specific framework or tool. That being said, some languages are better suited and, or have better support for microservices than others. One of them is Golang.

Golang is very light-weight, very fast, and has a fantastic support for concurrency, which is a powerful capability when running across several machines and cores.

A data ingestion example

With all this said, we can think about a simple example of a Golang data ingestion service as a reactive microservice example. We are going to use RabbitMQ as a message broker. Our service is going to consume every new message from the queue and perform operations on it.

We can start with a base RabbitMQ setup on which we will work.

// consumer.go
package consumer

import (
  log "github.com/sirupsen/logrus"
  "github.com/streadway/amqp"
)

const (
  AMQP_URL = "<YOUR_RABBITMQ_URL>"
)

type Consumer interface {
  InitConsumer() error
}

type consumer struct {}

func New() Consumer {
  return &consumer{}
}

func (c *consumer) InitConsumer() error {
  // conn
    conn, err := amqp.Dial(AMQP_URL)
    if err != nil {
        log.Fatalf("ERROR: fail init RabbitMQ consumer %s", err.Error())
        return err
    }

    // create channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("ERROR: fail to create a channel %s", err.Error())
        return err
    }

    // create queue
    queue, err := ch.QueueDeclare(
        consumerCh, // channel name
        true,       // durable
        false,      // delete when unused
        false,      // exclusive
        false,      // no-wait
        nil,        // arguments
    )
    if err != nil {
        log.Fatalf("ERROR: fail to create a queue %s", err.Error())
        return err
    }

    // channel
    msgChannel, err := ch.Consume(
        queue.Name, // queue
        "",         // consumer
        false,      // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // arguments
    )
    if err != nil {
        log.Fatalf("ERROR: fail to create a message channel %s", err.Error())
        return err
    }

  // TODO: consume messages from the queue
}
Enter fullscreen mode Exit fullscreen mode

Producer-Consumer pattern

A Golang channel is a typed value that allows goroutines to synchronize and exchange information.

In our previous example, we declared a message channel to get the data from RabbitMQ. We are going to use this channel to implement the consumer part of the producer-consumer pattern.

The producer-consumer pattern is a pattern in which one or more consumers carry out a job created by a producer. The producer and consumers exchange jobs by sharing one or more channels.

// consumer.go
package consumer

import (
  log "github.com/sirupsen/logrus"
  "github.com/streadway/amqp"
)

const (
  AMQP_URL = "<YOUR_RABBITMQ_URL>"
)

type Consumer interface {
  InitConsumer() error
}

type consumer struct {}

func New() Consumer {
  return &consumer{}
}

func (c *consumer) InitConsumer() error {
  // conn
    conn, err := amqp.Dial(AMQP_URL)
    if err != nil {
        log.Fatalf("ERROR: fail init RabbitMQ consumer %s", err.Error())
        return err
    }

    // create channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("ERROR: fail to create a channel %s", err.Error())
        return err
    }

    // create queue
    queue, err := ch.QueueDeclare(
        consumerCh, // channel name
        true,       // durable
        false,      // delete when unused
        false,      // exclusive
        false,      // no-wait
        nil,        // arguments
    )
    if err != nil {
        log.Fatalf("ERROR: fail to create a queue %s", err.Error())
        return err
    }

    // channel
    msgChannel, err := ch.Consume(
        queue.Name, // queue
        "",         // consumer
        false,      // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // arguments
    )
    if err != nil {
        log.Fatalf("ERROR: fail to create a message channel %s", err.Error())
        return err
    }

  // consume messages from the queue
  for {
    select {
    case msg := <-msgChannel:
      log.Debugf("received msg: %s", msg.Body)

        // parse message
            var parsedMsg m.Msg
            err = json.Unmarshal(msg.Body, &parsedMsg)
            if err != nil {
                log.Errorf("fail to parse message %s", err.Error())
            }

            // ack for message
            err = msg.Ack(true)
            if err != nil {
                log.Errorf("fail to ack: %s", err.Error())
                return err
            }

            // TODO: handle message
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Worker pool pattern

In Golang we can achieve concurrency by using goroutines. Goroutines are independently executing functions in the same address space.

So, what is a worker pool?

Worker pool is a pattern to achieve concurrency using a fixed number of workers to execute multiple tasks on a queue. We use goroutines to spawn a worker and implement the queue using a channel. The defined number of workers will pull a task from the queue, and finish up it. When the task has been done, the worker will keep pulling the new one until the queue is empty.

We are going to implement our worker pool in a separate package.

// pool.go
package pool

import (
  "context"
  "github.com/segmentio/fasthash/fnv1a"
)

type WorkerPool struct {
    maxWorkers  int
    taskQueue   []chan func()
    stoppedChan chan struct{}
    ctx         context.Context
    cancel      context.CancelFunc
}

func New(maxWorkers int) *WorkerPool {
    // there must be at least one worker
    if maxWorkers < 1 {
        maxWorkers = 1
    }

    ctx, cancel := context.WithCancel(context.Background())

    // taskQueue is unbuffered since items are always removed immediately
    pool := &WorkerPool{
        taskQueue:   make([]chan func(), maxWorkers),
        maxWorkers:  maxWorkers,
        stoppedChan: make(chan struct{}),
        ctx:         ctx,
        cancel:      cancel,
    }

    // start task dispatcher
    pool.dispatch()

    return pool
}

func (p *WorkerPool) Stop() {
    p.cancel()
}

func (p *WorkerPool) Submit(uid string, task func()) {
    idx := fnv1a.HashString64(uid) % uint64(p.maxWorkers)
    if task != nil {
        p.taskQueue[idx] <- task
    }
}

func (p *WorkerPool) dispatch() {
    for i := 0; i < p.maxWorkers; i++ {
        p.taskQueue[i] = make(chan func())
        go startWorker(p.taskQueue[i], p.ctx)
    }
}

func startWorker(taskChan chan func(), ctx context.Context) {
    go func() {
        var task func()
        for {
            select {
            case task = <-taskChan:
                // execute the task
                task()
            case <-ctx.Done():
                return
            }
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

Finally, we will modify our consumer package to use the worker pool to handle each message received.

// consumer.go
package consumer

import (
  "github.com/seb7887/consumer/internal/pool"
  log "github.com/sirupsen/logrus"
  "github.com/streadway/amqp"
)

const (
  AMQP_URL = "<YOUR_RABBITMQ_URL>"
  NUM_OF_WORKERS = 4096
)

type Consumer interface {
  InitConsumer() error
  SubmitWork(*m.Msg)
}

type consumer struct {
  wpool  *pool.WorkerPool
}

func New() Consumer {
  return &consumer{
    wpool: pool.New(NUM_OF_WORKERS),
  }
}

func (c *consumer) InitConsumer() error {
  // conn
    conn, err := amqp.Dial(AMQP_URL)
    if err != nil {
        log.Fatalf("ERROR: fail init RabbitMQ consumer %s", err.Error())
        return err
    }

    // create channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("ERROR: fail to create a channel %s", err.Error())
        return err
    }

    // create queue
    queue, err := ch.QueueDeclare(
        consumerCh, // channel name
        true,       // durable
        false,      // delete when unused
        false,      // exclusive
        false,      // no-wait
        nil,        // arguments
    )
    if err != nil {
        log.Fatalf("ERROR: fail to create a queue %s", err.Error())
        return err
    }

    // channel
    msgChannel, err := ch.Consume(
        queue.Name, // queue
        "",         // consumer
        false,      // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // arguments
    )
    if err != nil {
        log.Fatalf("ERROR: fail to create a message channel %s", err.Error())
        return err
    }

  // consume messages from the queue
  for {
    select {
    case msg := <-msgChannel:
      log.Debugf("received msg: %s", msg.Body)

        // parse message
            var parsedMsg m.Msg
            err = json.Unmarshal(msg.Body, &parsedMsg)
            if err != nil {
                log.Errorf("fail to parse message %s", err.Error())
            }

            // ack for message
            err = msg.Ack(true)
            if err != nil {
                log.Errorf("fail to ack: %s", err.Error())
                return err
            }

            // handle message
      c.SubmitWork(&parsedMessage)
    }
  }
}

func (c *consumer) SubmitWork(msg *m.Msg) {
    if c.wpool != nil {
        c.wpool = pool.New(config.GetConfig().NumOfWorkers)
    }

    uuidWithHypen := uuid.New()
    uuid := strings.Replace(uuidWithHypen.String(), "-", "", -1)

    c.wpool.Submit(uuid, func() {
        // Perform some operation...
    })
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

It's easy to implement a data ingestion service using Golang and its basic units, like channels and goroutines.

💖 💪 🙅 🚩
seb7887
Sebastian Segura

Posted on August 11, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related