Kafka Connect on Kubernetes, the easy way!

abhirockzz

Abhishek Gupta

Posted on April 16, 2020

Kafka Connect on Kubernetes, the easy way!

This is a tutorial that shows how to set up and use Kafka Connect on Kubernetes using Strimzi, with the help of an example.

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems using source and sink connectors. Although it's not too hard to deploy a Kafka Connect cluster on Kubernetes (just "DIY"!), I love the fact that Strimzi enables a Kubernetes-native way of doing this using the Operator pattern with the help of Custom Resource Definitions.

In addition to bootstrapping/installing Kafka Connect, this also applies to operations such as scaling the Connect cluster, deploying and managing connectors, etc. (you will see this in action during the course of this blog post)

We will go through the process of deploying a Kafka Connect cluster on Kubernetes, installing a connector, and test it out - all this using kubectl and some YAML (of course!). I will be using Azure Event Hubs as the Kafka broker and Azure Kubernetes Service as the Kubernetes cluster - feel free to use other alternatives (e.g. with a local minikube cluster on your laptop)

All the artifacts are available on GitHub

Strimzi is responsible for all the heavy lifting.. In case you don't already know, here is a gist

Strimzi overview

The Strimzi documentation is detailed yet very well organized and clear! Most of the below paragraph has been taken directly from the docs

Strimzi simplifies the process of running Apache Kafka in a Kubernetes cluster. Strimzi provides container images and Operators for running Kafka on Kubernetes. It is a part of the Cloud Native Computing Foundation as a Sandbox project (at the time of writing)

Strimzi Operators are fundamental to the project. These Operators are purpose-built with specialist operational knowledge to effectively manage Kafka. Operators simplify the process of: Deploying and running Kafka clusters and components, Configuring and securing access to Kafka, Upgrading and managing Kafka and even taking care of managing topics and users.

Here is a diagram which shows a 10,000 feet overview of the Operator roles:

I am not going to dive into the details of deploying Kafka using Strimzi in this post - probably something which I will tackle in future blogs

Pre-requisites

kubectl - https://kubernetes.io/docs/tasks/tools/install-kubectl/

If you choose to use Azure Event Hubs, Azure Kubernetes Service (or both) you will need a Microsoft Azure account. Go ahead and sign up for a free one!

Azure CLI or Azure Cloud Shell - you can either choose to install the Azure CLI if you don't have it already (should be quick!) or just use the Azure Cloud Shell from your browser.

Helm

I will be using Helm to install Strimzi. Here is the documentation to install Helm itself - https://helm.sh/docs/intro/install/

You can also use the YAML files directly to install Strimzi. Check out the quick start guide here - https://strimzi.io/docs/quickstart/latest/#proc-install-product-str

Let's start by setting up the required Azure services (if you're not using Azure, skip this section but please ensure you have the details for your Kafka cluster i.e. broker URLs and authentication credentials, if applicable)

I recommend installing the below services as a part of a single Azure Resource Group which makes it easy to clean up these services

Azure Event Hubs

Azure Event Hubs is a data streaming platform and event ingestion service. It can receive and process millions of events per second. It also provides a Kafka endpoint that can be used by existing Kafka based applications as an alternative to running your own Kafka cluster. Event Hubs supports Apache Kafka protocol 1.0 and later, and works with existing Kafka client applications and other tools in the Kafka ecosystem including Kafka Connect (demonstrated in this blog), MirrorMaker etc.

To setup an Azure Event Hubs cluster, you can choose from a variety of options including the Azure portal, Azure CLI, Azure PowerShell or an ARM template. Once the setup is complete, you will need the connection string (that will be used in subsequent steps) for authenticating to Event Hubs - use this guide to finish this step.

Please ensure that you also create an Event Hub (same as a Kafka topic) to act as the target for our Kafka Connect connector (details in subsequent sections)

Azure Kubernetes Service

Azure Kubernetes Service (AKS) makes it simple to deploy a managed Kubernetes cluster in Azure. It reduces the complexity and operational overhead of managing Kubernetes by offloading much of that responsibility to Azure. Here are examples of how you can setup an AKS cluster using Azure CLI, Azure portal or ARM template

Base install

To start off, we will install Strimzi and Kafka Connect, followed by the File Stream Source Connector

Install Strimzi

Installing Strimzi using Helm is pretty easy:

//add helm chart repo for Strimzi
helm repo add strimzi https://strimzi.io/charts/

//install it! (I have used strimzi-kafka as the release name)
helm install strimzi-kafka strimzi/strimzi-kafka-operator
Enter fullscreen mode Exit fullscreen mode

This will install the Strimzi Operator (which is nothing but a Deployment), Custom Resource Definitions and other Kubernetes components such as Cluster Roles, Cluster Role Bindings and Service Accounts

For more details, check out this link

To delete, simply helm uninstall strimzi-kafka

To confirm that the Strimzi Operator had been deployed, check it's Pod (it should transition to Running status after a while)

kubectl get pods -l=name=strimzi-cluster-operator

NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-5c66f679d5-69rgk   1/1     Running   0          43s
Enter fullscreen mode Exit fullscreen mode

Check the Custom Resource Definitions as well:

kubectl get crd | grep strimzi

kafkabridges.kafka.strimzi.io           2020-04-13T16:49:36Z
kafkaconnectors.kafka.strimzi.io        2020-04-13T16:49:33Z
kafkaconnects.kafka.strimzi.io          2020-04-13T16:49:36Z
kafkaconnects2is.kafka.strimzi.io       2020-04-13T16:49:38Z
kafkamirrormaker2s.kafka.strimzi.io     2020-04-13T16:49:37Z
kafkamirrormakers.kafka.strimzi.io      2020-04-13T16:49:39Z
kafkas.kafka.strimzi.io                 2020-04-13T16:49:40Z
kafkatopics.kafka.strimzi.io            2020-04-13T16:49:34Z
kafkausers.kafka.strimzi.io             2020-04-13T16:49:33Z
Enter fullscreen mode Exit fullscreen mode

I want to call out kafkas.kafka.strimzi.io which represents Kafka clusters in Kubernetes. We will focus on kafkaconnects.kafka.strimzi.io and kafkaconnectors.kafka.strimzi.io which represent Kafka Connect clusters and Connectors respectively.

I am going to skip over the other components but you can dig them out e.g. for Cluster Roles kubectl get clusterrole | grep strimzi

Now that we have the "brain" (the Strimzi Operator) wired up, let's use it!

Kafka Connect

We will need to create some helper Kubernetes components before we deploy Kafka Connect itself.

Before you proceed, clone the GitHub project

git clone https://github.com/abhirockzz/strimzi-kafka-connect-eventhubs
cd strimzi-kafka-connect-eventhubs
Enter fullscreen mode Exit fullscreen mode

Kafka Connect will need to reference an existing Kafka cluster (which in this case is Azure Event Hubs). We can store the authentication info for the cluster as a Kubernetes Secret which can later be used in the Kafka Connect definition.

Update the eventhubs-secret.yaml file to include the credentials for Azure Event Hubs. Enter the connection string in the eventhubspassword attribute.

e.g.

apiVersion: v1
kind: Secret
metadata:
  name: eventhubssecret
type: Opaque
stringData:
  eventhubsuser: $ConnectionString
  eventhubspassword: Endpoint=sb://<eventhubs-namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<access-key>
Enter fullscreen mode Exit fullscreen mode

Leave eventhubsuser: $ConnectionString unchanged

To create the Secret:

kubectl apply -f eventhubs-secret.yaml
Enter fullscreen mode Exit fullscreen mode

By default, Kafka Connect is configured to send logs to stdout. We will use a custom configuration (log4j) to ensure that logs are stored to /tmp/connect-worker.log (in addition to stdout) - you will understand why this is done, in a moment

the configuration itself is stored in a log4j.properties

The log configuration can be stored in a ConfigMap which will later be referenced by the Kafka Connect definition. For details, check https://strimzi.io/docs/latest/#con-kafka-connect-logging-deployment-configuration-kafka-connect

kubectl create configmap connect-logging-configmap --from-file=log4j.properties
Enter fullscreen mode Exit fullscreen mode

Before we deploy Kafka Connect, let's look into its definition. You can see it in its entirety here, but I will go through the important bits.

Notice that the resource kind is KafkaConnect - it is a Custom Resource Definition. Another interesting part is annotations (I will explain this in a bit)

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
Enter fullscreen mode Exit fullscreen mode

bootstrapServers points to a Kafka broker. This could be a comma-separated value for nodes in a HA cluster. In this case its a single Kafka endpoint for Azure Event Hubs (yes, that's all you need!)

spec:
  version: 2.4.0
  replicas: 1
  bootstrapServers: <eventhubs-namespace>.servicebus.windows.net:9093
Enter fullscreen mode Exit fullscreen mode

config is just good old Kafka Connect configuration similar to what you would use in connect-distributed.properties

  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
Enter fullscreen mode Exit fullscreen mode

The authentication section simply refers to a Kubernetes Secret. In this case, we created one earlier with the name eventhubssecret which has the key eventhubspassword containing the connection string for azure event hubs

  authentication:
    type: plain
    username: $ConnectionString
    passwordSecret:
      secretName: eventhubssecret
      password: eventhubspassword
Enter fullscreen mode Exit fullscreen mode

This is where the ConfigMap with log4j config is referenced. This will automatically configure Kafka Connect to use this configuration

  logging:
    type: external
    name: connect-logging-configmap
Enter fullscreen mode Exit fullscreen mode

tls section is used to configure TLS certificates (duh!). In case of event hubs, although we use SASL over PLAINTEXT, it required you to use SSL (i.e. set security.protocol to SASL_SSL). I initially faced an issue with this which was promptly clarified! Hence this piece of configuration was added:

  tls:
    trustedCertificates: []
Enter fullscreen mode Exit fullscreen mode

Cool! We are ready to create a Kafka Connect instance. Before that, make sure that you update the bootstrapServers property with the Azure Event Hubs host name e.g.

spec:
  version: 2.4.0
  replicas: 1
  bootstrapServers: <replace-with-eventhubs-namespace>.servicebus.windows.net:9093
Enter fullscreen mode Exit fullscreen mode

To create the Kafka Connect instance:

kubectl apply -f kafka-connect.yaml
Enter fullscreen mode Exit fullscreen mode

To confirm:

kubectl get kafkaconnects

NAME                 DESIRED REPLICAS
my-connect-cluster   1
Enter fullscreen mode Exit fullscreen mode

This will create a Deployment and a corresponding Pod

kubectl get pod -l=strimzi.io/cluster=my-connect-cluster

NAME                                          READY   STATUS    RESTARTS   AGE
my-connect-cluster-connect-5bf9db5d9f-9ttg4   1/1     Running   0          1h
Enter fullscreen mode Exit fullscreen mode

You have a Kafka Connect cluster in Kubernetes! Check out the logs using kubectl logs <pod name>

Install File Source connector

Let's deploy a connector! To keep things simple, we will use the File Stream Source Connector which comes bundled with Kafka Connect by default. A common way of installing and managing connectors is to use the Kafka Connect REST API, but there is another way that Strimzi offers. This is a Kubernetes-centric approach where a Kakfa Connect connector is represented by a custom resource definition called KafkaConnector. All we need to do is create/update/delete KafkaConnector definitions with the details of our connectors, and Strimzi will take care of the rest!

check out the details in the Strimzi docs https://strimzi.io/docs/latest/#con-creating-managing-connectors-str

Here the definition of our connector:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/tmp/connect-worker.log"
    topic: strimzi
Enter fullscreen mode Exit fullscreen mode

Just like we did before, let's understand what each component means:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: my-source-connector
Enter fullscreen mode Exit fullscreen mode

This is a KafkaConnector resource (specified by kind) whose name is my-source-connector

  labels:
    strimzi.io/cluster: my-connect-cluster
Enter fullscreen mode Exit fullscreen mode

This is where we refer to the Kafka Connect cluster - remember this annotation in the Kafka Connect definition shown above?

  annotations:
    strimzi.io/use-connector-resources: "true"
Enter fullscreen mode Exit fullscreen mode

This simply "activates" the feature and ensures that we are able to deploy connectors using the KafkaConnector CRD and we simply refer to the name of our kafkaconnect resource using the strimzi.io/cluster label

spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/tmp/connect-worker.log"
    topic: strimzi
Enter fullscreen mode Exit fullscreen mode

Finally, in the connector spec, we define the attributes for our connector. Notice the config property which points to the /tmp/connect-worker.log file? Recall that we modified our Kafka Connect instance to push logs to this file. Now, we have configured our File source connector to stream contents of this (log) file and send it to a Kafka topic named strimzi. This makes for a nice demo since the file will keep getting updated and we should be able to see each line as a different message in the destination Kafka topic (in Azure Event Hubs)

I have used strimzi as the topic name. This needs to be the same as the Event Hub created in the previous section (while setting up Azure Event Hubs)

To see this in action, let's deploy the connector

kubectl apply -f filestream-source-connector.yaml
Enter fullscreen mode Exit fullscreen mode

To confirm, simply list the connectors:

kubectl get kafkaconnectors

NAME                  AGE
my-source-connector   70s
Enter fullscreen mode Exit fullscreen mode

You can install other connectors as well. One of the ways (and the easiest IMO) to do this is by extending the Strimzi base image and adding the required connector artifacts on top of it. Check out the documentation https://strimzi.io/docs/latest/#using-kafka-connect-with-plug-ins-str

Kafka Connect in action...

First, let's confirm that the Kafka Connect logs are being piped to the intended location. This is important since we're using the log file as a source for the File stream connector. For this, we need to peek inside the Kafka Connect Pod e.g.

kubectl exec -it <kafka_connect_pod_name> -- tail -f /tmp/connect-worker.log
Enter fullscreen mode Exit fullscreen mode

To make this easier, simply use the below command:

kubectl exec -it $(kubectl get pod -l=strimzi.io/cluster=my-connect-cluster -o jsonpath='{.items[0].metadata.name}') -- tail -f /tmp/connect-worker.log
Enter fullscreen mode Exit fullscreen mode

Now, you should see the Kafka Connect logs...

In a different terminal window, start a consumer process connecting to your Azure Event Hubs topic. I used kafkacat but there are other options such as the console consumer in the Kafka CLI itself or a programmatic consumer using Java, .NET, Go etc. (although it might a bit of an overkill in this case)

You should see the same logs here as well! For e.g.

...
{"schema":{"type":"string","optional":false},"payload":"[2020-04-16 04:59:25,731] INFO WorkerSourceTask{id=my-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)"}
{"schema":{"type":"string","optional":false},"payload":"[2020-04-16 04:59:25,785] INFO WorkerSourceTask{id=my-source-connector-0} Finished commitOffsets successfully in 55 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)"}
...
Enter fullscreen mode Exit fullscreen mode

The log itself is captured as part of the payload e.g. [2020-04-16 04:59:25,785] INFO WorkerSourceTask{id=my-source-connector-0} Finished commitOffsets successfully in 55 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)

Managing Kafka Connect resources

To scale out Kafka Connect, simply update the no. of replicas in the spec e.g. from 1 to 2 in this case:

spec:
  version: 2.4.0
  replicas: 2
Enter fullscreen mode Exit fullscreen mode

Appy the updated manifest

kubectl apply -f kafka-connect.yaml
Enter fullscreen mode Exit fullscreen mode

Please ensure that you increase the no. of replicas by updating the manifest and not by updating the Deployment using kubectl scale. This is because, the Strimzi operator reconciliation loop will check the KafkaConnect resource, find that the replicas count is 1 and scale the Deployment back

There should be two Pods now:

kubectl get pod -l=strimzi.io/cluster=my-connect-cluster


NAME                                          READY   STATUS    RESTARTS   AGE
my-connect-cluster-connect-5bf9db5d9f-9ttg4   1/1     Running   0          45m
my-connect-cluster-connect-5bf9db5d9f-pzn95   1/1     Running   0          1m5s
Enter fullscreen mode Exit fullscreen mode

my-connect-cluster-connect-5bf9db5d9f-pzn95 is the new Pod

You can update the connector specification. For e.g. to allocate more tasks, update tasksMax from 2 to 5

...
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 5
...
Enter fullscreen mode Exit fullscreen mode

Note: this will restart the connector

Clean up

To delete the connector and the Kafka Connect instance:

kubectl delete -f filestream-source-connector.yaml
kubectl delete -f kafka-connect.yaml
Enter fullscreen mode Exit fullscreen mode

To clean up the AKS cluster and Azure Event Hubs, simply delete the resource group:

az group delete --name $AZURE_RESOURCE_GROUP --yes --no-wait
Enter fullscreen mode Exit fullscreen mode

That concludes this blog post!

Like I mentioned, the Strimzi documentation is detailed, yet very clear and easy to navigate. To wrap things up, I will leave you with additional references from the Strimzi docs which I found useful in addition to the ones I mentioned in the post:

Strimzi doc references

I hope you find it useful for getting started with Kafka Connect on Kubernetes :)

💖 💪 🙅 🚩
abhirockzz
Abhishek Gupta

Posted on April 16, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related