Spring Boot application with Apache Kafka

lehauchicha

LehaUchicha

Posted on November 15, 2021

Spring Boot application with Apache Kafka

In this article we will:

  1. Run Zookeeper and kafka locally
  2. Write Spring boot application Kafka-Server. It's order service, which will produce and consume messages using orders topic. Also has entrypoint for order creation which will send message to the kafka topic

All examples available on github: https://github.com/LehaUchicha/Kafka-Example.git

Configure Zookeeper and kafka

Download Zookeeper and Kafka

Before writing our application we need to run zookeeper and kafka locally.
For that go to official site and download the latest version

Zookeeper

https://zookeeper.apache.org/releases.html
Enter fullscreen mode Exit fullscreen mode

Kafka

https://kafka.apache.org/downloads
Enter fullscreen mode Exit fullscreen mode

ps: On the moment of article writing the latest version was 3.0.0, but
this version not worked to me, so I download previous version - 2.8.1, which worked fine

Run Zookeeper

  1. Go to the downloaded and extracted zookeeper folder. You will see such folder structure: Zookeeper folder structure
  • bin - folder, which contains utilities for zookeeper management
  • conf - folder, which contains configuration files for zookeeper. In this folder you can find zoo_sample.cfg - example of zookeeper configuration.
  • logs - folder, where zookeeper can write logs

First lets go to the conf folder, make a copy of zoo_sample.cfg and rename it to zoo.cfg.
In the zoo.cfg you need to override such value to dataDir=../logs or to any other directory which you like.
From the config file you can find othe important settings, like clientPort=2181, which said for us that zookeper by default run on port 2181.
You can override properties as you wish, but lets left them as is.

That's it! Now we need to run zookeeper server. For that go to the bin folder and find zkServer.cmd or
zkServer.sh. If you are the Windows user - use zkServer.cmd, otherwise zkServer.sh.

zkServer.sh start
Enter fullscreen mode Exit fullscreen mode

or

zkServer.cmd
Enter fullscreen mode Exit fullscreen mode

Result will be:
Image description

or logs without errors, like here in case if you use zkServer.sh start:
Image description

Great! Now we need to run kafka.

Run Kafka

  1. Go to the downloaded and extracted apache kafka folder. You will see such folder structure: kafka folder Structure
  • bin - folder, which contains utilities for kafka management
  • config - folder, which contains configuration files for kafka. In this folder you can find server.properties which should be modified
  • logs - folder, where kafka can write logs

Go to the config folder and override log.dirs= property in the file server.properties to the any folder where you want to store logs.
I will use logs folder in the kafka root, so in my case it looks like this: log.dirs=../logs

Now time to run apache kafka. For that go to the bin folder.
For NOT Windows user you need to find kafka-server-start and execute script with the path to the server.properties file as an argument

./kafka-server-start.sh ../config/server.properties
Enter fullscreen mode Exit fullscreen mode

For Windows users inside bin folder you can find windows folder which contains the same commands but for windows)
Go to the windows folder and run the same script. Don't forget to specify path to server.properties file as argument

kafka-server-start.bat ../../config/server.properties
Enter fullscreen mode Exit fullscreen mode

If you don't see any exceptions in console, it means, that kafka is working!
Now befor start to writing Spring Boot application we need to create orders topic which our applications will use.
For that we need to execute command:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders
Enter fullscreen mode Exit fullscreen mode

or

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders
Enter fullscreen mode Exit fullscreen mode

This command will create topic orders

Write Kafka Server Spring Boot application

You can download configured project from start.spring.io by link

or download ready project from github: https://github.com/LehaUchicha/Kafka-Example.git

Here in dependency you need to specify Spring for Apache Kafka for using kafka in our application.

pom.xml should contain:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

go to the KafkaServerApplication.java file and add @EnableKafka annotation

@EnableKafka
@SpringBootApplication
public class KafkaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaServerApplication.class, args);
    }

}
Enter fullscreen mode Exit fullscreen mode

go to the application.properties file and specify such properties

server.port=9955

kafka.server=localhost:9092
kafka.group.id=group1
kafka.producer.id=producer1
Enter fullscreen mode Exit fullscreen mode

Now we need to create configuration for consumer and producer.

Consumer configuration

@Configuration
public class KafkaConsumerConfiguration {

      @Value("${kafka.server}")
      private String kafkaServer;

      @Value("${kafka.group.id}")
      private String kafkaGroupId;

      @Bean
      public KafkaListenerContainerFactory<?> batchFactory() {
            ConcurrentKafkaListenerContainerFactory<Long, OrderDto> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setBatchListener(true);
            factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
            return factory;
      }

      @Bean
      public KafkaListenerContainerFactory<?> singleFactory() {
            ConcurrentKafkaListenerContainerFactory<Long, OrderDto> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setBatchListener(false);
            factory.setMessageConverter(new StringJsonMessageConverter());
            return factory;
      }

      @Bean
      public ConsumerFactory<Long, OrderDto> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }

      @Bean
      public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            return new ConcurrentKafkaListenerContainerFactory<>();
      }

      @Bean
      public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            return props;
      }

      @Bean
      public StringJsonMessageConverter converter() {
            return new StringJsonMessageConverter();
      }
}
Enter fullscreen mode Exit fullscreen mode

Producer configuration:

@Configuration
public class KafkaProducerConfiguration {

      @Value("${kafka.server}")
      private String kafkaServer;

      @Value("${kafka.producer.id}")
      private String kafkaProducerId;

      @Bean
      public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
            return props;
      }

      @Bean
      public ProducerFactory<Long, OrderDto> producerOrderFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
      }

      @Bean
      public KafkaTemplate<Long, OrderDto> kafkaTemplate() {
            KafkaTemplate<Long, OrderDto> template = new KafkaTemplate<>(producerOrderFactory());
            template.setMessageConverter(new StringJsonMessageConverter());
            return template;
      }
}
Enter fullscreen mode Exit fullscreen mode

Note: needs to use org.springframework.kafka.support.serializer.JsonSerializer;, not from Jackson library

OrderService:

@Slf4j
@Service
@AllArgsConstructor
public class OrderServiceImpl implements OrderService {

    private final KafkaTemplate<Long, OrderDto> kafkaOrderTemplate;

    private final ObjectMapper objectMapper;

    @Override
    public void send(OrderDto dto) {
        kafkaOrderTemplate.send("orders", dto);
    }

    @Override
    @KafkaListener(id = "OrderId", topics = {"orders"}, containerFactory = "singleFactory")
    public void consume(OrderDto dto) {
        log.info("-> consumed {}", writeValueAsString(dto));
    }

    private String writeValueAsString(OrderDto dto) {
        try {
            return objectMapper.writeValueAsString(dto);
        } catch (JsonProcessingException e) {
            log.error("Error happens during json processing", e);
            throw new RuntimeException("Error happens during json processing: " + dto.toString());
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

and OrderController:

@RestController
@AllArgsConstructor
public class OrderController {

    private final OrderServiceImpl orderService;

    @PostMapping("/order")
    public void create() {
        OrderDto order = OrderDto.builder()
                .name("New Order #: " + System.nanoTime())
                .description("Standard description")
                .build();
        orderService.send(order);
    }
}
Enter fullscreen mode Exit fullscreen mode

Now when you will run application - entrypoint will be available on the port 9955.
Make POST request using endpoint

localhost:9955/order
Enter fullscreen mode Exit fullscreen mode

Message will be created and sended to the topic orders. After that our application will immediately read this
message through the method with annotation @KafkaListener:

    @KafkaListener(id = "OrderId", topics = {"orders"}, containerFactory = "singleFactory")
    public void consume(OrderDto dto) {
        log.info("-> consumed {}", writeValueAsString(dto));
    }
Enter fullscreen mode Exit fullscreen mode

and in console you will see such result:
console output

That's it! The code available on github: https://github.com/LehaUchicha/Kafka-Example.git

As a bonus, which not explained in the article - you can find the second Spring boot application Kafka-Producer, which will push a message every 5 seconds after running to the orders topic.
And Kafka-Server application will read this message, which you can find in console. you need just run application.

Conclusion

In this article we successfully configured apache kafka and understand how to work with kafka wia Spring Boot.
Also figure out how services can communicate with each other wia apache kafka.

Thank you for reading. If you like an article, you can support me

by buying me a coffee

Have a good day!

💖 💪 🙅 🚩
lehauchicha
LehaUchicha

Posted on November 15, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Spring Boot application with Apache Kafka
springboot Spring Boot application with Apache Kafka

November 15, 2021