Spring Kafka Streams playground with Kotlin - V

thegroo

Marcos Maia

Posted on February 13, 2022

Spring Kafka Streams playground with Kotlin - V

This post is the final part of a series where we create a simple Kafka Streams Application with Kotlin using Spring boot and Spring Kafka.

Please check the first part of the tutorial to get started and get further context of what we're building.

If you want to start from here you can clone the source code for this project git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git and then checkout v8 git checkout v8 and follow from there continuing with this post.

In this post we're going to use the QuoteStream we created in the last post and use grouping by key to aggregate and count the total volume(count) per stock quote symbol and also materialize it with windowing so we can create an endpoint and query the local WindowStore to calculate the volume for each instrument in specific intervals. We will create an endpoint on our Controller to enable us to expose that information. Have fun.

Part V diagram

Grouping and counting

In this part of the series we will use Kafka Streams to keep track of the number of quotes for each key and then expand that to be able to check the number of quotes for a specific interval in time, in the finance lingo where a new Quote represent an executed trade this counting is also known as Trade Volume.

For this part we will use two new topics to store the total count of quotes per symbol and another one to store the total count per interval, for that let's start adding the following to the appTopics function on the KafkaConfiguration class so these topics are created for use next time we start the application:


 TopicBuilder
.name(COUNT_TOTAL_QUOTES_BY_SYMBOL_TOPIC)
.compact()
.build(),

TopicBuilder
.name(COUNT_WINDOW_QUOTES_BY_SYMBOL_TOPIC)
.build()

Enter fullscreen mode Exit fullscreen mode

And let's add some configuration we will need to the bottom of KafkaConfiguration class and also move the serdeCofnig from the LeverageStream to the KafkaConfiguration global section so we can re-use it in our QuoteStream next:

const val COUNT_TOTAL_QUOTES_BY_SYMBOL_TOPIC = "count-total-by-symbol-topic"
const val COUNT_WINDOW_QUOTES_BY_SYMBOL_TOPIC = "count-window-by-symbol-topic"
const val QUOTES_BY_WINDOW_TABLE = "quotes-by-window-table"
val serdeConfig: MutableMap<String, String> = Collections.singletonMap(
   AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL
)
Enter fullscreen mode Exit fullscreen mode

Now we declare a new Avro type which we will use to store the quotes per window we will be counting, create a new file called quote-per-window.avsc and add the following content to it:

{
  "namespace": "com.maia.springkafkastream.repository",
  "type": "record",
  "name": "QuotesPerWindow",
  "fields": [
    { "name": "symbol", "type": "string"},
    { "name": "startTime", "type": "long"},
    { "name": "endTime", "type":  "long"},
    { "name": "count", "type":  "long"}
  ]
}
Enter fullscreen mode Exit fullscreen mode

Build your project so the new java type is generated: mvn clean package -DskipTests

Grouping and counting

We start by doing a simple total count per symbol and send that to the newly created topic count-total-by-symbol-topic, so just after the segment where we did the branching in the last post add the following lines:

// group by key
val groupedBySymbol: KGroupedStream<String, ProcessedQuote> = resStream.groupByKey()

// count and send to compact topic so we can always quickly access the total count by symbol
val quotesCount: KTable<String, Long> = groupedBySymbol.count()
quotesCount.toStream().to(COUNT_TOTAL_QUOTES_BY_SYMBOL_TOPIC, Produced.with(Serdes.String(), Serdes.Long()))

Enter fullscreen mode Exit fullscreen mode

That's it, quite simple, this will keep track and count every single different quote by it's key and keep track of total count, to test you can run the app and consume messages from the count-total-by-symbol-topic and after you send some quotes using the API we created in a previous post you can see the updates on the topic:

Total count in topic

But to be honest, in this case having the total count doesn't seem to useful, it would be more useful if we could calculate the Trade Volume in specific interval, yeah cool, let's do that!

Create a KTable and materialize it so we can then use an endpoint and query the trade volume per symbol per interval, in the same QuoteStream.quoteKStream method add the following code:

// we could transform it materialize it to be able to query directly from kafka
val quotesWindowedKTable: KTable<Windowed<String>, Long> = groupedBySymbol
   .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(30)))
    .count(
        Materialized.`as`<String, Long, WindowStore<Bytes, ByteArray>>(QUOTES_BY_WINDOW_TABLE)
            .withValueSerde(Serdes.Long())
    )
Enter fullscreen mode Exit fullscreen mode

So this piece of code above is storing the count per 30 second interval for each key using a WindowStore which we can then query later to calculate volumes per interval.

Please make sure you understand the difference and consequence of using KTable vs GlobalKTables specially if you have multiple instances of your client application running.

To be able to query the created WindowStore we need to declare a ReadOnlyWindowStore and initialize it, declare it as an instance variable:

private lateinit var quotesPerWindowView: ReadOnlyWindowStore<String, Long>
Enter fullscreen mode Exit fullscreen mode

And then declare a listener to initialize it:

@Bean
fun afterStartQuote(sbfb: StreamsBuilderFactoryBean): StreamsBuilderFactoryBean.Listener {
    val listener: StreamsBuilderFactoryBean.Listener = object : StreamsBuilderFactoryBean.Listener {
        override fun streamsAdded(id: String, streams: KafkaStreams) {
            quotesPerWindowView = streams
                .store(
                    StoreQueryParameters
                        .fromNameAndType(QUOTES_BY_WINDOW_TABLE, QueryableStoreTypes.windowStore())
                )
        }
    }
    sbfb.addListener(listener)
    return listener
}
Enter fullscreen mode Exit fullscreen mode

Sweet, now that we have an initialized object for this WindowStore let's create a function that calculates the volume based on the quote symbol, start and end time specified:

fun quoteVolumePerInterval(key: String, start: Long, end: Long): QuotesPerWindow {
    val listOfCountedQuotes: WindowStoreIterator<Long> =
        quotesPerWindowView.fetch(key, Instant.ofEpochMilli(start), Instant.ofEpochMilli(end))

    var totalCountForInterval = 0L
    listOfCountedQuotes.forEach {
        totalCountForInterval += it.value
    }

    return QuotesPerWindow(key, start, end, totalCountForInterval)
}
Enter fullscreen mode Exit fullscreen mode

We can then create a new REST endpoint to expose the volume, to simplify we will create one where the callers specifies the minutes from now and we calculate the volume of the past specified volumes for that specific instrument. Add the following function to the QuotesController class:

@GetMapping("/quotes/count/{symbol}")
fun getQuotesPerWindow(
    @PathVariable symbol: String,
    @RequestParam("pastMinutes") pastMinutes: Int
): ResponseEntity<QuotesPerWindowDTO> {
    val end = Instant.now()
    val start = end.minusSeconds(pastMinutes.toLong() * 60)
    val quotesPerWindow: QuotesPerWindow = quoteStream.quoteVolumePerInterval(symbol, start.toEpochMilli(), end.toEpochMilli())
    val result = QuotesPerWindowDTO(symbol, quotesPerWindow.count, start, end)
    return ResponseEntity.ok(result)
}
Enter fullscreen mode Exit fullscreen mode

Build and run the application and let's play around with it a bit. Create some quotes as before send them to Kafka and with a new client call like the one before check the volume for those quotes in the specified interval:

GET http://localhost:8080/api/quotes/count/APPL?pastMinutes=5
Enter fullscreen mode Exit fullscreen mode

Total count volume endpoint call print

That's really nice, but let's now consider that you might want to calculate those totals and then be able to send them to Elasticsearch or MongoDB for further processing, the current KTable doesn't have much context as it's built in Kafka elements like the Windowed store, what you want to do in this case is to send this data to a new topic with the total count, the interval and the symbol so you can easily plugin a Kafka Connector to Synchronize those count aggregations to your external storage, let's do that, add the following code to the QuoteStream.quoteKStream function just after the KTable definition we did above:

// and then transform and send it to a new topic which we could then use a connector and send to elastic or mongodb for example...
quotesWindowedKTable
    .toStream()
    .map(processedQuoteToQuotesPerWindowMapper::apply)
    .to(COUNT_WINDOW_QUOTES_BY_SYMBOL_TOPIC, Produced.with(Serdes.String(), quotesPerWindowSerde))
Enter fullscreen mode Exit fullscreen mode

You will get an error at this point due to the missing definition of the processedQuoteToQuotesPerWindowMapper which is a Bifunction which receives as input the the value type of our KTable Windowed<String>, Long and we than transform it to the Avro type we created earlier so we can send this to a Topic with meaningful context, so let's add a property function to represent that transformation and fix the error, just after the quotesPerWindowView declaration at the class level add:

/**
 * From processed quote to quotes per window
 */
var processedQuoteToQuotesPerWindowMapper =
    BiFunction<Windowed<String>, Long, KeyValue<String, QuotesPerWindow>> { key, value ->
        val quotesPerWindow = QuotesPerWindow(key.key(), key.window().start(), key.window().end(), value ?: 0)
        KeyValue(key.key(), quotesPerWindow)
    }
Enter fullscreen mode Exit fullscreen mode

And we also need to initialize the Serde we used, so add a new @PostConstruct function to take care of that:

@PostConstruct
fun init() {
    quotesPerWindowSerde.configure(serdeConfig, false)
}
Enter fullscreen mode Exit fullscreen mode

As usual you can checkout the project to this point using git checkout v9 or just checkout latest main git checkout main, have fun.

That's it it's done, you can now play around with it, check the content of the new topic and you have a nice Kafka Streams playground to expand and practice.

Thank you so much to have reached this far, this is the end of a very long series of articles, took me a lot of effort to do it, I really appreciate feedback with issues and typos you will most likely find so I can improve it. Constructive suggestions are also Welcome!

Photo by Kevin Ku on Unsplash

💖 💪 🙅 🚩
thegroo
Marcos Maia

Posted on February 13, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related