Kafka Connect on Kubernetes, the easy way!
Abhishek Gupta
Posted on April 16, 2020
This is a tutorial that shows how to set up and use Kafka Connect on Kubernetes using Strimzi
, with the help of an example.
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems using source and sink connectors. Although it's not too hard to deploy a Kafka Connect cluster on Kubernetes (just "DIY"!), I love the fact that Strimzi
enables a Kubernetes-native way of doing this using the Operator pattern with the help of Custom Resource Definitions.
In addition to bootstrapping/installing Kafka Connect, this also applies to operations such as scaling the Connect cluster, deploying and managing connectors, etc. (you will see this in action during the course of this blog post)
We will go through the process of deploying a Kafka Connect cluster on Kubernetes, installing a connector, and test it out - all this using kubectl
and some YAML
(of course!). I will be using Azure Event Hubs as the Kafka broker and Azure Kubernetes Service as the Kubernetes cluster - feel free to use other alternatives (e.g. with a local minikube
cluster on your laptop)
All the artifacts are available on GitHub
Strimzi
is responsible for all the heavy lifting.. In case you don't already know, here is a gist
Strimzi overview
The Strimzi documentation is detailed yet very well organized and clear! Most of the below paragraph has been taken directly from the docs
Strimzi
simplifies the process of running Apache Kafka in a Kubernetes cluster. Strimzi provides container images and Operators for running Kafka on Kubernetes. It is a part of the Cloud Native Computing Foundation
as a Sandbox
project (at the time of writing)
Strimzi Operators
are fundamental to the project. These Operators are purpose-built with specialist operational knowledge to effectively manage Kafka. Operators simplify the process of: Deploying and running Kafka clusters and components, Configuring and securing access to Kafka, Upgrading and managing Kafka and even taking care of managing topics and users.
Here is a diagram which shows a 10,000 feet overview of the Operator roles:
I am not going to dive into the details of deploying Kafka using Strimzi in this post - probably something which I will tackle in future blogs
Pre-requisites
kubectl
- https://kubernetes.io/docs/tasks/tools/install-kubectl/
If you choose to use Azure Event Hubs, Azure Kubernetes Service (or both) you will need a Microsoft Azure account. Go ahead and sign up for a free one!
Azure CLI
or Azure Cloud Shell
- you can either choose to install the Azure CLI if you don't have it already (should be quick!) or just use the Azure Cloud Shell from your browser.
Helm
I will be using Helm
to install Strimzi
. Here is the documentation to install Helm
itself - https://helm.sh/docs/intro/install/
You can also use the
YAML
files directly to installStrimzi
. Check out the quick start guide here - https://strimzi.io/docs/quickstart/latest/#proc-install-product-str
Let's start by setting up the required Azure services (if you're not using Azure, skip this section but please ensure you have the details for your Kafka cluster i.e. broker URLs and authentication credentials, if applicable)
I recommend installing the below services as a part of a single Azure Resource Group which makes it easy to clean up these services
Azure Event Hubs
Azure Event Hubs is a data streaming platform and event ingestion service. It can receive and process millions of events per second. It also provides a Kafka endpoint that can be used by existing Kafka based applications as an alternative to running your own Kafka cluster. Event Hubs supports Apache Kafka protocol 1.0 and later, and works with existing Kafka client applications and other tools in the Kafka ecosystem including Kafka Connect
(demonstrated in this blog), MirrorMaker
etc.
To setup an Azure Event Hubs cluster, you can choose from a variety of options including the Azure portal, Azure CLI, Azure PowerShell or an ARM template. Once the setup is complete, you will need the connection string (that will be used in subsequent steps) for authenticating to Event Hubs - use this guide to finish this step.
Please ensure that you also create an Event Hub (same as a Kafka topic) to act as the target for our Kafka Connect connector (details in subsequent sections)
Azure Kubernetes Service
Azure Kubernetes Service (AKS) makes it simple to deploy a managed Kubernetes cluster in Azure. It reduces the complexity and operational overhead of managing Kubernetes by offloading much of that responsibility to Azure. Here are examples of how you can setup an AKS cluster using Azure CLI, Azure portal or ARM template
Base install
To start off, we will install Strimzi
and Kafka Connect, followed by the File Stream Source Connector
Install Strimzi
Installing Strimzi using Helm
is pretty easy:
//add helm chart repo for Strimzi
helm repo add strimzi https://strimzi.io/charts/
//install it! (I have used strimzi-kafka as the release name)
helm install strimzi-kafka strimzi/strimzi-kafka-operator
This will install the Strimzi
Operator (which is nothing but a Deployment
), Custom Resource Definitions and other Kubernetes components such as Cluster Roles
, Cluster Role Bindings
and Service Accounts
For more details, check out this link
To delete, simply
helm uninstall strimzi-kafka
To confirm that the Strimzi Operator had been deployed, check it's Pod
(it should transition to Running
status after a while)
kubectl get pods -l=name=strimzi-cluster-operator
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-5c66f679d5-69rgk 1/1 Running 0 43s
Check the Custom Resource Definitions as well:
kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io 2020-04-13T16:49:36Z
kafkaconnectors.kafka.strimzi.io 2020-04-13T16:49:33Z
kafkaconnects.kafka.strimzi.io 2020-04-13T16:49:36Z
kafkaconnects2is.kafka.strimzi.io 2020-04-13T16:49:38Z
kafkamirrormaker2s.kafka.strimzi.io 2020-04-13T16:49:37Z
kafkamirrormakers.kafka.strimzi.io 2020-04-13T16:49:39Z
kafkas.kafka.strimzi.io 2020-04-13T16:49:40Z
kafkatopics.kafka.strimzi.io 2020-04-13T16:49:34Z
kafkausers.kafka.strimzi.io 2020-04-13T16:49:33Z
I want to call out kafkas.kafka.strimzi.io
which represents Kafka clusters in Kubernetes. We will focus on kafkaconnects.kafka.strimzi.io
and kafkaconnectors.kafka.strimzi.io
which represent Kafka Connect clusters and Connectors respectively.
I am going to skip over the other components but you can dig them out e.g. for Cluster Roles
kubectl get clusterrole | grep strimzi
Now that we have the "brain" (the Strimzi Operator) wired up, let's use it!
Kafka Connect
We will need to create some helper Kubernetes components before we deploy Kafka Connect itself.
Before you proceed, clone the GitHub project
git clone https://github.com/abhirockzz/strimzi-kafka-connect-eventhubs
cd strimzi-kafka-connect-eventhubs
Kafka Connect will need to reference an existing Kafka cluster (which in this case is Azure Event Hubs). We can store the authentication info for the cluster as a Kubernetes Secret
which can later be used in the Kafka Connect definition.
Update the eventhubs-secret.yaml
file to include the credentials for Azure Event Hubs. Enter the connection string in the eventhubspassword
attribute.
e.g.
apiVersion: v1
kind: Secret
metadata:
name: eventhubssecret
type: Opaque
stringData:
eventhubsuser: $ConnectionString
eventhubspassword: Endpoint=sb://<eventhubs-namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<access-key>
Leave
eventhubsuser: $ConnectionString
unchanged
To create the Secret
:
kubectl apply -f eventhubs-secret.yaml
By default, Kafka Connect is configured to send logs to stdout
. We will use a custom configuration (log4j
) to ensure that logs are stored to /tmp/connect-worker.log
(in addition to stdout
) - you will understand why this is done, in a moment
the configuration itself is stored in a
log4j.properties
The log configuration can be stored in a ConfigMap
which will later be referenced by the Kafka Connect definition. For details, check https://strimzi.io/docs/latest/#con-kafka-connect-logging-deployment-configuration-kafka-connect
kubectl create configmap connect-logging-configmap --from-file=log4j.properties
Before we deploy Kafka Connect, let's look into its definition. You can see it in its entirety here, but I will go through the important bits.
Notice that the resource kind
is KafkaConnect
- it is a Custom Resource Definition. Another interesting part is annotations
(I will explain this in a bit)
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
bootstrapServers
points to a Kafka broker. This could be a comma-separated value for nodes in a HA cluster. In this case its a single Kafka endpoint for Azure Event Hubs (yes, that's all you need!)
spec:
version: 2.4.0
replicas: 1
bootstrapServers: <eventhubs-namespace>.servicebus.windows.net:9093
config
is just good old Kafka Connect configuration similar to what you would use in connect-distributed.properties
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
The authentication section simply refers to a Kubernetes Secret. In this case, we created one earlier with the name eventhubssecret
which has the key eventhubspassword
containing the connection string for azure event hubs
authentication:
type: plain
username: $ConnectionString
passwordSecret:
secretName: eventhubssecret
password: eventhubspassword
This is where the ConfigMap
with log4j
config is referenced. This will automatically configure Kafka Connect to use this configuration
logging:
type: external
name: connect-logging-configmap
tls
section is used to configure TLS certificates (duh!). In case of event hubs, although we use SASL over PLAINTEXT, it required you to use SSL (i.e. set security.protocol
to SASL_SSL
). I initially faced an issue with this which was promptly clarified! Hence this piece of configuration was added:
tls:
trustedCertificates: []
Cool! We are ready to create a Kafka Connect instance. Before that, make sure that you update the bootstrapServers
property with the Azure Event Hubs host name e.g.
spec:
version: 2.4.0
replicas: 1
bootstrapServers: <replace-with-eventhubs-namespace>.servicebus.windows.net:9093
To create the Kafka Connect instance:
kubectl apply -f kafka-connect.yaml
To confirm:
kubectl get kafkaconnects
NAME DESIRED REPLICAS
my-connect-cluster 1
This will create a Deployment
and a corresponding Pod
kubectl get pod -l=strimzi.io/cluster=my-connect-cluster
NAME READY STATUS RESTARTS AGE
my-connect-cluster-connect-5bf9db5d9f-9ttg4 1/1 Running 0 1h
You have a Kafka Connect cluster in Kubernetes! Check out the logs using kubectl logs <pod name>
Install File Source connector
Let's deploy a connector! To keep things simple, we will use the File Stream Source Connector which comes bundled with Kafka Connect by default. A common way of installing and managing connectors is to use the Kafka Connect REST API, but there is another way that Strimzi
offers. This is a Kubernetes-centric approach where a Kakfa Connect connector is represented by a custom resource definition called KafkaConnector
. All we need to do is create/update/delete KafkaConnector
definitions with the details of our connectors, and Strimzi
will take care of the rest!
check out the details in the
Strimzi
docs https://strimzi.io/docs/latest/#con-creating-managing-connectors-str
Here the definition of our connector:
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 2
config:
file: "/tmp/connect-worker.log"
topic: strimzi
Just like we did before, let's understand what each component means:
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: my-source-connector
This is a KafkaConnector
resource (specified by kind
) whose name is my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
This is where we refer to the Kafka Connect cluster - remember this annotation in the Kafka Connect definition shown above?
annotations:
strimzi.io/use-connector-resources: "true"
This simply "activates" the feature and ensures that we are able to deploy connectors using the KafkaConnector
CRD and we simply refer to the name of our kafkaconnect
resource using the strimzi.io/cluster
label
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 2
config:
file: "/tmp/connect-worker.log"
topic: strimzi
Finally, in the connector spec, we define the attributes for our connector. Notice the config
property which points to the /tmp/connect-worker.log
file? Recall that we modified our Kafka Connect instance to push logs to this file. Now, we have configured our File source connector to stream contents of this (log) file and send it to a Kafka topic named strimzi
. This makes for a nice demo since the file will keep getting updated and we should be able to see each line as a different message in the destination Kafka topic (in Azure Event Hubs)
I have used
strimzi
as the topic name. This needs to be the same as the Event Hub created in the previous section (while setting up Azure Event Hubs)
To see this in action, let's deploy the connector
kubectl apply -f filestream-source-connector.yaml
To confirm, simply list the connectors:
kubectl get kafkaconnectors
NAME AGE
my-source-connector 70s
You can install other connectors as well. One of the ways (and the easiest IMO) to do this is by extending the
Strimzi
base image and adding the required connector artifacts on top of it. Check out the documentation https://strimzi.io/docs/latest/#using-kafka-connect-with-plug-ins-str
Kafka Connect in action...
First, let's confirm that the Kafka Connect logs are being piped to the intended location. This is important since we're using the log file as a source for the File stream connector. For this, we need to peek inside the Kafka Connect Pod
e.g.
kubectl exec -it <kafka_connect_pod_name> -- tail -f /tmp/connect-worker.log
To make this easier, simply use the below command:
kubectl exec -it $(kubectl get pod -l=strimzi.io/cluster=my-connect-cluster -o jsonpath='{.items[0].metadata.name}') -- tail -f /tmp/connect-worker.log
Now, you should see the Kafka Connect logs...
In a different terminal window, start a consumer process connecting to your Azure Event Hubs topic. I used kafkacat
but there are other options such as the console consumer in the Kafka CLI itself or a programmatic consumer using Java, .NET, Go etc. (although it might a bit of an overkill in this case)
You should see the same logs here as well! For e.g.
...
{"schema":{"type":"string","optional":false},"payload":"[2020-04-16 04:59:25,731] INFO WorkerSourceTask{id=my-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)"}
{"schema":{"type":"string","optional":false},"payload":"[2020-04-16 04:59:25,785] INFO WorkerSourceTask{id=my-source-connector-0} Finished commitOffsets successfully in 55 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)"}
...
The log itself is captured as part of the
payload
e.g.[2020-04-16 04:59:25,785] INFO WorkerSourceTask{id=my-source-connector-0} Finished commitOffsets successfully in 55 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
Managing Kafka Connect resources
To scale out Kafka Connect, simply update the no. of replicas in the spec
e.g. from 1
to 2
in this case:
spec:
version: 2.4.0
replicas: 2
Appy the updated manifest
kubectl apply -f kafka-connect.yaml
Please ensure that you increase the no. of replicas by updating the manifest and not by updating the
Deployment
usingkubectl scale
. This is because, theStrimzi
operator reconciliation loop will check theKafkaConnect
resource, find that thereplicas
count is1
and scale theDeployment
back
There should be two Pods
now:
kubectl get pod -l=strimzi.io/cluster=my-connect-cluster
NAME READY STATUS RESTARTS AGE
my-connect-cluster-connect-5bf9db5d9f-9ttg4 1/1 Running 0 45m
my-connect-cluster-connect-5bf9db5d9f-pzn95 1/1 Running 0 1m5s
my-connect-cluster-connect-5bf9db5d9f-pzn95
is the newPod
You can update the connector specification. For e.g. to allocate more tasks, update tasksMax
from 2
to 5
...
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 5
...
Note: this will restart the connector
Clean up
To delete the connector and the Kafka Connect instance:
kubectl delete -f filestream-source-connector.yaml
kubectl delete -f kafka-connect.yaml
To clean up the AKS cluster and Azure Event Hubs, simply delete the resource group:
az group delete --name $AZURE_RESOURCE_GROUP --yes --no-wait
That concludes this blog post!
Like I mentioned, the Strimzi documentation is detailed, yet very clear and easy to navigate. To wrap things up, I will leave you with additional references from the Strimzi docs which I found useful in addition to the ones I mentioned in the post:
Strimzi doc references
- Configuring Kafka Connect - https://strimzi.io/docs/latest/#proc-configuring-kafka-connect-deployment-configuration-kafka-connect
-
KafkaConnect
schema https://strimzi.io/docs/latest/#type-KafkaConnect-reference -
KafkaConnector
schema reference https://strimzi.io/docs/latest/#type-KafkaConnector-reference - Kafka
SASL
auth configuration https://strimzi.io/docs/latest/#sasl_based_plain_authentication
I hope you find it useful for getting started with Kafka Connect on Kubernetes :)
Posted on April 16, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.