Testando Kafka no Spring Boot com Test-containers
Rafael Fantinel
Posted on June 27, 2023
Hoje, vamos falar sobre como testar uma aplicação Spring Boot que utiliza o Apache Kafka como sistema de mensageria. Para garantir que tudo funcione corretamente durante os testes de integração, vamos usar a biblioteca Testcontainers, que nos permite criar um ambiente de teste automatizado e isolado.
O Apache Kafka é uma plataforma popular para streaming distribuído, usada para troca de mensagens em tempo real. Quando estamos desenvolvendo uma aplicação que utiliza o Kafka, é fundamental verificar se os componentes do sistema estão funcionando adequadamente, mesmo durante os testes de integração.
O Testcontainers é uma biblioteca Java que simplifica a criação e o gerenciamento de contêineres Docker para testes automatizados. Ele facilita a configuração do ambiente de teste, garantindo a replicação e o isolamento do ambiente de produção.
Antes de começarmos, é importante ter o Docker instalado em sua máquina. Em seguida, adicione as dependências necessárias ao arquivo pom.xml do seu projeto:
<dependencies>
<!-- Dependências do Spring Boot e Kafka -->
<!-- ... -->
<!-- Dependência do Testcontainers para Kafka -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.16.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Criando o teste com Testcontainers
Agora, vamos criar um teste de integração para verificar se a nossa aplicação Spring Boot é capaz de se conectar e enviar mensagens para um tópico do Kafka.
Começaremos criando classes genéricas para o consumidor e produtor
...
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
public String getPayload() {
return payload;
}
}
...
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
Logo em seguida crie uma classe de teste chamada KafkaProducerIntegrationTest e anote-a com @RunWith(SpringRunner.class) e @SpringBootTest. Essas anotações configuram o contexto de teste do Spring Boot:
...
@RunWith(SpringRunner.class)
@Import(com.fantinel.kafka.KafkaProducerIntegrationTest.KafkaTestContainerConfiguration.class)
@SpringBootTest(classes = Application.class)
@DirtiesContext
public class KafkaProducerIntegrationTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
@Autowired public KafkaTemplate<String, String> template;
@Autowired private KafkaConsumer consumer;
@Autowired private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Before
public void setup() {
consumer.resetLatch();
}
@Test
public void givenKafkaDockerContainer_whenSendingMessage_thenMessageReceived() throws Exception {
String data = "Hello DevTo";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
@TestConfiguration
static class KafkaTestContainerConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "devto");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
}
Por fim, adicione as configurações do consumidor e topico no arquivo application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: dev-to-test-group
test:
topic: dev-to-test-topic
No exemplo acima, importamos as classes necessárias, incluindo classes do Kafka, classes de teste do JUnit e classes do Spring Boot. Em seguida, definimos a anotação @RunWith(SpringRunner.class) para indicar que o teste será executado usando o Spring Runner, que permite a inicialização do contexto do Spring.
O teste de integração é definido na classe KafkaProducerIntegrationTest. Primeiro, definimos um contêiner Kafka utilizando o Testcontainers. O contêiner Kafka é inicializado com a imagem "confluentinc/cp kafka:7.4.0". Essa configuração permite que o teste execute um ambiente Kafka totalmente funcional dentro de um contêiner Docker isolado.
Em seguida, usamos a anotação @Autowired para injetar automaticamente o KafkaTemplate, KafkaConsumer e KafkaProducer necessários para o teste. Também injetamos o nome do tópico Kafka a ser usado, que é especificado por meio da anotação @Value("${test.topic}").
Antes de cada teste, o método setup() é executado para redefinir o latch do consumidor. Essa preparação garante que o consumidor esteja pronto para receber mensagens.
Agora que o teste de integração está pronto, podemos executá-lo. O Testcontainers se encarregará de criar e gerenciar o contêiner Docker do Kafka durante a execução do teste.
Você pode executar o teste diretamente pela sua IDE ou utilizando o comando mvn test. Durante a execução, você poderá ver os logs do Kafka sendo exibidos.
Conclusão
Em resumo, o uso do Testcontainers facilita a criação de testes de integração para aplicações Spring Boot que utilizam o Apache Kafka. Com um ambiente de teste isolado e automatizado, podemos garantir que os componentes do sistema estejam funcionando corretamente e identificar problemas de integração antes de implantar a aplicação em produção.
Neste post, exploramos como configurar e escrever testes de integração com o Testcontainers e o Kafka no Spring Boot. Espero que você tenha achado este conteúdo útil e que possa aplicar esses conceitos em seus projetos. Se você tiver alguma dúvida ou sugestão, deixe seu comentário abaixo.
O código feito a cima está disponível no repositório GIT
Posted on June 27, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.