Kafka on Kubernetes, the Strimzi way! (Part 2)
Abhishek Gupta
Posted on June 17, 2020
We kicked off the the first part of the series by setting up a single node Kafka cluster which was accessible to only internal clients within the same Kubernetes cluster, had no encryption, authentication or authorization and used temporary persistence. We will keep iterating/improving on this during the course of this blog series.
This part will cover these topics:
- Expose Kafka cluster to external applications
- Apply
TLS
encryption - Explore Kubernetes resources behind the scenes
- Use Kafka CLI and Go client applications to test our cluster setup
The code is available on GitHub - https://github.com/abhirockzz/kafka-kubernetes-strimzi/
What do I need to try this out?
kubectl
- https://kubernetes.io/docs/tasks/tools/install-kubectl/
I will be using Azure Kubernetes Service (AKS) to demonstrate the concepts, but by and large it is independent of the Kubernetes provider (e.g. feel free to use a local setup such as minikube
). If you want to use AKS
, all you need is a Microsoft Azure account which you can get for FREE if you don't have one already.
I will not be repeating some of the common sections (such as Installation/Setup for Helm, Strimzi, Azure Kubernetes Service as well as Strimzi overview) in this or subsequent part of this series and would request you to refer to part one for those details
Let's create an externally accessible Kafka cluster
To achieve this, we just need to tweak the Strimzi Kafka
resource a little bit. I am highlighting the key part below - here is the original manifest from part 1
spec:
kafka:
version: 2.4.0
replicas: 1
listeners:
plain: {}
external:
type: loadbalancer
tls: true
What changed?
To make Kafka accessible to external client applications, we added an external
listener of type loadbalancer
. Since we will exposing our application to the public Internet, we need additional layers of protection such as transport level (TLS/SSL
encryption) and application level security (authentication and authorization). In this part, we will just configure encryption and explore the other aspects in another blog. To configure end-to-end TLS encryption, we add tls: true
tls: true
config is actually used as a default, but I have added it explicitly for sake of clarity
To create the cluster:
kubectl apply -f https://github.com/abhirockzz/kafka-kubernetes-strimzi/raw/master/part-2/kafka.yaml
Kubernetes magic!
The Strimzi Operator kicks into action and does all the heavy lifting for us:
- It creates a Kubernetes
LoadBalancer
Service.. - .. and seeding the appropriate Kafka server configuration in a
ConfigMap
I will be highlighting the resources created corresponding to the external listener and TLS encryption. For a walk through of ALL the resources which are created as part of the Kafka cluster, please refer to part 1
If you look for the Service
s, you will see something similar to this:
kubectl get svc
my-kafka-cluster-kafka-0 LoadBalancer 10.0.162.98 40.119.233.2 9094:31860/TCP 60s
my-kafka-cluster-kafka-bootstrap ClusterIP 10.0.200.20 <none> 9091/TCP,9092/TCP 60s
my-kafka-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP 60s
my-kafka-cluster-kafka-external-bootstrap LoadBalancer 10.0.122.211 20.44.239.202 9094:32267/TCP 60s
my-kafka-cluster-zookeeper-client ClusterIP 10.0.137.33 <none> 2181/TCP 82s
my-kafka-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 82s
Notice the my-kafka-cluster-kafka-external-bootstrap
Service
of the type LoadBalancer
? Since I am using Azure Kubernetes Service, this is powered by an Azure Load Balancer which has a public IP (20.44.239.202
in this example) and exposes Kafka to external clients over port 9094
. You should be able to locate it using the Azure CLI (or the Azure portal if you prefer) by using the az network lb list
command
export AKS_RESOURCE_GROUP=[replace with resource group name]
export AKS_CLUSTER_NAME=[replace with AKS cluster name]
export AKS_LOCATION=[replace with region e.g. southeastasia]
az network lb list -g MC_${AKS_RESOURCE_GROUP}_${AKS_CLUSTER_NAME}_${AKS_LOCATION}
What about the encryption part?
To figure that out, let's introspect the Kafka server configuration:
As explained in the previous blog, this is stored in a
ConfigMap
export CLUSTER_NAME=my-kafka-cluster
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml
This is what the Common listener configuration
in server.config
reveals:
listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,EXTERNAL-9094://0.0.0.0:9094
advertised.listeners=REPLICATION-9091://my-kafka-cluster-kafka-${STRIMZI_BROKER_ID}.my-kafka-cluster-kafka-brokers.default.svc:9091,PLAIN-9092://my-kafka-cluster-kafka-${STRIMZI_BROKER_ID}.my-kafka-cluster-kafka-brokers.default.svc:9092,EXTERNAL-9094://${STRIMZI_EXTERNAL_9094_ADVERTISED_HOSTNAME}:${STRIMZI_EXTERNAL_9094_ADVERTISED_PORT}
listener.security.protocol.map=REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT,EXTERNAL-9094:SSL
Notice that in addition to inter-broker replication (over port 9091
) and un-encrypted internal (within Kubernetes cluster) client access over non TLS
port 9092
, appropriate listener config has been added for TLS encrypted access over port 9094
The moment of truth....
To confirm, let's try out a couple of client applications which will communicate with our freshly minted Kafka cluster on Kubernetes! We will produce and consume messages using the following:
- Kafka CLI (console) producer and consumer
- Go application (using the Confluent Kafka Go client)
Communication to our Kafka cluster has to be encrypted (non TLS client connections will be rejected). TLS/SSL
implicitly implies one way authentication, where the client validates the Kafka broker identity. In order to do this, client applications need to trust the cluster CA certificate. Remember that the cluster CA certificate is stored in a Kubernetes Secret
(refer to details in part 1). By default, these are auto-generated by Strimzi, but you can provide your own certificates as well (refer https://strimzi.io/docs/operators/master/using.html#kafka-listener-certificates-str)
Start by extracting the cluster CA certificate and password:
export CLUSTER_NAME=my-kafka-cluster
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password
You should have two files:
ca.crt
andca.password
. Feel free to check out their contents
While some Kafka clients (e.g. Confluent Go client) use the CA certificate directly, others (e.g. Java client, Kafka CLI etc.) require access to the CA certificate via a truststore
. I am using the built-in truststore
which comes in with a JDK (Java) installation - but this is just for convenience and you're free to use other options (such as creating your own)
export CERT_FILE_PATH=ca.crt
export CERT_PASSWORD_FILE_PATH=ca.password
# replace this with the path to your truststore
export KEYSTORE_LOCATION=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib/security/cacerts
export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH`
export CA_CERT_ALIAS=strimzi-kafka-cert
# you will prompted for the truststore password. for JDK truststore, the default password is "changeit"
# Type yes in response to the 'Trust this certificate? [no]:' prompt
sudo keytool -importcert -alias $CA_CERT_ALIAS -file $CERT_FILE_PATH -keystore $KEYSTORE_LOCATION -keypass $PASSWORD
sudo keytool -list -alias $CA_CERT_ALIAS -keystore $KEYSTORE_LOCATION
That's it for the base setup - you are ready to try out the Kafka CLI client!
Please note that the configuration steps for the Kafka CLI as detailed below will also work for the Java clients as well - give it a try!
Extract the LoadBalancer
public IP for Kafka cluster
export KAFKA_CLUSTER_NAME=my-kafka-cluster
kubectl get service/${KAFKA_CLUSTER_NAME}-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}
Create a file called client-ssl.properties
with the following contents:
bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094
security.protocol=SSL
ssl.truststore.location=[TRUSTSTORE_LOCATION]
//for JDK truststore, the default password is "changeit"
ssl.truststore.password=changeit
To use the Kafka CLI, download Kafka
if you don't have it already - https://kafka.apache.org/downloads
All you need to do is use the kafka-console-producer
and kafka-console-consumer
by pointing it to the client-ssl.properties
file you just created
export KAFKA_HOME=[replace with Kafka installation path] e.g. /Users/foobar/kafka_2.12-2.3.0
export LOADBALANCER_PUBLIC_IP=[replace with public IP of Load Balancer]
export TOPIC_NAME=test-strimzi-topic
# on a terminal, start producer and send a few messages
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl.properties
# on another terminal, start consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning
You should see producer and consumer working in tandem. Great!
If you face SSL Handshake errors, please check whether the CA cert has been correctly imported along with its correct password. If the Kafka cluster is not reachable, ensure you are using the right value for the public IP
Now, let's try a programmatic client. Since the Java client behavior (required config properties) are same as the CLI, I am using a Go client to try something different. Don't worry, if you are not a Go programmer, it should be easy to follow along - I will not walk through the entire program, just the part where we create the connection related configuration.
Here is the snippet:
bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
caLocation = os.Getenv("CA_CERT_LOCATION")
topic = os.Getenv("KAFKA_TOPIC")
config := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SSL", "ssl.ca.location": caLocation}
Notice that the bootstrap.servers
and security.protocol
are the same as ones you used in the Kafka CLI client (same for Java as well). The only difference is that ssl.ca.location
is used to point to the CA certificate directly as opposed to a truststore
If you have Go
installed, you can try it out. Clone the Git repo...
git clone https://github.com/abhirockzz/kafka-kubernetes-strimzi
cd part-2/go-client-app
.. and run the program:
export KAFKA_BOOTSTRAP_SERVERS=[replace with loadbalancer_ip:9094] e.g. 42.42.424.424:9094
export CA_CERT_LOCATION=[replace with path to ca.crt file which you downloaded]
export KAFKA_TOPIC=test-strimzi-topic
go run kafka-client.go
You should see logs similar to this and confirm that messages are being produced and consumed
press
ctrl+c
to exit the app
started consumer
started producer delivery goroutine
started producer goroutine
delivered messaged test-strimzi-topic[0]@122
delivered messaged test-strimzi-topic[0]@123
delivered messaged test-strimzi-topic[0]@124
received message from test-strimzi-topic[0]@122: value-2020-06-08 16:23:05.913303 +0530 IST m=+0.020529419
received message from test-strimzi-topic[0]@123: value-2020-06-08 16:23:07.915252 +0530 IST m=+2.022455867
received message from test-strimzi-topic[0]@124: value-2020-06-08 16:23:09.915875 +0530 IST m=+4.023055601
received message from test-strimzi-topic[0]@125: value-2020-06-08 16:23:11.915977 +0530 IST m=+6.023134961
....
That's all for now, but there is more to come!
So we made some progress! We now have a Kafka cluster on Kubernetes which is publicly accessible but is (partially) secure thanks to TLS encryption. We also did some sanity testing using not one, but two (different) client applications. In the next part, we'll improve this further... stay tuned!
Posted on June 17, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.