ledux
Posted on March 24, 2023
Introduction
We once couldn't process data records from a Kinesis data stream with a certain value in a field. After we fixed the bug, we needed to reprocess these records.
This blog post describes how I read the data records from the stream, filtered them by the value in question and prepared them for reprocessing.
In this post I will cover the following topics
- What is Kinesis
- How to read records from Kinesis, using the
aws
cli - How to process json using the
jq
cli - How to read only records which match certain criteria
What is Kinesis
Kinesis data stream is an AWS service for data processing. Producer of data send data records to Kinesis, consumers can then process them.
A data stream consists of one or more shard
s. A shard is a sequence of data records and are used for scaling the throughput. Ideally every shard has its own consumer.
On which shard a data records will be stored, is determined by the PartitionKey
. The partition key is defined by the producer and can be any field in the payload. Kinesis will hash this value and based on the outcome assign it to a shard.
Every data record gets a SequenceNumber
. This number identifies the data record inside the shard. It is an increasing number but not evenly distributed.
ShardIterator
Kinesis is designed to read the ingested data in order it came in. There is no mechanism to search or filter the data. A consumer can only read the data records forward in sequence.
To read the data, the consumer needs a ShardIterator
. It represents the position from where the consumer will start reading.
Depending on how the first record is defined, there are different iterator types:
- AT_SEQUENCE_NUMBER: starts at the record with the exact number provided or the record with the next higher number
- AFTER_SEQUENCE_NUMBER: starts at the record with the next higher number than the provided one
- AT_TIMESTAMP: starts at the record with the exact timestamp provided or the next higher one
- LATEST: starts with the next record created
- TRIM_HORIZON: starts the the first available record in the shard
AWS CLI
AWS provides a command line interface (CLI) to interact with their services, thus also with Kinesis Data Streams.
To read from Kinesis a ShardIterator is needed. Here I create one of type AT_TIMESTAMP
:
stream=stream-name
timestamp=$(date --date='2023-03-13 12:00:00' +%s)
sharditerator=$(aws kinesis get-shard-iterator --stream-name $stream --shard-id 0 --shard-iterator-type AT_TIMESTAMP --timestamp $timestamp | jq .ShardIterator)
This shard iterator allows to read data records from Kinesis:
aws kinesis get-records --shard-iterator $sharditerator --limit 10
This gives an object with an array called Records
, where the Kinesis data records are listed. Additionally, there is the next shard iterator and how many milliseconds this batch is behind the latest data record.
A data record contains the identification (SequenceNumber
), the timestamp, when the record was sent to Kinesis, the Id of the shard, and the actual data. The data is base64 encoded.
{
"Records": [
{
"SequenceNumber": "49633478129523018598145441545878647674003909456640868354",
"ApproximateArrivalTimestamp": "2023-03-03T13:22:56.498000+01:00",
"Data": "ewogICJmb3Jtc0RhdGEiOiB7CiAgICAiZmlyc3ROYW1lIjogIkVyaWMiLAogICAgImxhc3ROYW1lIjogIklkbGUiCiAgfSwKICAibWV0YWRhdGEiOiB7CiAgICAicGFyZW50SWQiOiAiNzgwN2IzNzMtNTRlNy00ZTAzLWE1ZGUtMGI0MDE2OGFmNTRiIgogIH0KfQo=",
"PartitionKey": "0"
},
{
"SequenceNumber": "49633478129523018598145441601514622818488773411761815554",
"ApproximateArrivalTimestamp": "2023-03-03T13:26:30.057000+01:00",
"Data": "ewogICJmb3Jtc0RhdGEiOiB7CiAgICAiZmlyc3ROYW1lIjogIkdyYWhhbSIsCiAgICAibGFzdE5hbWUiOiAiQ2hhcG1hbiIKICB9LAogICJtZXRhZGF0YSI6IHsKICAgICJpbmZvbWVldGluZ0lkIjogImt3MzlrenNhZjMiCiAgfQp9Cg==",
"PartitionKey": "0"
}
],
"NextShardIterator": "AAAAAAAAAAHrZiW4CusNNQOgyPkpxttbi9hAPH35qT91FVx7RcmZKmSuzulLh0t16SAlG9jPUO+NJ0RPxfaWZaCjwusIjzxI3MBdGvKbJt/MX2bJHv2FTqiyArEDvuFBI0cvdNeX+T18wcnljCEZ3etm7tBkr9l84O0+1KakYygljotcBba49QLuvW3f90OXxXV9bam5HY3CmbxEr5fK5quRhoBgvhrvxBXUCvMoRCGzVn7krSr9EhZD79DwynYJ9qL3JY5/ZyVAMeh4a20ENkt6PR7MdUikElbjeyuvmeLBpOj+demEps/1NaHh2i5r1i/BRiemgj/5sii+bKcWGPqhkeEujl5+",
"MillisBehindLatest": 953238000
}
Kinesis will only return the data records in a certain time frame. Often times not even the amount specified with the --limit
parameter. To get the next batch of records, the same request must be sent, but with the NextShardIterator
. Also, to filter the data records on a field in the Data
, we need act on the JSON.
There is a nice little powerful tool called jq
which can do that.
Excursion jq
jq is a lightweight and flexible command-line JSON processor.
There is a nice tutorial, but I'll give you the basics here:
Just passing json to jq, it will pretty print the it, including syntax highlighting:
json='[{"name":"John","age":30,"address":{"street":"123 Main St","city":"Anytown","country":"USA"}},{"name":"Jane","age":25},{"name":"Bob","age":40,"address":{"street":"456 Oak St","city":"Othertown","country":"USA"}}]'
echo $json | jq .
To get an entry in the array, we can use the array notation.
To get the entries from the end, use negative numbers (-1
for the last, -2
for the second to last, etc.)
echo $json | jq '.[0]'
We can also transform the data and for example just get the firstname and the city:
The |
operator in jq feeds the output of one filter into the input of another.
echo $json | jq '.[0] | { firstname: .name, city: .address.city }'
To act on all entries in the array, just omit the index:
echo $json | jq '.[] | { firstname: .name, city: .address.city }'
To put the result into an array, instead of using independent objects, just wrap the whole query in brackets:
echo $json | jq '[.[] | { firstname: .name, city: .address.city }]'
Besides filtering and modifying json, there are numerous built-in functions and operators one can use.
In our example we need to select data records which have a certain property.
The first function we can use is select(boolean_expression)
, which can filter lists:
echo $json | jq '.[] | select(.age > 26)'
The second one is has(key)
, which checks for the presence of a property in a json object:
echo $json | jq '.[1] | has("address")'
To get all objects in the array, which have an address
, we can combine those filters:
echo $json | jq '.[] | select(has("address"))'
Those are the basics of jq
we need to filter all data records which have a certain property.
Read and filter records from Kinesis
To filter the Kinesis data records by the presence of a property in the payload, the payload must be decoded first.
Then we can filter it with jq
as described above.
Afterwards we need to use the next shard iterator to fetch the next batch.
To not call Kinesis twice, I cache the batch in a temporary file.
kinesisBatch=$(mktmp)
aws kinesis get-records --sharditerator $sharditerator --limit 50 > $kinesisBatch
cat $kinesisBatch | jq '.Records[].Data' | base64 -d | jq -s '.[] | select(.metadata | has("parentId"))'
sharditerator=$(cat $kinesisBatch | jq '.NextShardIterator')
aws kinesis get-records --sharditerator $sharditerator --limit 50 > $kinesisBatch
# etc.
Now we need to repeat this until a break condition is reached.
This can be a timestamp or a SequenceNumber.
stream=stream-name
fromTime=$(date --date='2023-03-13 12:00:00' +%s)
timeOfLastEntry=$(date --date='2023-03-13 12:00:00' +%s)
toTime=$(date --date='2023-03-15 15:00:00' +%s)
sharditerator=$(aws kinesis get-shard-iterator --stream-name $stream --shard-id 0 --shard-iterator-type AT_TIMESTAMP --timestamp $timestamp | jq .ShardIterator)
kinesisBatch=$(mktmp)
while [ $toTime -ge $timeOfLastEntry ]; do
aws kinesis get-records --shard-iterator $sharditerator --limit 50 > $kinesisBatch
jq '.Records[].Data' $kinesisBatch | base64 -d | jq -s '.[] | select(.metdata | has("parentId"))' >> foundEntries.json
lastArrival=$(jq '.Records[-1].ApproximateArrivalTimestamp' $kinesisBatch | tr -d '"')
timeOfLastEntry=$(date --date=$lastArrival +%s)
sharditerator=$(jq '.NextShardIterator' $kinesisBatch)
done
Wrap up
Even though Kinesis is storing the data persistently, it is not a database. It stores all records in sequence and there is no way to query the data in Kinesis itself. If you want to act on only a subset of the records, you need consume all records and filter them locally. In this blogppost, I outlined how this can be done using the aws cli, in conjunction with jq
.
The above snippet can be put in a file an be called from the console. The next step would be, to add parameters, so it can be used more interactively, for example
--stream-name stream-name
--starting-timestamp yyyy-mm-dd
--end-timestamp yyyy-mm-dd
--jq-filter-on-data 'select(.metdata | has("parentId"))'
Posted on March 24, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.