Testando Kafka no Spring Boot com Test-containers

rafaelfantinel

Rafael Fantinel

Posted on June 27, 2023

Testando Kafka no Spring Boot com Test-containers

Hoje, vamos falar sobre como testar uma aplicação Spring Boot que utiliza o Apache Kafka como sistema de mensageria. Para garantir que tudo funcione corretamente durante os testes de integração, vamos usar a biblioteca Testcontainers, que nos permite criar um ambiente de teste automatizado e isolado.

O Apache Kafka é uma plataforma popular para streaming distribuído, usada para troca de mensagens em tempo real. Quando estamos desenvolvendo uma aplicação que utiliza o Kafka, é fundamental verificar se os componentes do sistema estão funcionando adequadamente, mesmo durante os testes de integração.

O Testcontainers é uma biblioteca Java que simplifica a criação e o gerenciamento de contêineres Docker para testes automatizados. Ele facilita a configuração do ambiente de teste, garantindo a replicação e o isolamento do ambiente de produção.

Antes de começarmos, é importante ter o Docker instalado em sua máquina. Em seguida, adicione as dependências necessárias ao arquivo pom.xml do seu projeto:

<dependencies>
    <!-- Dependências do Spring Boot e Kafka -->
    <!-- ... -->
    <!-- Dependência do Testcontainers para Kafka -->
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <version>1.16.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

Criando o teste com Testcontainers

Agora, vamos criar um teste de integração para verificar se a nossa aplicação Spring Boot é capaz de se conectar e enviar mensagens para um tópico do Kafka.

Começaremos criando classes genéricas para o consumidor e produtor

...
@Component
public class KafkaConsumer {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

  private CountDownLatch latch = new CountDownLatch(1);

  private String payload;

  @KafkaListener(topics = "${test.topic}")
  public void receive(ConsumerRecord<?, ?> consumerRecord) {
    LOGGER.info("received payload='{}'", consumerRecord.toString());

    payload = consumerRecord.toString();
    latch.countDown();
  }

  public CountDownLatch getLatch() {
    return latch;
  }

  public void resetLatch() {
    latch = new CountDownLatch(1);
  }

  public String getPayload() {
    return payload;
  }
}

Enter fullscreen mode Exit fullscreen mode
...

@Component
public class KafkaProducer {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

  @Autowired private KafkaTemplate<String, String> kafkaTemplate;

  public void send(String topic, String payload) {
    LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
    kafkaTemplate.send(topic, payload);
  }
}

Enter fullscreen mode Exit fullscreen mode

Logo em seguida crie uma classe de teste chamada KafkaProducerIntegrationTest e anote-a com @RunWith(SpringRunner.class) e @SpringBootTest. Essas anotações configuram o contexto de teste do Spring Boot:

...
@RunWith(SpringRunner.class)
@Import(com.fantinel.kafka.KafkaProducerIntegrationTest.KafkaTestContainerConfiguration.class)
@SpringBootTest(classes = Application.class)
@DirtiesContext
public class KafkaProducerIntegrationTest {

  @ClassRule
  public static KafkaContainer kafka =
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));

  @Autowired public KafkaTemplate<String, String> template;

  @Autowired private KafkaConsumer consumer;

  @Autowired private KafkaProducer producer;

  @Value("${test.topic}")
  private String topic;

  @Before
  public void setup() {
    consumer.resetLatch();
  }

  @Test
  public void givenKafkaDockerContainer_whenSendingMessage_thenMessageReceived() throws Exception {
    String data = "Hello DevTo";

    producer.send(topic, data);

    boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
    assertTrue(messageConsumed);
    assertThat(consumer.getPayload(), containsString(data));
  }

  @TestConfiguration
  static class KafkaTestContainerConfiguration {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "devto");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
      Map<String, Object> configProps = new HashMap<>();
      configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
      configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
    }
  }
}

Enter fullscreen mode Exit fullscreen mode

Por fim, adicione as configurações do consumidor e topico no arquivo application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: dev-to-test-group
test:
  topic: dev-to-test-topic

Enter fullscreen mode Exit fullscreen mode

No exemplo acima, importamos as classes necessárias, incluindo classes do Kafka, classes de teste do JUnit e classes do Spring Boot. Em seguida, definimos a anotação @RunWith(SpringRunner.class) para indicar que o teste será executado usando o Spring Runner, que permite a inicialização do contexto do Spring.

O teste de integração é definido na classe KafkaProducerIntegrationTest. Primeiro, definimos um contêiner Kafka utilizando o Testcontainers. O contêiner Kafka é inicializado com a imagem "confluentinc/cp kafka:7.4.0". Essa configuração permite que o teste execute um ambiente Kafka totalmente funcional dentro de um contêiner Docker isolado.

Em seguida, usamos a anotação @Autowired para injetar automaticamente o KafkaTemplate, KafkaConsumer e KafkaProducer necessários para o teste. Também injetamos o nome do tópico Kafka a ser usado, que é especificado por meio da anotação @Value("${test.topic}").

Antes de cada teste, o método setup() é executado para redefinir o latch do consumidor. Essa preparação garante que o consumidor esteja pronto para receber mensagens.

Agora que o teste de integração está pronto, podemos executá-lo. O Testcontainers se encarregará de criar e gerenciar o contêiner Docker do Kafka durante a execução do teste.

Você pode executar o teste diretamente pela sua IDE ou utilizando o comando mvn test. Durante a execução, você poderá ver os logs do Kafka sendo exibidos.

Conclusão

Em resumo, o uso do Testcontainers facilita a criação de testes de integração para aplicações Spring Boot que utilizam o Apache Kafka. Com um ambiente de teste isolado e automatizado, podemos garantir que os componentes do sistema estejam funcionando corretamente e identificar problemas de integração antes de implantar a aplicação em produção.

Neste post, exploramos como configurar e escrever testes de integração com o Testcontainers e o Kafka no Spring Boot. Espero que você tenha achado este conteúdo útil e que possa aplicar esses conceitos em seus projetos. Se você tiver alguma dúvida ou sugestão, deixe seu comentário abaixo.
O código feito a cima está disponível no repositório GIT

💖 💪 🙅 🚩
rafaelfantinel
Rafael Fantinel

Posted on June 27, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related