Testing stateful Flink applications locally
Mohammed
Posted on October 2, 2021
Before reading this article, if you’re new to Flink then consider heading over to this link where I provide an overview about Flink and explain some basic concepts.
In this article, we’ll be exploring on how to set up your Flink applications to run tests locally. As the logic of your Flink applications gets complex over time it becomes even more difficult to debug and understand the behaviour of the unique input events that the application may receive. Specifically, with state-based applications, the ability to test locally facilitates faster development as these tests emulate the complete end-to-end flow with Flink without actually deploying the project.
Setup your pom.xml
With the assumption of using Maven as the build mechanism for your Flink project, insert the following dependencies into your pom.xml. Note the scope of the dependencies.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.11.1</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.11.1</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.16.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.7.1</version>
<scope>test</scope>
</dependency>
Writing tests
We leverage Junit which is a framework to write tests for Java based applications. The Flink framework also additional classes to support testing the applications that we will explore in the sample below.
Let’s consider the following sample test which is present under the ../src/test/SingleEventTest.java of your project.
We’ll get to what the tests do but before that, the above test takes the following into considerations.
- The input source of the events is Kafka. (not directly relevant in the test, but something to note when it comes to deserializing the events)
- The Flink application has Side Outputs and therefore makes use of the OutputTag util class. Side outputs are great when your application needs to send out different types of outputs from Flink that are not intended to be the main output. In this sample, the application needs to send the events to a separate Kafka topic when the processing doesn’t follow the intended path.
- The events that the application consumes are JSON objects.
testFlinkJob():
This test is focused on validating the core logic of the application i.e. the KeyedProcessFunction (ProcessorFunction). You will only be running tests on that one operator to validate the flow of execution.
The API classes used are part of the Flink framework and are geared towards running these types of tests. The appropriate classes are to be used for keyed streams and unkeyed streams for which the Flink operator is set up accordingly.
The success/failure of the test is determined with the help of Junit assertions. In the above sample, we only check if output was generated for a given event or not, and also to check if a sideoutput has been generated for a specific test event. We could get into more detail by validating the individual keys of the output event with the appropriate assert statements.
endToEndTest():
This test simulates the entire Flink framework while executing the input events that you specify as part of the test.
The variation here is we setup a fakeSource() and a fakeSink by using the SourceFunction and SinkFunction functions, both of which are part of the Flink APIs. The idea is understand how the application behaves if it were to be deployed. You will notice how the inputs are sent within a loop and not by using any of the test API classes from Flink as they were actually being sent from the source.
Just like the previous test, we use Junit assertions to validate the success of a test.
One key difference that you will notice is that in the endToEndTest() function, all Flink operations like the onTimer() method are being called as per the flow. The same event when tested using the testFlinkJob() function will actually not trigger the timer.
Something to note is that if the application requires some custom steps in the deserialization process, it would be prudent to include that as part of your tests as well. You will notice the deserializer being called in the createEvent() and in the SourceFunction() function because in my sample application I have a custom Deserializer and Serializer. Since the events from Kafka are in the form bytes, post conversion to JSON string, if there are any transformations required on the JSON object it would make sense to perform that step in the Deserializer and not have a separate function for that. This ensures that the complete flow of the event from the deserializer step to Flink generating an output is tested even before pushing your code to a running instance.
Optional — Test coverage:
If you would like to understand your test coverage and generate test reports, that can be done quite easily by adding these plugins as in the plugins section of your pom.xml.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
</plugin><plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<configuration>
<excludes>
<exclude>**/serializers/*</exclude>
<exclude>**/FlinkJob.java</exclude>
</excludes>
</configuration>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>jacoco-site</id>
<phase>package</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
Closing comments:
Writing tests as you code through the application logic ensures functional accuracy. State based applications can become complex to maintain because the execution of a Flink job may alter based on historical events, and putting the business logic through it’s paces with varied tests provides confidence in the development process.
Adding log statements in info, error or debug mode at different places of your application makes a huge difference in following through the flow when the tests are being executed.
Variations with the test event data for edge cases should be done more often as the events are keyed by specific values to enable state based processing.
As a follow up with the testing process, checkpoints in Flink is an area that should be explored because, in the adverse situation that your Flink application hits a bump and restarts, the state is restored from the last successful checkpoint. It is possible to include the checkpointing mechanism as part of your tests and see how the overall application performs when something goes wrong. I’ll follow up soon on this front with an article shortly. Until then, give these tests a go and do reach out in case of any questions!
Posted on October 2, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.