Quick profiling of data in Apache Kafka using kafkacat and visidata

rmoff

Robin Moffatt

Posted on March 30, 2021

Quick profiling of data in Apache Kafka using kafkacat and visidata

ksqlDB is a fantastically powerful tool for processing and analysing streams of data in Apache Kafka. But sometimes, you just want a quick way to profile the data in a topic in Kafka. I wrote about this previously with a convoluted (but effective) set of bash commands pipelined together to perform a GROUP BY on data. Then someone introduced me to visidata, which makes it all a lot quicker!

Let’s imagine we have data in Kafka, and we’re going to go and build some cool stuff with it. We’re going to process it and build a pipeline, and we need to know something about the data we’re working with. Visidata is a commandline tool to work with data in all sorts of formats, including from stdin. Coupled with kafkacat for consuming data from a topic to stdout they make a perfect pairing:

This samples 100000 JSON records from a topic and pipes it into visidata:

Once visidata is open, press Shift-F to create histogram

kafkacat -b localhost:9092 -t my_topic -C -e -o-100000 | \
  vd --filetype jsonl
Enter fullscreen mode Exit fullscreen mode

Once visidata is open, use the arrow keys to move to the column on which you want to build a histogram and press Shift-F. Since it works with pipes if you leave the -e off the kafkacat argument you get a live stream of messages from the Kafka topic and the visidata will continue to update as messages arrive (although I think you need to replot the histogram if you want it to refresh).

If your data is in Avro instead you can use kafkacat’s support for Avro conversion (-s avro) and JSON output (-J):

kafkacat -b localhost:9092 -t my_topic -C -e -o-100000 \
  -r http://schema-registry:8081 -s avro -J | \
  jq -c '.payload'| \
  vd --filetype jsonl
Enter fullscreen mode Exit fullscreen mode

The fields may well be nested - use g( in visidata to expand them.

asciicast

Using it with Confluent Cloud

Raw JSON messages:

kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
         -b BROKER.gcp.confluent.cloud:9092 \
         -X sasl.username="CCLOUD_API_KEY" \
         -X sasl.password="CCLOUD_API_PASSWORD" \
         -t my_topic -C -e -o-10000 | \
         vd --filetype jsonl
Enter fullscreen mode Exit fullscreen mode

Avro data (Schema Registry on Confluent Cloud):

kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
         -b BROKER.gcp.confluent.cloud:9092 \
         -X sasl.username="CCLOUD_API_KEY" \
         -X sasl.password="CCLOUD_API_PASSWORD" \
         -s avro \
         -r https://SR_API_KEY:SR_API_SECRET@SR_ENDPOINT.gcp.confluent.cloud \
         -t my_avro_topic -C -e -o-10000 | \
         vd --filetype jsonl
Enter fullscreen mode Exit fullscreen mode

Note: You need to URL encode your credentials when supplying them in the Schema Registry URL (thanks to a6kme for this tip!). If you don’t you may well get the error Avro/Schema-registry message deserialization: REST request failed (code -1): HTTP request failed: URL using bad/illegal format or missing URL : terminating.

So if your Schema Registry API key and Secret were key123! and S3cr3t/kjna%$!%dsf£ you’d URL encode it and use https://key123%21:S3cr3t%2Fkjna%25%24%21%25dsf%C2%A3@SR_ENDPOINT.gcp.confluent.cloud

💖 💪 🙅 🚩
rmoff
Robin Moffatt

Posted on March 30, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related