Functional Kafka with Spring Cloud - Part 1
Anthony Ikeda
Posted on April 27, 2022
I've put this article together to create a working demo of Spring Cloud Kafka which I've been unable to find so far.
Prerequisites:
- Java 11+ (I'm using Java 18)
- Linux based OS (I'm on macOS)
- Httpie
Setup:
- Spring Cloud 2021.0.1
- Confluent Schema Registry 7.1.0
- Apache Kafka 2.13_3.1.0
- Apache ZooKeeper 3.7.0
This article will first start with setting up a web API publishing events to Kafka as a string with a functional kafka consumer using Spring Cloud Stream.
Set up the environment
Download Apache ZooKeeper from here:
https://zookeeper.apache.org/releases.html#download
Decompress it and move it to a working folder (we will use $ZOOKEEPER_HOME
)
Download Kafka from here:
https://kafka.apache.org/downloads
Again, decompress the archive and move it to a working folder, this time we will refer to the working folder as $KAFKA_HOME
Download and decompress the Schema Registry using the following:
$ wget https://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
$ tar -xf confluent-community-7.0.1.tar.gz
$ cd confluent-7.0.1
We will refer to this applications location at $CONFLUENT_HOME
Configure the environment
First let's set up ZooKeeper!
ZooKeeper Config
Create the config file by copying the example config:
ZOOKEEPER_HOME $> cp conf/zoo_sample.cfg conf/zoo.cfg
The only value you may want to edit is the dataDir directory to a location that you are comfortable storing the metadata to:
...
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=./data
Save the file and we will move on to configuring Kafka.
Kafka Config
The file we are interested here is:
$KAFKA_HOME/config/server.properties
There are 2 values we most likely will change:
log.dirs=./logs
zookeeper.connect=localhost:2181
This ensures our logs are in an accessible location as well as allows Kafka to connect to ZooKeeper to persist data.
Configure Schema Registry
Just one file needs to be checked here: $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
All the default should be fine:
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false
Starting ZooKeeper and Kafka
We won't be using the Schema Registry to begin with since we are just working with String
values in Kafka so let's get them started:
$ZOOKEEPER_HOME > bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
$KAFKA_HOME > bin/kafka-server-start.sh config/server.properties
...
[2022-04-25 11:19:17,742] INFO Kafka version: 3.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,743] INFO Kafka commitId: 37edeed0777bacb3 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,743] INFO Kafka startTimeMs: 1650910757737 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,745] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Okay, let's get programming!!!
Employee API
This will be a simple API that will havea single POST
endpoint to create a kafka message.
Just head to https://start.spring.io and create a basic web application:
Click download and expand the demo.zip
file to a workspace
To get started we are going to need some dependencies to publish to Kafka.
To your pom.xml
file add the following dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.4</version>
</dependency>
Next we will create the controller to serve our endpoint.
package com.example.demo;
import com.example.demo.model.Employee;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.net.URI;
import java.util.UUID;
@RequestMapping("/employee")
@RestController
public class EmployeeController {
@Autowired
protected KafkaTemplate<String, String> kafka;
public ResponseEntity<Void> createEmployee(@RequestParam("firstname") String firstname,
@RequestParam("lastname") String lastname) {
String id = UUID.randomUUID().toString();
kafka.send("employee", id, String.format("%s, %s", lastname, firstname));
return ResponseEntity.created(URI.create(String.format("/employee/%s", id))).build();
}
}
Let's quickly walk through the code...
First, we inject the KafkaTemplate<String, String> template
since we aren't doing anything fancy, this is going to just send the message key as a java.lang.String
and the body of the message will also be of type java.lang.String
, nothing fancy.
Our @PostMapping
is going to simply be the endpoint with 2 query parameters:
- firstname
- lastname
So the URL will have the format:
http://localhost:8050/employee?firstname=Paula&lastname=Abdul
And when we call the API we will use Httpie with the following syntax:
http POST http://localhost:8070/employee firstname==Paula lastname==Abdul
Next we are just generating a random UUID as the message id and send this along with a concatenated string of the lastname and firstname to the employee
topic:
String id = UUID.randomUUID().toString();
kafka.send("employee", id, String.format("%s, %s", lastname, firstname));
Before we start the application, lets tweak the configuration. In the src/main/resources/application.yml
file, let's make it the same as:
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers:
- localhost:9092
server:
port: 8070
Now we can run the application and hit the endpoint:
./mvnw spring-boot:run
...
: Tomcat initialized with port(s): 8070 (http)
: Starting service [Tomcat]
: Starting Servlet engine: [Apache Tomcat/9.0.54]
: Initializing Spring embedded WebApplicationContext
: Root WebApplicationContext: initialization completed in 754 ms
: Tomcat started on port(s): 8070 (http) with context path ''
: Started DemoApplication in 1.522 seconds (JVM running for 1.841)
Call the endpoint:
$ http POST http://localhost:8070/employee firstname==Andrew lastname==Marshall
HTTP/1.1 201
Connection: keep-alive
Content-Length: 0
Date: Wed, 27 Apr 2022 17:30:39 GMT
Keep-Alive: timeout=60
Location: /employee/830da346-38b9-4d5b-a051-a302c395333e
You can track the message in Kafka using the kafka-console-consumer.sh
command:
$ $KAFKA_HOME/bin/kafka-console-consumer --topic employee --bootstrap-server localhost:9092 --from-beginning
Marshall, Andrew
Next article we will setup a functional consumer using Spring Cloud Stream...
Posted on April 27, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.