Spring Kafka Streams playground with Kotlin - II
Marcos Maia
Posted on February 10, 2022
Context
This post is part of a series where we create a simple Application with Kotlin using Spring boot and Spring Kafka.
Please check the first part of the tutorial to get started and get further context of what we're building.
If you want to start from here you can clone the source code for this project
git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git
and then checkout v4git checkout v4
and follow from there continuing with this post.
In this second part we will build the producers LeverageProducer
and QuotesProducer
and our Controller QuoteController
and also use the Kafka client Admin to create the topics for us.
We will then be able to send some messages using the REST endpoinst we created and check the messages in Kafka using the command line client or Conduktor. Let's go for it.
Setting up Kafka Admin to create topics
In this section we will create the spring-kafka setup so Kafka Admin Client create the topics for us on the broker on Application startup.
For this sample application I will keep the package structure very simple and we will end up with two subpackages, api
where our rest endpoints will be created and repository
where we will put all kafka configuration and kafka client code. In real projects I usually have other layers and I tend to use business related packaging paths to segregate subdomain, but this is to be discussed in another post.
- Create a package called
repository
and a new Kotlin classKafkaConfiguraton
in this new package.
Add the Spring boot @Configuration
and Spring Kafka @EnableKafka
annotations to your class, create constants with the topic names and a Spring @Bean
with spring-kafka Admin code to create the two initial topics for leverage and quotes. See code fragment below, notice that we will create the leverage topic as a Kafka compact topic. We will later build a Global KTable to read this data, we will also use the default settings for this simple example.
KafkaConfiguration
Kotlin class:
@Configuration
@EnableKafka
class KafkaConfiguration {
// TOPICS
val quotesTopic = "stock-quotes-topic"
val leveragePriceTopic = "leverage-prices-topic"
@Bean
fun appTopics(): NewTopics? {
return NewTopics(
TopicBuilder.name(quotesTopic).build(),
TopicBuilder.name(leveragePriceTopic)
.compact().build(),
)
}
}
Add the following property to the application.yaml under src/main/resources. If your file is named application.properties you can just change the name / extension to yaml instead.
spring:
kafka:
bootstrap-servers: localhost:9092
You may git checkout v5
to check out the code up to this point.
Start local Kafka on compose or Kubernetes and then build and run this application from your IDE or from command line:
mvn clean package -DskipTests
run it:
mvn spring-boot:run
Now when the application starts you should see the Kafka Client Admin configuration info on the logs:
Ok, once the application starts you can now check on the kafka broker that the two topics are now available due to the Admin client code we just created.
Enter the running container: kubectl exec -it kafka-0 -- bash
and then list topics kafka-topics --list --bootstrap-server kafka:29092
, you should see the topics:
Use exit
to go back to the host command line.
Create Kafka producers
In this next section we will create the Kafka producers and API endpoints to send messages to the topics we just created.
Kafka Producers are thread safe and we could use a single producer passing in different types of Schemas and topics but in this tutorial to avoid abstractions we will make it explicit creating two producers.
First lets add the generic producer configuration to spring-kafka producers to the application.yaml
file, under spring.kafka
add:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081
The configurations tell our producer clients where to find Kafka and the Schema registry and the types of the key/value pairs we will be sending to our topics.
Creating the producers:
In the producers we will need to reference the topics to send messages, it would be better to have a central point from where to get them. Just now we added the topic names to the KafkaConfiguration
class but they are normal Kotlin val
attributes, let's make a change and add them as constants instead so we can easily access them, change the KafkaConfiguration
class to be like this so we can have access to those constants from our producers:
@Configuration
@EnableKafka
class KafkaConfiguration {
@Bean
fun appTopics(): NewTopics {
return NewTopics(
TopicBuilder.name(STOCK_QUOTES_TOPIC).build(),
TopicBuilder.name(LEVERAGE_PRICES_TOPIC).compact().build(),
)
}
}
// constants for topics
const val STOCK_QUOTES_TOPIC = "stock-quotes-topic"
const val LEVERAGE_PRICES_TOPIC = "leverage-prices-topic"
Notice that above we moved the constants outside of the class definition.
Under the repository
package create two new Kotlin classes called LeveragePriceProducer
and QuotePriceProducer
and add the following content to them:
LeveragePriceProducer
class:
@Repository
class LeveragePriceProducer(val leveragePriceProducer: KafkaTemplate<String, LeveragePrice> ) {
fun send(message: LeveragePrice) {
leveragePriceProducer.send(LEVERAGE_PRICES_TOPIC, message.symbol.toString(), message)
}
}
StockQuoteProducer
class:
@Repository
class StockQuoteProducer(val quoteProducer: KafkaTemplate<String, StockQuote>) {
fun send(message: StockQuote) {
quoteProducer.send(STOCK_QUOTES_TOPIC, message.symbol.toString(), message)
}
}
This is it for the producers, pretty simple! This is possible because when using Spring boot and Spring Kafka optimized defaults are applied for any non specified configurations but if you need you can easily override them in configurations or using Spring @Bean
definitions.
Let's now build two REST endpoints so we can send messages to those topics using the producers we just created. To pass in the messages we will also create two wrapper DTOs which will be used to pass in data to our REST endpoints.
Create a package called api
and a Kotlin class called QuotesController
and using constructor depencency injection pass in a reference to our producers, this will be the initial content:
@Controller
@RequestMapping("api")
class QuotesController(val stockQuoteProducer: StockQuoteProducer, val leveragePriceProducer: LeveragePriceProducer) {}
Ok, now let's create a new package under our just create api
called dto
where we will put our simple DTOs called LeveragePriceDTO
and StockQuoteDTO
with the following content:
LeveragePriceDTO
class:
class LeveragePriceDTO(val symbol: String, val leverage: BigDecimal)
StockQuoteDTO
class:
class StockQuoteDTO(val symbol: String, val tradeValue: BigDecimal, @JsonFormat(shape = JsonFormat.Shape.STRING, timezone = "UTC") val isoDateTime: Instant)
This is the package structure after creating these three new classes:
Let's now create two endpoint so we can process and send messages to Kafka, create a new function newQuote
inside the QuotesController
class:
@PostMapping("/quotes")
fun newQuote(@RequestBody stockQuoteDTO: StockQuoteDTO): ResponseEntity<StockQuoteDTO> {
val stockQuote = StockQuote(stockQuoteDTO.symbol, stockQuoteDTO.tradeValue.toDouble(), stockQuoteDTO.isoDateTime.toEpochMilli())
stockQuoteProducer.send(stockQuote) // fire and forget
return ResponseEntity.ok(stockQuoteDTO)
}
And for Leverage create a new function newLeveragePrice
with the following content:
@PostMapping("/leverage")
fun newLeveragePrice(@RequestBody leveragePriceDTO: LeveragePriceDTO): ResponseEntity<LeveragePriceDTO> {
val leveragePrice = LeveragePrice(leveragePriceDTO.symbol, leveragePriceDTO.leverage.toDouble())
leveragePriceProducer.send(leveragePrice) // fire and forget
return ResponseEntity.ok(leveragePriceDTO)
}
Now we need to send some messages to these REST endpoints which will send them to our Kafka topics, in order to do that create a new folder called test-data
on the root of the project folder and add a few JSON files with data for leverage and quotes as the examples below:
leveragePrice1.json
file:
{
"symbol": "APPL",
"leveragePrice": 144.54
}
stockQuote1.json
file:
{
"symbol": "APPL",
"tradeValue": 123.45,
"isoDateTime": "2021-11-04T08:22:35.000000Z"
}
Note that we will be receiving an ISO date so make sure to follow the pattern when creating new files.
The structure after creating a few sample data files should be like this:
Now let's create a file called test-data.http
on the root of our project where we will have some http calls to invoke our newly created endpoints:
test-data.http
file:
POST http://localhost:8080/api/quotes
Content-Type: application/json
< ./test-data/stockQuote1.json
###
POST http://localhost:8080/api/leverage
Content-Type: application/json
< ./test-data/leveragePrice1.json
You can use postman or convert those commands to
curl
if you prefer. Or if you're using IntelliJ thehttp
files should be automatically recognizable and you can run them directly from the IDE. If you're using VSCode you can install a REST extension which recognizes thehttp
file and you can also run those commands directly from the IDE.
IntelliJ - You can run the calls clicking the green arrow as seen below :
VSCode - You can run the calls clicking the Send Request
text on the http file as seen below:
Send some messages to the endpoint and after that let's check the messages on the Broker, I will show how to do that using the Kubernetes setup which is in the project. If you're using docker compose please check the article references on beginning of part I of this series where it's explained in details how to do that using compose.
Checking the messages
First we need to enter the running schema-registry container, so:
- List the running pods:
kubectl get pods
you should find the schema-registry pod and copy it's name:
- Enter the schema registry container(use the pod name from the command on step 1):
kubectl exec -it schema-registry-54bd4d4b4f-f5wm5 -- bash
- Run an avro console consumer to consume the messages from the topics, example below is showing the
stock-quotes-topic
, replace the topic name to check other topics.
kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic stock-quotes-topic --from-beginning --property "schema.registry.url=http://schema-registry:30081"
After client initialization you should see the messages, I'ves sent some multiple times to illustrate:
A good alternative to using command line and having to enter the running container is to use some Kafka client tooling or kcat. As tooling I really come to enjoy lately using Conduktor which is free for personal use, has a lot of features and is very intuitive to use.
Same messages visualized using Conduktor:
That's it for this post, congratulations if you made it this far, it means you're successfully sending messsages to both topics and we have prepared the ground so we can have some fun using Kafka Streams starting in the next post. Stay tuned.
As usual you can checkout the code up to this stage from the project repo using: git checkout v6
Photo by Adi Goldstein on Unsplash
Posted on February 10, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.