Functional Kafka with Spring Cloud - Part 2

anthonyikeda

Anthony Ikeda

Posted on April 27, 2022

Functional Kafka with Spring Cloud - Part 2

So, previously we setup a basic kafka environment and a Spring Boot Web API to push a basic string message to Kafka, let's spend the time in this article reading the value.

Continuing on from where we left off, we need to create another Spring Application using the https://start.spring.io website:

Spring Initializer

You can omit the Spring Web dependency this time around!

We aren't going to add anything fancy yet, just get the code base and get it ready to start a new application!

Set Up Spring Cloud

First, let's set up Spring Cloud dependencies. In your pom.xml let' make the following changes:

  1. Add a new property indicating the version for Spring Cloud:
<properties>
    <java.version>18</java.version>
    <spring-cloud-release.version>2021.0.1</spring-cloud-release.version>
</properties>
Enter fullscreen mode Exit fullscreen mode

Next we want to reference the Spring Cloud POM. Add the following XML block under the project tag:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud-release.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
Enter fullscreen mode Exit fullscreen mode

This will include any dependencies we need from Spring Cloud v2021.0.1

For our String based consumer we just need 2 core dependencies added and 1 test dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <scope>test</scope>
    <classifier>test-binder</classifier>
    <type>test-jar</type>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Consumers and Processors

Now we can add in the code that will basically be a data pipeline, taking a message from a topic and then doing something with it before passing it on to another topic.

Create a new class KafkaConfiguration.java with the following beans:

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;

import java.util.function.Consumer;
import java.util.function.Function;


@Configuration
public class KafkaConfiguration {

    private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);

    @Bean
    public Function<Flux<String>, Flux<String>> employeeProcessor() {
        return employee -> employee.map(emp -> {
           log.info("Employee is {}", emp);
           return emp;
        }).log();
    }

    @Bean
    public Consumer<String> employeeConsumer() {
        return (value) -> log.info("Consumed {}", value);
    }

}
Enter fullscreen mode Exit fullscreen mode

Let's walk the code!

We have a basic Logger in place to output values as they happen:

private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
Enter fullscreen mode Exit fullscreen mode

Next we have a bean that will act as an Input/Output of the message coming from the topic:

@Bean
public Function<Flux<String>, Flux<String>> employeeProcessor() {
    return employee -> employee.map(emp -> {
       log.info("Employee is {}", emp);
       return emp;
    }).log();
}
Enter fullscreen mode Exit fullscreen mode

Our method signature represents a (data in/data out) pipeline:

Function<Flux<String>, Flux<String>>
Enter fullscreen mode Exit fullscreen mode

Here we are basically saying that the message we read in will be a string (<Flux<String>,)and we will then send a message that is also a string (, Flux<String>>)

The body of our processor just reads the message and prints it to the log (a couple of times!):

return employee -> employee.map(emp -> {
       log.info("Employee is {}", emp);
       return emp;
    }).log();
Enter fullscreen mode Exit fullscreen mode

Typically we might call another method to do more with the incoming data creating perhaps a different output. For now we just send the message as-is on it's way.

The next bean is a consumer, for now it just logs out the String message:

@Bean
public Consumer<String> employeeConsumer() {
    return (value) -> log.info("Consumed {}", value);
}
Enter fullscreen mode Exit fullscreen mode

Before we can run this application you may notice something missing, the topics aren't yet defined.

When we use Spring Cloud Functions to create these messaging apps, we need to defer to the binding capabilities whereby we configure these beans and bind them to the underlying topics of the message bus.

Open the src/main/resources/application.yml file and let's do that now!

spring:
  cloud:
    function:
      definition: employeeConsumer;employeeProcessor
    stream:
      bindings:
        employeeProcessor-in-0:
          destination: employee
        employeeProcessor-out-0:
          destination: employees-processed
        employeeConsumer-in-0:
          destination: employees-processed

      kafka:
        binder:
          producer-properties:
            value.serializer: org.apache.kafka.common.serialization.StringSerializer
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Enter fullscreen mode Exit fullscreen mode

First in our config, we've created function definitions:

spring:
  cloud:
    function:
      definition: employeeConsumer;employeeProcessor
Enter fullscreen mode Exit fullscreen mode

As you can see these are simply the names of the beans we just defined. We will use these as reference points later on in the configuration.

Next we've defined the actual Bean-to-Topic bindings:

spring:
  cloud:
    stream:
      bindings:
        employeeProcessor-in-0:
          destination: employee
        employeeProcessor-out-0:
          destination: employees-processed
        employeeConsumer-in-0:
          destination: employees-processed
Enter fullscreen mode Exit fullscreen mode

Using the definitions (employeeProcessor, employeeConsumer) we have mapped the topics accordingly. So, the employeeProcessor has 2 mappings:

  • Message In: employee
  • Message Out: employees-processed

Meanwhile the employeeConsumer has 1 binding:

  • Message In: employees-processed

This means that anything that lands on the employee topic, gets picked up by the employeeProcesor. The employeeProcessor then prints out the message then sends the message to the employees-processed topic.

Any messages coming to the employees-processed queue then get picked up by the employeeConsumer which, again, just prints the message out to the logs.

Lastly in our configuration we are defining some basic properties for Kafka in general:

spring:
  cloud:
    stream:
      kafka:
        binder:
          producer-properties:
            value.serializer: org.apache.kafka.common.serialization.StringSerializer
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Enter fullscreen mode Exit fullscreen mode

Quite simply put, we treat messages coming off and going onto the topics as Strings!

Starting the application

You now be able to run the application with:

$ ./mvnw spring-boot:run
Enter fullscreen mode Exit fullscreen mode

Provided the original web application from the previous article is still running, we should be able to send a message from the API and see it being picked up by our consumer:

Employee is Marshall, Andrew
Consumed Marshall, Andrew
Enter fullscreen mode Exit fullscreen mode

In the next article we will introduce Avro as the serializer and show you how to send binary messages and the Schema Registry!

💖 💪 🙅 🚩
anthonyikeda
Anthony Ikeda

Posted on April 27, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related