Spring Kafka Streams playground with Kotlin - V
Marcos Maia
Posted on February 13, 2022
This post is the final part of a series where we create a simple Kafka Streams Application with Kotlin using Spring boot and Spring Kafka.
Please check the first part of the tutorial to get started and get further context of what we're building.
If you want to start from here you can clone the source code for this project
git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git
and then checkout v8git checkout v8
and follow from there continuing with this post.
In this post we're going to use the QuoteStream we created in the last post and use grouping by key to aggregate and count the total volume(count) per stock quote symbol and also materialize it with windowing so we can create an endpoint and query the local WindowStore to calculate the volume for each instrument in specific intervals. We will create an endpoint on our Controller to enable us to expose that information. Have fun.
Grouping and counting
In this part of the series we will use Kafka Streams to keep track of the number of quotes for each key and then expand that to be able to check the number of quotes for a specific interval in time, in the finance lingo where a new Quote represent an executed trade this counting is also known as Trade Volume.
For this part we will use two new topics to store the total count of quotes per symbol and another one to store the total count per interval, for that let's start adding the following to the appTopics
function on the KafkaConfiguration
class so these topics are created for use next time we start the application:
TopicBuilder
.name(COUNT_TOTAL_QUOTES_BY_SYMBOL_TOPIC)
.compact()
.build(),
TopicBuilder
.name(COUNT_WINDOW_QUOTES_BY_SYMBOL_TOPIC)
.build()
And let's add some configuration we will need to the bottom of KafkaConfiguration
class and also move the serdeCofnig
from the LeverageStream to the KafkaConfiguration global section so we can re-use it in our QuoteStream
next:
const val COUNT_TOTAL_QUOTES_BY_SYMBOL_TOPIC = "count-total-by-symbol-topic"
const val COUNT_WINDOW_QUOTES_BY_SYMBOL_TOPIC = "count-window-by-symbol-topic"
const val QUOTES_BY_WINDOW_TABLE = "quotes-by-window-table"
val serdeConfig: MutableMap<String, String> = Collections.singletonMap(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL
)
Now we declare a new Avro type which we will use to store the quotes per window we will be counting, create a new file called quote-per-window.avsc
and add the following content to it:
{
"namespace": "com.maia.springkafkastream.repository",
"type": "record",
"name": "QuotesPerWindow",
"fields": [
{ "name": "symbol", "type": "string"},
{ "name": "startTime", "type": "long"},
{ "name": "endTime", "type": "long"},
{ "name": "count", "type": "long"}
]
}
Build your project so the new java type is generated: mvn clean package -DskipTests
Grouping and counting
We start by doing a simple total count per symbol and send that to the newly created topic count-total-by-symbol-topic
, so just after the segment where we did the branching in the last post add the following lines:
// group by key
val groupedBySymbol: KGroupedStream<String, ProcessedQuote> = resStream.groupByKey()
// count and send to compact topic so we can always quickly access the total count by symbol
val quotesCount: KTable<String, Long> = groupedBySymbol.count()
quotesCount.toStream().to(COUNT_TOTAL_QUOTES_BY_SYMBOL_TOPIC, Produced.with(Serdes.String(), Serdes.Long()))
That's it, quite simple, this will keep track and count every single different quote by it's key and keep track of total count, to test you can run the app and consume messages from the count-total-by-symbol-topic
and after you send some quotes using the API we created in a previous post you can see the updates on the topic:
But to be honest, in this case having the total count doesn't seem to useful, it would be more useful if we could calculate the Trade Volume in specific interval, yeah cool, let's do that!
Create a KTable and materialize it so we can then use an endpoint and query the trade volume per symbol per interval, in the same QuoteStream.quoteKStream
method add the following code:
// we could transform it materialize it to be able to query directly from kafka
val quotesWindowedKTable: KTable<Windowed<String>, Long> = groupedBySymbol
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(30)))
.count(
Materialized.`as`<String, Long, WindowStore<Bytes, ByteArray>>(QUOTES_BY_WINDOW_TABLE)
.withValueSerde(Serdes.Long())
)
So this piece of code above is storing the count per 30 second interval for each key using a WindowStore
which we can then query later to calculate volumes per interval.
Please make sure you understand the difference and consequence of using KTable vs GlobalKTables specially if you have multiple instances of your client application running.
To be able to query the created WindowStore
we need to declare a ReadOnlyWindowStore
and initialize it, declare it as an instance variable:
private lateinit var quotesPerWindowView: ReadOnlyWindowStore<String, Long>
And then declare a listener to initialize it:
@Bean
fun afterStartQuote(sbfb: StreamsBuilderFactoryBean): StreamsBuilderFactoryBean.Listener {
val listener: StreamsBuilderFactoryBean.Listener = object : StreamsBuilderFactoryBean.Listener {
override fun streamsAdded(id: String, streams: KafkaStreams) {
quotesPerWindowView = streams
.store(
StoreQueryParameters
.fromNameAndType(QUOTES_BY_WINDOW_TABLE, QueryableStoreTypes.windowStore())
)
}
}
sbfb.addListener(listener)
return listener
}
Sweet, now that we have an initialized object for this WindowStore
let's create a function that calculates the volume based on the quote symbol, start and end time specified:
fun quoteVolumePerInterval(key: String, start: Long, end: Long): QuotesPerWindow {
val listOfCountedQuotes: WindowStoreIterator<Long> =
quotesPerWindowView.fetch(key, Instant.ofEpochMilli(start), Instant.ofEpochMilli(end))
var totalCountForInterval = 0L
listOfCountedQuotes.forEach {
totalCountForInterval += it.value
}
return QuotesPerWindow(key, start, end, totalCountForInterval)
}
We can then create a new REST endpoint to expose the volume, to simplify we will create one where the callers specifies the minutes
from now and we calculate the volume of the past specified volumes for that specific instrument. Add the following function to the QuotesController
class:
@GetMapping("/quotes/count/{symbol}")
fun getQuotesPerWindow(
@PathVariable symbol: String,
@RequestParam("pastMinutes") pastMinutes: Int
): ResponseEntity<QuotesPerWindowDTO> {
val end = Instant.now()
val start = end.minusSeconds(pastMinutes.toLong() * 60)
val quotesPerWindow: QuotesPerWindow = quoteStream.quoteVolumePerInterval(symbol, start.toEpochMilli(), end.toEpochMilli())
val result = QuotesPerWindowDTO(symbol, quotesPerWindow.count, start, end)
return ResponseEntity.ok(result)
}
Build and run the application and let's play around with it a bit. Create some quotes as before send them to Kafka and with a new client call like the one before check the volume for those quotes in the specified interval:
GET http://localhost:8080/api/quotes/count/APPL?pastMinutes=5
That's really nice, but let's now consider that you might want to calculate those totals and then be able to send them to Elasticsearch or MongoDB for further processing, the current KTable doesn't have much context as it's built in Kafka elements like the Windowed store, what you want to do in this case is to send this data to a new topic with the total count, the interval and the symbol so you can easily plugin a Kafka Connector to Synchronize those count aggregations to your external storage, let's do that, add the following code to the QuoteStream.quoteKStream
function just after the KTable definition we did above:
// and then transform and send it to a new topic which we could then use a connector and send to elastic or mongodb for example...
quotesWindowedKTable
.toStream()
.map(processedQuoteToQuotesPerWindowMapper::apply)
.to(COUNT_WINDOW_QUOTES_BY_SYMBOL_TOPIC, Produced.with(Serdes.String(), quotesPerWindowSerde))
You will get an error at this point due to the missing definition of the processedQuoteToQuotesPerWindowMapper
which is a Bifunction
which receives as input the the value type of our KTable Windowed<String>, Long
and we than transform it to the Avro type we created earlier so we can send this to a Topic with meaningful context, so let's add a property function to represent that transformation and fix the error, just after the quotesPerWindowView
declaration at the class level add:
/**
* From processed quote to quotes per window
*/
var processedQuoteToQuotesPerWindowMapper =
BiFunction<Windowed<String>, Long, KeyValue<String, QuotesPerWindow>> { key, value ->
val quotesPerWindow = QuotesPerWindow(key.key(), key.window().start(), key.window().end(), value ?: 0)
KeyValue(key.key(), quotesPerWindow)
}
And we also need to initialize the Serde
we used, so add a new @PostConstruct
function to take care of that:
@PostConstruct
fun init() {
quotesPerWindowSerde.configure(serdeConfig, false)
}
As usual you can checkout the project to this point using git checkout v9
or just checkout latest main git checkout main
, have fun.
That's it it's done, you can now play around with it, check the content of the new topic and you have a nice Kafka Streams playground to expand and practice.
Thank you so much to have reached this far, this is the end of a very long series of articles, took me a lot of effort to do it, I really appreciate feedback with issues and typos you will most likely find so I can improve it. Constructive suggestions are also Welcome!
Posted on February 13, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.