Quick profiling of data in Apache Kafka using kafkacat and visidata
Robin Moffatt
Posted on March 30, 2021
ksqlDB is a fantastically powerful tool for processing and analysing streams of data in Apache Kafka. But sometimes, you just want a quick way to profile the data in a topic in Kafka. I wrote about this previously with a convoluted (but effective) set of bash commands pipelined together to perform a GROUP BY
on data. Then someone introduced me to visidata
, which makes it all a lot quicker!
Let’s imagine we have data in Kafka, and we’re going to go and build some cool stuff with it. We’re going to process it and build a pipeline, and we need to know something about the data we’re working with. Visidata is a commandline tool to work with data in all sorts of formats, including from stdin
. Coupled with kafkacat
for consuming data from a topic to stdout
they make a perfect pairing:
This samples 100000 JSON records from a topic and pipes it into visidata:
Once visidata is open, press Shift-F to create histogram
kafkacat -b localhost:9092 -t my_topic -C -e -o-100000 | \
vd --filetype jsonl
Once visidata is open, use the arrow keys to move to the column on which you want to build a histogram and press Shift-F. Since it works with pipes if you leave the -e
off the kafkacat
argument you get a live stream of messages from the Kafka topic and the visidata will continue to update as messages arrive (although I think you need to replot the histogram if you want it to refresh).
If your data is in Avro instead you can use kafkacat’s support for Avro conversion (-s avro
) and JSON output (-J
):
kafkacat -b localhost:9092 -t my_topic -C -e -o-100000 \
-r http://schema-registry:8081 -s avro -J | \
jq -c '.payload'| \
vd --filetype jsonl
The fields may well be nested - use g(
in visidata to expand them.
Using it with Confluent Cloud
Raw JSON messages:
kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-b BROKER.gcp.confluent.cloud:9092 \
-X sasl.username="CCLOUD_API_KEY" \
-X sasl.password="CCLOUD_API_PASSWORD" \
-t my_topic -C -e -o-10000 | \
vd --filetype jsonl
Avro data (Schema Registry on Confluent Cloud):
kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-b BROKER.gcp.confluent.cloud:9092 \
-X sasl.username="CCLOUD_API_KEY" \
-X sasl.password="CCLOUD_API_PASSWORD" \
-s avro \
-r https://SR_API_KEY:SR_API_SECRET@SR_ENDPOINT.gcp.confluent.cloud \
-t my_avro_topic -C -e -o-10000 | \
vd --filetype jsonl
Note: You need to URL encode your credentials when supplying them in the Schema Registry URL (thanks to a6kme for this tip!). If you don’t you may well get the error Avro/Schema-registry message deserialization: REST request failed (code -1): HTTP request failed: URL using bad/illegal format or missing URL : terminating
.
So if your Schema Registry API key and Secret were key123!
and S3cr3t/kjna%$!%dsf£
you’d URL encode it and use https://key123%21:S3cr3t%2Fkjna%25%24%21%25dsf%C2%A3@SR_ENDPOINT.gcp.confluent.cloud
Posted on March 30, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.