Usando mensageria com Amazon SQS + Node

antiduhring

Mateus Vinícius

Posted on May 11, 2023

Usando mensageria com Amazon SQS + Node

O que é um serviço de mensageria

O serviço de mensageria é uma forma de trocar informações entre diferentes partes de uma aplicação ou sistema de forma organizada e eficiente. Ele ajuda a separar as partes da aplicação e facilita a sua manutenção e desenvolvimento.

Imaginando uma aplicação de e-commerce em que o serviço de compra de produtos fica separado do serviço que processa o pagamento e efetivamente realiza a transação entre as contas bancárias, a forma mais segura e efetiva de realizar a comunicação entre eles é via mensageria.

Com mensageria, o serviço de compra de produtos vai enviar uma mensagem para uma fila com as informações de transação, enquanto o serviço de pagamento vai buscar as mensagens nesta fila e realizar as transações. Dessa forma é possível garantir que, mesmo que o serviço de compra de produtos caia, ou esteja passando por uma demanda gigantesca que possa comprometer o processamento de novas requisições, os pagamentos ainda assim serão processados de forma correta. Isso também garante que, mesmo havendo milhares de pagamentos na fila para serem processados, o usuário não precisará aguardar a transação ser realizada, já que uma vez enviado o pagamento para fila, é garantido que ele será consumido pelo serviço de processamento do pagamento, independemente do tempo que demore.

Objetivo neste tutorial

Neste tutorial iremos criar um exemplo básico de um serviço de e-commerce (producer) que se comunica com um serviço de pagamento (consumer), e toda a comunicação será feita via mensageria.

Setup

Como o objetivo é entender o funcionamento de um serviço de mensageria utilizando Amazon SQS, não iremos nos aprofundar tanto na criação dos serviços de e-commerce e pagamento. Em vez disso, vamos nos concentrar na comunicação entre os dois usando como base um projeto hospedado no repositório: https://github.com/anti-duhring/amazon-sqs-tutorial

Abra o terminal e digite:

git clone https://github.com/anti-duhring/amazon-sqs-tutorial.git
Enter fullscreen mode Exit fullscreen mode

Após isso, instale as dependências da aplicação com o comando:

npm install
Enter fullscreen mode Exit fullscreen mode

Executando a aplicação

Iremos executar ambos os serviços: producer e consumer.

Vá até o root da pasta local do repositório que você acabou de clonar e abra dois terminais.

Em um deles, digite o comando:

npm run start:producer
Enter fullscreen mode Exit fullscreen mode

No outro, digite o comando:

npm run start:consumer
Enter fullscreen mode Exit fullscreen mode

Entendendo o funcionamento

A aplicação consiste em dois serviços: producer e consumer.

  • Producer: Uma simulação de um e-commerce, que possui uma rota /request-transfer, cujo objetivo é enviar ao serviço de pagamento (consumer) uma requisição com o número da conta que fará o pagamento, o número da conta que receberá o pagamento e a quantia de dinheiro a ser transferido.
  • Consumer: Um exemplo de um serviço de pagamento que recebe as informações por meio da rota /process-transfer e realiza a transação.

Por ora a comunicação entre os dois serviços está sendo feita de forma direta, ou seja, dentro da própria rota /request-transfer o producer recebe as informações do pagamento e já dispara uma requisição para a rota /process-transfer do consumer, que realiza a transação.

Agora, vamos adaptar o projeto para que o producer não precise mais se comunicar diretamente com o consumer. Em vez disso, o producer será responsável apenas por enviar os pedidos de transação para uma fila, e o consumer será responsável apenas por buscar esses pedidos constantemente e processá-los.

Criando nossa fila principal

Vá até o serviço Amazon SQS no seu console da AWS.

Pesquisando por SQS na barra de pesquisa do console da AWS

Após acessar a página do Amazon SQS, clique no botão "Create queue" (criar fila).

Clicando no botão "Create queue" na página do Amazon SQS da AWS

Dentro da página de criação da fila você verá uma série de opções para configuração. Entre elas, destaco aqui o type (tipo) da fila:

Escolhendo o tipo da fila dentro da página de criação da fila

O tipo da fila, pode ser standard(comum) ou FIFO (First In First Out). Filas comuns não possuem ordem nas mensagens, enquanto as FIFO ordena a fila em forma de pilha, onde a primeira mensagem que foi criada será a primeira a ser encontrada ao buscar mensagens dessa fila.

Para os propósitos deste tutorial, vamos usar as seguintes configurações que já são atribuídas por padrão pelo próprio SQS:

Configurações da fila do Amazon SQS

Clique no link Info ao lado da configuração para entender melhor o que cada propriedade representa.

Após ter criado a fila copie o URL gerado no final.

Criando o service de comunicação com o Amazon SQS

Para facilitar a comunicação da nossa aplicação com o Amazon SQS, vamos criar um service, dentro da pasta src/service, que terá três funções principais: enviar uma mensagem para a fila, recuperar mensagens da fila e deletar mensagens da fila.

Esse service será útil para desacoplar o SQS da nossa aplicação, permitindo que ele seja facilmente adaptado para funcionar com outros serviços de fila também.

Criando a classe SqsService

Vamos começar com uma classe com apenas dois atributos, ambos privados, client e queueUrl.

export class SqsService {
    #client
    #queueUrl

    constructor() {
    }

}
Enter fullscreen mode Exit fullscreen mode

A ideia é que client seja uma instância do SQSClient do pacote @aws-sdk/client-sqs, que serve pra estabelecer uma conexão com a nossa fila. Enquanto que queueUrl será o URL da fila copiado anteriormente, que deverá ser passado via constructor, assim vamos poder usar o service para outras filas, se for preciso.

Adicionando o client do SQS no constructor

Agora precisamos adicionar o SQSClient no constructor, e atribuí-lo como valor da propriedade client.

import { SQSClient } from "@aws-sdk/client-sqs";
import * as dotenv from 'dotenv';

dotenv.config()

export class SqsService {
    #client
    #queueUrl

    constructor(queue) {
        this.#client = new SQSClient({
            region: process.env.AWS_REGION,
            credentials: {
                accessKeyId: process.env.AWS_ACCESS_KEY_ID,
                secretAccessKey: process.env.AWS_ACESS_KEY
            }
        })

        this.#queueUrl = queue
    }

}
Enter fullscreen mode Exit fullscreen mode

Perceba que, para estabelecer uma conexão, o client requere algumas informações, que são parte da interface SQSClientConfig: region, accessKeyId e secretAccessKey. Essas informações são referentes a sua conta da AWS e não devem ser compartilhadas, por isso iremos usar um arquivo .env pra criar variáveis de ambiente.

Basta criar um arquivo .env no root do projeto com as seguintes variáveis:

AWS_REGION=REGIÃO (ex: us-east-1)
AWS_ACCESS_KEY_ID=ID DO USUÁRIO
AWS_ACESS_KEY=CHAVE DE ACESSO
AWS_MAIN_QUEUE_URL=URL DA SUA FILA
Enter fullscreen mode Exit fullscreen mode

Adicionando método para enviar mensagens para fila

Agora precisamos criar um método que irá enviar uma mensagem para a nossa fila usando o client.

O método terá como parâmetro um objeto com duas propriedades: body e attributes. O body será o texto que ficará no corpo da mensagem, enquanto attributes é um valor opcional que fornece mais informações sobre a mensagem.

Aqui está um exemplo do objeto, mas você pode entender melhor suas propriedades na documentação oficial da AWS:

{
    body: 'MENSAGEM QUE IRÁ NO CORPO',
    attributes: {
        'VALOR_OPCIONAL1': {
            DataType: 'String',
            StringValue: 'VALOR DO ATRIBUTO'
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Nosso método de enviar mensagem será assim:

import { SQSClient } from "@aws-sdk/client-sqs";
import * as dotenv from 'dotenv';

dotenv.config()

export class SqsService {
    #client
    #queueUrl

    constructor(queue) {
        this.#client = new SQSClient({
            region: process.env.AWS_REGION,
            credentials: {
                accessKeyId: process.env.AWS_ACCESS_KEY_ID,
                secretAccessKey: process.env.AWS_ACESS_KEY
            }
        })

        this.#queueUrl = queue
    }

    async sendMessage({ body, attributes }) {
        try {

            const sendMessageCommand = new SendMessageCommand({
                QueueUrl: this.#queueUrl,
                MessageBody: body,
                MessageAttributes: attributes,
                DelaySeconds: 0
            })

            const response = await this.#client.send(sendMessageCommand)


            return response
        } catch(err) {
            throw err
        }
    }

}
Enter fullscreen mode Exit fullscreen mode

Agora podemos enviar mensagens para a fila com o nosso service!

Adicionando método para buscar mensagens na fila

Além de enviar mensagens, precisamos também de um método para buscar mensagens na fila e retornar uma array delas:

async receiveMessages() {
    try {
        const receiveMessageCommand = new ReceiveMessageCommand({
            QueueUrl: this.#queueUrl,
            MaxNumberOfMessages: 10,
            WaitTimeSeconds: 20,
            MessageAttributeNames: ['All']
        })
        const response = await this.#client.send(receiveMessageCommand)
        const messages = response?.Messages?.length? response.Messages : []

        return messages
    } catch(err) { 
        throw err
    }
}
Enter fullscreen mode Exit fullscreen mode

Entendendo as propriedades:

  • QueueUrl: A URL da nossa fila.
  • MaxNumberOfMessages: O número máximo de mensagens a serem retornadas. O Amazon SQS nunca retorna mais mensagens do que esse valor (no entanto, menos mensagens podem ser retornadas).
  • WaitTimeSeconds: A duração (em segundos) pela qual a chamada aguarda a chegada de uma mensagem na fila antes de retornar.
  • MessageAttributeNames: Atributos que serão recebidos.

Para saber mais sobre as propriedades consulte a documentação oficial sobre a interface ReceiveMessageCommandInput da AWS.

Adicionando método de deletar mensagem da fila

Após a mensagem ser recebida e a transação ser concluída pelo consumer, é importante excluir a mensagem para evitar processamento duplicado.

Para isso, vamos criar um método em nosso service que receberá um parâmetro chamado ReceiptHandle, que é um identificador associado ao recebimento de uma mensagem. Um novo identificador de recebimento é gerado cada vez que uma mensagem é recebida. Ao excluir uma mensagem, é necessário fornecer o ReceiptHandle mais recente associado a ela para garantir que a mensagem correta seja excluída.

async deleteMessage(receiptHandle) {
    try {
        const deleteMessageCommand = new DeleteMessageCommand({
            QueueUrl: this.#queueUrl,
            ReceiptHandle: receiptHandle
        })

        await this.#client.send(deleteMessageCommand)
    } catch(err) {
        throw err
    }
}
Enter fullscreen mode Exit fullscreen mode

Entendendo as propriedades:

  • QueueUrl: link da nossa fila.
  • ReceiptHandle: Identificador da nossa mensagem.

Para saber mais sobre as propriedades consulte a documentação oficial sobre a interface DeleteMessageCommandInput da AWS.

Usando o service de comunicação com o Amazon SQS no producer

Agora que criamos o service de comunicação com o Amazon SQS, podemos finalmente modificar a nossa rota /request-transfer do producer. Em vez de disparar uma requisição diretamente para a rota /process-transfer do consumer, o producer enviará uma mensagem com as informações do pagamento para a nossa fila, para que o consumer possa recebê-la e consumi-la no momento adequado.

Nossa rota /request-transfer terá, agora, o seguinte conteúdo:

Primeiro criamos uma nova instância do SqsService e, após isso, chamamos o método sendMessage(), passando como body as informações da nossa transação. Como o body precisa ser uma string, o método JSON.stringify() é chamado para converter nosso objeto com as informações da transação em texto.

app.post('/transfer', async(req, res) => {
    const { 
        fromAccount,
        toAccount,
        amount
    } = req.body;

    try {
        const sqsService = new SqsService(process.env.AWS_MAIN_QUEUE_URL)
        await sqsService.sendMessage({
            body: JSON.stringify({
                fromAccount,
                toAccount,
                amount
            })
        })

        res.status(200).json({ message: 'Sua transação será processada' })
    } catch(err) {
        console.log(err)
        res.status(500).json({ message: 'Erro interno' })
    }
})
Enter fullscreen mode Exit fullscreen mode

Consumindo as mensagens da fila no consumer

Agora que o producer envia as mensagens para a fila e não se conecta mais diretamente ao consumer, precisamos criar um worker que busque constantemente por transações em nossa fila e as processe. Para isso, usaremos uma biblioteca bem conhecida do Node.js para agendamento de tarefas, chamada node-cron. Com ela, poderemos atribuir uma tarefa a ser executada repetidamente pelo consumer. Essa tarefa será buscar novos pedidos de transações na fila e, no nosso caso, para facilitar a visualização do que está acontecendo, vamos definir um intervalo de 1 minuto. Ou seja, o consumer buscará novas transações na fila e as processará a cada 1 minuto.

Criando a task agendada com node-cron

Para criar uma task com node-cron que seja executada a cada 1min, basta substituirmos nossa rota /process-transfer por essa função:

import cron from 'node-cron'

cron.schedule('* * * * *', () => {
    console.log('Running every minute')
})
Enter fullscreen mode Exit fullscreen mode

Agora sempre que o nosso serviço consumer estiver sendo executado, uma task será realizada automaticamente a cada 1min.

Consumindo mensagens do Amazon SQS dentro da task

Agora que nossa task está sendo executada a cada 1min, precisamos fazer com que ela busque mensagens que estão na nossa fila, para isso, iremos criar uma nova instância do SqsService e chamar o método receiveMessages() que criamos anteriormente.

cron.schedule('* * * * *', async() => {
    const sqsService = new SqsService(process.env.AWS_MAIN_QUEUE_URL)
    const messages = await sqsService.receiveMessages()
    console.log(messages)
})
Enter fullscreen mode Exit fullscreen mode

O output será uma array de objetos Message, que são as nossas mensagens:
Exemplo de uma mensagem retornada pela nossa fila do Amazon SQS

Removendo mensagens da fila

Agora que o nosso consumer já recebe as mensagens da fila de forma autônoma, isto é, sem precisar ser "chamado" pelo producer, vamos implementar um protótipo que simula o processamento da transação de cada mensagem. Ao final do processamento, a mensagem será removida da fila.

É importante destacar que toda mensagem precisa ser removida da fila após ser consumida, para evitar que ela seja processada duas vezes.

A função que iremos adicionar faz um loop por cada mensagem recebida e simula seu processamento com um setTimeout dentro de uma Promise, demorando 5 segundos pra cada mensagem:

cron.schedule('* * * * *', async() => {
    const sqsService = new SqsService(process.env.AWS_MAIN_QUEUE_URL)
    const messages = await sqsService.receiveMessages()

    for(let i = 0; i < messages.length; i++) {
        await new Promise(resolve =>
            setTimeout(() => {
                console.log(`Simulando processamento da mensagem ${messages[i].MessageId}...`)
                resolve()
            }, 5000)
        )
    } 
})
Enter fullscreen mode Exit fullscreen mode

Após o processamento precisamos remover a mensagem da fila. Pra isso iremos chamar o método deleteMessage() do nosso SqsService. Ele recebe como parâmetro o ReceiptHandle da mensagem.

cron.schedule('* * * * *', async() => {
    const sqsService = new SqsService(process.env.AWS_MAIN_QUEUE_URL)
    const messages = await sqsService.receiveMessages()

    for(let i = 0; i < messages.length; i++) {
        await new Promise(resolve =>
            setTimeout(() => {
                console.log(`Simulando processamento da mensagem ${messages[i].MessageId}...`)
                resolve()
            }, 5000)
        )

        await sqsService.deleteMessage(messages[i].ReceiptHandle)
        console.log(`Mensagem ${messages[i].MessageId} processada e removida da fila...`)
    } 
})
Enter fullscreen mode Exit fullscreen mode

Pronto! Criamos um serviço de pagamentos que se comunica via mensageria com o Amazon SQS. Perceba que, agora, o serviço que pede que pagamentos sejam feitos e o serviço que de fato processa o pagamento são totalmente independentes um do outro, o que facilita a escalabilidade e manutenção.

💖 💪 🙅 🚩
antiduhring
Mateus Vinícius

Posted on May 11, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related