Using Kafka in 2022
aseem wangoo
Posted on March 10, 2022
We will cover briefly:
- Intro to Kafka
- Setup Kafka on machine
- Create Kafka Producer
- Create Kafka Consumer
- Running Producer and Consumer
Intro to Kafka
As per the docs, Kafka is an event streaming platform that can be used for collecting, processing, storing, and analyzing data at scale. Kafka is used by thousands of companies including over 60% of the Fortune 100.
Kafka is known for its excellent performance, low latency, fault tolerance, and high throughput, it’s capable of handling thousands of messages per second.
Usecases:
- Process payments and financial transactions in real-time,
- Continuously capture and analyze sensor data from IoT devices,
- Collect and immediately react to customer interactions and orders,
- Track and monitor cars, trucks, fleets, and shipments in real-time
Setup Kafka on machine
This section will help to set up Kafka on your machine.
Note: We are using a mac.
- To install
Kafka
follow here. Navigate to the latest binary downloads, as of today, it’s Scala 2.13, and click on the link. It will open another webpage and click the link as below:
- Extract the folder and navigate to the folder (Downloads/kafka_2.12–3.1.0) in our case.
Note: Follow the below terminal commands at the path where you extract the Kafka zip.
sh bin/zookeeper-server-start.sh config/zookeeper.properties
Start Zookeeper
The above command starts Zookeeper
. It is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages.
Note: Starting with v2.8, Kafka can be run without
Zookeeper
. However, this update isn’t ready for use in production
Keep the above terminal open
Start Kafka broker service
- Open another terminal at the above path(Downloads/kafka_2.12–3.1.0) and type
sh bin/kafka-server-start.sh config/server.properties
This starts the Kafka Broker
. A Kafka cluster is made up of multiple Kafka Brokers. Each Kafka Broker has a unique ID (number). Kafka Brokers contain topic log partitions. Connecting to one broker bootstraps a client to the entire Kafka cluster. A Kafka cluster can have, 10, 100, or 1,000 brokers in a cluster if needed.
Keep this terminal open
Create a topic
- Open a new terminal at the above path and type
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
Kafka produces events across different machines, these events are organized and stored inside topics
In the above command, we are saying our
- server is
localhost:9092
and - topic created is
test-topic
Verify the topic created
- Open a new terminal at the above path and type
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
This should show you the test-topic
Create Kafka Producer
Note: Kafka Broker should be running in the terminal
Install dependencies
We will be using Eclipse IDE for creating our Kafka Producer
- Create a Maven Project using the eclipse.
- Go to the pom.xml, put in the following, and update your maven project
<dependencies>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<!-- Simple logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.0-alpha6</version>
</dependency>
</dependencies>
This will add Kafka Client
and slf4j
to our project.
Create SampleProducer.java
- Create a class called
SampleProducer.java
public class SampleProducer {
public SampleProducer() throws InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("kafka.topic", "test-topic");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
int i=0;
try {
while (true) {
i++;
ProducerRecord producerRecord = new ProducerRecord("test-topic","key","message-" +i);
Thread.sleep(3000);
kafkaProducer.send(producerRecord);
}
} finally {
kafkaProducer.close();
}
}
}
We initialize the Properties
with the following keys and values
bootstrap.servers
: your local server host and port
key.serializer
: for sending the serialized key over the network.
value.serializer
: for sending the serialized value over the network.
kafka.topic
: your Kafka topic
- Next, we create an instance of
KafkaProducer
with the properties (as specified above) - We need to create a
ProducerRecord
for sending the data to the Kafka producer.
This ProducerRecord takes in the topic name, key, and the value to be sent.
- We send the record using kafkaProducer.
- This block of code is placed inside the while loop, and after every 3 seconds, it sends a record to the Kafka broker.
Create KafkaProducerRunner.java
- This will be class that invokes the above
SampleProducer
public class KafkaProducerRunner {
public static void main(String[] args) {
SampleProducer sampleProducer = new SampleProducer();
}
}
Create Kafka Consumer
Note: Kafka Broker should be running in the terminal
- Create a class called
SampleConsumer.java
public class SampleConsumer {
public SampleConsumer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("kafka.topic", "test-topic");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "my-group");
KafkaConsumer consumer = new KafkaConsumer(properties);
consumer.subscribe(Arrays.asList(properties.getProperty("kafka.topic")));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
We initialize the Properties
with the following keys and values
bootstrap.servers
: your local server host and port
kafka.topic
: your Kafka topic
key.deserializer
: for deserializing the key over the network.
value.deserializer
: for deserializing the value over the network.
group.id
: specify the group id, used only on the Consumer side.
- Next, we create an instance of
KafkaConsumer
with the properties (as specified above). This KafkaConsumer consumes the records from a Kafka cluster. - Next, we subscribe to the topic (which was created in the KafkaProducer) using
subscribe
- We get the data in the form of
ConsumerRecords
and we callpoll
every 100 ms in an infinite loop. - For each consumer record, we extract the partition, offset, key and value
Create KafkaConsumerRunner.java
- This will be class that invokes the above
SampleConsumer
public class KafkaConsumerRunner {
public static void main(String[] args) {
SampleConsumer sampleConsumer = new SampleConsumer();
}
}
Running Producer and Consumer
If you followed the above steps correctly, you should have the below files
Let’s run the KafkaProducerRunner first, we should see something like this
Our Producer produces data every 3 seconds.
- Let’s run the KafkaConsumerRunner, we should see something like this
Our consumer receives the data, every 3 seconds and we print it onto the console
Posted on March 10, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.