Streaming data into Kafka S01/E03 - Loading JSON file
Florian Hussonnois
Posted on September 10, 2020
This is the third article in the "Streaming data into Kafka" series. In the first two, we saw how it's fairly easy to use Kafka Connect to load records from CSV and XML files into Apache Kafka without writing a single line of code. For doing this, we have used the Kafka Connect FilePulse connector which packs with a lot of nice features to parse and transform data.
- Streaming data into Kafka S01/E01 - Loading CSV file
- Streaming data into Kafka S01/E02 - Loading XML file
Now, let's see how to integrate JSON data, another file format that is widely used on most projects (and much more appreciated than XML for web-based applications).
Kafka Connect File Pulse connector
If you have already read the previous articles go directly to the next section (i.e Ingesting Data).
The Kafka Connect FilePulse connector is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).
For a broad overview of FilePulse, I suggest you read this article :
For more information, you can check-out the documentation here.
How to use the connector
The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.
$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
You can download the docker-compose.yml
file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.
$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on http://localhost:8083
.
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Note : You can also install the connector either from GitHub Releases Page or from Confluent Hub.
Ingesting Data
To read a file containing a single JSON document, we will use the BytesArrayInputReader
. This reader allows us to create a single record per source file. Each record produced by this reader will have a single field of type byte[] named message
. The byte[]
value is the full content of the source file (i.e the JSON document).
Then, to parse this field, we will use the processing filter mechanism provided by the FilePulse connector and more particularly the JSONFilter.
So lets's create the connector with this minimal configuration:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"topic":"tracks-filepulse-json-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"tasks.max": 1
}'
Note: The Connect FilePulse connector periodically scan the input directory that we set using the property fs.scan.directory.path
. Then, it will lookup for files matching the pattern .*\\.json$
. Each file is uniquely identified and tracked depending on the value of the offset.strategy
. Here, the configuration specifies that a file is identified by its name.
Create a valid JSON file that looks like this:
$ cat <<EOF > track.json
{
"track": {
"title":"Star Wars (Main Theme)",
"artist":"John Williams, London Symphony Orchestra",
"album":"Star Wars",
"duration":"10:52"
}
}
EOF
Then copy this file from your host to the Docker container which runs the connector. You can run the following commands:
// Create the target directory
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
// Copy host file to docker-container
$ docker cp track.json connect://tmp/kafka-connect/examples/track-00.json
Finally, consume the topic named tracks-filepulse-json-00
and verify that the connector has detected and processed the JSON file:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"message": {
"bytes": "{ \n \"track\": {\n \"title\":\"Star Wars (Main Theme)\",\n \"artist\":\"John Williams, London Symphony Orchestra\",\n \"album\":\"Star Wars\",\n \"duration\":\"10:52\"\n }\n}\n"
},
"track": {
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "10:52"
}
}
}
}
Note: In the example above, we have used kafkacat to consume the topics. The option -o-1
is used to only consume the latest message
Excluding Field
The JSONFilter
does not automatically delete the original field containing the raw JSON string (i.e. the message
). If you do not want to keep this field, you can remove it using the ExcludeFilter
as follows:
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"topic":"tracks-filepulse-json-01",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON, ExcludeFieldMessage",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
"filters.ExcludeFieldMessage.fields":"message",
"tasks.max": 1
}'
Copy the JSON file to the Docker container as previously:
$ docker cp track.json \
connect://tmp/kafka-connect/examples/track-01.json
Then consume the output topic tracks-filepulse-json-01
by running :
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-01 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"track": {
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "10:52"
}
}
}
}
That's it! We have successfully produced a clean structured message similar to the one contained in our input file.
Now, let's go a step further.
Handling Null values
Sometimes you may have to process JSON documents with null values. By default, if we take the configuration used so far, the null values will be ignored during the serialization.
The main reason for this is that the connector cannot infer the type of a field containing a null
value.
However, we can combine the AppendFilter and the Simple Connect Expression Language (SCEL) to both define the type of null value and set a default value.
Note: Simple Connect Expression Language (SCEL) is a basic expression language provided by the Connect FilePulse connector to access and manipulate records fields.
Let's update the connector configuration :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"topic":"tracks-filepulse-json-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON, ExcludeFieldMessage, SetDefaultRank",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
"filters.ExcludeFieldMessage.fields":"message",
"filters.SetDefaultRank.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetDefaultRank.field":"$value.track.rank",
"filters.SetDefaultRank.value":"{{ converts(nlv($value.track.rank, 0), '\''INTEGER'\'') }}",
"filters.SetDefaultRank.overwrite": "true",
"tasks.max": 1
}'
Create a second JSON document with the following content :
$ cat <<EOF > track-with-null.json
{
"track": {
"title":"Duel of the Fates",
"artist":"John Williams, London Symphony Orchestra",
"album":"Star Wars",
"duration":"4:14",
"rank": null
}
}
EOF
Copy it to the Docker container as previously:
$ docker cp track-with-null.json \
connect://tmp/kafka-connect/examples/track-02.json
Next consume the output topic tracks-filepulse-json-01
:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-02 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"track": {
"Track": {
"title": {
"string": "Duel of the Fates"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "4:14"
},
"rank": {
"int": 0
}
}
}
}
Finally, you should get an output message containing the field rank
of type int and initialize with the default value 0
.
Spliting JSON Array
Finally, it's also common to have to process JSON files containing a JSON array of records.
To produce one record per element in the array you have to set explode.array
property of the JSONFilter
to true
.
Lets' update the connector configuration with the following :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",
"topic":"tracks-filepulse-json-03",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
"filters": "ParseJSON, ExcludeFieldMessage",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"filters.ParseJSON.explode.array":"true",
"filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
"filters.ExcludeFieldMessage.fields":"message",
"tasks.max": 1
}'
Create a file containing two JSON objects :
$ cat <<EOF > tracks.json
[
{
"track": {
"title": "Star Wars (Main Theme)",
"artist": "John Williams, London Symphony Orchestra",
"album": "Star Wars",
"duration": "10:52"
}
},
{
"track": {
"title": "Duel of the Fates",
"artist": "John Williams, London Symphony Orchestra",
"album": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)",
"duration": "4:14"
}
}
]
EOF
Copy it to the Docker container as previously.
$ docker cp tracks.json \
connect://tmp/kafka-connect/examples/tracks-00.json
Then consume the output topic tracks-filepulse-json-02
:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-json-03 \
-C -J -q -o0 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"track": {
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars"
},
"duration": {
"string": "10:52"
}
}
}
}
{
"track": {
"Track": {
"title": {
"string": "Duel of the Fates"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
},
"duration": {
"string": "4:14"
}
}
}
}
And voilà! Now you know how to process JSON files with Kafka Connect.
Conclusion
We have seen in this article that it is very easy to load records from JSON files into Apache Kafka without writing a single line of code using Kafka Connect. The Connect File Pulse connector is a very powerful solution that allows you to easily manipulate your data before loading it into Apache Kafka.
Please, share this article if you like this project. You can even add a ⭐ to the GitHub repository to support us.
Thank you very much.
Posted on September 10, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.