Spring Boot application with Apache Kafka
LehaUchicha
Posted on November 15, 2021
In this article we will:
- Run Zookeeper and kafka locally
- Write Spring boot application Kafka-Server. It's order service, which will produce and consume messages using orders topic. Also has entrypoint for order creation which will send message to the kafka topic
All examples available on github: https://github.com/LehaUchicha/Kafka-Example.git
Configure Zookeeper and kafka
Download Zookeeper and Kafka
Before writing our application we need to run zookeeper and kafka locally.
For that go to official site and download the latest version
Zookeeper
https://zookeeper.apache.org/releases.html
Kafka
https://kafka.apache.org/downloads
ps: On the moment of article writing the latest version was 3.0.0, but
this version not worked to me, so I download previous version - 2.8.1, which worked fine
Run Zookeeper
- Go to the downloaded and extracted zookeeper folder. You will see such folder structure:
- bin - folder, which contains utilities for zookeeper management
- conf - folder, which contains configuration files for zookeeper. In this folder you can find zoo_sample.cfg - example of zookeeper configuration.
- logs - folder, where zookeeper can write logs
First lets go to the conf folder, make a copy of zoo_sample.cfg and rename it to zoo.cfg.
In the zoo.cfg you need to override such value to dataDir=../logs or to any other directory which you like.
From the config file you can find othe important settings, like clientPort=2181, which said for us that zookeper by default run on port 2181.
You can override properties as you wish, but lets left them as is.
That's it! Now we need to run zookeeper server. For that go to the bin folder and find zkServer.cmd or
zkServer.sh. If you are the Windows user - use zkServer.cmd, otherwise zkServer.sh.
zkServer.sh start
or
zkServer.cmd
or logs without errors, like here in case if you use zkServer.sh start:
Great! Now we need to run kafka.
Run Kafka
- Go to the downloaded and extracted apache kafka folder. You will see such folder structure:
- bin - folder, which contains utilities for kafka management
- config - folder, which contains configuration files for kafka. In this folder you can find server.properties which should be modified
- logs - folder, where kafka can write logs
Go to the config folder and override log.dirs= property in the file server.properties to the any folder where you want to store logs.
I will use logs folder in the kafka root, so in my case it looks like this: log.dirs=../logs
Now time to run apache kafka. For that go to the bin folder.
For NOT Windows user you need to find kafka-server-start and execute script with the path to the server.properties file as an argument
./kafka-server-start.sh ../config/server.properties
For Windows users inside bin folder you can find windows folder which contains the same commands but for windows)
Go to the windows folder and run the same script. Don't forget to specify path to server.properties file as argument
kafka-server-start.bat ../../config/server.properties
If you don't see any exceptions in console, it means, that kafka is working!
Now befor start to writing Spring Boot application we need to create orders topic which our applications will use.
For that we need to execute command:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders
or
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders
This command will create topic orders
Write Kafka Server Spring Boot application
You can download configured project from start.spring.io by link
or download ready project from github: https://github.com/LehaUchicha/Kafka-Example.git
Here in dependency you need to specify Spring for Apache Kafka for using kafka in our application.
pom.xml should contain:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
go to the KafkaServerApplication.java file and add @EnableKafka annotation
@EnableKafka
@SpringBootApplication
public class KafkaServerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaServerApplication.class, args);
}
}
go to the application.properties file and specify such properties
server.port=9955
kafka.server=localhost:9092
kafka.group.id=group1
kafka.producer.id=producer1
Now we need to create configuration for consumer and producer.
Consumer configuration
@Configuration
public class KafkaConsumerConfiguration {
@Value("${kafka.server}")
private String kafkaServer;
@Value("${kafka.group.id}")
private String kafkaGroupId;
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Long, OrderDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public KafkaListenerContainerFactory<?> singleFactory() {
ConcurrentKafkaListenerContainerFactory<Long, OrderDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Bean
public ConsumerFactory<Long, OrderDto> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
return new ConcurrentKafkaListenerContainerFactory<>();
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
}
Producer configuration:
@Configuration
public class KafkaProducerConfiguration {
@Value("${kafka.server}")
private String kafkaServer;
@Value("${kafka.producer.id}")
private String kafkaProducerId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
return props;
}
@Bean
public ProducerFactory<Long, OrderDto> producerOrderFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, OrderDto> kafkaTemplate() {
KafkaTemplate<Long, OrderDto> template = new KafkaTemplate<>(producerOrderFactory());
template.setMessageConverter(new StringJsonMessageConverter());
return template;
}
}
Note: needs to use org.springframework.kafka.support.serializer.JsonSerializer;, not from Jackson library
OrderService:
@Slf4j
@Service
@AllArgsConstructor
public class OrderServiceImpl implements OrderService {
private final KafkaTemplate<Long, OrderDto> kafkaOrderTemplate;
private final ObjectMapper objectMapper;
@Override
public void send(OrderDto dto) {
kafkaOrderTemplate.send("orders", dto);
}
@Override
@KafkaListener(id = "OrderId", topics = {"orders"}, containerFactory = "singleFactory")
public void consume(OrderDto dto) {
log.info("-> consumed {}", writeValueAsString(dto));
}
private String writeValueAsString(OrderDto dto) {
try {
return objectMapper.writeValueAsString(dto);
} catch (JsonProcessingException e) {
log.error("Error happens during json processing", e);
throw new RuntimeException("Error happens during json processing: " + dto.toString());
}
}
}
and OrderController:
@RestController
@AllArgsConstructor
public class OrderController {
private final OrderServiceImpl orderService;
@PostMapping("/order")
public void create() {
OrderDto order = OrderDto.builder()
.name("New Order #: " + System.nanoTime())
.description("Standard description")
.build();
orderService.send(order);
}
}
Now when you will run application - entrypoint will be available on the port 9955.
Make POST request using endpoint
localhost:9955/order
Message will be created and sended to the topic orders. After that our application will immediately read this
message through the method with annotation @KafkaListener:
@KafkaListener(id = "OrderId", topics = {"orders"}, containerFactory = "singleFactory")
public void consume(OrderDto dto) {
log.info("-> consumed {}", writeValueAsString(dto));
}
and in console you will see such result:
console output
That's it! The code available on github: https://github.com/LehaUchicha/Kafka-Example.git
As a bonus, which not explained in the article - you can find the second Spring boot application Kafka-Producer, which will push a message every 5 seconds after running to the orders topic.
And Kafka-Server application will read this message, which you can find in console. you need just run application.
Conclusion
In this article we successfully configured apache kafka and understand how to work with kafka wia Spring Boot.
Also figure out how services can communicate with each other wia apache kafka.
Thank you for reading. If you like an article, you can support me
Have a good day!
Posted on November 15, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.