Getting started with Apache Kafka using Python
Rubens Barbosa
Posted on May 7, 2022
Apache Kafka is a distributed streaming system that provide real-time access to the data. This system let us publish and subscribe to streams of data, store them, and process them.
Message
The unit of data within Kafka is called a message. A message is simply an array of bytes. A message can have an optional bit of metadata, which is referred to as a key.
For efficiency, messages are written into Kafka in batches. A batch is just a collection of messages, all of which are being produced to the same topic and partition.
Topics
Messages in Kafka are categorized into topics. The closest analogies for a topic are a database table or a folder in a filesystem. Topics are additionally broken down into a number of partitions. Note that as a topic typically has multiple partitions, there is no guarantee of message time-ordering across the entire topic, just within a single partition.
Producers and Consumers
Producers in Kafka are the ones who produce and send the messages to the topics. In some cases, the producer will direct messages to specific partitions. This is typically done using the message key and a partitioner that will generate a hash of the key and map it to a specific partition.
The consumer subscribes to one or more topics and reads the messages in the order in which they were produced. The consumer keeps track of which messages it has already consumed by keeping track of the offset of messages. The offset is another bit of metadata an integer value that continually increases that Kafka adds to each message as it is produced. Each message in a given partition has a unique offset.
Brokers and Clusters
A single Kafka server is called a broker. Depending on the specific hardware and its performance characteristics, a single broker can easily handle thousands of partitions and millions of messages per second. Kafka brokers are designed to operate as part of a cluster. Within a cluster of brokers, one broker will also function as the cluster controller.
Retention
A key feature of Apache Kafka is that of retention, which is the durable storage of messages for some period of time. Kafka brokers are configured with a default retention setting for topics, either retaining messages for some period of time (e.g., 7 days) or until the topic reaches a certain size in bytes (e.g., 1 GB). Once these limits are reached, messages are expired and deleted so that the retention configuration is a minimum amount of data available at any time. Individual topics can also be configured with their own retention settings so that messages are stored for only as long as they are useful.
Now that we have an overview about Apache Kafka, let's install it.
Installing Kafka
I'll install Apache Kafka on mac using homebrew. To do so, I just need type on my terminal:
$ brew install kafka
Apache Kafka uses Zookeeper to store metadata about the Kafka cluster, as well as consumer client details. So, during the installation it will install Apache Zookeeper as well. We must already have Java installed on our machine.
After installing Kafka we can see something like this:
Navigate to this directory in separate terminal sessions in order to execute Zookeeper and Kafka. It might be another path depending on your machine and O.S.
$ cd /usr/local/opt/kafka/bin
First, let's start Apache Zookeeper Server.
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Now, in another terminal session execute the command below
$ kafka-server-start /usr/local/etc/kafka/server.properties
All right, we have Apache Zookeeper and Apache Kafka running what we should do now? Let's create a Kafka topic.
Creation of Kafka Topic
Now, let's create a topic called: first-topic in a new terminal session.
$ kafka-topics --create --topic first-topic \
--bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 1
Created topic first-topic.
Producer
Produce messages to the first-topic
$ kafka-console-producer --broker-list localhost:9092 \
--topic first-topic
>Sunday 1st May 2022
>Data Engineering
>
Consumer
Consume messages from the first-topic
$ kafka-console-consumer --bootstrap-server localhost:9092 \
--topic first-topic --from-beginning
Sunday 1st May 2022
Data Engineering
List Topics
Listing all the Kafka topics in a cluster
$ kafka-topics --list --bootstrap-server localhost:9092
Delete Topic
We might want to delete a specific topic
$ kafka-topics --bootstrap-server localhost:9092 \
--delete --topic first-topic
Producer & Consumer with Python
Let's create a producer and consumer using python. First, we should create virtual environment.
$ python3 -m venv env
or
$ python -m venv env
Activate the virtual env, in order to install libraries
$ source env/bin/activate
Let's install Python client for Apache Kafka and Request libraries
$ pip install kafka-python
$ pip install requests
Python Producer
Now, let's dive into our producer.py
#!/usr/local/bin/python
import sys
import json
import logging
import requests
from kafka import KafkaProducer
from datetime import datetime, timedelta
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def producer():
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# get data from public API
date = datetime.today() - timedelta(days=2)
previous_date = f"{date.year}-{date.month}-{date.day}"
url = 'https://indicadores.integrasus.saude.ce.gov.br/api/casos-coronavirus?dataInicio='+ previous_date +'&dataFim=' + previous_date
req = requests.get(url)
covid_data = req.json()
for data in covid_data:
producer.send('covid-topic', json.dumps(data).encode('utf-8'))
producer.flush()
if __name__ == "__main__":
producer()
Python Consumer
Alright, let's have a look at our consumer.py
#!/usr/local/bin/python
from kafka import KafkaConsumer
if __name__ == "__main__":
consumer = KafkaConsumer('covid-topic')
for data in consumer:
print(data)
You should have two terminal session to run producer.py and consumer.py
Conclusion
We learned the main concepts of Apache Kafka: message/record, producers, consumers, topics, brokers and retention. An event records the fact that "something happened". It is also called record or message. Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. Events are organized and durably stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.
Posted on May 7, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.