Connecting to Kafka cluster using SSL with Python

adityakanekar

Aditya Kanekar

Posted on January 19, 2021

Connecting to Kafka cluster using SSL with Python

This article specifically talks about how to write producer and consumer for Kafka cluster secured with SSL using Python. I won't be getting into how to generate client certificates in this article, that's the topic reserved for another article :).

Pre-Requisites

  • Kafka Cluster with SSL
  • Client certificate (KeyStore) in JKS format
  • Linux environment with keytool and openssl installed
  • Python 3.6

Step 1 - Converting JKS to PEM file

Why I need this step?

Unlike Java, Python and C# uses .pem files to connect to Kafka. For this purpose we will have to convert the JKS files to PEM with the help of keytool and openssl commands. If you are working on Windows 10 you can refer to my article on how to run WSL on Windows here.

To make your life easy I have created a shell script to quickly convert JKS to PEM.

#!/bin/bash
srcFolder=$1
keyStore=$1/$2
password=$3
alias=$4
outputFolder=$5

echo $keyStore
echo "Generating certificate.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password

echo "Generating key.pem"
keytool -v -importkeystore -srckeystore $keyStore -srcalias $alias -destkeystore $outputFolder/cert_and_key.p12 -deststoretype PKCS12 -storepass $password -srcstorepass $password
openssl pkcs12 -in $outputFolder/cert_and_key.p12 -nodes -nocerts -out $outputFolder/key.pem -passin pass:$password

echo "Generating CARoot.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password
Enter fullscreen mode Exit fullscreen mode

The script generates following files from the keystore file,

  • key.pem
  • certificate.pem
  • CARoot.pem

How to run this script?

Save the script in a file e.g. jkstopem.sh and give execute permissions like below,

chmod +x jkstopem.sh
Enter fullscreen mode Exit fullscreen mode

To generate the PEM files. Run the shell script as shown in the below example,

./jkstopem.sh <source_path_to_jks> <keystore_file_name> <keystore_password> <alias> <output_folder>
Enter fullscreen mode Exit fullscreen mode
How to find Alias?

If you are not aware of what alias your certificate has. Run following command in the folder where you have the keystore file.

keytool -list -v -keystore kafka.client.keystore.jks
Enter fullscreen mode Exit fullscreen mode

You will be prompted to enter the password. Enter the keystore password, this will list the contents of the keystore file. You will be able to see *Alias name.

Following is the example to run the shell script,

./jkstopem.sh ~/client-cert kafka.client.keystore.jks welcome123 client-alias ~/client-cert/pem
Enter fullscreen mode Exit fullscreen mode

Now you should be able to see following files in the output folder,

  • key.pem
  • certificate.pem
  • CARoot.pem

Now as we have all the PEM files, lets get cracking.

Step 2 - Writing Kafka Producer in Python

We will be using 'kafka-python' package to connect to Kafka. You can install it using pip,

pip install kafka-python
Enter fullscreen mode Exit fullscreen mode

Now, lets write our producer.

#Producer.py
from kafka import KafkaProducer

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoot.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
topic='test-topic'
password='welcome123'

producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)

producer.send(topic, bytes('Hello Kafka!','utf-8'))

# Send to a particular partition
producer.send(topic, bytes('Hello Kafka!','utf-8'),partition=0)
producer.flush()
Enter fullscreen mode Exit fullscreen mode

In the above example we are using the pem files we generated in the last step with the password to read the pem file.

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoote.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)
Enter fullscreen mode Exit fullscreen mode

Sending data to random topic partition

Below code snippet will send data to random partition decided by Kafka.

producer.send(topic, bytes('Hello Kafka!','utf-8'))
producer.flush()
Enter fullscreen mode Exit fullscreen mode

Sending data to specific topic partition

To send data to a specific partition, you just need to specify the partition as shown in below code snippet,

producer.send(topic, bytes('Hello Kafka - Partition 0!','utf-8'),partition=0)
producer.flush()
Enter fullscreen mode Exit fullscreen mode

So we have built our Python producer for Kafka. In the next part we will write consumer to consume the message from the topic.

💖 💪 🙅 🚩
adityakanekar
Aditya Kanekar

Posted on January 19, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related