Calvin Nguyen
Posted on August 22, 2020
Image source from Cloudkarafka
In this tutorial/lab, you will experience a very simple and quick hands-on experience with:
- Java
- Springboot
- Apache Kafka
- Zookeeper
Requirements
- Homebrew
- Testing with Postman. To learn more, you can watch this video.
- Understanding the core concept of Kafka
Installment
- Install Java 8
$ brew tap adoptopenjdk/openjdk
$ brew cask install adoptopenjdk8
- Install Kafka
$ brew install kafka
- Open the 1st terminal to start Zookeeper
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
- Open the 2nd terminal to start Kafka
$ kafka-server-start /usr/local/etc/kafka/server.properties
- Open the 3rd terminal to create a Kafka topic
$ kafka-topics --bootstrap-server localhost:9092 --topic <enter-a-topic> --create --partitions 1 --replication-factor 1
Create a Maven project with Springboot
- Open start.spring.io
- Choose Spring Web & Spring for Apache Kafka dependencies
- Open your project in IDE (I prefer to use IntelliJ)
Configure Kafka Producer and Consumer
- Add this to
application.properties
insrc/main/resources
folder, and modify the highlights:
spring.datasource.url=jdbc:mysql://localhost:3306/<your-db-name>?allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC
spring.datasource.username=<your-db-username>
spring.datasource.password=<your-db-password>
app.topic=<same-topic-created-in-terminal>
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.groupId=<any-group-id>
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=<any-group-id>
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- Open a new terminal, and create a topic:
kafka-topics --bootstrap-server localhost:9092 --topic <same-topic-in-application.properties> --create --partitions 1 --replication-factor 1
Initialize Kafka Producer & Receiver Service
- Inside the package
com.example
, create these folders:$ mkdir controller model repository service
- Create Producer and Receiver Services:
$ touch service/KafkaConsumer.java service/KafkaProducer.java
- Add this to KafkaConsumer:
package <enter-your-package-here>;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class KafkaConsumer {
public static List<String> messages = new ArrayList<>();
private final static String topic = "<same-topic-in-resources>";
private final static String groupId = "<same-group-id>";
@KafkaListener(topics = topic, groupId = groupId)
public void listen(String message) {
messages.add(message);
}
}
- Add this code to KafkaProducer:
package <enter-your-package-here>;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic}")
private String topic;
public void produce(String message) {
kafkaTemplate.send(topic, message);
}
}
Create Kafka API
- Create a Kafka controller file:
$ touch controller/kafkaController.java
and add this:
package server.controller;
import org.springframework.web.bind.annotation.*;
import server.service.KafkaConsumer;
import server.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
@RestController
public class KafkaController {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@PostMapping("/send")
public void send(@RequestBody String data) {
producer.produce(data);
}
@GetMapping("/receive")
public List<String> receive() {
return consumer.messages;
}
public KafkaConsumer getConsumer() {
return consumer;
}
public KafkaProducer getProducer() {
return producer;
}
public void setConsumer(KafkaConsumer consumer) {
this.consumer = consumer;
}
public void setProducer(KafkaProducer producer) {
this.producer = producer;
}
}
NOTE: There are also models and repository, which you use to configure with your database, but we don’t need it in this tutorial.
Final Project Structure
Testing
There are 2 ways to send the message from the producer: Postman & Terminal.
Open the 3rd terminal that you used to create the topic earlier
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic <enter-your-topic> --from-beginning
1. Postman
Use POST/ method and enter localhost:8080/send with the JSON body:
{
"message": "Hello from Postman"
}
2. Terminal
Open the 4th terminal, and type:
$ kafka-console-producer --broker-list localhost:9092 --topic <enter-your-topic>
> Hello from the terminal
Check the terminal, you will see the message that you sent via postman and terminal in real-time.
Done & Congratulations!
💖 💪 🙅 🚩
Calvin Nguyen
Posted on August 22, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.