Getting started with Apache Kafka using Python

rubnsbarbosa

Rubens Barbosa

Posted on May 7, 2022

Getting started with Apache Kafka using Python

Apache Kafka is a distributed streaming system that provide real-time access to the data. This system let us publish and subscribe to streams of data, store them, and process them.

Message

The unit of data within Kafka is called a message. A message is simply an array of bytes. A message can have an optional bit of metadata, which is referred to as a key.

For efficiency, messages are written into Kafka in batches. A batch is just a collection of messages, all of which are being produced to the same topic and partition.

Topics

Messages in Kafka are categorized into topics. The closest analogies for a topic are a database table or a folder in a filesystem. Topics are additionally broken down into a number of partitions. Note that as a topic typically has multiple partitions, there is no guarantee of message time-ordering across the entire topic, just within a single partition.

Producers and Consumers

Producers in Kafka are the ones who produce and send the messages to the topics. In some cases, the producer will direct messages to specific partitions. This is typically done using the message key and a partitioner that will generate a hash of the key and map it to a specific partition.

The consumer subscribes to one or more topics and reads the messages in the order in which they were produced. The consumer keeps track of which messages it has already consumed by keeping track of the offset of messages. The offset is another bit of metadata an integer value that continually increases that Kafka adds to each message as it is produced. Each message in a given partition has a unique offset.

Brokers and Clusters

A single Kafka server is called a broker. Depending on the specific hardware and its performance characteristics, a single broker can easily handle thousands of partitions and millions of messages per second. Kafka brokers are designed to operate as part of a cluster. Within a cluster of brokers, one broker will also function as the cluster controller.

Retention

A key feature of Apache Kafka is that of retention, which is the durable storage of messages for some period of time. Kafka brokers are configured with a default retention setting for topics, either retaining messages for some period of time (e.g., 7 days) or until the topic reaches a certain size in bytes (e.g., 1 GB). Once these limits are reached, messages are expired and deleted so that the retention configuration is a minimum amount of data available at any time. Individual topics can also be configured with their own retention settings so that messages are stored for only as long as they are useful.

Now that we have an overview about Apache Kafka, let's install it.

Installing Kafka

I'll install Apache Kafka on mac using homebrew. To do so, I just need type on my terminal:

$ brew install kafka
Enter fullscreen mode Exit fullscreen mode

Apache Kafka uses Zookeeper to store metadata about the Kafka cluster, as well as consumer client details. So, during the installation it will install Apache Zookeeper as well. We must already have Java installed on our machine.

After installing Kafka we can see something like this:

Kafka-Logs

Navigate to this directory in separate terminal sessions in order to execute Zookeeper and Kafka. It might be another path depending on your machine and O.S.

$ cd /usr/local/opt/kafka/bin
Enter fullscreen mode Exit fullscreen mode

First, let's start Apache Zookeeper Server.

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Enter fullscreen mode Exit fullscreen mode

Now, in another terminal session execute the command below

$ kafka-server-start /usr/local/etc/kafka/server.properties
Enter fullscreen mode Exit fullscreen mode

All right, we have Apache Zookeeper and Apache Kafka running what we should do now? Let's create a Kafka topic.

Creation of Kafka Topic

Now, let's create a topic called: first-topic in a new terminal session.

$ kafka-topics --create --topic first-topic \
--bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 1

Created topic first-topic.
Enter fullscreen mode Exit fullscreen mode

Producer

Produce messages to the first-topic

$ kafka-console-producer --broker-list localhost:9092 \
--topic first-topic

>Sunday 1st May 2022
>Data Engineering    
>
Enter fullscreen mode Exit fullscreen mode

Consumer

Consume messages from the first-topic

$ kafka-console-consumer --bootstrap-server localhost:9092 \
--topic first-topic --from-beginning

Sunday 1st May 2022
Data Engineering 
Enter fullscreen mode Exit fullscreen mode

List Topics

Listing all the Kafka topics in a cluster

$ kafka-topics --list --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

Delete Topic

We might want to delete a specific topic

$ kafka-topics --bootstrap-server localhost:9092 \
--delete --topic first-topic
Enter fullscreen mode Exit fullscreen mode

Producer & Consumer with Python

Let's create a producer and consumer using python. First, we should create virtual environment.

$ python3 -m venv env
Enter fullscreen mode Exit fullscreen mode

or

$ python -m venv env
Enter fullscreen mode Exit fullscreen mode

Activate the virtual env, in order to install libraries

$ source env/bin/activate
Enter fullscreen mode Exit fullscreen mode

Let's install Python client for Apache Kafka and Request libraries

$ pip install kafka-python
$ pip install requests 
Enter fullscreen mode Exit fullscreen mode

Python Producer

Now, let's dive into our producer.py

#!/usr/local/bin/python
import sys
import json
import logging
import requests
from kafka import KafkaProducer
from datetime import datetime, timedelta

logging.basicConfig(stream=sys.stdout,
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")

logger = logging.getLogger(__name__)


def producer():
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

    # get data from public API
    date = datetime.today() - timedelta(days=2)
    previous_date = f"{date.year}-{date.month}-{date.day}"
    url = 'https://indicadores.integrasus.saude.ce.gov.br/api/casos-coronavirus?dataInicio='+ previous_date +'&dataFim=' + previous_date
    req = requests.get(url)
    covid_data = req.json()

    for data in covid_data:
        producer.send('covid-topic', json.dumps(data).encode('utf-8'))
        producer.flush()


if __name__ == "__main__":
    producer()
Enter fullscreen mode Exit fullscreen mode

Python Consumer

Alright, let's have a look at our consumer.py

#!/usr/local/bin/python
from kafka import KafkaConsumer


if __name__ == "__main__":
    consumer = KafkaConsumer('covid-topic')
    for data in consumer:
        print(data)
Enter fullscreen mode Exit fullscreen mode

You should have two terminal session to run producer.py and consumer.py

Conclusion

We learned the main concepts of Apache Kafka: message/record, producers, consumers, topics, brokers and retention. An event records the fact that "something happened". It is also called record or message. Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. Events are organized and durably stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.

💖 💪 🙅 🚩
rubnsbarbosa
Rubens Barbosa

Posted on May 7, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related