ClickHouse Advanced Tutorial: Apply CDC from MySQL to ClickHouse

hoptical

Hamed Karbasi

Posted on June 15, 2023

ClickHouse Advanced Tutorial: Apply CDC from MySQL to ClickHouse

Introduction

Suppose that you have a database handling OLTP queries. To tackle intensive analytical BI reports, you set up an OLAP-friendly database such as ClickHouse. How do you synchronize your follower database (which is ClickHouse here)? What challenges should you be prepared for?

Synchronizing two or more databases in a data-intensive application is one of the usual routines you may have encountered before or are dealing with now. Thanks to Change Data Capture (CDC) and technologies such as Kafka, this process is not sophisticated anymore. However, depending on the databases you’re utilizing, it could be challenging if the source database works in the OLTP paradigm and the target in the OLAP. In this article, I will walk through this process from MySQL as the source to ClickHouse as the destination. Although I’ve limited this article to those technologies, it’s pretty generalizable to similar cases.

System Design Overview

Contrary to what it sounds, it’s quite straightforward. The database changes are captured via Debezium and published as events on Apache Kafka. ClickHouse consumes those changes in partial order by Kafka Engine. Real-time and eventually consistent.


CDC Architecture

Case Study

Imagine that we have an orders table in Mysql with the following DDL:

CREATE TABLE `orders` (
    `id` int(11) NOT NULL,
    `status` varchar(50) NOT NULL,
    `price` varchar(50) NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
Enter fullscreen mode Exit fullscreen mode

Users may create, delete and update any column or the whole record. We want to capture its changes and sink them to ClickHouse to synchronize them.

We’re going to use Debezium v2.1 and the ReplacingMergeTree engine in ClickHouse.

Implementation

Step 1: CDC with Debezium

Most databases have a log that every operation is written there before applying on data (Write Ahead Log or WAL). In Mysql, this file is called Binlog. If you read that file, parse and apply it to your target database, you’re following the Change Data Capture (CDC) manifest.

CDC is one of the best ways to synchronize two or multiple heterogeneous databases. It’s real-time, eventually consistent, and prevents you from the other methods imposing more costs like batch-backfills with Airflow. No matter what happens on the source, you can capture it in order and be consistent with the original (eventually, of course!)

Debezium is a well-known tool for reading and parsing the Binlog. It simply integrates with Kafka Connect as a connector and produces every change on a Kafka topic.

To do so, you’ve to enable log-bin on the MySQL database and set up Kafka Connect, Kafka, and Debezium accordingly. Since it is well-explained in other articles like this or this, I’ll only focus on the Debezium configuration customized for our purpose: Capture the changes while being functional and parsable by ClickHouse.

Before showing the whole configuration, we should discuss three necessary configs:

Extracting New Record State

Debezium emits every record concluding of before and after states for every operation by default which is hard to parse by ClickHouse Kafka Table. Additionally, it creates tombstone records (i.e., a record with a Null value) in case of a delete operation (Again, unparsable by Clickhouse). The entire behavior has been demonstrated in the table below.


Records state for different operations in the default configuration.

We use the ExtractNewRecod transformer in the Debezium configuration to handle the problem. Thanks to this option, Debezium only keeps the after state for the create/update operations and disregards the before state. But as a drawback, It drops the Delete record containing the previous state and the tombstone record mentioned earlier. In other words, you won’t capture the delete operation anymore. Don’t worry! We’ll tackle it in the next section.

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
Enter fullscreen mode Exit fullscreen mode

The picture below shows how the state before is dropped and after is flattened by using the ExtractNewRecord configuration.


Left: Record without ExtractNewRecord config; Right: Record with ExtractNewRecord config

Rewriting Delete Events

To capture delete operations, we must add the rewrite config as below:

"transforms.unwrap.delete.handling.mode":"rewrite"
Enter fullscreen mode Exit fullscreen mode

Debezium adds field __deleted with this config, which is true for the delete operation and false for the others. Hence, a deletion would contain the previous state as well as a __deleted: true field.


The field __deleted is added after using the rewrite configuration

Handling Non-Primary Keys Update

Providing the mentioned configurations, updating a record (every column except the primary key) emits a simple record with the new state. Having another relational database with the same DDL is OK since the updated record replaces the previous one in the destination. But in the case of ClickHouse, the story goes wrong!

In our example, the source uses id as the primary key, and ClickHouse uses id and status as order keys. Replaces and uniqueness only guarantees for the records with the same id and status! So what happens if the source updates the status column? We end up with duplicate records implying equal ids but different statuses in ClikHouse!

Fortunately, there is a way. By default, Debezium creates a delete record and a create record for updating on primary keys. So if the source updates the id, it emits a delete record with the previous id and a create record with the new id. The previous one with the __deleted=ture field replaces our stall record in CH. Then the records implying deletion can be filtered in the view. We can extend this behavior to other columns with the below option:

"message.key.columns": "inventory.orders:id;inventory.orders:status"
Enter fullscreen mode Exit fullscreen mode

Now by putting together all the above options and the usual ones, we’ll have a fully functional Debezium configuration capable of handling any change desired by ClickHouse:

{
    "name": "mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.include.list": "inventory",
        "database.password": "mypassword",
        "database.port": "3306",
        "database.server.id": "2",
        "database.server.name": "dbz.inventory.v2",
        "database.user": "root",
        "message.key.columns": "inventory.orders:id;inventory.orders:status",
        "name": "mysql-connector-v2",
        "schema.history.internal.kafka.bootstrap.servers": "broker:9092",
        "schema.history.internal.kafka.topic": "dbz.inventory.history.v2",
        "snapshot.mode": "schema_only",
        "table.include.list": "inventory.orders",
        "topic.prefix": "dbz.inventory.v2",
        "transforms": "unwrap",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}
Enter fullscreen mode Exit fullscreen mode

Important: How to choose the Debezium key columns?

By changing the key columns of the connector, Debezium uses those columns as the topic keys instead of the default Primary key of the source table. So different operations related to a record of the database may end up at the other partitions in Kafka. As records lose their order in different partitions, it can lead to inconsistency in Clikchouse unless you ensure that ClickHouse order keys and Debezium message keys are the same.

The rule of thumb is as below:

  1. Design the partition key and order key based on your desired table design.

  2. Extract the source origin of the partition and sort keys, supposing they are calculated during materialization.

  3. Union all of those columns

  4. Define the result of step 3 as the message.column.keys in the Debezium connector configuration.

  5. Check if the Clickhouse sort key has all those columns. If not, add them.

Step 2: ClickHouse Tables

ClickHouse can sink Kafka records into a table by utilizing Kafka Engine. We need to define three tables: Kafka table, Consumer Materilizaed table, and Main table.

Kafka Table

Kafka table defines the record structure and Kafka topic intended to be read.

CREATE TABLE default.kafka_orders
(
    `id` Int32,
    `status` String,
    `price` String,
    `__deleted` Nullable(String)
)
ENGINE = Kafka('broker:9092', 'inventory.orders', 'clickhouse', 'AvroConfluent')
SETTINGS format_avro_schema_registry_url = 'http://schema-registry:8081'
Enter fullscreen mode Exit fullscreen mode

Consumer Materializer

Every record of the Kafka Table is only read once since its consumer group bumps the offset, and we can’t read it twice. So, we need to define a main table and materialize every Kafka table record to it via the view Materializer:

CREATE MATERIALIZED VIEW default.consumer__orders TO default.stream_orders
(
    `id` Int32,
    `status` String,
    `price` String,
    `__deleted` Nullable(String)
) AS
SELECT
    id AS id,
    status AS status,
    price AS price,
    __deleted AS __deleted
FROM default.kafka_orders 
Enter fullscreen mode Exit fullscreen mode

Main Table

The main table has the source structure plus the __deleted field. I’m using a Replacing Merge Tree since we need to replace stall records with their deleted or updated ones.

CREATE TABLE default.stream_orders
(
    `id` Int32,
    `status` String,
    `price` String,
    `__deleted`String
)
ENGINE = ReplacingMergeTree
ORDER BY (id, price)
SETTINGS index_granularity = 8192
Enter fullscreen mode Exit fullscreen mode

View Table

Finally, we need to filter every deleted record (since we don’t want to see them) and have the most recent one in case of having different records with the same sort key. This can be tackled by using the Final modifier. But to avoid using filter and final in every query, we can define a simple View to do the job implicitly:

CREATE VIEW default.orders
(
    `id` Int32,
    `status` String,
    `price` String,
    `__deleted` String
) AS
SELECT *
FROM default.stream_orders
FINAL
WHERE __deleted = 'false'
Enter fullscreen mode Exit fullscreen mode

Note: It’s inefficient to use Final for every query, especially in production. You can use aggregations to see the last records or wait for ClickHouse to merge records in the background.

Conclusion

In this article, we saw how we could synchronize the ClickHouse database with MySQL via CDC and prevent duplication using a soft-delete approach.

💖 💪 🙅 🚩
hoptical
Hamed Karbasi

Posted on June 15, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related