Usando postgres como message broker

antiduhring

Mateus Vinícius

Posted on August 13, 2023

Usando postgres como message broker

Em muitos cenários de desenvolvimento, a necessidade de um message broker é inegável. No entanto, adotar soluções como Kafka ou RabbitMQ nem sempre é a opção mais viável ou desejável. Neste tutorial, vamos explorar uma abordagem alternativa e interessante: transformar uma tabela comum do Postgres em um message broker robusto, garantindo a integridade e consistência das mensagens processadas.

O que é um message broker? Trata-se de um sistema que gerencia e armazena uma fila de mensagens, sendo crucial para a comunicação assíncrona entre diferentes partes de um sistema. No nosso caso, o objetivo é criar um sistema onde diferentes sistemas e threads possam consumir mensagens da fila, assegurando que cada mensagem seja processada apenas uma vez, evitando corrupções ou duplicações indesejadas.

Preparando o Terreno

A ideia central é simples: um producer envia ordens de pagamento para uma tabela que atua como fila, com status inicial "PENDING". Em seguida, três clusters de uma aplicação Node executam cron jobs a cada 1 minuto, desempenhando o papel de consumers. Cada job verifica as ordens de pagamento pendentes na tabela, processa o pagamento e atualiza o status para "COMPLETED". O desafio é garantir que uma ordem seja processada apenas uma vez, mesmo com múltiplos consumidores operando simultaneamente.

O objetivo deste artigo não é ensinar como criar uma aplicação com Node do zero, mas apenas mostrar como utilizar o Postgres como message broker numa API feita como Node, por isso iremos pular alguns passos de setup básico.

Também, para fins didáticos, usaremos NestJS como framework para a API, pm2 para o gerenciamento dos clusters da aplicação e prisma para modelagem e como camada de comunicação com nosso banco de dados.

Você pode obter o projeto-base no repositório anti-duhring/postgres-queue-broker.

Primeiro de tudo é necessário iniciar um novo projeto Nest:

nest new postgres-queue-broker
Enter fullscreen mode Exit fullscreen mode

Após isso, iremos instalar os pacotes pm2, @nestjs/schedule para gerenciamento dos cron jobs e o @prisma/client:

npm i pm2 @nestjs/schedule @prisma/client
Enter fullscreen mode Exit fullscreen mode

Esse será o schema da nossa tabela Order, que conterá as ordens de pagamentos:

// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

model Order {
  id        String    @id             @default(uuid())
  message   String
  status    String
  createdAt DateTime  @default(now()) @map("created_at")
  updatedAt DateTime  @updatedAt      @map("updated_at")
}
Enter fullscreen mode Exit fullscreen mode

Após isso, iremos criar um módulo para ser o nosso Producer, ele será responsável por criar uma nova ordem de pagamentos e enviar para a nossa tabela Order.

nest g resource order
Enter fullscreen mode Exit fullscreen mode

order.service.ts:

import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service'
import { orderStatus } from '../../common/orderStatus.enum'

@Injectable()
export class OrderService {
    constructor(private readonly prisma: PrismaService) {}

    async create(message: string) {
        const order = await this.prisma.order.create({
            data: {
                message,
                status: orderStatus.PENDING
            }
        })

        return order
    }
}
Enter fullscreen mode Exit fullscreen mode

Trabalhando com Locks Inteligentes

Para alcançar esse objetivo, iremos explorar as funcionalidades do Postgres: as cláusulas FOR UPDATE e SKIP LOCKED. O FOR UPDATE bloqueia as linhas retornadas por uma consulta, garantindo que nenhuma outra transação possa modificá-las até que a transação atual seja concluída. Por sua vez, o SKIP LOCKED permite que uma consulta ignore linhas que estejam bloqueadas, possibilitando o salto sobre linhas já bloqueadas por outros consumidores.

Sabendo isto, iremos criar o módulo do nosso consumer, que consistirá num cron job_ que roda a cada 1min, obtendo as ordens de pagamento pendentes, processando e atualizando seu status:

nest g resource consumer
Enter fullscreen mode Exit fullscreen mode

consumer.service.ts

import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { orderStatus } from '../../common/orderStatus.enum';
import { Cron } from '@nestjs/schedule';

@Injectable()
export class ConsumerService {
  constructor(private readonly prisma: PrismaService) {}

  async getPendingOrders() {
    const pendingOrders = await this.prisma.$queryRaw`
                select o.*
                from "Order" o 
                where o.status = 'PENDING'
                order by o.created_at asc 
                for update skip locked
        `;

    return pendingOrders as any[];
  }

  @Cron('0 * * * * *')
  async processOrders() {
    const pendingOrders = await this.getPendingOrders();

    for await (const order of pendingOrders) {
      await this.prisma.order.update({
        where: {
          id: order.id,
        },
        data: {
          status: orderStatus.COMPLETED,
        },
      });
      console.log(`Order ${order.id} has been processed`);
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Esta consulta permite que cada job consumer obtenha uma ordem de pagamento pendente e a bloqueie, impedindo que outros jobs acessem a mesma ordem simultaneamente. Uma vez que a ordem é processada e o status é atualizado, ela é liberada para futuros consumidores.

Importante mencionar que, num cenário real, é necessário ter cautela com a quantidade de itens carregados, por isso um LIMIT para limitar a quantidade de itens obtidos por cada job pode evitar gargalos e erros de out of memory.

Rodando nossa aplicação

Por fim, basta realizar a build da aplicação.

npm run build
Enter fullscreen mode Exit fullscreen mode

E executar os 3 clusters com o pm2:

pm2 start dist/main.js -i 3
Enter fullscreen mode Exit fullscreen mode

É possível monitorar os logs dos clusters com seguinte comando:

pm2 logs main
Enter fullscreen mode Exit fullscreen mode

Dessa forma podemos ver que cada ordem de pagamento é processada por apenas um cluster:

Log mostrando que cada ordem de pagamento foi processada por um cluster diferente

Conclusão

O uso do Postgres como um message broker alternativo pode ser uma solução valiosa em cenários onde a complexidade do Kafka ou do RabbitMQ não é justificada. Aproveitar as funcionalidades nativas do Postgres, como FOR UPDATE e SKIP LOCKED, possibilita a criação de um sistema robusto que garante a integridade e consistência das mensagens processadas.

Ao explorar essa abordagem, você poderá aprimorar a comunicação assíncrona em seus sistemas, evitando problemas de duplicação e corrupção de mensagens. Dessa forma, o Postgres se mostra não apenas como um excelente banco de dados, mas também como um aliado poderoso na construção de soluções de integração confiáveis. Experimente essa abordagem em seus projetos e desfrute dos benefícios de um message broker sólido e eficiente.

Para saber mais

Esse artigo foi fortemente inspirado pela palestra do Rafael Ponte no canal da ZUP, fica aqui a recomendação para entender a implementação com mais detalhes:
https://www.youtube.com/watch?v=FF6Am0N6eq4&t=3275s

💖 💪 🙅 🚩
antiduhring
Mateus Vinícius

Posted on August 13, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Usando postgres como message broker
postgres Usando postgres como message broker

August 13, 2023