Abhishek Gupta
Posted on October 9, 2019
This blog will demonstrate how to interact with Event Hubs Kafka cluster using the Sarama Kafka client library. The sarama
package provides a pure Go client that supports Kafka v 0.8 and above.
Azure Event Hubs is a streaming platform and event ingestion service, capable of receiving and processing millions of events per second. It also provides a Kafka endpoint that can be used by your existing Kafka based applications as an alternative to running your own Kafka cluster. Since Azure Event Hubs exposes a protocol that is binary compatible with Kafka versions 1.0, you can start using the Kafka endpoint from your existing applications with no code change but a minimal configuration change. This also supports frameworks like Kafka Connect (currently in preview), MirrorMaker etc.
What's covered?
- Code: how to configure and use the Sarama Go client to talk to Event Hubs Kafka endpoint and build producer, consumer apps
- Setup: use Azure CLI to quickly bootstrap an Event Hubs for Kafka instance
- Test: run the producer and consumer app to try the end to end scenario
as always, the code is available on GitHub
Pre-requisites
If you don't have an Azure subscription, just create a free account and get going! You will also need the Azure CLI. Of course, you will need to have Go installed as well.
Code walkthrough
The app is pretty simple and consists of a producer and a consumer built using the Sarama Go client. Let's skim through the code real quick
Configuration for connecting to Event Hubs for Kafka
You need to pass a sarama.Config
object in order to create a producer or consumer instance.
func getConfig() *sarama.Config {
config := sarama.NewConfig()
config.Net.DialTimeout = 10 * time.Second
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = getEnv(eventHubsConnStringEnvVar)
config.Net.SASL.Mechanism = "PLAIN"
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: true,
ClientAuth: 0,
}
config.Version = sarama.V1_0_0_0
return config
}
Authentication is the key here - Azure Event Hubs (including the Kafka features) requires SSL or TLS for all communication and uses Shared Access Signatures (SAS) for authentication.
Please refer to the documentation for details regarding the security model
- SASL is enabled with
config.Net.SASL.Enable = true
and uses thePLAIN
mechanism (config.Net.SASL.Mechanism) - SASL User is defined using
config.Net.SASL.User
and is set to$ConnectionString
(yes this is static) - SASL password (
config.Net.SASL.Password
) is set to the Event Hubs connection string (details to follow in the upcoming section) - Finally, TLS is enabled (
config.Net.TLS.Enable
) andconfig.Net.TLS.Config
is set to an instance of TLS Config
The configuration for the producer and consumer (for this example) is the same except for
config.Producer.Return.Successes = true
which is required for the synchronous version of the producer
Things to watch out for:
- If you do not configure TLS properly, you might see this error -
Failed to start Sarama producer: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
- At the time of writing this article, Event Hubs does not support message compression, so please ensure that you use the default value of
config.Producer.Compression
(which isnone
) or set it explicitly usingconfig.Producer.Compression = sarama.CompressionNone
. If not, you might see this errorFailed to send msg: kafka server: The requested operation is not supported by the message format version.
- Make sure you set the Kafka broker version to
config.Version = sarama.V1_0_0_0
or higher
Event Hubs Kafka Producer
Create a producer instance passing in the Event Hubs broker and the required configuration (sarama.Config
)
producer, err := sarama.NewSyncProducer(brokerList, getConfig())
if err != nil {
fmt.Println("Failed to start Sarama producer:", err)
os.Exit(1)
}
Start a goroutine for producing messages. This happens in an infinite for loop which can be stopped using ctrl+c
go func() {
for {
if producerOpen {
ts := time.Now().String()
msg := &sarama.ProducerMessage{Topic: eventHubsTopic, Key: sarama.StringEncoder("key-" + ts), Value: sarama.StringEncoder("value-" + ts)}
p, o, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Failed to send msg:", err)
continue
}
}
...
}
}()
For a clean exit, listen to the interrupt signal (ctrl+c
) and close the producer
close := make(chan os.Signal)
signal.Notify(close, syscall.SIGTERM, syscall.SIGINT)
<-close
err = producer.Close()
Event Hubs Kafka Consumer
Create a consumer group instance passing in the Event Hubs broker
consumer, err := sarama.NewConsumerGroup(brokerList, consumerGroupID, getConfig())
Start consuming in a separate goroutine
go func() {
for {
err = consumer.Consume(ctx, []string{getEnv(eventHubsTopicEnvVar)}, messageHandler{})
....
if ctx.Err() != nil {
return
}
}
}()
messageHandler
implements sarama.ConsumerGroupHandler
(functions - Setup
, Cleanup
and ConsumeClaim
). ConsumeClaim
function is the important one. It is where you specify what to do with each message - in this example, it is logged to standard out and marked as consumed
func (h messageHandler) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {
for msg := range c.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
fmt.Println("Message content", string(msg.Value))
s.MarkMessage(msg, "")
}
return nil
}
The consumer can be stopped by exiting the program (pressing ctrl+c
). For a clean exit, the consumer instance is closed
close := make(chan os.Signal)
signal.Notify(close, syscall.SIGTERM, syscall.SIGINT)
<-close
cancel()
if err := consumer.Close(); err != nil {
......
}
Ok, let's create an Event Hubs cluster and try the end to end scenario.
Create your Kafka enabled Event Hubs cluster
If you have a cluster already, skip this and go to the "Event Hubs connection details" sub-section
Set environment variables:
export AZURE_SUBSCRIPTION=[to be filled]
export AZURE_RESOURCE_GROUP=[to be filled]
export AZURE_LOCATION=[to be filled]
export EVENT_HUBS_NAMESPACE=[name of the event hub namespace - to be filled]
export EVENT_HUB_NAME=[name of the event hub (topic) - to be filled]
Create the resource group if you don't have one already
az account set --subscription $AZURE_SUBSCRIPTION
az group create --name $AZURE_RESOURCE_GROUP --location $AZURE_LOCATION
Create an Event Hubs namespace (similar to a Kafka Cluster)
For details on Event Hubs namespace, please refer to the Event Hubs documentation
az eventhubs namespace create --name $EVENT_HUBS_NAMESPACE --resource-group $AZURE_RESOURCE_GROUP --location $AZURE_LOCATION --enable-kafka true --enable-auto-inflate false
Documentation for
az eventhubs namespace create
And then create an Event Hub (same as a Kafka topic)
az eventhubs eventhub create --name $EVENT_HUB_NAME --resource-group $AZURE_RESOURCE_GROUP --namespace-name $EVENT_HUBS_NAMESPACE --partition-count 10
Documentation for
az eventhub create
Event Hubs connection details
Get the connection string and credentials for your cluster
For details, read how Event Hubs uses Shared Access Signatures for authorization
Start by fetching the Event Hub rule/policy name
az eventhubs namespace authorization-rule list --resource-group $AZURE_RESOURCE_GROUP --namespace-name $EVENT_HUBS_NAMESPACE
Documentation for
az eventhubs namespace authorization-rule list
You will get a JSON output similar to below:
[
{
"id": "/subscriptions/qwerty42-ae29-4924-b6a7-dda0ea91d347/resourceGroups/foobar-resource/providers/Microsoft.EventHub/namespaces/foobar-event-hub-ns/AuthorizationRules/RootManageSharedAccessKey",
"location": "Southeast Asia",
"name": "RootManageSharedAccessKey",
"resourceGroup": "foobar-resource",
"rights": [
"Listen",
"Manage",
"Send"
],
"type": "Microsoft.EventHub/Namespaces/AuthorizationRules"
}
]
The authorization rule name is the value of the name
attribute (without the quotes), which in this case is RootManageSharedAccessKey
export EVENT_HUB_AUTH_RULE_NAME=RootManageSharedAccessKey
And, then make use of the rule name to extract the connection string
az eventhubs namespace authorization-rule keys list --resource-group $AZURE_RESOURCE_GROUP --namespace-name $EVENT_HUBS_NAMESPACE --name $EVENT_HUB_AUTH_RULE_NAME
Documentation for
az eventhubs namespace authorization-rule keys list
You'll get a JSON response as such:
{
"aliasPrimaryConnectionString": null,
"aliasSecondaryConnectionString": null,
"keyName": "RootManageSharedAccessKey",
"primaryConnectionString": "Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty=",
"primaryKey": "qwertyEiQHIirSNDPzqcqvZEUs6VAW+JIK3L46tqwerty",
"secondaryConnectionString": "Endpoint=sb://abhishgu-temp-event-hub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=qwertyPF2/YRGzxKmb06Z8NBFLCjnX38O7ch6aiYkN0=",
"secondaryKey": "qwertyPF2/YRGzxKmb06Z8NBqwertyX38O7ch6aiYk42="
}
The primary connection string is the value of the primaryConnectionString
attribute (without the quotes), which in this case is "Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty="
. Make a note of the connection string as you will be using it in the next step.
Test producer and consumer
Clone the GitHub repository and navigate to the right directory:
git clone https://github.com/abhirockzz/eventhubs-kafka-go-sarama
cd eventhubs-kafka-go-sarama
Fetch the Sarama Kafka client library
go get github.com/Shopify/sarama
Producer
Set environment variables
export EVENTHUBS_CONNECTION_STRING=[value of primary connection string obtained in the previous step]
export EVENT_HUBS_NAMESPACE=[event hub namespace]
export EVENTHUBS_BROKER=$EVENT_HUBS_NAMESPACE.servicebus.windows.net:9093
export EVENTHUBS_TOPIC=[name of the event hub (topic)]
for
EVENTHUBS_CONNECTION_STRING
variable, please ensure that you include the double-quotes in the value received using the Azure CLI e.g.export EVENTHUBS_CONNECTION_STRING="Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty="
Start the producer
go run producer/sarama-producer.go
Once it starts, you should see the logs
Event Hubs broker [foo-bar.servicebus.windows.net:9093]
Event Hubs topic testhub
Waiting for program to exit...
sent message to partition 0 offset 1
sent message to partition 7 offset 1
sent message to partition 6 offset 1
sent message to partition 8 offset 1
sent message to partition 2 offset 1
To stop, just press ctrl+c on your terminal
Consumer
Start the consumer process in a different terminal. Set environment variables
export EVENTHUBS_CONNECTION_STRING=[value of primary connection string obtained in the previous step]
export EVENT_HUBS_NAMESPACE=[event hub namespace]
export EVENTHUBS_BROKER=$EVENT_HUBS_NAMESPACE.servicebus.windows.net:9093
export EVENTHUBS_TOPIC=[name of the event hub (topic) - to be filled]
export EVENTHUBS_CONSUMER_GROUPID=[name of consumer group e.g. testgroup]
for
EVENTHUBS_CONNECTION_STRING
variable, please ensure that you include the double-quotes in the value received using the Azure CLI e.g.export EVENTHUBS_CONNECTION_STRING="Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty="
Start the consumer
go run consumer/sarama-consumer.go
In the logs, you will see that the consumer group gets created and all the partitions (10 in this example) are allocated to it
Event Hubs broker [foo-bar.servicebus.windows.net:9093]
Sarama client consumer group ID testgroup
new consumer group created
Event Hubs topic testhub
Waiting for program to exit
Partition allocation - map[testhub:[0 1 2 3 4 5 6 7 8 9]]
Message topic:"testhub" partition:9 offset:45
Message content value-2019-10-08 16:12:23.704802 +0530 IST m=+1.003667284
Message topic:"testhub" partition:3 offset:32
Message content value-2019-10-08 17:05:42.388301 +0530 IST m=+0.912420074
Scale out...
In a different terminal, start another instance of the consumer. This will trigger a rebalance of the partitions and you will see that few (5 in this case) will get allocated to this (new) consumer instance
Event Hubs broker [foo-bar.servicebus.windows.net:9093]
Sarama client consumer group ID testgroup
new consumer group created
Event Hubs topic testhub
Waiting for program to exit
Partition allocation - map[testhub:[0 1 2 3 4]]
If you go back to the terminal for the first consumer instance, you will see that few partitions have been taken away as a result of the rebalancing
Consumer group clean up initiated
Partition allocation - map[testhub:[5 6 7 8 9]]
To stop, just press
ctrl+c
on your terminal
Now, both the consumers will share the workload and consume messages from Event Hubs. You keep scaling out by starting more consumer instances, but this will only be useful until you reach the point where the number of consumer instances is equal to the number of partitions. In essence, the number of partitions of your Events Hub is the unit of parallelism and scale.
That's all for this blog. I would love to have your feedback and suggestions! Don't be shy, just tweet or drop a comment. And, if you found this article useful, please like and follow 😃😃
Posted on October 9, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.