Debezium - CDC
Mustafa Karatas
Posted on May 4, 2024
CDC
Change Data Capture (CDC) is a technique and a design pattern. We often use it to replicate data between databases in real-time.
We can also track data changes written to a source database and automatically sync target databases. CDC enables incremental loading and eliminates the need for bulk load updating.
Debezium
Based on Apache Kafka, Debezium is an open-source platform for CDC. Its main purpose is to create a transaction log that contains all row-level modifications committed to each source database table. Based on incremental changes in the data, any application that is listening to these events can take the necessary steps.
Project overview
docker-compose.yml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 52181:2181
kafka:
image: confluentinc/cp-kafka:7.0.1
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafka-admin:
image: provectuslabs/kafka-ui
container_name: kafka-admin
ports:
- "9091:8080"
restart: always
depends_on:
- kafka
- zookeeper
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
db:
platform: linux/x86_64
image: debezium/example-postgres
restart: always
environment:
POSTGRES_PASSWORD: example
ports:
- 5432:5432
extra_hosts:
- "host.docker.internal:host-gateway"
command:
- "postgres"
- "-c"
- "wal_level=logical"
volumes:
- ./resource/init.sql:/docker-entrypoint-initdb.d/create-db-tables.sql
elasticsearch:
image: elasticsearch:8.8.0
ports:
- 9200:9200
- 9300:9300
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
adminer:
image: adminer
restart: always
ports:
- 8001:8080
kafka_connect:
container_name: kafka_connect
image: debezium/connect
links:
- db
- kafka
ports:
- '8083:8083'
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- GROUP_ID=medium_debezium
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
Kafka UI
http://localhost:9091/ui/clusters/local/all-topics?perPage=25
Database
- Set connection username :
postgres
and password :example
- Create a valid database
create database debezium_demo
- Run query to create database
- properties.yml file example
url: jdbc:postgresql://localhost:5432/debezium_demo
username: postgres
password: example
Connector Requests
Connector information:
- Connector should be created at first. It seems like you're working with the Debezium connector, which is used for change data capture (CDC) from database management systems like PostgreSQL. Debezium is commonly used in streaming data pipelines to capture and propagate database changes to other systems or applications in real-time. To configure a Debezium connector for PostgreSQL, you typically need to provide configuration parameters such as database connection details, the replication slot name, and other connector-specific settings. Below is an example configuration for setting up a Debezium connector for PostgreSQL:
{
"name": "product-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.hostname": "db",
"database.port": "5432",
"database.user": "postgres",
"database.password": "example",
"database.dbname": "debezium_demo",
"database.server.name": "postgres",
"tombstones.on.delete" : false,
"topic.prefix" : "product",
"table.inclde.list" : "public.product",
"heartbeat.interval.ms" : "5000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"plugin.name" : "pgoutput",
"slot.name": "unique_slot_name"
}
}
-
connector.class
: Specifies the class for the PostgreSQL connector. -
tasks.max
: Defines the maximum number of tasks to create for this connector. -
database.hostname
,database.port
,database.user
,database.password
,database.dbname
: Connection details for PostgreSQL. -
database.server.name
: A unique identifier for the connector instance. Server's name can be found in properties for database -
slot.name
: The replication slot name used by the connector. Make sure it's unique for each connector. -
plugin.name
: Specifies the Debezium plugin to use for capturing changes. -
schema.include
, table.include.list: Specify the schema and tables to include in the CDC. -
snapshot.mode
: Defines how the initial snapshot of data should be taken. -
heartbeat.interval.ms
: Interval for sending heartbeat messages to PostgreSQL. -
heartbeat.action.query
: SQL query used for heartbeat messages. -
snapshot.lock.timeout.ms
: Timeout for acquiring locks during the snapshot. -
snapshot.select.statement.overrides
: Overrides the default SELECT statement used during the snapshot. -
database.history.kafka.bootstrap.servers": "kafka:9092"
: Kafka broker information -
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
and"value.converter": "org.apache.kafka.connect.json.JsonConverter"
: Kafka serializing configs
CURL Requests:
GET Connector :
curl --request GET \
--url http://localhost:8083/connectors/product-connector
POST - add a connector
localhost:8083/connectors
CURL Request :
curl --location 'localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--header 'Cookie: csrftoken=M7jtiBvQrP5MVhm9kN1nbq8hNFgi0lRS' \
--data '{
"name": "product-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.hostname": "db",
"database.port": "5432",
"database.user": "postgres",
"database.password": "example",
"database.dbname": "debezium_demo",
"database.server.name": "postgres",
"tombstones.on.delete" : false,
"topic.prefix" : "product",
"table.inclde.list" : "public.product",
"heartbeat.interval.ms" : "5000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"plugin.name" : "pgoutput",
"slot.name": "unique_slot_name"
}
}'
DELETE Connector
curl --request DELETE \
--url http://localhost:8083/connectors/product-connector
Extra information for slot.name
:
- The error message indicates that the creation of a replication slot failed due to a slot with the name "debezium" already existing. This typically happens when attempting to set up multiple connectors for the same database host using the same replication slot name.
- To resolve this issue, you need to ensure that each connector you set up for the same database host uses a distinct replication slot name. This can be achieved by configuring a unique replication slot name for each connector.
- You can specify the replication slot name in the Debezium PostgreSQL connector configuration. Look for the slot.name parameter and ensure that it is set to a unique value for each connector.
Before null problem
alter table product replica identity full;
: run this command in database console
Project Running Configuration
- Build project
- Open Docker in local machine
- Run docker-compose up -d for
docker-compose.yml
file, this can be changed for other configurations - Run project
- Open
http://localhost:9091/ui/clusters/local/all-topics?perPage=25
to see topics and messages on Kafka - Send CURL to create a connector, post connector
- Open messages in kafka ui localhost:9091,
product.public.product
topic should be seen - Send request to create Product
curl --location 'localhost:8080/api/v1/product/add-product' \
--header 'Content-Type: application/json' \
--header 'Cookie: csrftoken=M7jtiBvQrP5MVhm9kN1nbq8hNFgi0lRS' \
--data '{
"name" : "pant",
"price" : 150,
"stock" : 4
}'
- Check debezium topic [CREATE]
{
"before": null,
"after": {
"id": 25,
"name": "pant",
"price": "Opg=",
"stock": 4
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "product",
"ts_ms": 1710684511415,
"snapshot": "false",
"db": "debezium_demo",
"sequence": "[\"25659272\",\"25659664\"]",
"schema": "public",
"table": "product",
"txId": 771,
"lsn": 25659664,
"xmin": null
},
"op": "c",
"ts_ms": 1710684511616,
"transaction": null
}
- Make an update operation to product:
curl --location --request PUT 'localhost:8080/api/v1/product/update-product/14' \
--header 'Content-Type: application/json' \
--header 'Cookie: csrftoken=M7jtiBvQrP5MVhm9kN1nbq8hNFgi0lRS' \
--data '{
"name" : "pant",
"price" : 1512,
"stock" : 6
}'
- Check messages in debezium topic:
product.public.product
{
"before": {
"id": 14,
"name": "pant",
"price": "Opg=",
"stock": 4
},
"after": {
"id": 14,
"name": "pant",
"price": "Ak6g",
"stock": 6
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "product",
"ts_ms": 1710680240132,
"snapshot": "false",
"db": "debezium_demo",
"sequence": "[\"25645448\",\"25645504\"]",
"schema": "public",
"table": "product",
"txId": 760,
"lsn": 25645504,
"xmin": null
},
"op": "u",
"ts_ms": 1710680240470,
"transaction": null
}
Posted on May 4, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.