Generator Functions, um caso de uso

unimatrix2

Danny

Posted on August 6, 2024

Generator Functions, um caso de uso

Contexto

Oi! Antes de iniciar propriamente esse artigo, gostaria de apresentar o contexto de como cheguei a utilizar uma generator function para resolver um problema.

Tudo começa alguns anos atrás quando pensei no meu projeto final do bootcamp que participei. A ideia inicial era um portal que agregava dados sobre políticos como notícias, projetos, votações e etc. Isso evoluiu para uma plataforma que utiliza os bancos eleitorais do TSE para monitorar políticos que estão exercendo cargos eletivos diretos. Esses bancos do TSE são distribuídos em formato CSV, mas não são perfeitos. Só nesses anos tentando evoluir esse projeto, os bancos melhoraram muito, e no meio do caminho o Brasil API veio ao mundo também e me inspirou ainda mais.

Mesmo que a stack do projeto já estivesse definida, estar constantemente atualizando meu banco principal com as constantes atualizações dos CSVs, corrigindo inconsistências e o tempo para se importar tudo se tornaram grandes empecilhos e percebi que precisaria de um programa só para lidar com isso. Começa a aventura da CLI que montei para resolver esse problema.

Inicialmente, a ideia era simples: ler o CSV, lotear os dados e salvar os lotes no banco. Fui testando o tempo médio que levava para salvar tanto um a um, quanto vários de uma vez e qual tamanho de lote tinha o melhor tempo médio, mas nada muito criterioso ou exato (inclusive se alguém testar ou souber otimizações do tipo para MongoDB, aceito a crítica/sugestão). E funcionava legal, tinha agora um jeito rápido de passar o caminho do CSV para uma CLI que resolvia o meu problema.

Já nem pensava mais nessa CLI, até que em uma viagem queria codar nesse projeto, meu notebook não tinha o banco e só 8GB de RAM, ou seja, era um editor de código e um sonho. Obviamente ele me deixou na mão porque o código era bem básico e subia todo o arquivo na memória antes de salvar no banco. Nessa altura, já havia lido sobre funções que "retornavam mais de uma vez" e a palavra reservada yield, pensei que era uma oportunidade interessante, fazia sentido pois a biblioteca de leitura de CSV que utilizei na CLI já possuia uma API para processar o CSV linha por linha.

O problema

Com apenas 8GB de RAM, o desafio de otimização era refatorar a CLI para processar o arquivo linha por linha, em vez de carregar tudo na memória. Até então o fluxo era bem simples: carregar arquivo na memória, lotear e salvar os lotes no banco. Como repensar esse fluxo?

Pensando a solução

O primeiro passo foi decidir o que manter. Bom, toda a estrutura da CLI vai ficar igual, o tamanho do lote vai ficar igual, operações de banco, vou mexer apenas na função que faz a leitura e como as operações de banco vão ocorrer quando mexer nessa parte. O segundo passo foi juntar curiosidade com aplicabilidade: como usar uma generator function para implementar a solução?

Isso levou a uma série de considerações para escrever a solução:

  • O ideal é manter o loteamento de inserts no banco para garantir a performance dessa etapa, mas é necessário evitar a qualquer custo carregar mais dados na memória do que o essencial, então cada lote precisa ser preparado já no processamento do CSV
  • Será necessário pensar bem onde bloquear e não bloquear o loop de eventos para garantir que nada está se perdendo ou lotes incompletos sejam processados sem comprometer a eficiência de memória e velocidade
  • Será necessário usar async para juntar cada lote antes de entregar para o banco de dados salvar (para minha surpresa, existem Async Generators também!)
  • Cada lote que estiver pronto precisa ser entregue para o banco salvar

A cada consideração, ficou mais claro que uma Generator Function era exatamente o que eu precisava.

Detalhe

Pensar a solução não foi um processo muito custoso pois implementei na CLI uma estrutura parecida com as camadas de uma API convencional. Os comandos são como controllers e os serviços definem e executam o contrato. Em uma API existem mais camadas, mas como é uma CLI optei por não abstrair as operações de banco e usar a pasta /lib para organizar funcionalidades que são utilizadas nos serviços. É nesses momentos que o seu eu de agora agradece o seu eu do passado pelo código organizado que permite alterações de todos os tamanhos com pouco esforço ou retrabalho.

Implementando a solução

O primeiro passo aqui foi entender como e quais recursos de Async Generators vou utilizar. Os exemplos disponíveis na MDN já foram de grande ajuda para elaborar uma solução:

Particularmente o último link foi muito útil para pensar como estaria chamando essa função para salvar os dados no banco, utilizando um loop for...of assíncrono. Como essa parte da solução já era óbvia, implementei.

const promises = [];
  try {
    await mongoConnect(mongoUri);
    for await (const batch of parse(pathUri)) {
      promises.push(TseCandidate.insertMany(batch)
        .then(() => console.log(`${log.success}Batch entries imported with success`))
        .catch((err: any) => {
          console.log(log.error, err);
          throw new AppError({
            message: err.message || 'Error importing batch entries',
            method: 'parseAndImportDb',
            field: JSON.stringify(batch, null, 2)
          })
        })
      );
    }
    await Promise.all(promises);
    await mongoDisconnect();
  } catch (error) {
    throw error;
  }
Enter fullscreen mode Exit fullscreen mode

Uma coisa já estava garantida portanto: cada lote de arquivos que ficar pronto é resolvido e imediatamente enviado para o banco. O resto do código serve basicamente para reportar no terminal o progresso ou eventuais erros das operações e garantir controle de fluxo.

  • Utilizando o .then/.catch da promessa do banco, garanto que cada lote que for salvo com sucesso é independentemente reportado no terminal
  • Utilizando o Promise.all, garanto que todas as operações de banco terminem antes de encerrar a conexão.

É importante notar que o Promise.all joga um erro se qualquer uma das promessas falhar, então qualquer problema salvando um lote no banco compromete todo o resto da operação. Mas esse comportamento é desejado: preciso entender a razão do erro e corrigir o código ou entender uma inconsistência/corrupção de dados que possa estar ocorrendo e resolver isso. Como um banco incompleto é inútil e precisarei apagar tudo e começar novamente, não faz sentido correr o processo até o final, mas é importante conhecer o Promise.allSettled também!

Implementando o Async Generator

Até então, a implementação era baseada em "embalar" a API da bilbioteca de leitura de CSV em uma promessa:

new Promise((resolve, reject) => {
  Papa.parse<ICandidate>(fs.createReadStream(url), {
    encoding: 'latin1',
    header: true,
    skipEmptyLines: true,
    complete: (results: IParseResult) => {
      console.log(`${log.info}${String(results.data.length).green} entries parsed`);
      const resultsErrorsLength = results.errors.length;
      console.log(`${
        resultsErrorsLength < 1
          ? log.success + String(resultsErrorsLength).green
          : log.error + String(results.errors.length).red
      } parsing errors`);
      if (results.errors.length > 0) {
        reject(new AppError({
          message: 'Errors were found on the table, import cancelled'.red,
          method: 'parse',
          module: 'ParserLib',
          step: 'Complete',
          field: JSON.stringify(results.errors).gray,
          })
        );
      }
      resolve(results.data);
    },
    error: (err) =>
      reject(new AppError({
        message: err.message.red,
        method: 'Parse',
        module: 'ParserLib',
      })),
    })
});
Enter fullscreen mode Exit fullscreen mode

Essa implementação utilizava apenas a callback complete da biblioteca, a ideia agora é utilizar a step. Para quem estiver mais curioso sobre a biblioteca, chama-se Papaparse.

Vamos enfim ao esboço dessa refatoração:

// Note o asterisco caracterizando a função como um gerador assíncrono
export async function* parse(url: string): AsyncGenerator<ICandidate[]> {
  // Temos aqui nosso tamanho de lote e array que acumulará as linhas em um lote
  const batchSize = 5000;
  let batch: ICandidate[] = [];

  // Temos aqui duas lets com uma promise e um resolver.
  // Note que a promise recebe o resolver externo como resolver
  // dela.
  let resolve: (value: ICandidate[]) => void;
  let promise = new Promise<ICandidate[]>(r => resolve = r);

  Papa.parse<ICandidate>(createReadStream(url), {
    // Utilizei latin1 pois é a codificação dos bancos do TSE
    encoding: 'latin1',
    header: true,
    skipEmptyLines: true,
    delimiter: ';',
    step: (results) => {
      // nossa lógica 'linha por linha' vai aqui
    },
    complete: (results) => {
      // Nossa lógica para finalizar o processamento vai aqui
    }
  });

  // Aqui fica tudo mais interessante: utilizando de um
  // 'loop infinito', vamos travar o loop para aguardar a
  // promessa e se ele for um array (ou seja, um lote de dados)
  // rendemos ele ao loop do serviço para inserir no banco
  while (true) {
    const result = await promise;
    if (result instanceof Array) yield result;
  }
}
Enter fullscreen mode Exit fullscreen mode

Fora a lógica principal de processamento, ainda é preciso lidar com a condição de parada do loop.

É importante ressaltar como o fluxo dessa solução está estruturado: para cada lote de dados acumuluado, a ideia é utilizar o resolver para resolver a promessa, "entregando" o lote de dados para o loop, que vai render (yield) para outro loop. A utilização de lets tem uma dupla função:

  • permitir a redesignação do lote e da promessa, garantindo que o array está vazio e o loop está aguardando a promessa resolver ou rejeitar
  • dereferenciar o lote e a promessa antigos. Ao eliminar todas as referências ativas de um objeto, ele pode ser marcado para coleta de lixo, e ao usar redesignação, isso é realizado intencionalmente.

Um pouco mais a fundo - Coleta de lixo

Vou deixar a introdução desse tópico para um bom artigo que encontrei sobre coleta de lixo no Node:

Um aspecto importante para se notar é que ao dereferenciar constantemente os dados/promessa antigos, poucas ou até nenhuma referência sobrevive a um ciclo de coleta no espaço novo, a memória é liberada o quanto antes.

Processando as linhas

Cada linha processada dispara a callback definida em step. Basta então acessar o dado e acumular no lote. Por fim, quando o lote possuir o tamanho desejado, a promessa é resolvida com o lote e tanto o array quanto a promessa são redesignados.

step: (results) => {
  batch.push(results.data);
  if (batch.length >= batchSize) {
    resolve(batch);
    batch = [];
    promise = new Promise(r => resolve = r);
  }
},
Enter fullscreen mode Exit fullscreen mode

Processando o último lote e criando o sinal de parada

Agora a callback de complete apenas resolve o último lote de linhas. Para a condição de parada, é criada mais uma promise que resolve com o objeto { done: true }. Isso será avaliado no loop da função para encerrar.

complete: (results) => {
  if (batch.length > 0) {
    console.log(log.success + 'Last batch parsed with success!');
    resolve(batch);
    batch = [];
    promise = new Promise(r => resolve = r);
  };
  resolve({ done: true });
}
Enter fullscreen mode Exit fullscreen mode

Parando o loop

Para finalizar a implementação, basta validar no loop se há o sinal de parada e retornar. A implementação completa ficou assim:

export async function* parse(url: string): AsyncGenerator<ICandidate[]> {
  const batchSize = 5000;
  let batch: ICandidate[] = [];

  let resolve: (value: ICandidate[] | { done: boolean }) => void;
  let promise = new Promise<ICandidate[] | { done: boolean }>(r => resolve = r);
  Papa.parse<ICandidate>(createReadStream(url), {
    encoding: 'latin1',
    header: true,
    skipEmptyLines: true,
    delimiter: ';',
    step: (results) => {
      batch.push(results.data);
      if (batch.length >= batchSize) {
        resolve(batch);
        batch = [];
        promise = new Promise(r => resolve = r);
      }
    },
    complete: (results) => {
      if (batch.length > 0) {
        console.log(log.success + 'Last batch parsed with success!');
        resolve(batch);
        batch = [];
        promise = new Promise(r => resolve = r);
      };
      resolve({ done: true });
    }
  });

  while (true) {
    const result = await promise;
    if (result instanceof Array) yield result;
    if (!(result instanceof Array) && result.done) return;
  }
}
Enter fullscreen mode Exit fullscreen mode

Considerações finais

Essa não é a solução que melhor utiliza os recursos de async generators, mas foi a solução que resolveu meu problema. Com ela, todo o processamento de dados foi otimizado e um trabalho que exigia um montante cada vez maior de memória agora exige um montante quase constante, permitindo até um sofrido notebook de 8GB de RAM montar um banco com milhares ou até milhões de linhas, em questão de minutos.

Para complementar a funcionalidade e facilitar ainda mais, incluí um comando de Download na CLI, automatizando o trabalho de acessar e baixar os CSVs atualizados. Mas falta muita coisa. Ainda faltam funcionalidades de normalização de dados, descompactar seletivamente um CSV do arquivo fonte, um modo interativo e futuramente suporte a bancos relacionais.

O repositório está público, aberto a contribuições (embora a CLI ainda não esteja publicada, ainda falte um Código de Conduta e Regras de Contribuição) e possui uma licença CC BY-NC-ND 4.0 pois acredito firmemente que o trabalhar com dados públicos exige zelo com a análise que se pretende realizar e com o público que venha a consumir esses dados, portanto, essa licensa impede derivativos diretos da CLI, para evitar que a ferramenta seja utilizada para (por exemplo) difundir desinformação ou análises enviezadas. A ideia é construir uma ferramenta de qualidade para uso geral.

Peço perdão em adianto pela bagunça no código do repositório, nos momentos de luta implementei um logger da idade das pedras para persistir tudo em um arquivo de texto/JSON, mas é mais um dos itens que será bem trabalhado a seguir. Essa CLI ainda está meio longe de ser lançada.

Conclusão

Espero que esse artigo não seja tão maçante para o leitor e que ajude a esclarecer alguns recursos da língua ou ao menos sirva de inspiração para aventuras semelhantes de otimização. É minha primeira vez escrevendo um artigo do tipo, pretendo escrever outros e com cada vez mais qualidade.

Muito obrigado pelo seu tempo e espero que tenha curtido. Até a próxima!

💖 💪 🙅 🚩
unimatrix2
Danny

Posted on August 6, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related