Usando postgres como message broker
Mateus Vinícius
Posted on August 13, 2023
Em muitos cenários de desenvolvimento, a necessidade de um message broker é inegável. No entanto, adotar soluções como Kafka ou RabbitMQ nem sempre é a opção mais viável ou desejável. Neste tutorial, vamos explorar uma abordagem alternativa e interessante: transformar uma tabela comum do Postgres em um message broker robusto, garantindo a integridade e consistência das mensagens processadas.
O que é um message broker? Trata-se de um sistema que gerencia e armazena uma fila de mensagens, sendo crucial para a comunicação assíncrona entre diferentes partes de um sistema. No nosso caso, o objetivo é criar um sistema onde diferentes sistemas e threads possam consumir mensagens da fila, assegurando que cada mensagem seja processada apenas uma vez, evitando corrupções ou duplicações indesejadas.
Preparando o Terreno
A ideia central é simples: um producer envia ordens de pagamento para uma tabela que atua como fila, com status inicial "PENDING". Em seguida, três clusters de uma aplicação Node executam cron jobs a cada 1 minuto, desempenhando o papel de consumers. Cada job verifica as ordens de pagamento pendentes na tabela, processa o pagamento e atualiza o status para "COMPLETED". O desafio é garantir que uma ordem seja processada apenas uma vez, mesmo com múltiplos consumidores operando simultaneamente.
O objetivo deste artigo não é ensinar como criar uma aplicação com Node do zero, mas apenas mostrar como utilizar o Postgres como message broker numa API feita como Node, por isso iremos pular alguns passos de setup básico.
Também, para fins didáticos, usaremos NestJS como framework para a API, pm2 para o gerenciamento dos clusters da aplicação e prisma para modelagem e como camada de comunicação com nosso banco de dados.
Você pode obter o projeto-base no repositório anti-duhring/postgres-queue-broker.
Primeiro de tudo é necessário iniciar um novo projeto Nest:
nest new postgres-queue-broker
Após isso, iremos instalar os pacotes pm2
, @nestjs/schedule
para gerenciamento dos cron jobs e o @prisma/client
:
npm i pm2 @nestjs/schedule @prisma/client
Esse será o schema da nossa tabela Order, que conterá as ordens de pagamentos:
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model Order {
id String @id @default(uuid())
message String
status String
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
}
Após isso, iremos criar um módulo para ser o nosso Producer, ele será responsável por criar uma nova ordem de pagamentos e enviar para a nossa tabela Order.
nest g resource order
order.service.ts:
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service'
import { orderStatus } from '../../common/orderStatus.enum'
@Injectable()
export class OrderService {
constructor(private readonly prisma: PrismaService) {}
async create(message: string) {
const order = await this.prisma.order.create({
data: {
message,
status: orderStatus.PENDING
}
})
return order
}
}
Trabalhando com Locks Inteligentes
Para alcançar esse objetivo, iremos explorar as funcionalidades do Postgres: as cláusulas FOR UPDATE
e SKIP LOCKED
. O FOR UPDATE
bloqueia as linhas retornadas por uma consulta, garantindo que nenhuma outra transação possa modificá-las até que a transação atual seja concluída. Por sua vez, o SKIP LOCKED
permite que uma consulta ignore linhas que estejam bloqueadas, possibilitando o salto sobre linhas já bloqueadas por outros consumidores.
Sabendo isto, iremos criar o módulo do nosso consumer, que consistirá num cron job_ que roda a cada 1min, obtendo as ordens de pagamento pendentes, processando e atualizando seu status:
nest g resource consumer
consumer.service.ts
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { orderStatus } from '../../common/orderStatus.enum';
import { Cron } from '@nestjs/schedule';
@Injectable()
export class ConsumerService {
constructor(private readonly prisma: PrismaService) {}
async getPendingOrders() {
const pendingOrders = await this.prisma.$queryRaw`
select o.*
from "Order" o
where o.status = 'PENDING'
order by o.created_at asc
for update skip locked
`;
return pendingOrders as any[];
}
@Cron('0 * * * * *')
async processOrders() {
const pendingOrders = await this.getPendingOrders();
for await (const order of pendingOrders) {
await this.prisma.order.update({
where: {
id: order.id,
},
data: {
status: orderStatus.COMPLETED,
},
});
console.log(`Order ${order.id} has been processed`);
}
}
}
Esta consulta permite que cada job consumer obtenha uma ordem de pagamento pendente e a bloqueie, impedindo que outros jobs acessem a mesma ordem simultaneamente. Uma vez que a ordem é processada e o status é atualizado, ela é liberada para futuros consumidores.
Importante mencionar que, num cenário real, é necessário ter cautela com a quantidade de itens carregados, por isso um LIMIT
para limitar a quantidade de itens obtidos por cada job pode evitar gargalos e erros de out of memory.
Rodando nossa aplicação
Por fim, basta realizar a build da aplicação.
npm run build
E executar os 3 clusters com o pm2:
pm2 start dist/main.js -i 3
É possível monitorar os logs dos clusters com seguinte comando:
pm2 logs main
Dessa forma podemos ver que cada ordem de pagamento é processada por apenas um cluster:
Conclusão
O uso do Postgres como um message broker alternativo pode ser uma solução valiosa em cenários onde a complexidade do Kafka ou do RabbitMQ não é justificada. Aproveitar as funcionalidades nativas do Postgres, como FOR UPDATE
e SKIP LOCKED
, possibilita a criação de um sistema robusto que garante a integridade e consistência das mensagens processadas.
Ao explorar essa abordagem, você poderá aprimorar a comunicação assíncrona em seus sistemas, evitando problemas de duplicação e corrupção de mensagens. Dessa forma, o Postgres se mostra não apenas como um excelente banco de dados, mas também como um aliado poderoso na construção de soluções de integração confiáveis. Experimente essa abordagem em seus projetos e desfrute dos benefícios de um message broker sólido e eficiente.
Para saber mais
Esse artigo foi fortemente inspirado pela palestra do Rafael Ponte no canal da ZUP, fica aqui a recomendação para entender a implementação com mais detalhes:
https://www.youtube.com/watch?v=FF6Am0N6eq4&t=3275s
Posted on August 13, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.