Functional Kafka with Spring Cloud - Part 3

anthonyikeda

Anthony Ikeda

Posted on April 27, 2022

Functional Kafka with Spring Cloud - Part 3

So, we have messages going to Kafka and messages coming off Kafka. Right now, though, those messages are rather unstructured.

There are a couple of approaches we can take:

  • Serialize an object using JSON
  • Serialize an object using Avro

Well, we've used JSON a lot in the past for our APIs and to be honest, it's boring hehe. So in this article we will take advantage of Apache Avro and create binary representations of the messages in Kafka!

Prerequisites:

  • We will need our Employee API from the first article and the Consumer from the second article.
  • Schema Registry which we downloaded and configured in the first article but didn't touch quite just yet!

First let's update the Employee API and set up Avro!

Add Avro to the Employee API

We will need a new dependency this time but first we need to reference a new maven repository. Add the following tag under the pom.xml project tag:

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>
Enter fullscreen mode Exit fullscreen mode

This will make the Confluent Maven repo available from which we will source the Avro Serializers.

Next, let's add the the new dependency:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.1.1</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

As mentioned, this dependency supplies us with the tools to convert our Java objects to and from Avro. To complete this tasks we need one more change to our pom.xml:

<build>
    <plugins>
...
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.11.0</version>
            <executions>
                <execution>
                    <id>schemas</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                        <goal>protocol</goal>
                        <goal>idl-protocol</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Enter fullscreen mode Exit fullscreen mode

The avro-maven-plugin performs some nifty functions for us, primarily it will take an Avro schema and generate the classes we need to convert our classes to and from Avro.

As you can see, we've configured it to read the JSON schemas from the ${project.basedir}/src/main/resources/ directory and publish the generated classes to ${project.basedir}/src/main/java/

Now we can define our new Employee.java class.

Create a new file src/main/resources/employee.avsc:

{
  "type" : "record",
  "name" : "Employee",
  "namespace": "com.example.demo.model",
  "fields": [
    {
      "name" : "employeeId",
      "type" : "string"
    },
    {
      "name": "firstname",
      "type": "string"
    },
    {
      "name": "lastname",
      "type": "string"
    }]
}
Enter fullscreen mode Exit fullscreen mode

Here we have defined the structure of data we want to transport. There are 3 areas worth noting:

"name" : "Employee"

This is the name of the record as an entity. It's how we will refer to the record and it will end up being the name of the class that is generated.

"namespace" : "com.example.demo.model"

The namespace is synonymous with a Java package, it helps to organize the Records and allow us to create other classes of the same name in different namespaces.

"fields" : [...]

Here we have the structure of the message, basically the record attributes.

When you build the application with:

$ ./mvnw clean verify
Enter fullscreen mode Exit fullscreen mode

You will have a new class generated in the src/main/java directory: com.example.demo.model.Employee

It's too large a class to publish here, however, it contains logic to identify the fields, field types and the encoders/decoders from and to the binary representation.

Now that we have the Avro entity created, we want to update the code to publish this to kafka.

Update the EmployeeController

First change we want is to update the KafkaTemplate<String, String> to KafkaTemplate<String, Employee>:

@RequestMapping("/employee")
@RestController
public class EmployeeController {

    @Autowired
    protected KafkaTemplate<String, Employee>  kafka;
Enter fullscreen mode Exit fullscreen mode

This ensures the message ID remains as a String value, but the Employee object get's recognized as a serialized/deserialized object.

Next let's change how we send the message in our @PostMapping:

    @PostMapping
    public ResponseEntity<Void> createEmployee(@RequestParam("firstname") String firstname,
                                               @RequestParam("lastname") String lastname) {

        String id = UUID.randomUUID().toString();
        Employee employee = new Employee(id, firstname, lastname);
        kafka.send("employee", id, employee);

        return ResponseEntity.created(URI.create(String.format("/employee/%s", id))).build();
    }
Enter fullscreen mode Exit fullscreen mode

The only thing we've changed here is the setting up of the Employee record and how we send it to the KafkaTemplate.

There is one last change to make and that is to the src/main/resources/application.yml file.

We need to tell the application to handle the Record Value differently, to treat is as an Avro record. So we need to change the value serializer/deserializer:

spring:
  kafka:
    properties:
      schema.registry.url: http://localhost:8081
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    bootstrap-servers:
      - localhost:9092

server:
  port: 8070
Enter fullscreen mode Exit fullscreen mode

As mentioned before, we want to preserve the message identifier as a string, but take the value and serialize it using Avro.

You may also notice a new entry:

spring:
  kafka:
    properties:
      schema.registry.url: http://localhost:8081
Enter fullscreen mode Exit fullscreen mode

This is the Schema Registry we downloaded in the first article that is going to store the schema of our Employee record so other consumers can perform the same transformation in absence of the actual schema (employee.avsc)

With that, let's start the Schema Registry:

$ $CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
Enter fullscreen mode Exit fullscreen mode

As the schema registry starts up, it will create it's own topic in Kafka where it stores the schemas.

Now we can start publishing to Kafka using Avro!

Start the Employee and send a call to the POST endpoint.

We really shouldn't see any difference from the application standpoint, but if you inspect the topic employee using the kafka-console-consumer you should see the message in a very different format:

❯ bin/kafka-console-consumer.sh --topic employee --bootstrap-server localhost:9092 --from-beginning
H0e426803-0903-43dd-a63a-a66734c52b0e
                                     AndrewMarshall
Enter fullscreen mode Exit fullscreen mode

Not much to see really, but this is our Avro representation of the message.

If you try and consume this with the current application consumer we created it won't really be able to read the data properly. So let's fix that!

Update the Kafka Consumer

In our second application these are the changes we need to make:

  • Create a representation class for the Employee record
  • Add the Avro serializers to the project
  • Reconfigure the Serializers for the Record Value (not the message id)
  • Reference the Employee attributes instead of printing a raw string to the log

Employee.java

We want to mirror the original Avro record by creating a simple class with the same namespace:

package com.example.demo.model;

public class Employee {

    private String employeeId;

    private String firstname;

    private String lastname;

    public Employee() {
    }

    public Employee(String employeeId, String firstname, String lastname) {
        this.employeeId = employeeId;
        this.firstname = firstname;
        this.lastname = lastname;
    }

    public String getEmployeeId() {
        return employeeId;
    }

    public void setEmployeeId(String employeeId) {
        this.employeeId = employeeId;
    }

    public String getFirstname() {
        return firstname;
    }

    public void setFirstname(String firstname) {
        this.firstname = firstname;
    }

    public String getLastname() {
        return lastname;
    }

    public void setLastname(String lastname) {
        this.lastname = lastname;
    }
}
Enter fullscreen mode Exit fullscreen mode

Nothing crazy here, just a standard Plain Old Java Object.

Add Avro Serializers to the project

Much like we did with the API we want to add Avro to our project, the only difference is we won't be using the plugin.

In your pom.xml add the following entries:

<dependencies>
...
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.1.1</version>
    </dependency>
...
</dependencies>

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>
Enter fullscreen mode Exit fullscreen mode

Reconfigure the Serializers for the Record Value

Next let's change the src/main/resources/application.yml file:

spring:
  cloud:
    stream:
      kafka:
        binder:
          producer-properties:
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            schema.registry.url: http://localhost:8081
          consumer-properties:
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            schema.registry.url: http://localhost:8081
            value.deserializer.specific.avro.reader: true

Enter fullscreen mode Exit fullscreen mode

Here we have replaced the value serializers/deserializers with the io.confluent.kafka.serializers classes.

One other thing to note is we specified the Schema Registry to locate the schemas and we've also specified that when it comes to the record values, we want to manage them specifically with Avro:

spring:
  cloud:
    stream:
      kafka:
        binder:
          consumer-properties:
            value.deserializer.specific.avro.reader: true
Enter fullscreen mode Exit fullscreen mode

Reference the Employee attributes instead of printing a raw string to the log

Here we will update our consumer code to use the Employee class instead of a string:

@Bean
public Function<Flux<Employee>, Flux<Employee>> employeeProcessor() {
    return employee -> employee.map(emp -> {
       log.info("Employee is {}, {}, {}", emp.getEmployeeId(), emp.getFirstname(), emp.getLastname());
       return emp;
    }).log();
}

@Bean
public Consumer<Employee> employeeConsumer() {
    return (value) -> log.info("Consumed {}", value.getEmployeeId());
}
Enter fullscreen mode Exit fullscreen mode

All good, we should now be ready.

Before we start the application we need to do 2 things:

  1. Start the Schema Registry
  2. Clear the current topics

Start the Schema Registry

Let's start it up with:

$ $CONFLUENCE_HOME/bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
Enter fullscreen mode Exit fullscreen mode

In kafka, for now, we will just delete the topics we were using to recreate them (make sure neither of the Spring apps are running):

$ $KAFKA_HOME/bin/kafka-topics.sh --delete  --bootstrap-server=localhost:9092 --topic employees-processed
$ $KAFKA_HOME/bin/kafka-topics.sh --delete --bootstrap-server=localhost:9092 --topic employees
Enter fullscreen mode Exit fullscreen mode

Start up the Employee API and the Consumer and once you POST to the Employee API you should see the Consumers reading the message and printing out the attributes!

Avro output

💖 💪 🙅 🚩
anthonyikeda
Anthony Ikeda

Posted on April 27, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related