Streaming data into Kafka S01/E03 - Loading JSON file

fhussonnois

Florian Hussonnois

Posted on September 10, 2020

Streaming data into Kafka S01/E03 - Loading JSON file

This is the third article in the "Streaming data into Kafka" series. In the first two, we saw how it's fairly easy to use Kafka Connect to load records from CSV and XML files into Apache Kafka without writing a single line of code. For doing this, we have used the Kafka Connect FilePulse connector which packs with a lot of nice features to parse and transform data.

Now, let's see how to integrate JSON data, another file format that is widely used on most projects (and much more appreciated than XML for web-based applications).

Kafka Connect File Pulse connector

If you have already read the previous articles go directly to the next section (i.e Ingesting Data).

The Kafka Connect FilePulse connector is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).

For a broad overview of FilePulse, I suggest you read this article :

For more information, you can check-out the documentation here.

How to use the connector

The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.

$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
Enter fullscreen mode Exit fullscreen mode

You can download the docker-compose.yml file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on http://localhost:8083.

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse

"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Enter fullscreen mode Exit fullscreen mode

Note : You can also install the connector either from GitHub Releases Page or from Confluent Hub.

Ingesting Data

To read a file containing a single JSON document, we will use the BytesArrayInputReader. This reader allows us to create a single record per source file. Each record produced by this reader will have a single field of type byte[] named message. The byte[] value is the full content of the source file (i.e the JSON document).

Then, to parse this field, we will use the processing filter mechanism provided by the FilePulse connector and more particularly the JSONFilter.

So lets's create the connector with this minimal configuration:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-json-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "tasks.max": 1
    }'
Enter fullscreen mode Exit fullscreen mode

Note: The Connect FilePulse connector periodically scan the input directory that we set using the property fs.scan.directory.path. Then, it will lookup for files matching the pattern .*\\.json$. Each file is uniquely identified and tracked depending on the value of the offset.strategy. Here, the configuration specifies that a file is identified by its name.

Create a valid JSON file that looks like this:

$ cat <<EOF > track.json
{ 
  "track": {
     "title":"Star Wars (Main Theme)",
     "artist":"John Williams, London Symphony Orchestra",
     "album":"Star Wars",
     "duration":"10:52"
  }
}
EOF
Enter fullscreen mode Exit fullscreen mode

Then copy this file from your host to the Docker container which runs the connector. You can run the following commands:

// Create the target directory
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples

// Copy host file to docker-container
$ docker cp track.json connect://tmp/kafka-connect/examples/track-00.json
Enter fullscreen mode Exit fullscreen mode

Finally, consume the topic named tracks-filepulse-json-00 and verify that the connector has detected and processed the JSON file:

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
Enter fullscreen mode Exit fullscreen mode

(output)

{
  "message": {
    "bytes": "{ \n  \"track\": {\n     \"title\":\"Star Wars (Main Theme)\",\n     \"artist\":\"John Williams, London Symphony Orchestra\",\n     \"album\":\"Star Wars\",\n     \"duration\":\"10:52\"\n  }\n}\n"
  },
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Note: In the example above, we have used kafkacat to consume the topics. The option -o-1 is used to only consume the latest message

Excluding Field

The JSONFilter does not automatically delete the original field containing the raw JSON string (i.e. the message). If you do not want to keep this field, you can remove it using the ExcludeFilter as follows:

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-json-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "tasks.max": 1
    }'
Enter fullscreen mode Exit fullscreen mode

Copy the JSON file to the Docker container as previously:

$ docker cp track.json \
connect://tmp/kafka-connect/examples/track-01.json
Enter fullscreen mode Exit fullscreen mode

Then consume the output topic tracks-filepulse-json-01 by running :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-01 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
Enter fullscreen mode Exit fullscreen mode

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

That's it! We have successfully produced a clean structured message similar to the one contained in our input file.

Now, let's go a step further.

Handling Null values

Sometimes you may have to process JSON documents with null values. By default, if we take the configuration used so far, the null values will be ignored during the serialization.

The main reason for this is that the connector cannot infer the type of a field containing a null value.

However, we can combine the AppendFilter and the Simple Connect Expression Language (SCEL) to both define the type of null value and set a default value.

Note: Simple Connect Expression Language (SCEL) is a basic expression language provided by the Connect FilePulse connector to access and manipulate records fields.

Let's update the connector configuration :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-02/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-json-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage, SetDefaultRank",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "filters.SetDefaultRank.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.SetDefaultRank.field":"$value.track.rank",
        "filters.SetDefaultRank.value":"{{ converts(nlv($value.track.rank, 0), '\''INTEGER'\'') }}",
        "filters.SetDefaultRank.overwrite": "true",
        "tasks.max": 1
    }'
Enter fullscreen mode Exit fullscreen mode

Create a second JSON document with the following content :

$ cat <<EOF > track-with-null.json
{ 
  "track": {
     "title":"Duel of the Fates",
     "artist":"John Williams, London Symphony Orchestra",
     "album":"Star Wars",
     "duration":"4:14",
     "rank": null
  }
}
EOF
Enter fullscreen mode Exit fullscreen mode

Copy it to the Docker container as previously:

$ docker cp track-with-null.json \
connect://tmp/kafka-connect/examples/track-02.json
Enter fullscreen mode Exit fullscreen mode

Next consume the output topic tracks-filepulse-json-01 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-02 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
Enter fullscreen mode Exit fullscreen mode

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Duel of the Fates"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "4:14"
      },
      "rank": {
        "int": 0
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Finally, you should get an output message containing the field rank of type int and initialize with the default value 0.

Spliting JSON Array

Finally, it's also common to have to process JSON files containing a JSON array of records.

To produce one record per element in the array you have to set explode.array property of the JSONFilter to true.

Lets' update the connector configuration with the following :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-json-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ParseJSON.explode.array":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "tasks.max": 1
    }'
Enter fullscreen mode Exit fullscreen mode

Create a file containing two JSON objects :

$ cat <<EOF > tracks.json
[
  {
    "track": {
      "title": "Star Wars (Main Theme)",
      "artist":  "John Williams, London Symphony Orchestra",
      "album": "Star Wars",
      "duration": "10:52"
    }
  },
  {
    "track": {
      "title":  "Duel of the Fates",
      "artist": "John Williams, London Symphony Orchestra",
      "album": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)",
      "duration": "4:14"
    }
  }
]
EOF
Enter fullscreen mode Exit fullscreen mode

Copy it to the Docker container as previously.

$ docker cp tracks.json \
connect://tmp/kafka-connect/examples/tracks-00.json
Enter fullscreen mode Exit fullscreen mode

Then consume the output topic tracks-filepulse-json-02 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-03 \
        -C -J -q -o0 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
Enter fullscreen mode Exit fullscreen mode

(output)

{
  "track": {
    "Track": {
      "title": {
        "string": "Star Wars (Main Theme)"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars"
      },
      "duration": {
        "string": "10:52"
      }
    }
  }
}
{
  "track": {
    "Track": {
      "title": {
        "string": "Duel of the Fates"
      },
      "artist": {
        "string": "John Williams, London Symphony Orchestra"
      },
      "album": {
        "string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
      },
      "duration": {
        "string": "4:14"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

And voilà! Now you know how to process JSON files with Kafka Connect.

Conclusion

We have seen in this article that it is very easy to load records from JSON files into Apache Kafka without writing a single line of code using Kafka Connect. The Connect File Pulse connector is a very powerful solution that allows you to easily manipulate your data before loading it into Apache Kafka.

Please, share this article if you like this project. You can even add a ⭐ to the GitHub repository to support us.

Thank you very much.

💖 💪 🙅 🚩
fhussonnois
Florian Hussonnois

Posted on September 10, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related