Testing Micronaut Kafka Streams
Phil Hardwick
Posted on April 19, 2020
Here’s an example of integration testing Micronaut Kafka Streams applications - https://github.com/PhilHardwick/micronaut-avro-streams-example. It uses embedded Kafka so you can test things work with a real Kafka instance, but it could just as easily use test containers.
The example is a little contrived but the applications receives commands to create pots for a bank account (initialising the balance to 0) and commands for making a transfer between pots (which changes the balances of the two pots in the transfer). The application I’ve created isn’t necessarily how I would design something going into production - I think the data and partition keys need some more thought, especially when running multiple instances - but I wanted to create a non-trivial example to demonstrate testing.
There are a few key things needed to be able to integration test this.
Wait for streams to start
@BeforeEach
void setUp() {
await().atMost(10, TimeUnit.SECONDS).until(() -> stream.state().equals(KafkaStreams.State.RUNNING));
}
Waiting for the stream to start is essential because, by default, streams process exactly once. Our tests can plough ahead and send messages before the stream is ready, and so the stream never receives the message (because it is not reading from the earliest offset).
Create simple producers and consumers to exercise the stream
You need to send messages to the stream and listen for the results, so create some simple producers and consumers for this. Micronaut makes this easy with annotations or you can set up producers and consumers in code as shown in Confluent’s examples.
@Singleton
@KafkaListener(groupId = "all-event-listener", offsetReset = OffsetReset.EARLIEST, clientId = "all-event-test-listener")
public class EventsListener {
private BlockingQueue<PotEvent> potEvents = new LinkedBlockingDeque<>();
private BlockingQueue<PotTransferEvent> transferEvents = new LinkedBlockingDeque<>();
@Topic("pot-events")
public void potEventReceived(PotEvent accountEvent) {
potEvents.add(accountEvent);
}
@Topic("transfer-events")
public void transferEventReceived(PotTransferEvent transferEvent) {
transferEvents.add(transferEvent);
}
public BlockingQueue<PotEvent> getPotEvents() {
return potEvents;
}
public BlockingQueue<PotTransferEvent> getTransferEvents() {
return transferEvents;
}
}
@KafkaClient
public interface TestCommandSender {
@Topic("pot-commands")
void sendMakePotTransfer(@KafkaKey UUID accountId, MakePotTransfer makePotTransfer);
@Topic("pot-commands")
void sendCreatePot(@KafkaKey UUID accountId, CreatePot createPot);
}
The key to remember here is to set your event listener to listen from the earliest offset so it gets all the events outputted from the stream, no matter what time it gets subscribed to the topic.
Mock schema registry
From Confluent version 5.4.0 you can use a url beginning with mock://
for your schema registry url and it will inject a mock schema registry in your serdes. This is what I’ve set up in my application-kafka.yml
.
Serializers and Deserializers
Make sure your serializers and deserializers are set up as per Configuring Micronaut Kafka with serialisers and deserialisers, including your test producers and consumers. I used
kafka:
key:
serializer: org.apache.kafka.common.serialization.UUIDSerializer
deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value:
serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
in application-kafka.yml in the test resources.
Conclusion
Integration testing is really useful for testing how your application will actually run. It also allows flexibility in implementation as I could now change how the stream works without changing the test and I can be confident I haven’t broken anything.
Posted on April 19, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.