How to develop a Kafka Streams application for data processing and deploy it to Kubernetes

abhirockzz

Abhishek Gupta

Posted on September 16, 2019

How to develop a Kafka Streams application for data processing and deploy it to Kubernetes

This tutorial will guide you through how to build a stateless stream processing application using the Kafka Streams library and run it in a Kubernetes cluster on Azure (AKS).

As you go through this, you'll learn about the following:

  • What is Kafka Streams?
  • How to set up and configure a Docker container registry and Kubernetes cluster on Azure
  • What's going on in the Java code for stream processing logic using Kafka Streams
  • How to build & deploy our app to Kubernetes and finally test it out using the Kafka CLI

The source code is on GitHub

a handy list of all the CLI commands is available at the end of this blog

Before we dive in, here is a snapshot of how the end state looks like.

Overview of Kafka Streams

It is a simple and lightweight client library, which can be easily embedded in any Java app or microservice, where the input and output data are stored in Kafka clusters. It has no external dependencies on systems other than Kafka itself and it’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees. It has support for fault-tolerant local state, employs one-record-at-a-time processing to achieve millisecond processing latency and offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API. The combination of "state stores" and "Interactive queries" allow you to leverage the state of your application from outside your application.

Pre-requistes:

If you don't have it already, please install the Azure CLI and kubectl. The stream processing app is written in Java and uses Maven and you will also need Docker to build the app container image.

This tutorial assumes you have a Kafka cluster which is reachable from your Kubernetes cluster on Azure

AKS cluster setup

You need a single command to stand up a Kubernetes cluster on Azure. But, before that, we'll have to create a resource group

export AZURE_SUBSCRIPTION_ID=[to be filled]
export AZURE_RESOURCE_GROUP=[to be filled]
export AZURE_REGION=[to be filled] (e.g. southeastasia)
Enter fullscreen mode Exit fullscreen mode

Switch to your subscription and invoke az group create

az account set -s $AZURE_SUBSCRIPTION_ID
az group create -l $AZURE_REGION -n $AZURE_RESOURCE_GROUP
Enter fullscreen mode Exit fullscreen mode

You can now invoke az aks create to create the new cluster

To keep things simple, the below command creates a single node cluster. Feel free to change the specification as per your requirements

export AKS_CLUSTER_NAME=[to be filled]

az aks create --resource-group $AZURE_RESOURCE_GROUP --name $AKS_CLUSTER_NAME --node-count 1 --node-vm-size Standard_B2s --node-osdisk-size 30 --generate-ssh-keys
Enter fullscreen mode Exit fullscreen mode

Get the AKS cluster credentials using az aks get-credentials - as a result, kubectl will now point to your new cluster. You can confirm the same

az aks get-credentials --resource-group $AZURE_RESOURCE_GROUP --name $AKS_CLUSTER_NAME
kubectl get nodes
Enter fullscreen mode Exit fullscreen mode

If you are interested in learning Kubernetes and Containers using Azure, simply create a free account and get going! A good starting point is to use the quickstarts, tutorials and code samples in the documentation to familiarize yourself with the service. I also highly recommend checking out the 50 days Kubernetes Learning Path. Advanced users might want to refer to Kubernetes best practices or the watch some of the videos for demos, top features and technical sessions.

Setup Azure Container Registry

Simply put, Azure Container Registry (ACR in short) is a managed private Docker registry in the cloud which allows you to build, store, and manage images for all types of container deployments.

Start by creating an ACR instance

export ACR_NAME=[to be filled]
az acr create --resource-group $AZURE_RESOURCE_GROUP --name $ACR_NAME --sku Basic
Enter fullscreen mode Exit fullscreen mode

valid SKU values - Basic, Classic, Premium, Standard. See command documentation

Configure ACR to work with AKS

To access images stored in ACR, you must grant the AKS service principal the correct rights to pull images from ACR.

Get the appId of the service principal which is associated with your AKS cluster

AKS_SERVICE_PRINCIPAL_APPID=$(az aks show --name $AKS_CLUSTER_NAME --resource-group $AZURE_RESOURCE_GROUP --query servicePrincipalProfile.clientId -o tsv)
Enter fullscreen mode Exit fullscreen mode

Find the ACR resource ID

ACR_RESOURCE_ID=$(az acr show --resource-group $AZURE_RESOURCE_GROUP --name $ACR_NAME --query "id" --output tsv)
Enter fullscreen mode Exit fullscreen mode

Grant acrpull permissions to AKS service principal

az role assignment create --assignee $AKS_SERVICE_PRINCIPAL_APPID --scope $ACR_RESOURCE_ID --role acrpull
Enter fullscreen mode Exit fullscreen mode

For some more details on this topic, check out one of my previous blog

Alright, our AKS cluster along with ACR is ready to use! Let's shift gears and look at the Kafka Streams code - it's succinct and has been kept simple for the purposes of this tutorial.

Stream processing code

What the processing pipeline does is pretty simple. It makes use of the high-level Streams DSL API:

  • receives words from an input/source Kafka topic
  • converts it to upper case
  • stores the records in an output Kafka topic (sink)

Please not that the latest Kafka Streams library version at the time of writing was 2.3.0 and that's what the app uses

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.3.0</version>
    </dependency>
Enter fullscreen mode Exit fullscreen mode

We start off with an instance of StreamsBuilder and invoke it's stream method to hook on to the source topic (name: lower-case). What we get is a KStream object which is a representation of the continuous stream of records sent to the topic lower-case. Note that the inputs records are nothing but key value pairs.

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> lowerCaseStrings = builder.stream(INPUT_TOPIC);
Enter fullscreen mode Exit fullscreen mode

Ok we have our records in the form of an object - what do we do with them? How do we process them? In this case all we do is apply a simple transformation using mapValues to convert the record (the value, not the key) to upper case. This gives us another KStream instance - upperCaseStrings, whose records we are pushed to a sink topic named upper-case by invoking the to method.

    KStream<String, String> upperCaseStrings = lowerCaseStrings.mapValues(new ValueMapper<String, String>() {
        @Override
        public String apply(String str) {
            return str.toUpperCase();
        }
    });
    upperCaseStrings.to(OUTPUT_TOPIC);
Enter fullscreen mode Exit fullscreen mode

That's all in terms fo the setting up the stream and defining the logic. We create a Topology object using the build method in StreamsBuilder and use this object to create a KafkaStreams instance which is a representation of our application itself. We start the stream processing using the start method

    Topology topology = builder.build();
    KafkaStreams streamsApp = new KafkaStreams(topology, getKafkaStreamsConfig());
    streamsApp.start();
Enter fullscreen mode Exit fullscreen mode

The getKafkaStreamsConfig() is just a helper method which creates a Properties object which contains Kafka Streams specific configuration, including Kafka broker endpoint etc.

static Properties getKafkaStreamsConfig() {
    String kafkaBroker = System.getenv().get(KAFKA_BROKER_ENV_VAR);
    Properties configurations = new Properties();
    configurations.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker + ":9092");
    configurations.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
    configurations.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    configurations.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    configurations.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000");
    configurations.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "500");

    return configurations;
}
Enter fullscreen mode Exit fullscreen mode

That's it for the code. It's time to deploy it!

From your laptop to a Docker Registry in the cloud

Clone the GitHub repo, change to the correct directory and build the application JAR

git clone https://github.com/abhirockzz/kafka-streams-kubernetes
cd kafka-streams-kubernetes
mvn clean isntall
Enter fullscreen mode Exit fullscreen mode

You should see kstreams-lower-to-upper-1.0.jar in the target directory

Here is the Dockerfile for our stream processsing app

FROM openjdk:8-jre
WORKDIR /
COPY target/kstreams-lower-to-upper-1.0.jar /
CMD ["java", "-jar","kstreams-lower-to-upper-1.0.jar"]
Enter fullscreen mode Exit fullscreen mode

We will now build a Docker image ...

export DOCKER_IMAGE=kstreams-lower-to-upper:v1
export ACR_SERVER=$ACR_NAME.azurecr.io
docker build -t $DOCKER_IMAGE .
Enter fullscreen mode Exit fullscreen mode

... and push it to Azure Container Registry

az acr login --name $ACR_NAME
docker tag $DOCKER_IMAGE $ACR_SERVER/$DOCKER_IMAGE
docker push $ACR_SERVER/$DOCKER_IMAGE
Enter fullscreen mode Exit fullscreen mode

Once this is done, you can confirm using az acr repository list

az acr repository list --name $ACR_NAME --output table
Enter fullscreen mode Exit fullscreen mode

Deploy to Kubernetes

Our application is a stateless processor and we will deploy it as a Kubernetes Deployment with two instances (replicas).

In case you're interested in diving deeper into native Kubernetes primitives for managing stateless applications, check out this blog

The kstreams-deployment.yaml file contains the spec the Deployment which will represent our stream processing app. You need to modify it add the following information as per your environment

  • Azure Container Registry name (which you earlier specified using ACR_NAME)
  • The endpoint for your Kafka broker e.g. my-kafka:9092

    spec:
    containers:
    - name: kstreams-lower-to-upper
        image: [REPLACE_ACR_NAME].azurecr.io/kstreams-lower-to-upper:v1
        env:
        - name: KAFKA_BROKER
            value: [to be filled]
    

To deploy and confirm

kubectl apply -f kstreams-deployment.yaml
kubectl get pods -l=app=kstream-lower-to-upper
Enter fullscreen mode Exit fullscreen mode

You should see two pods in the Running state

The moment of truth!

It's time to test our end to end flow. Just to summarize:

  • you will produce data to the input Kafka topic (lower-case) using the Kafka CLI locally
  • the stream processing application in AKS will churn the data and put it back to another Kafka topic
  • your local Kafka CLI based consumer process will get that data from the output topic (upper-case)

Let' create the Kafka topics first

export KAFKA_HOME=[kafka installation directory]
export INPUT_TOPIC=lower-case
export OUTPUT_TOPIC=upper-case

$KAFKA_HOME/bin/kafka-topics.sh --create --topic $INPUT_TOPIC --partitions 2 --replication-factor 1 --bootstrap-server $KAFKA_BROKER
$KAFKA_HOME/bin/kafka-topics.sh --create --topic $OUTPUT_TOPIC --partitions 2 --replication-factor 1 --bootstrap-server $KAFKA_BROKER

$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_BROKER
Enter fullscreen mode Exit fullscreen mode

Start consumer process

export KAFKA_HOME=[kafka installation directory]
export KAFKA_BROKER=[kafka broker e.g. localhost:9092]
export OUTPUT_TOPIC=upper-case

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 
$KAFKA_BROKER --topic $OUTPUT_TOPIC --from-beginning
Enter fullscreen mode Exit fullscreen mode

Start producer process (different terminal)

export KAFKA_HOME=[kafka installation directory]
export KAFKA_BROKER=[kafka broker e.g. localhost:9092]
export INPUT_TOPIC=lower-case

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $INPUT_TOPIC
Enter fullscreen mode Exit fullscreen mode

You will get a prompt and you can start entering values e.g.

> foo
> bar
> baz
> john
> doe
Enter fullscreen mode Exit fullscreen mode

Wait for a few seconds and check the terminal window. You should see the upper case form of the above records i.e. FOO, BAR etc.

Clean up

To clean up your AKS cluster, ACR instance and related resources

az group delete --name $AZURE_RESOURCE_GROUP --yes --no-wait
Enter fullscreen mode Exit fullscreen mode

(as promised)

Handy list of commands..

.. for your reference

Azure Kubernetes Service

Azure Container Registry

General commands

If you found this article helpful, please like and follow! Happy to get feedback via @abhi_tweeter or just drop a comment :-)

💖 💪 🙅 🚩
abhirockzz
Abhishek Gupta

Posted on September 16, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related