Simplest Spring Kafka Producer and Consumer

thegroo

Marcos Maia

Posted on July 13, 2019

Simplest Spring Kafka Producer and Consumer

Let's now build and run the simples example of a Kafka Consumer and then a Kafka Producer using spring-kafka. If you need assistance with Kafka, spring boot or docker which are used in this article, or want to checkout the sample application from this post please check the References section below.

The first step is to create a simple Spring Boot maven Application and make sure to have spring-kafka dependency to pom.xml

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Create a Spring Kafka Consumer

Let's now write the simplest possible Kafka consumer with spring-kafka using spring-boot default configurations.

Create a class called SimpleConsumer and add a method with the @KakfaListener annotation.

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class SimpleConsumer {
  @KafkaListener(id = "simple-consumer", topics = "simple-message")
  public void consumeMessage(String message) {
    System.out.println("Got message: " + message);
  }
}
Enter fullscreen mode Exit fullscreen mode

This is it, this is all it takes because we are relying on spring-boot default configurations.

Start Kafka and Zookeeper

As we've seen in details on this other article, we're going to be using docker-compose to run our local Kafka for development, let's start our Kafka and Zookeeper containers:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Make sure the containers are running:

docker ps
Enter fullscreen mode Exit fullscreen mode

You should see Kafka and Zookeeper running:

console-kafka-zookeeper-running-docker

Run the app

Let's now compile and run the application, if you need more detailed instructions please check this post, run the following commands to build and run the application:

mvn clean package 

Enter fullscreen mode Exit fullscreen mode

and let's run it:

mvn spring-boot:run
Enter fullscreen mode Exit fullscreen mode

The application will start and you will see on the standard output the configurations for the consumer, the Kafka version being used and a message Started SpringKafkaApplication in x seconds.

app-started

Make sure to keep the application running, don't close the terminal window where it's running. Let's now produce a few messages with the Kafka console producer and see our consumer processing the messages and logging them out.

Produce message using the Kafka console producer

Open a new terminal and enter the Kafka running container so we can use the console producer:

docker exec -it kafka /bin/bash

Enter fullscreen mode Exit fullscreen mode

Once inside the container cd /opt/kafka/bin , the command line scripts for Kafka in this specific image we're using are located in this folder. If you're using different docker images those scripts might be in some other location.

Run the console producer which will enable you to send messages to Kafka:

./kafka-console-producer.sh --broker-list localhost:9092 --topic simple-message
Enter fullscreen mode Exit fullscreen mode

kafka-console-producer

The console will now block and you can write your message and hit enter, for each time you do this one message will be produced to the simple-topic. Try sending a few messages and watch the application standard output in the shell where you are running your Spring Boot application processing the messages and printing them.

console-producer-spring-kafka-consumer

Write a simple producer

Time to create our spring-kafka producer. Create a class called SimpleProducer, we will again use the defaults for the producer as we did for the consumer.

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class SimpleProducer {

  private KafkaTemplate<String, String> simpleProducer;

  public SimpleProducer(KafkaTemplate<String, String> simpleProducer) {
    this.simpleProducer = simpleProducer;
  }
  public void send(String message) {
    simpleProducer.send("simple-message", message);
  }
}

Enter fullscreen mode Exit fullscreen mode

Write an endpoint

Let's now create a simple endpoint which will receive a text message and publish it to Kafka, we're always returning 200 OK for now.

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api")
public class MessageApi {

  private final SimpleProducer simpleProducer;

  public MessageApi(SimpleProducer simpleProducer) {
    this.simpleProducer = simpleProducer;
  }

  @PostMapping("/message")
  public ResponseEntity<String> message(@RequestBody String message) {
    simpleProducer.send(message);
    return ResponseEntity.ok("Message received: " + message);
  }
}

Enter fullscreen mode Exit fullscreen mode

Build and run the application.

mvn clean package && mvn spring-boot:run
Enter fullscreen mode Exit fullscreen mode

There's a good chance you will get and error when running the application on your development machine now, this happens because your application is running inside your normal host network and Kafka and zookeeper are running inside the "docker network".

There are some ways to solve this, the best is to pass in your development machine hostname to docker-compose when starting the containers if you open the docker-compose file from this project it has an entry at the KAFKA_ADVERTISED_LISTENERS: ... LISTENER_DOCKER_EXTERNAL like ${DOCKER_HOST_IP:-kafka}:9092 this tells compose to try to use the passed in hostname or Kafka by default, check the comments in the compose file to find out how to fix it and check the references section below for more details.

Send some messages with curl

Now make sure to watch the application terminal and in another terminal window let's use curl to send some messages:

curl -X POST http://localhost:8080/api/message -d "yet more fun" -H "Content-Type: text/plain"
Enter fullscreen mode Exit fullscreen mode

You should see the response on the same terminal where curl was executed also check the consumer processing the message and printing it to the terminal where the application is running.

spring-kafka-curl

Done

This is it, it's done. You have now created the simplest possible Spring Boot application that produces and consume messages from Kafka. The reason it looks so simple is that we are relying on Spring Boot and spring-kafka default configurations.

If you want to know more about how Spring Boot or Kafka Works please take a look at the links in the next session where you'll find some references with further details.

We're going to cover testing your consumer and producers in another post. Happy Coding.

References

Source code with the application created in this post.

To set up your environment with java, maven, docker and docker-compose, please check how to set up your environment for the example tutorials.

If you need some quick introduction to Kafka: Kafka - Crash Course

For some insights on how to use docker-compose for local development, please check this post: One to run them all where you will also learn some useful Kafka commands.

If you're new to Spring Boot, please take a look at Spring Boot - Crash Course

Docker compose environment variables to understand the configuration for the Kafka Advertise Listener.

💖 💪 🙅 🚩
thegroo
Marcos Maia

Posted on July 13, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related