Processamento assíncrono utilizando Go e RabbitMQ. Parte 2
Odilon Jonathan Kröger
Posted on June 28, 2020
Foto de capa: Mika Baumeister no site Unsplash.
Chegou a hora de ver código!!!
O código fonte se encontra em um monorepo.
Dentro do repositório você pode encontrar o Dockerfile e também o Makefile para conseguir buildar e rodar o projeto. As informações necessárias de como rodar se encontram no README.
Apenas para deixar claro, a intensão deste projeto é apenas exemplificar uma forma de processamento assíncrono. O código não foi feito pensando na melhor forma de reutilização de código, nem considera as configurações pensando em ambiente produtivo.
A ideia é passar o conceito (publicação anterior) e um exemplo de como começar a brincar com processamento assíncrono em sua casa! :)
Exemplo de microsserviço Estoque
Nesta publicação vou abordar a explicação sobre o Estoque.
Para facilitar o entendimento, sugiro deixar aberto em uma aba ao lado ou em seu editor o código fonte que está aqui.
O exemplo está dividido de forma bastante simples. Um arquivo main.go
na raiz e outro pacote rabbit.go
dentro do diretório pkg
.
Pacote rabbit.go 🐰
Este pacote é bastante simples. Nele fica a implementação que faz comunicação com a API do RabbitMQ.
A implementação é baseada nos exemplos contidos na documentação oficial do RabbitMQ.
func NewDefaultChannel() *amqp.Channel {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open channel")
return ch
}
Esta função inicia uma conexão com o RabbitMQ, utilizando o usuário guest
, no host/port localhost:5272
.
É feita a validação se retornou algum erro, em seguida é aberto um canal de comunicação com o RabbitMQ, chamando conn.Channel()
.
Após nova verificação para ver se houve falha, é retornado o canal de comunicação com o RabbitMQ.
As próximas funções são bastante simples. Elas são utilizadas no arquivo main.go
para declarar exchange
e queue
.
Elas recebem por parâmetro informações como nome, tipo e argumentos de configuração.
func DeclareExchange(ch *amqp.Channel, n, t string, args amqp.Table) {
err := ch.ExchangeDeclare(
n,
t,
true,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare exchange "+n)
}
func DeclareQueue(ch *amqp.Channel, n string, args amqp.Table) {
_, err := ch.QueueDeclare(
n,
true, false,
false,
false,
args,
)
failOnError(err, "Failed to declare queue "+n)
}
A próxima função é utilizada quando queremos criar a ligação entre uma exchange
e uma queue
.
func BindQueue(ch *amqp.Channel, e, q, k string) {
err := ch.QueueBind(
q,
k,
e,
false,
nil)
failOnError(err, "Failed to bind "+e+" with "+q)
}
Nesta função recebemos o nome da exchange
, o nome da queue
a uma chave de roteamento k
.
Essa chave de roteamento é que faz a exhange
saber direcionar uma mensagem para a queue
correta.
O RabbitMQ permite configurar de diversas formas essa ligação entre exhanges
, queues
e routingKeys
.
Mais detalhes você pode encontrar na documentação de roteamento do RabbitMQ.
func Publish(ch *amqp.Channel, m []byte, e, k string) {
err := ch.Publish(
e,
k,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: m,
})
failOnError(err, "Failed to publish on exchange "+e)
}
Finalmente temos a função que envia uma mensagem para o RabbitMQ.
O parâmetro m
é uma mensagem no formato JSON, e
é o nome da exchange
que receberá a mensagem e k
é a chave de roteamento.
Agora vamos ver o arquivo main.go
A primeira coisa que vamos olhar é a declaração de constantes.
const (
productionOrderExchange = "productionorder"
deadLetterExchange = "productionorder.dlx"
createQueue = "create.productionorder"
deadLetterQueue = "create.productionorder.dlx"
routingKey = "create"
)
Aqui temos declarados os nomes de todas exchanges
, queues
e routingKey
utilizados no microsserviço.
Para deixar mais fácil de encotrar as coisas no Management do RabbitMQ, deixei dlx
como sufixo para a exchange
e queue
de dead-letter.
type message struct {
RefCode uuid.UUID
ColorCode uuid.UUID
Quantity int
}
Esta é a estrutura da mensagem enviada para o RabbitMQ. Temos um RefCode
que é o código do pedido.
O atributo ColorCode
é utilizado para identificação da cor que deve ser produzida e quantity
é simplesmente a quantidade de esmaltes que deve ser produzida.
Entendendo a função main()
Antes de mais nada estou criando um canal de comunicação com o RabbitMQ, fazendo uso do pacote declarado em rabbit.go
.
ch := rabbit.NewDefaultChannel()
defer ch.Close()
Em seguida eu trato da declaração das exchanges
.
exchangeArgs := make(amqp.Table)
exchangeArgs["x-delayed-type"] = "direct"
rabbit.DeclareExchange(ch, productionOrderExchange, "x-delayed-message", exchangeArgs)
rabbit.DeclareExchange(ch, deadLetterExchange, "direct", nil)
Eu crio um mapa de argumentos em make(amqp.Table)
.
Para conseguir utilizar o plugin Delayed Message, é preciso adicionar o argumento x-delayed-type
com o valor direct
.
Seria basicamente como criar uma exchange
do tipo direct
, mas que saiba trabalhar com delay nas mensagens.
Depois eu declaro as duas exchanges
necessárias para esse projeto. Uma contendo os argumentos para trabalhar com delay, outra para tratar as dead-letters.
Na deadLetterExchange
eu não adiciono a configuração de delay, porque não quero esse comportamento sendo executado para a DLQ.
queueArgs := make(amqp.Table)
queueArgs["x-max-length"] = 3
queueArgs["x-dead-letter-exchange"] = deadLetterExchange
rabbit.DeclareQueue(ch, createQueue, queueArgs)
rabbit.DeclareQueue(ch, deadLetterQueue, nil)
Agora eu crio os argumentos de configuração para fila. O argumento x-max-length
configura qual o tamanho da fila. Nesse exemplo, caso a fila já tenha 3 mensagens aguardando processamento, novas mensagens recebidas são automaticamente direcionadas para a DLQ.
O argumento x-dead-letter-exchange
é que informa qual é a exchange
responsável pela DLQ.
Em seguida, eu faço a declaração das duas queues
: a normal, com os argumentos de configuração, e a deadLetterQueue
, que é a DLQ do projeto.
rabbit.BindQueue(ch, productionOrderExchange, createQueue, routingKey)
rabbit.BindQueue(ch, deadLetterExchange, deadLetterQueue, routingKey)
Este é o código responsável por criar a ligação entre as exchanges
, queues
e routingKey
.
E finalmente, vamos para a publicação da mensagem.
m := createMsgAsJSON()
rabbit.Publish(ch, m, productionOrderExchange, routingKey)
Eu crio uma mensagem com a função createMsgAsJSON()
, que cria dados aleatórios e retorna a estrutura message
no formato JSON.
Essa mensagem então é enviada para o RabbitMQ. A função rabbit.Publish
recebe a mensagem, a exchange
que deve tratar a mensagem e também a routingKey
.
Aqui neste exemplo estamos apenas tratando da criação de pedidos, mas poderíamos ter outras chaves de roteamento, como delete
e update
. Assim como o caso das exchanges
e queues
, o nome das chaves de roteamento é você que escolhe conforme sua necessidade. :)
Conclusão
A solução ficou maior do que utilizar uma simples requisição HTTP, mas acaba não sendo um bicho de sete cabeças.
Poder trabalhar com o Estoque
, sem sofrer impacto negativo no caso da OrdemDeProducao
estar fora do ar, é um grande ganho!
Ao rodar o Estoque
, é possível ver na prática que podemos disparar N mensagens sem nos preocuparmos com a OrdemDeProducao
.
Na próxima publicação da série vou comentar sobre a implementação da OrdemDeProducao
. Até lá! o/
Posted on June 28, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.