Functional Kafka with Spring Cloud - Part 3
Anthony Ikeda
Posted on April 27, 2022
So, we have messages going to Kafka and messages coming off Kafka. Right now, though, those messages are rather unstructured.
There are a couple of approaches we can take:
- Serialize an object using JSON
- Serialize an object using Avro
Well, we've used JSON a lot in the past for our APIs and to be honest, it's boring hehe. So in this article we will take advantage of Apache Avro and create binary representations of the messages in Kafka!
Prerequisites:
- We will need our Employee API from the first article and the Consumer from the second article.
- Schema Registry which we downloaded and configured in the first article but didn't touch quite just yet!
First let's update the Employee API and set up Avro!
Add Avro to the Employee API
We will need a new dependency this time but first we need to reference a new maven repository. Add the following tag under the pom.xml
project tag:
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
This will make the Confluent Maven repo available from which we will source the Avro Serializers.
Next, let's add the the new dependency:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.1.1</version>
</dependency>
As mentioned, this dependency supplies us with the tools to convert our Java objects to and from Avro. To complete this tasks we need one more change to our pom.xml
:
<build>
<plugins>
...
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
The avro-maven-plugin
performs some nifty functions for us, primarily it will take an Avro schema and generate the classes we need to convert our classes to and from Avro.
As you can see, we've configured it to read the JSON schemas from the ${project.basedir}/src/main/resources/
directory and publish the generated classes to ${project.basedir}/src/main/java/
Now we can define our new Employee.java
class.
Create a new file src/main/resources/employee.avsc
:
{
"type" : "record",
"name" : "Employee",
"namespace": "com.example.demo.model",
"fields": [
{
"name" : "employeeId",
"type" : "string"
},
{
"name": "firstname",
"type": "string"
},
{
"name": "lastname",
"type": "string"
}]
}
Here we have defined the structure of data we want to transport. There are 3 areas worth noting:
"name" : "Employee"
This is the name of the record as an entity. It's how we will refer to the record and it will end up being the name of the class that is generated.
"namespace" : "com.example.demo.model"
The namespace is synonymous with a Java package, it helps to organize the Records and allow us to create other classes of the same name in different namespaces.
"fields" : [...]
Here we have the structure of the message, basically the record attributes.
When you build the application with:
$ ./mvnw clean verify
You will have a new class generated in the src/main/java
directory: com.example.demo.model.Employee
It's too large a class to publish here, however, it contains logic to identify the fields, field types and the encoders/decoders from and to the binary representation.
Now that we have the Avro entity created, we want to update the code to publish this to kafka.
Update the EmployeeController
First change we want is to update the KafkaTemplate<String, String>
to KafkaTemplate<String, Employee>
:
@RequestMapping("/employee")
@RestController
public class EmployeeController {
@Autowired
protected KafkaTemplate<String, Employee> kafka;
This ensures the message ID remains as a String value, but the Employee
object get's recognized as a serialized/deserialized object.
Next let's change how we send the message in our @PostMapping
:
@PostMapping
public ResponseEntity<Void> createEmployee(@RequestParam("firstname") String firstname,
@RequestParam("lastname") String lastname) {
String id = UUID.randomUUID().toString();
Employee employee = new Employee(id, firstname, lastname);
kafka.send("employee", id, employee);
return ResponseEntity.created(URI.create(String.format("/employee/%s", id))).build();
}
The only thing we've changed here is the setting up of the Employee
record and how we send it to the KafkaTemplate
.
There is one last change to make and that is to the src/main/resources/application.yml
file.
We need to tell the application to handle the Record Value differently, to treat is as an Avro record. So we need to change the value serializer/deserializer:
spring:
kafka:
properties:
schema.registry.url: http://localhost:8081
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bootstrap-servers:
- localhost:9092
server:
port: 8070
As mentioned before, we want to preserve the message identifier as a string, but take the value and serialize it using Avro.
You may also notice a new entry:
spring:
kafka:
properties:
schema.registry.url: http://localhost:8081
This is the Schema Registry we downloaded in the first article that is going to store the schema of our Employee record so other consumers can perform the same transformation in absence of the actual schema (employee.avsc
)
With that, let's start the Schema Registry:
$ $CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
As the schema registry starts up, it will create it's own topic in Kafka where it stores the schemas.
Now we can start publishing to Kafka using Avro!
Start the Employee and send a call to the POST endpoint.
We really shouldn't see any difference from the application standpoint, but if you inspect the topic employee
using the kafka-console-consumer
you should see the message in a very different format:
❯ bin/kafka-console-consumer.sh --topic employee --bootstrap-server localhost:9092 --from-beginning
H0e426803-0903-43dd-a63a-a66734c52b0e
AndrewMarshall
Not much to see really, but this is our Avro representation of the message.
If you try and consume this with the current application consumer we created it won't really be able to read the data properly. So let's fix that!
Update the Kafka Consumer
In our second application these are the changes we need to make:
- Create a representation class for the
Employee
record - Add the Avro serializers to the project
- Reconfigure the Serializers for the Record Value (not the message id)
- Reference the
Employee
attributes instead of printing a raw string to the log
Employee.java
We want to mirror the original Avro record by creating a simple class with the same namespace:
package com.example.demo.model;
public class Employee {
private String employeeId;
private String firstname;
private String lastname;
public Employee() {
}
public Employee(String employeeId, String firstname, String lastname) {
this.employeeId = employeeId;
this.firstname = firstname;
this.lastname = lastname;
}
public String getEmployeeId() {
return employeeId;
}
public void setEmployeeId(String employeeId) {
this.employeeId = employeeId;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getLastname() {
return lastname;
}
public void setLastname(String lastname) {
this.lastname = lastname;
}
}
Nothing crazy here, just a standard Plain Old Java Object.
Add Avro Serializers to the project
Much like we did with the API we want to add Avro to our project, the only difference is we won't be using the plugin.
In your pom.xml
add the following entries:
<dependencies>
...
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.1.1</version>
</dependency>
...
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
Reconfigure the Serializers for the Record Value
Next let's change the src/main/resources/application.yml
file:
spring:
cloud:
stream:
kafka:
binder:
producer-properties:
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
schema.registry.url: http://localhost:8081
consumer-properties:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
schema.registry.url: http://localhost:8081
value.deserializer.specific.avro.reader: true
Here we have replaced the value serializers/deserializers with the io.confluent.kafka.serializers
classes.
One other thing to note is we specified the Schema Registry to locate the schemas and we've also specified that when it comes to the record values, we want to manage them specifically with Avro:
spring:
cloud:
stream:
kafka:
binder:
consumer-properties:
value.deserializer.specific.avro.reader: true
Reference the Employee
attributes instead of printing a raw string to the log
Here we will update our consumer code to use the Employee class instead of a string:
@Bean
public Function<Flux<Employee>, Flux<Employee>> employeeProcessor() {
return employee -> employee.map(emp -> {
log.info("Employee is {}, {}, {}", emp.getEmployeeId(), emp.getFirstname(), emp.getLastname());
return emp;
}).log();
}
@Bean
public Consumer<Employee> employeeConsumer() {
return (value) -> log.info("Consumed {}", value.getEmployeeId());
}
All good, we should now be ready.
Before we start the application we need to do 2 things:
- Start the Schema Registry
- Clear the current topics
Start the Schema Registry
Let's start it up with:
$ $CONFLUENCE_HOME/bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
In kafka, for now, we will just delete the topics we were using to recreate them (make sure neither of the Spring apps are running):
$ $KAFKA_HOME/bin/kafka-topics.sh --delete --bootstrap-server=localhost:9092 --topic employees-processed
$ $KAFKA_HOME/bin/kafka-topics.sh --delete --bootstrap-server=localhost:9092 --topic employees
Start up the Employee API and the Consumer and once you POST to the Employee API you should see the Consumers reading the message and printing out the attributes!
Posted on April 27, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.