Loading CSV data into Kafka - video walkthrough

rmoff

Robin Moffatt

Posted on June 23, 2020

Loading CSV data into Kafka - video walkthrough

For whatever reason, CSV still exists as a ubiquitous data interchange format. It doesn’t get much simpler: chuck some plaintext with fields separated by commas into a file and stick .csv on the end. If you’re feeling helpful you can include a header row with field names in.

order_id,customer_id,order_total_usd,make,model,delivery_city,delivery_company,delivery_address
1,535,190899.73,Dodge,Ram Wagon B350,Sheffield,DuBuque LLC,2810 Northland Avenue
2,671,33245.53,Volkswagen,Cabriolet,Edinburgh,Bechtelar-VonRueden,1 Macpherson Crossing
Enter fullscreen mode Exit fullscreen mode

In this article we’ll see how to load this CSV data into Kafka, without even needing to write any code

Importantly, we’re not going to reinvent the wheel by trying to write some code to do it ourselves - Kafka Connect (which is part of Apache Kafka) already exists to do all of this for us; we just need the appropriate connector.

Schemas?

Yeah, schemas. CSV files might not care about them much, but the users of your data in Kafka will. Ideally we want a way to define the schema of the data that we ingest so that it can be stored and read by anyone who wants to use the data. To understand why this is such a big deal check out:

If you are going to define a schema at ingest (and I hope you do), use Avro, Protobuf, or JSON Schema, as described here.

Kafka Connect SpoolDir connector

The Kafka Connect SpoolDir connector supports various flatfile formats, including CSV. Get it from Confluent Hub, and check out the docs here. Once you’ve installed it in your Kafka Connect worker make sure you restart the worker for it to pick it up. You can check by running:

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'SpoolDir'

"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector"
Enter fullscreen mode Exit fullscreen mode

Loading data from CSV into Kafka and applying a schema

If you have a header row with field names you can take advantage of these to define the schema at ingestion time (which is a good idea).

Create the connector:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'
Enter fullscreen mode Exit fullscreen mode

Now head over to a Kafka consumer and observe our data. Here I’m using kafkacat cos it’s great :)

$ docker exec kafkacat \
    kafkacat -b kafka:29092 -t orders_spooldir_00 \
             -C -o-1 -J \
             -s key=s -s value=avro -r http://schema-registry:8081 | \
             jq '.payload'
{
  "order_id": {
    "string": "500"
  },
  "customer_id": {
    "string": "424"
  },
  "order_total_usd": {
    "string": "160312.42"
  },
  "make": {
    "string": "Chevrolet"
  },
  "model": {
    "string": "Suburban 1500"
  },
  "delivery_city": {
    "string": "London"
  },
  "delivery_company": {
    "string": "Predovic LLC"
  },
  "delivery_address": {
    "string": "2 Sundown Drive"
  }
}
Enter fullscreen mode Exit fullscreen mode

What’s more, in the header of the Kafka message is the metadata from the file itself:

$ docker exec kafkacat \
    kafkacat -b kafka:29092 -t orders_spooldir_00 \
             -C -o-1 -J \
             -s key=s -s value=avro -r http://schema-registry:8081 | \
             jq '.headers'
[
  "file.name",
  "orders.csv",
  "file.path",
  "/data/unprocessed/orders.csv",
  "file.length",
  "39102",
  "file.offset",
  "501",
  "file.last.modified",
  "2020-06-17T13:33:50.000Z"
]
Enter fullscreen mode Exit fullscreen mode

Setting the message key

Assuming you have header row to provide field names, you can set schema.generation.key.fields to the name of the field(s) you’d like to use for the Kafka message key. If you’re running this after the first example above remember that the connector relocates your file so you need to move it back to the input.path location for it to be processed again.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-01/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_01",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "schema.generation.key.fields":"order_id",
        "csv.first.row.as.header":"true"
        }'
Enter fullscreen mode Exit fullscreen mode

The resulting Kafka message has the order_id set as the message key:

docker exec kafkacat \
    kafkacat -b kafka:29092 -t orders_spooldir_01 -o-1 \
             -C -J \
             -s key=s -s value=avro -r http://schema-registry:8081 | \
             jq '{"key":.key,"payload": .payload}'
{
  "key": "Struct{order_id=3}",
  "payload": {
    "order_id": {
      "string": "3"
    },
    "customer_id": {
      "string": "695"
    },
    "order_total_usd": {
      "string": "155664.90"
    },
    "make": {
      "string": "Toyota"
    },
    "model": {
      "string": "Avalon"
    },
    "delivery_city": {
      "string": "Brighton"
    },
    "delivery_company": {
      "string": "Jacobs, Ebert and Dooley"
    },
    "delivery_address": {
      "string": "4 Loomis Crossing"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Changing the schema field types

The connector does a fair job at setting the schema, but maybe you want to override it. You can declare the whole thing upfront using the value.schema configuration, but perhaps you are happy with it inferring the whole schema except for a couple of fields. Here you can use Single Message Transform to munge it:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-02/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_02",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "schema.generation.key.fields":"order_id",
        "csv.first.row.as.header":"true",
        "transforms":"castTypes",
        "transforms.castTypes.type":"org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.castTypes.spec":"order_id:int32,customer_id:int32,order_total_usd:float32"
        }'
Enter fullscreen mode Exit fullscreen mode

If you go and look at the schema that’s been created and stored in the Schema Registry you can see the field data types have been set as specified:

➜ curl --silent --location --request GET 'http://localhost:8081/subjects/orders_spooldir_02-value/versions/latest' |jq '.schema|fromjson'
{
  "type": "record", "name": "Value", "namespace": "com.github.jcustenborder.kafka.connect.model",
  "fields": [
    { "name": "order_id", "type": ["null", "int"], "default": null },
    { "name": "customer_id", "type": ["null", "int"], "default": null },
    { "name": "order_total_usd", "type": ["null", "float"], "default": null },
    { "name": "make", "type": ["null", "string"], "default": null },
    { "name": "model", "type": ["null", "string"], "default": null },
    { "name": "delivery_city", "type": ["null", "string"], "default": null },
    { "name": "delivery_company", "type": ["null", "string"], "default": null },
    { "name": "delivery_address", "type": ["null", "string"], "default": null }
  ],
  "connect.name": "com.github.jcustenborder.kafka.connect.model.Value"
}
Enter fullscreen mode Exit fullscreen mode

Just gimme the plain text! 😢

All of this schemas seems like a bunch of fuss really, doesn’t it? Well not really. But, if you absolutely must just have CSV in your Kafka topic then here’s how. Note that we’re using a different connector class and we’re using org.apache.kafka.connect.storage.StringConverter to write the values. If you want to learn more about serialisers and converters see here.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-03/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "topic": "orders_spooldir_03",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv"
        }'
Enter fullscreen mode Exit fullscreen mode

The result? Just CSV.

➜ docker exec kafkacat \
    kafkacat -b kafka:29092 -t orders_spooldir_03 -o-5 -C -u -q
496,456,80466.80,Volkswagen,Touareg,Leeds,Hilpert-Williamson,96 Stang Junction
497,210,57743.67,Dodge,Neon,London,Christiansen Group,7442 Algoma Hill
498,88,211171.02,Nissan,370Z,York,"King, Yundt and Skiles",3 1st Plaza
499,343,126072.73,Chevrolet,Camaro,Sheffield,"Schiller, Ankunding and Schumm",8920 Hoffman Place
500,424,160312.42,Chevrolet,Suburban 1500,London,Predovic LLC,2 Sundown Drive
Enter fullscreen mode Exit fullscreen mode

Side-bar: Schemas in action

So we’ve read some CSV data into Kafka. That’s not the end of its journey. It’s going to be used for something! Let’s do that.

Here’s ksqlDB, in which we declare the orders topic we wrote to with a schema as a stream:

ksql> CREATE STREAM ORDERS_02 WITH (KAFKA_TOPIC='orders_spooldir_02',VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------
Enter fullscreen mode Exit fullscreen mode

Having done that—and because there’s a schema that was created at ingestion time—we can see all of the fields available to us:

ksql> DESCRIBE ORDERS_02;

Name : ORDERS_02
 Field | Type
-------------------------------------------
 ROWKEY | VARCHAR(STRING) (key)
 ORDER_ID | INTEGER
 CUSTOMER_ID | INTEGER
 ORDER_TOTAL_USD | DOUBLE
 MAKE | VARCHAR(STRING)
 MODEL | VARCHAR(STRING)
 DELIVERY_CITY | VARCHAR(STRING)
 DELIVERY_COMPANY | VARCHAR(STRING)
 DELIVERY_ADDRESS | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
Enter fullscreen mode Exit fullscreen mode

and run queries against the data that’s in Kafka:

ksql> SELECT DELIVERY_CITY, COUNT(*) AS ORDER_COUNT, MAX(CAST(ORDER_TOTAL_USD AS DECIMAL(9,2))) AS BIGGEST_ORDER_USD FROM ORDERS_02 GROUP BY DELIVERY_CITY EMIT CHANGES;
+---------------+-------------+---------------------+
|DELIVERY_CITY |ORDER_COUNT |BIGGEST_ORDER_USD |
+---------------+-------------+---------------------+
|Bradford |13 |189924.47 |
|Edinburgh |13 |199502.66 |
|Bristol |16 |213830.34 |
|Sheffield |74 |216233.98 |
|London |160 |219736.06 |
Enter fullscreen mode Exit fullscreen mode

What about our data that we just ingested into a different topic as straight-up CSV? Because, like, schemas aren’t important?

ksql> CREATE STREAM ORDERS_03 WITH (KAFKA_TOPIC='orders_spooldir_03',VALUE_FORMAT='DELIMITED');
No columns supplied.
Enter fullscreen mode Exit fullscreen mode

Yeah, no columns supplied. No schema, no bueno. If you want to work with the data, whether to query in SQL, stream to a data lake, or do anything else with—at some point you’re going to have to declare that schema. Hence why CSV, as a schemaless-serialisation method, is a bad way to exchange data between systems.

If you really want to use your CSV data in ksqlDB, you can, you just need to enter the schema—which is error prone and tedious. You enter it each time to use the data, every other consumer of the data enters it each time too. Declaring it once at ingest and it being available for all to use makes a lot more sense.

Regex and JSON

If you’re using the REST API to submit configuration you might hit up against errors sending regex values within the JSON. For example, if you want to set input.file.pattern to .*\.csv and you put that in your JSON literally:

    "input.file.pattern": ".*\.csv",
Enter fullscreen mode Exit fullscreen mode

You’ll get this error back if you submit it as inline data with curl:

com.fasterxml.jackson.core.JsonParseException: Unrecognized character escape '.' (code 46) at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 7, column: 36]
Enter fullscreen mode Exit fullscreen mode

THe solution is to escape the escape character (the backslash):

    "input.file.pattern": ".*\\.csv",
Enter fullscreen mode Exit fullscreen mode

Streaming CSV data from Kafka to a database (or anywhere else…)

Since you’ve got a schema to the data, you can easily sink it to a database, such as Postgres:

curl -X PUT http://localhost:8083/connectors/sink-postgres-orders-00/config \
    -H "Content-Type: application/json" \
    -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://postgres:5432/",
        "connection.user": "postgres",
        "connection.password": "postgres",
        "tasks.max": "1",
        "topics": "orders_spooldir_02",
        "auto.create": "true",
        "auto.evolve":"true",
        "pk.mode":"record_value",
        "pk.fields":"order_id",
        "insert.mode": "upsert",
        "table.name.format":"orders"
    }'
Enter fullscreen mode Exit fullscreen mode
postgres=# \dt
         List of relations
 Schema | Name | Type | Owner
--------+--------+-------+----------
 public | orders | table | postgres
(1 row)

postgres=# \d orders;
                    Table "public.orders"
      Column | Type | Collation | Nullable | Default
------------------+---------+-----------+----------+---------
 order_id | integer | | not null |
 customer_id | integer | | |
 order_total_usd | real | | |
 make | text | | |
 model | text | | |
 delivery_city | text | | |
 delivery_company | text | | |
 delivery_address | text | | |
Indexes:
    "orders_pkey" PRIMARY KEY, btree (order_id)

postgres=# SELECT * FROM orders FETCH FIRST 10 ROWS ONLY;
 order_id | customer_id | order_total_usd | make | model | delivery_city | delivery_company | delivery_address
----------+-------------+-----------------+------------+----------------+---------------+--------------------------+--------------------------
        1 | 535 | 190899.73 | Dodge | Ram Wagon B350 | Sheffield | DuBuque LLC | 2810 Northland Avenue
        2 | 671 | 33245.53 | Volkswagen | Cabriolet | Edinburgh | Bechtelar-VonRueden | 1 Macpherson Crossing
        3 | 695 | 155664.9 | Toyota | Avalon | Brighton | Jacobs, Ebert and Dooley | 4 Loomis Crossing
        4 | 366 | 149012.9 | Hyundai | Santa Fe | Leeds | Kiehn Group | 538 Burning Wood Alley
        5 | 175 | 63274.18 | Kia | Sportage | Leeds | Miller-Hudson | 6 Kennedy Court
        6 | 37 | 97790.04 | BMW | 3 Series | Bristol | Price Group | 21611 Morning Trail
        7 | 644 | 76240.84 | Mazda | MPV | Leeds | Kihn and Sons | 9 Susan Street
        8 | 973 | 216233.98 | Hyundai | Elantra | Sheffield | Feeney, Howe and Koss | 07671 Hazelcrest Terrace
        9 | 463 | 162589.1 | Chrysler | Grand Voyager | York | Fay, Murazik and Schumm | 42080 Pawling Circle
       10 | 863 | 111208.24 | Ford | Laser | Leeds | Boehm, Mohr and Doyle | 0919 International Trail
(10 rows)
Enter fullscreen mode Exit fullscreen mode

To learn more about writing data from Kafka to a database see this tutorial.

For more tutorials on Kafka Connect see 🎥 this playlist.

Try it out!

All the code for this article is on GitHub, and you just need Docker and Docker Compose to spin it up and give it a try. The commandline examples quoted below are based on the Docker environment.

To spin it up, clone the repository, change to the correct folder, and launch the stack:

git clone https://github.com/confluentinc/demo-scene.git
cd csv-to-kafka
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Wait for Kafka Connect to launch and then off you go!

bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
Enter fullscreen mode Exit fullscreen mode

The examples in this article are based on the data folder mapped to /data on the Kafka Connect worker.


💖 💪 🙅 🚩
rmoff
Robin Moffatt

Posted on June 23, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related