Jorge PM
Posted on February 19, 2021
Index
1. Introduction
This tutorial will guide you through how to setup up all that you need to stream changes happening to a postgres database table using kafka. There are a bunch of similar tutorials out there but I prepared this one to be very simple to setup (literally a single command) and bare bones, just what you need to get you started.
The idea is to capture all the changes (additions, deletions and updates) and stream them so you can do whatever you want with them, for example: archive them, modify them and add them to another table, analyse them etc.
We achieve this using a technique called Change Data Capture (CDC). We use our database (in this case PostgreSQL) ability to replicate to capture the changes through pgoutput, postgres' standard logical decoding output plug-in.
Once we have captured the changes we pass them to kafka using debezium's connector. It is debezium that implementes CDC in this particular architecture.
For more details, read this https://debezium.io/documentation/reference/1.4/connectors/postgresql.html
Once in kafka, the changes will be stored in a topic that we can read with a number of kafka connectors. We can change our offset (the number of the last message read) so we can read all the changes from the start, or re-start reading where we left it (after a crash or if you are not reading all the time).
2. Environment preparation and system setup
The only requirement for this to work is to have Docker (correctly authenticated) and docker-compose fully installed in your system.
To make it easy, you can clone this repository that I prepared. We will go through all the files so there's not (too much) magic involved in making this work.
https://github.com/zom-pro/kafka-cdc
The main file we are going to use is docker-compose.yml
. This file control what the docker-compose up
command does.
It's a normal docker-compose file but there are a couple of interesting things to note. First of all, we have postgres (our database from where we want to capture changes), zookeper (required by kafka. It allows kafka to run as a multi-node distributed system), kafka (the streaming platform we will use to store and stream the changes in our database), connect (debezium source connector that allows connecting the database to kakfa). This connector is built out of a "context", a folder where more information about how to build this service can be found. This particular implementation is recommended by debezium. They have a very comprehensive documentation (link at the beginning) so I won't go into more details.
The original code for the connector context can be found here: https://github.com/debezium/debezium-examples/tree/master/end-to-end-demo/debezium-jdbc
The link
sections show the required connections between components.
Another interesting detail is how postgres can take an init.sql
file to build our database. This means that by the time the container is built, we have a database setup and ready to go. This is a huge thing. If you have tried to do this with databases that don't support it, you will know how painful it could be to achieve just this.
To start our environment, run:
docker-compose up
And that's it! you know have a fully operational environment with a database connected to kafka listening to changes.
you can add -d
if you want to run and detach (otherwise when you close your window or ctrl-c
it, all the containers will be stopped).
To stop and destroy the containers run docker-compose down
. This will not only stop the containers but remove them and the used networks as well.
3. Using our setup
Let's explore what we have created. We will use the docker exec
command to run commands in our containers as well as the REST interfaces they expose. Start a new shell if you didn't use -d
.
We will request kafka its available connectors.
curl -H "Accept:application/json" localhost:8083/connectors/
The result should be []
and the reason is while our debezium connector container is running, we haven't connected it to kafka yet. So let's do that:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @postgresql-connect.json
We are using the postgresql-connect.json
file that contains the configuration for our connector. The most important sections are the ones that point the connector to the right database and the right table (remember, we already created these through the init.sql file/command).
The result of this command should be something like HTTP/1.1 201 Created
and a bunch of extra information about the connector we just created.
Now that we have the source connector installed we are ready to start listening to our kafka topic with a console connector. This is the connector reading "from" kafka (debezium, the source connector, was reading from the database "to" kafka)
docker exec kafka bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic postgres.public.data --from-beginning | jq
The first part of this command will initialise and connect the connector. jq
is just a utility to make the output look prettier so you can run it without | jq
if you don't have it installed (or you can install it). Note also that kafka comes with a bunch of utilities in its bin
folder that are worth exploring.
That command will run and it will wait for something to happen (the table is empty in the database at this point).
Let's use psql
to add some data to the table (you need to run this in an additional terminal session. At this point, one session has docker-compose, the other one has our connector and we will run the command in a third one).
docker exec -it postgres psql -U kafkasandbox -d kafkasandboxdb -c 'INSERT INTO data(key, value) VALUES(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60), (7, 70), (8, 80), (9, 90), (10, 100)'
Now, go and have a look at the connector session and you should see all your inserts.
At this point you can start experimenting with deletions, changes etc. Pay attention to the payload section of each event being streamed by kafka.
For example, run
docker exec -it postgres psql -U kafkasandbox -d kafkasandboxdb -c "UPDATE data SET value=60 where key=3"
This is the resulting payload
"before": {
"id": 3,
"value": 30,
"key": 3
},
"after": {
"id": 3,
"value": 60,
"key": 3
},
"source": {
"version": "1.4.1.Final",
"connector": "postgresql",
"name": "postgres",
"ts_ms": 1613679449081,
"snapshot": "false",
"db": "kafkasandboxdb",
"schema": "public",
"table": "data",
"txId": 558,
"lsn": 23744400,
"xmin": null
},
"op": "u",
"ts_ms": 1613679449296,
"transaction": null
}
It contains everything to fully define the update we just did. Type of operation "u", the previous and now current value, the affected table and database, the timestamp, etc.
You can now do whatever you want/need with these changes. You could for example create and archive table if you want to be able to stream the changes made in a particular table in different clients. Or you could take the changes, do something with them like version them and push them back into another table.
You can use a variety of different connectors (we are using a console connector for convenience here). Normally, these connectors will be called sink connectors (remember debezium was our source connector)
Another interesting thing to try is killing and re-starting your connector. Stop it with control-c
and re-run the command
docker exec kafka bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic postgres.public.data --from-beginning | jq
Note we are reading back from "the beginning" which means all the changes will be re-played but we could have chosen a particular "offset" (location in the stream). The most common case is to run it without the --from-beginning
which will start reading from where we left it. Let's try offsetting the connector. Ctrl-c
your connector and change a couple of things in your database (use the update command above with a different value for example). Then re-start the connector with an specified offset (also note we need to specify the partition, we only have one partition so we will use 0).
docker exec kafka bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic postgres.public.data --offset 8 --partition 0 | jq
This will read from the message offset = 8 onward (including the changes you made if you made any of course)
To determine your latest offset run:
docker exec kafka bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic postgres.public.data
In my case the result of this command is
postgres.public.data:0:14
Which means that if I offset to 13, I will see the last message only and whatever it happens from that point onward.
4. Conclusion and final thoughts
Hopefully you enjoyed this introduction to kafka. Kafka itself is big and it comes in different flavours (like confluent kafka for example) and a world of add-ons and related functionality. Running it in production is not for the faint-hearted neither but it's a really powerful platform.
Kafka implementing CDC as we are using here solves a number of problems that you would need to solve if you implement this from scratch. For example, without kafka you need to find a way to store the events in order, being able to reproduce them from any location to support clients crashing etc. Also kafka provides other useful tools such as logs compaction which will be very useful in a production-level solution.
As a way to understand this last thought, compare this implementation we described here with the alternative setup proposed in AWS here (without kafka implementing CDC with AWS streams)
As always, let me know if you have any issues or comments!
Posted on February 19, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.