Aditya Kanekar
Posted on January 19, 2021
This article specifically talks about how to write producer and consumer for Kafka cluster secured with SSL using Python. I won't be getting into how to generate client certificates in this article, that's the topic reserved for another article :).
Pre-Requisites
- Kafka Cluster with SSL
- Client certificate (KeyStore) in JKS format
- Linux environment with keytool and openssl installed
- Python 3.6
Step 1 - Converting JKS to PEM file
Why I need this step?
Unlike Java, Python and C# uses .pem files to connect to Kafka. For this purpose we will have to convert the JKS files to PEM with the help of keytool and openssl commands. If you are working on Windows 10 you can refer to my article on how to run WSL on Windows here.
To make your life easy I have created a shell script to quickly convert JKS to PEM.
#!/bin/bash
srcFolder=$1
keyStore=$1/$2
password=$3
alias=$4
outputFolder=$5
echo $keyStore
echo "Generating certificate.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password
echo "Generating key.pem"
keytool -v -importkeystore -srckeystore $keyStore -srcalias $alias -destkeystore $outputFolder/cert_and_key.p12 -deststoretype PKCS12 -storepass $password -srcstorepass $password
openssl pkcs12 -in $outputFolder/cert_and_key.p12 -nodes -nocerts -out $outputFolder/key.pem -passin pass:$password
echo "Generating CARoot.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password
The script generates following files from the keystore file,
- key.pem
- certificate.pem
- CARoot.pem
How to run this script?
Save the script in a file e.g. jkstopem.sh and give execute permissions like below,
chmod +x jkstopem.sh
To generate the PEM files. Run the shell script as shown in the below example,
./jkstopem.sh <source_path_to_jks> <keystore_file_name> <keystore_password> <alias> <output_folder>
How to find Alias?
If you are not aware of what alias your certificate has. Run following command in the folder where you have the keystore file.
keytool -list -v -keystore kafka.client.keystore.jks
You will be prompted to enter the password. Enter the keystore password, this will list the contents of the keystore file. You will be able to see *Alias name.
Following is the example to run the shell script,
./jkstopem.sh ~/client-cert kafka.client.keystore.jks welcome123 client-alias ~/client-cert/pem
Now you should be able to see following files in the output folder,
- key.pem
- certificate.pem
- CARoot.pem
Now as we have all the PEM files, lets get cracking.
Step 2 - Writing Kafka Producer in Python
We will be using 'kafka-python' package to connect to Kafka. You can install it using pip,
pip install kafka-python
Now, lets write our producer.
#Producer.py
from kafka import KafkaProducer
kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoot.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
topic='test-topic'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=caRootLocation,
ssl_certfile=certLocation,
ssl_keyfile=keyLocation,
ssl_password=password)
producer.send(topic, bytes('Hello Kafka!','utf-8'))
# Send to a particular partition
producer.send(topic, bytes('Hello Kafka!','utf-8'),partition=0)
producer.flush()
In the above example we are using the pem files we generated in the last step with the password to read the pem file.
kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoote.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=caRootLocation,
ssl_certfile=certLocation,
ssl_keyfile=keyLocation,
ssl_password=password)
Sending data to random topic partition
Below code snippet will send data to random partition decided by Kafka.
producer.send(topic, bytes('Hello Kafka!','utf-8'))
producer.flush()
Sending data to specific topic partition
To send data to a specific partition, you just need to specify the partition as shown in below code snippet,
producer.send(topic, bytes('Hello Kafka - Partition 0!','utf-8'),partition=0)
producer.flush()
So we have built our Python producer for Kafka. In the next part we will write consumer to consume the message from the topic.
Posted on January 19, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.