Processamento assíncrono utilizando Go e RabbitMQ. Parte 2

odilonjk

Odilon Jonathan Kröger

Posted on June 28, 2020

Processamento assíncrono utilizando Go e RabbitMQ. Parte 2

Foto de capa: Mika Baumeister no site Unsplash.


Chegou a hora de ver código!!!

O código fonte se encontra em um monorepo.

Dentro do repositório você pode encontrar o Dockerfile e também o Makefile para conseguir buildar e rodar o projeto. As informações necessárias de como rodar se encontram no README.

Apenas para deixar claro, a intensão deste projeto é apenas exemplificar uma forma de processamento assíncrono. O código não foi feito pensando na melhor forma de reutilização de código, nem considera as configurações pensando em ambiente produtivo.
A ideia é passar o conceito (publicação anterior) e um exemplo de como começar a brincar com processamento assíncrono em sua casa! :)


Exemplo de microsserviço Estoque

Nesta publicação vou abordar a explicação sobre o Estoque.
Para facilitar o entendimento, sugiro deixar aberto em uma aba ao lado ou em seu editor o código fonte que está aqui.

O exemplo está dividido de forma bastante simples. Um arquivo main.go na raiz e outro pacote rabbit.go dentro do diretório pkg.

Pacote rabbit.go 🐰

Este pacote é bastante simples. Nele fica a implementação que faz comunicação com a API do RabbitMQ.
A implementação é baseada nos exemplos contidos na documentação oficial do RabbitMQ.

func NewDefaultChannel() *amqp.Channel {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    ch, err := conn.Channel()
    failOnError(err, "Failed to open channel")
    return ch
}
Enter fullscreen mode Exit fullscreen mode

Esta função inicia uma conexão com o RabbitMQ, utilizando o usuário guest, no host/port localhost:5272.
É feita a validação se retornou algum erro, em seguida é aberto um canal de comunicação com o RabbitMQ, chamando conn.Channel().
Após nova verificação para ver se houve falha, é retornado o canal de comunicação com o RabbitMQ.

As próximas funções são bastante simples. Elas são utilizadas no arquivo main.go para declarar exchange e queue.
Elas recebem por parâmetro informações como nome, tipo e argumentos de configuração.

func DeclareExchange(ch *amqp.Channel, n, t string, args amqp.Table) {
    err := ch.ExchangeDeclare(
        n,
        t,
        true,
        false,
        false,
        false,
        args,
    )
    failOnError(err, "Failed to declare exchange "+n)
}

func DeclareQueue(ch *amqp.Channel, n string, args amqp.Table) {
    _, err := ch.QueueDeclare(
        n,
        true, false,
        false,
        false,
        args,
    )
    failOnError(err, "Failed to declare queue "+n)
}
Enter fullscreen mode Exit fullscreen mode

A próxima função é utilizada quando queremos criar a ligação entre uma exchange e uma queue.

func BindQueue(ch *amqp.Channel, e, q, k string) {
    err := ch.QueueBind(
        q,
        k,
        e,
        false,
        nil)
    failOnError(err, "Failed to bind "+e+" with "+q)
}
Enter fullscreen mode Exit fullscreen mode

Nesta função recebemos o nome da exchange, o nome da queue a uma chave de roteamento k.
Essa chave de roteamento é que faz a exhange saber direcionar uma mensagem para a queue correta.
O RabbitMQ permite configurar de diversas formas essa ligação entre exhanges, queues e routingKeys.
Mais detalhes você pode encontrar na documentação de roteamento do RabbitMQ.

func Publish(ch *amqp.Channel, m []byte, e, k string) {
    err := ch.Publish(
        e,
        k,
        false,
        false,
        amqp.Publishing{
            ContentType: "application/json",
            Body:        m,
        })
    failOnError(err, "Failed to publish on exchange "+e)
}
Enter fullscreen mode Exit fullscreen mode

Finalmente temos a função que envia uma mensagem para o RabbitMQ.
O parâmetro m é uma mensagem no formato JSON, e é o nome da exchange que receberá a mensagem e k é a chave de roteamento.

Agora vamos ver o arquivo main.go

A primeira coisa que vamos olhar é a declaração de constantes.

const (
    productionOrderExchange = "productionorder"
    deadLetterExchange      = "productionorder.dlx"
    createQueue             = "create.productionorder"
    deadLetterQueue         = "create.productionorder.dlx"
    routingKey              = "create"
)
Enter fullscreen mode Exit fullscreen mode

Aqui temos declarados os nomes de todas exchanges, queues e routingKey utilizados no microsserviço.
Para deixar mais fácil de encotrar as coisas no Management do RabbitMQ, deixei dlx como sufixo para a exchange e queue de dead-letter.

type message struct {
    RefCode   uuid.UUID
    ColorCode uuid.UUID
    Quantity  int
}
Enter fullscreen mode Exit fullscreen mode

Esta é a estrutura da mensagem enviada para o RabbitMQ. Temos um RefCode que é o código do pedido.
O atributo ColorCode é utilizado para identificação da cor que deve ser produzida e quantity é simplesmente a quantidade de esmaltes que deve ser produzida.

Entendendo a função main()

Antes de mais nada estou criando um canal de comunicação com o RabbitMQ, fazendo uso do pacote declarado em rabbit.go.

    ch := rabbit.NewDefaultChannel()
    defer ch.Close()
Enter fullscreen mode Exit fullscreen mode

Em seguida eu trato da declaração das exchanges.

    exchangeArgs := make(amqp.Table)
    exchangeArgs["x-delayed-type"] = "direct"

    rabbit.DeclareExchange(ch, productionOrderExchange, "x-delayed-message", exchangeArgs)
    rabbit.DeclareExchange(ch, deadLetterExchange, "direct", nil)
Enter fullscreen mode Exit fullscreen mode

Eu crio um mapa de argumentos em make(amqp.Table).
Para conseguir utilizar o plugin Delayed Message, é preciso adicionar o argumento x-delayed-type com o valor direct.
Seria basicamente como criar uma exchange do tipo direct, mas que saiba trabalhar com delay nas mensagens.

Depois eu declaro as duas exchanges necessárias para esse projeto. Uma contendo os argumentos para trabalhar com delay, outra para tratar as dead-letters.
Na deadLetterExchange eu não adiciono a configuração de delay, porque não quero esse comportamento sendo executado para a DLQ.

    queueArgs := make(amqp.Table)
    queueArgs["x-max-length"] = 3
    queueArgs["x-dead-letter-exchange"] = deadLetterExchange

    rabbit.DeclareQueue(ch, createQueue, queueArgs)
    rabbit.DeclareQueue(ch, deadLetterQueue, nil)
Enter fullscreen mode Exit fullscreen mode

Agora eu crio os argumentos de configuração para fila. O argumento x-max-length configura qual o tamanho da fila. Nesse exemplo, caso a fila já tenha 3 mensagens aguardando processamento, novas mensagens recebidas são automaticamente direcionadas para a DLQ.
O argumento x-dead-letter-exchange é que informa qual é a exchange responsável pela DLQ.

Em seguida, eu faço a declaração das duas queues: a normal, com os argumentos de configuração, e a deadLetterQueue, que é a DLQ do projeto.

    rabbit.BindQueue(ch, productionOrderExchange, createQueue, routingKey)
    rabbit.BindQueue(ch, deadLetterExchange, deadLetterQueue, routingKey)
Enter fullscreen mode Exit fullscreen mode

Este é o código responsável por criar a ligação entre as exchanges, queues e routingKey.

E finalmente, vamos para a publicação da mensagem.

    m := createMsgAsJSON()
    rabbit.Publish(ch, m, productionOrderExchange, routingKey)
Enter fullscreen mode Exit fullscreen mode

Eu crio uma mensagem com a função createMsgAsJSON(), que cria dados aleatórios e retorna a estrutura message no formato JSON.
Essa mensagem então é enviada para o RabbitMQ. A função rabbit.Publish recebe a mensagem, a exchange que deve tratar a mensagem e também a routingKey.
Aqui neste exemplo estamos apenas tratando da criação de pedidos, mas poderíamos ter outras chaves de roteamento, como delete e update. Assim como o caso das exchanges e queues, o nome das chaves de roteamento é você que escolhe conforme sua necessidade. :)


Conclusão

A solução ficou maior do que utilizar uma simples requisição HTTP, mas acaba não sendo um bicho de sete cabeças.
Poder trabalhar com o Estoque, sem sofrer impacto negativo no caso da OrdemDeProducao estar fora do ar, é um grande ganho!

Ao rodar o Estoque, é possível ver na prática que podemos disparar N mensagens sem nos preocuparmos com a OrdemDeProducao.

Na próxima publicação da série vou comentar sobre a implementação da OrdemDeProducao. Até lá! o/

💖 💪 🙅 🚩
odilonjk
Odilon Jonathan Kröger

Posted on June 28, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related