How to develop a Kafka Streams application for data processing and deploy it to Kubernetes
Abhishek Gupta
Posted on September 16, 2019
This tutorial will guide you through how to build a stateless stream processing application using the Kafka Streams library and run it in a Kubernetes cluster on Azure (AKS).
As you go through this, you'll learn about the following:
- What is Kafka Streams?
- How to set up and configure a Docker container registry and Kubernetes cluster on Azure
- What's going on in the Java code for stream processing logic using Kafka Streams
- How to build & deploy our app to Kubernetes and finally test it out using the Kafka CLI
The source code is on GitHub
a handy list of all the CLI commands is available at the end of this blog
Before we dive in, here is a snapshot of how the end state looks like.
Overview of Kafka Streams
It is a simple and lightweight client library, which can be easily embedded in any Java app or microservice, where the input and output data are stored in Kafka clusters. It has no external dependencies on systems other than Kafka itself and it’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees. It has support for fault-tolerant local state, employs one-record-at-a-time processing to achieve millisecond processing latency and offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API. The combination of "state stores" and "Interactive queries" allow you to leverage the state of your application from outside your application.
Pre-requistes:
If you don't have it already, please install the Azure CLI and kubectl. The stream processing app is written in Java and uses Maven and you will also need Docker to build the app container image.
This tutorial assumes you have a Kafka cluster which is reachable from your Kubernetes cluster on Azure
AKS cluster setup
You need a single command to stand up a Kubernetes cluster on Azure. But, before that, we'll have to create a resource group
export AZURE_SUBSCRIPTION_ID=[to be filled]
export AZURE_RESOURCE_GROUP=[to be filled]
export AZURE_REGION=[to be filled] (e.g. southeastasia)
Switch to your subscription and invoke az group create
az account set -s $AZURE_SUBSCRIPTION_ID
az group create -l $AZURE_REGION -n $AZURE_RESOURCE_GROUP
You can now invoke az aks create
to create the new cluster
To keep things simple, the below command creates a single node cluster. Feel free to change the specification as per your requirements
export AKS_CLUSTER_NAME=[to be filled]
az aks create --resource-group $AZURE_RESOURCE_GROUP --name $AKS_CLUSTER_NAME --node-count 1 --node-vm-size Standard_B2s --node-osdisk-size 30 --generate-ssh-keys
Get the AKS cluster credentials using az aks get-credentials
- as a result, kubectl
will now point to your new cluster. You can confirm the same
az aks get-credentials --resource-group $AZURE_RESOURCE_GROUP --name $AKS_CLUSTER_NAME
kubectl get nodes
If you are interested in learning Kubernetes and Containers using Azure, simply create a free account and get going! A good starting point is to use the quickstarts, tutorials and code samples in the documentation to familiarize yourself with the service. I also highly recommend checking out the 50 days Kubernetes Learning Path. Advanced users might want to refer to Kubernetes best practices or the watch some of the videos for demos, top features and technical sessions.
Setup Azure Container Registry
Simply put, Azure Container Registry (ACR
in short) is a managed private Docker registry in the cloud which allows you to build, store, and manage images for all types of container deployments.
Start by creating an ACR instance
export ACR_NAME=[to be filled]
az acr create --resource-group $AZURE_RESOURCE_GROUP --name $ACR_NAME --sku Basic
valid SKU values -
Basic
,Classic
,Premium
,Standard
. See command documentation
Configure ACR to work with AKS
To access images stored in ACR, you must grant the AKS service principal the correct rights to pull images from ACR.
Get the appId
of the service principal which is associated with your AKS cluster
AKS_SERVICE_PRINCIPAL_APPID=$(az aks show --name $AKS_CLUSTER_NAME --resource-group $AZURE_RESOURCE_GROUP --query servicePrincipalProfile.clientId -o tsv)
Find the ACR resource ID
ACR_RESOURCE_ID=$(az acr show --resource-group $AZURE_RESOURCE_GROUP --name $ACR_NAME --query "id" --output tsv)
Grant acrpull
permissions to AKS service principal
az role assignment create --assignee $AKS_SERVICE_PRINCIPAL_APPID --scope $ACR_RESOURCE_ID --role acrpull
For some more details on this topic, check out one of my previous blog
Quick tip: How to get your Kubernetes cluster service principal and use it to access other Azure services?
Abhishek Gupta for Microsoft Azure ・ Sep 11 '19
Alright, our AKS cluster along with ACR is ready to use! Let's shift gears and look at the Kafka Streams code - it's succinct and has been kept simple for the purposes of this tutorial.
Stream processing code
What the processing pipeline does is pretty simple. It makes use of the high-level Streams DSL API:
- receives words from an input/source Kafka topic
- converts it to upper case
- stores the records in an output Kafka topic (sink)
Please not that the latest Kafka Streams library version at the time of writing was 2.3.0
and that's what the app uses
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.3.0</version>
</dependency>
We start off with an instance of StreamsBuilder
and invoke it's stream
method to hook on to the source topic (name: lower-case
). What we get is a KStream
object which is a representation of the continuous stream of records sent to the topic lower-case
. Note that the inputs records are nothing but key value pairs.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> lowerCaseStrings = builder.stream(INPUT_TOPIC);
Ok we have our records in the form of an object - what do we do with them? How do we process them? In this case all we do is apply a simple transformation
using mapValues
to convert the record (the value, not the key) to upper case. This gives us another KStream
instance - upperCaseStrings
, whose records we are pushed to a sink topic named upper-case
by invoking the to
method.
KStream<String, String> upperCaseStrings = lowerCaseStrings.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String str) {
return str.toUpperCase();
}
});
upperCaseStrings.to(OUTPUT_TOPIC);
That's all in terms fo the setting up the stream and defining the logic. We create a Topology
object using the build
method in StreamsBuilder
and use this object to create a KafkaStreams
instance which is a representation of our application itself. We start the stream processing using the start
method
Topology topology = builder.build();
KafkaStreams streamsApp = new KafkaStreams(topology, getKafkaStreamsConfig());
streamsApp.start();
The getKafkaStreamsConfig()
is just a helper method which creates a Properties
object which contains Kafka Streams specific configuration, including Kafka broker endpoint etc.
static Properties getKafkaStreamsConfig() {
String kafkaBroker = System.getenv().get(KAFKA_BROKER_ENV_VAR);
Properties configurations = new Properties();
configurations.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker + ":9092");
configurations.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
configurations.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
configurations.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
configurations.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000");
configurations.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "500");
return configurations;
}
That's it for the code. It's time to deploy it!
From your laptop to a Docker Registry in the cloud
Clone the GitHub repo, change to the correct directory and build the application JAR
git clone https://github.com/abhirockzz/kafka-streams-kubernetes
cd kafka-streams-kubernetes
mvn clean isntall
You should see
kstreams-lower-to-upper-1.0.jar
in thetarget
directory
Here is the Dockerfile
for our stream processsing app
FROM openjdk:8-jre
WORKDIR /
COPY target/kstreams-lower-to-upper-1.0.jar /
CMD ["java", "-jar","kstreams-lower-to-upper-1.0.jar"]
We will now build a Docker image ...
export DOCKER_IMAGE=kstreams-lower-to-upper:v1
export ACR_SERVER=$ACR_NAME.azurecr.io
docker build -t $DOCKER_IMAGE .
... and push it to Azure Container Registry
az acr login --name $ACR_NAME
docker tag $DOCKER_IMAGE $ACR_SERVER/$DOCKER_IMAGE
docker push $ACR_SERVER/$DOCKER_IMAGE
Once this is done, you can confirm using az acr repository list
az acr repository list --name $ACR_NAME --output table
Deploy to Kubernetes
Our application is a stateless processor and we will deploy it as a Kubernetes Deployment
with two instances (replicas).
In case you're interested in diving deeper into native Kubernetes primitives for managing stateless applications, check out this blog
Tutorial: How to orchestrate stateless apps in Kubernetes?
Abhishek Gupta for ITNEXT ・ Sep 9 '19
The kstreams-deployment.yaml
file contains the spec the Deployment
which will represent our stream processing app. You need to modify it add the following information as per your environment
- Azure Container Registry name (which you earlier specified using
ACR_NAME
) -
The endpoint for your Kafka broker e.g.
my-kafka:9092
spec: containers: - name: kstreams-lower-to-upper image: [REPLACE_ACR_NAME].azurecr.io/kstreams-lower-to-upper:v1 env: - name: KAFKA_BROKER value: [to be filled]
To deploy and confirm
kubectl apply -f kstreams-deployment.yaml
kubectl get pods -l=app=kstream-lower-to-upper
You should see two pods in the Running
state
The moment of truth!
It's time to test our end to end flow. Just to summarize:
- you will produce data to the input Kafka topic (
lower-case
) using the Kafka CLI locally - the stream processing application in AKS will churn the data and put it back to another Kafka topic
- your local Kafka CLI based consumer process will get that data from the output topic (
upper-case
)
Let' create the Kafka topics first
export KAFKA_HOME=[kafka installation directory]
export INPUT_TOPIC=lower-case
export OUTPUT_TOPIC=upper-case
$KAFKA_HOME/bin/kafka-topics.sh --create --topic $INPUT_TOPIC --partitions 2 --replication-factor 1 --bootstrap-server $KAFKA_BROKER
$KAFKA_HOME/bin/kafka-topics.sh --create --topic $OUTPUT_TOPIC --partitions 2 --replication-factor 1 --bootstrap-server $KAFKA_BROKER
$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_BROKER
Start consumer process
export KAFKA_HOME=[kafka installation directory]
export KAFKA_BROKER=[kafka broker e.g. localhost:9092]
export OUTPUT_TOPIC=upper-case
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server
$KAFKA_BROKER --topic $OUTPUT_TOPIC --from-beginning
Start producer process (different terminal)
export KAFKA_HOME=[kafka installation directory]
export KAFKA_BROKER=[kafka broker e.g. localhost:9092]
export INPUT_TOPIC=lower-case
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $INPUT_TOPIC
You will get a prompt and you can start entering values e.g.
> foo
> bar
> baz
> john
> doe
Wait for a few seconds and check the terminal window. You should see the upper case form of the above records i.e. FOO
, BAR
etc.
Clean up
To clean up your AKS cluster, ACR instance and related resources
az group delete --name $AZURE_RESOURCE_GROUP --yes --no-wait
(as promised)
Handy list of commands..
.. for your reference
Azure Kubernetes Service
-
az aks create
- Create a new managed Kubernetes cluster -
az aks get-credentials
- Get access credentials for a managed Kubernetes cluster -
az aks show
- Show the details for a managed Kubernetes cluster
Azure Container Registry
-
az acr create
- Creates an Azure Container Registry -
az acr show
- Get the details of an Azure Container Registry -
az acr login
- Log in to an Azure Container Registry through the Docker CLI -
az acr repository list
- List repositories in an Azure Container Registry.
General commands
-
az account set
- Set a subscription to be the current active subscription -
az group create
- Create a new resource group -
az role assignment create
- Create a new role assignment for a user, group, or service principal
If you found this article helpful, please like and follow! Happy to get feedback via @abhi_tweeter or just drop a comment :-)
Posted on September 16, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
September 16, 2019