Debezium Custom Converters - TimestampConverter

oryanmoshe

Oryan Moshe

Posted on May 26, 2020

Debezium Custom Converters - TimestampConverter

Debezium Custom Converters

Creating custom converters using Debezium's new SPI to override value conversions

Introduction

Background about the TimestampConverter

Hey, my name is Oryan Moshe, and I started my own community for senior developers in Israel named in.dev.

I'm also a Lead Architect at Rivery, We are a fully managed data integration platform, and part of my job here is developing the ability to stream changes straight from the clients’ databases, to our platform, using change data capture (CDC).

The last time I coded in Java was 7 years ago, so if you have any suggestions to improve the code shown here please feel free to comment below!

You can find the converter right here:
https://github.com/oryanmoshe/debezium-timestamp-converter/

CDC — Change Data Capture

Before we talk about Debezium, we have to talk about CDC.

CDC is a way for us to get the changes happening in the database (as opposed to the actual data)

This means we can actually get every state that every record has been through in the database.

CDC is useful for a number of cases:

  • Compiling a log of record changes
  • Undoing (or reverting) a change
  • Tracking record deletion (which is not simply a matter of using SELECT)

What is Debezium Anyway?

Debezium is an open source platform, maintained by Red Hat, that allows developers to implement CDC into a Kafka infrastructure.

Debezium actuates CDC by configuring connections using the provided Kafka Connect data connectors. Currently there's support for MySQL, PostgreSQL, Microsoft SQL Server, MongoDB, and even some limited support for Oracle.

What are converters, and why would we need a custom one?

All messages produced by Debezium are processed before entering the designated topic.

This ensures that all fields of a given type (defined by the schema) behave the same.

In other words, all DATE fields on all of the databases will be transformed into the same format. This is, by default "Days since epoch".

But this behavior isn't always wanted, especially in this temporal example.

For our particular use case we need all temporal fields to be in the same format, whether the type is DATE, DATETIME, DATETIME2, TIME or TIMESTAMP.

The format we chose was YYYY-MM-dd'T'HH:mm:ss.SSS'Z'.


Creating a custom converter

Here's an explanation for each step needed to create our TimestampConverter.

The basics of custom converters

To allow such behavior, the Debezium SPI (Service Provider Interface) was added to Debezium Version 1.1.

This allows developers to make their own converters with Java, by creating a class that implements the io.debezium.spi.converter.CustomConverter interface.

The First Gotcha

What we didn't know when we started developing this converter, is that once we registered a custom converter to a temporal column, Debezium's behavior became sporadic. Sometimes it'll pass a DATE column as "Days since epoch", as expected, but sometimes it'll pass it as a string, matching the date format of the database it came from.

This meant we had to have all of our bases covered, both for numeric values (let's say, "Days since epoch") and for all date format databases can produce (YYYY-MM-dd, dd/MM/YYYY, YYYY-MMM-dd, etc.)

Things got a bit complicated on the logic front, but let's not get into this right now.

What's needed for our custom converter to work

Each converter has to implement at least two methods to be harnessed by Debezium:

configure

This method runs when the connector is initialised. It accepts one argument:

props
An object of type java.util.Properties, containing all of the properties we passed to our converter instance.

converterFor

This method runs once for each column defined in our schema, and its job is to define (a.k.a "register") the converter for each. It accepts two arguments:

column
An object of type io.debezium.spi.converter.RelationalColumn, containing the definition of the column we're currently handling, including its name, type, size, table, etc.

registration
An object of type io.debezium.spi.converter.CustomConverter.ConverterRegistration, an internal definition, that has one method: register.

Using the configure method

As stated above, we use the configure method to pass properties into our converter. This is important because we can use the same converter for multiple connectors, and change its behavior according to these properties.

For our TimestampConverter we wanted to pass four properties:

  • debug – Indicates whether to print debug messages. Defaults to false.
  • format.date – The format to convert all columns of type DATE. Defaults to YYYY-MM-dd.
  • format.time – The format to convert all columns of type TIME. Defaults to HH:mm:ss.
  • format.datetime – The format to convert all other temporal columns. Defaults to YYYY-MM-dd'T'HH:mm:ss.SSS'Z'.

All of these properties are optional and have default values associated with them.

To support them we defined each of them as a class property with the default value. Inside the configure method we assigned them with the passed value:

public class TimestampConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    public static final String DEFAULT_DATE_FORMAT = "YYYY-MM-dd";
    public static final String DEFAULT_TIME_FORMAT = "HH:mm:ss.SSS";

    public String strDatetimeFormat, strDateFormat, strTimeFormat;
    public Boolean debug;

    private SimpleDateFormat simpleDatetimeFormatter, simpleDateFormatter, simpleTimeFormatter;

    @Override
    public void configure(Properties props) {
        this.strDatetimeFormat = props.getProperty("format.datetime", DEFAULT_DATETIME_FORMAT);
        this.simpleDatetimeFormatter = new SimpleDateFormat(this.strDatetimeFormat);

        this.strDateFormat = props.getProperty("format.date", DEFAULT_DATE_FORMAT);
        this.simpleDateFormatter = new SimpleDateFormat(this.strDateFormat);

        this.strTimeFormat = props.getProperty("format.time", DEFAULT_TIME_FORMAT);
        this.simpleTimeFormatter = new SimpleDateFormat(this.strTimeFormat);

        this.debug = props.getProperty("debug", "false").equals("true");

        this.simpleDatetimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
        this.simpleTimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
    }
}
Enter fullscreen mode Exit fullscreen mode

Using the converterFor method

Now it's time for the big moment. Each column must be converted to its respective format.

First of all, we have to understand the type of the column we're currently handling. This is determined using column.typeName.

If the type is any of the temporal types (defined as a class constant) we handle it accordingly. If it's not, we do nothing, and Debezium will take control.

To tell Debezium to convert a specific column to something else, we need to use the registration passed to us. Then register it, providing a schema (create one of type string and make it optional) and a converter.

The converter is just a function, or in our case a lambda, that receives an Object. This is the source value, and returns a value matching the schema we provided. In our case, we needed to return a String (or null, because we made it optional).

@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
    if (SUPPORTED_DATA_TYPES.stream().anyMatch(s -> s.toLowerCase().equals(column.typeName().toLowerCase()))) {
        boolean isTime = "time".equals(column.typeName().toLowerCase());
        registration.register(datetimeSchema, rawValue -> {
            if (rawValue == null)
                return rawValue;

            Long millis = getMillis(rawValue.toString(), isTime);

            Instant instant = Instant.ofEpochMilli(millis);
            Date dateObject = Date.from(instant);

            switch (column.typeName().toLowerCase()) {
                case "time":
                    return simpleTimeFormatter.format(dateObject);
                case "date":
                    return simpleDateFormatter.format(dateObject);
                default:
                    return simpleDatetimeFormatter.format(dateObject);
            }
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

In this code snippet look at the two crucial parts we have mentioned before. These are the call to registration.register, and the return statements.

Using a Custom Converter with Debezium

Installation

Installation in our Debezium cluster is straight-forward. We just need to add the .jar file of the converter to the connector we want to use it in.

The Second Gotcha

Notice I said " ... to the connecter we want ... ", this is a thing Debezium didn't make clear in the documentation. We need to add this converter to every connector if we want to use it in.

Let's say the base folder for connectors is /kafka/connect. Then inside we'll find folders like debezium-connector-mysql, or debezium-connector-postgres.

We need to add the converter .jar file to each of those folders if we intend to use it.

Configuration

After adding the .jar file to our connector, we can configure our connectors to use it!

To do so all we need to do is add the following keys to our existing configuration:

"converters": "timestampConverter",
"timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter"
Enter fullscreen mode Exit fullscreen mode

If we need to customize the formats of specific data types, we can use these additional configuration keys:

"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"
Enter fullscreen mode Exit fullscreen mode

Conclusions

The addition of an SPI to Debezium brought a lot to the table in term of custom converters.

This allows us to get a tailored fit CDC connector, with the data streaming into our Kafka cluster exactly in the format we want.

I didn't include the actual logic, converting the values from their raw format into the epoch time (this part is contained in the getMillis method)

But I have published the TimestampConverter as open source, so anyone can read the code there, use the converter in an application (be it as a .jar file found in the releases section, or as a dependency found in the packages section), or contribute to its development!

Feel free to suggest contributions to this converter, and share with me what kind of converters you created using the new Debezium SPI, and which ones you wish were made!

Links

To read more about Debezium Custom Converter visit their official documentation:
https://debezium.io/documentation/reference/1.1/development/converters.html

Link to the repository of my TimestampConverter:
https://github.com/oryanmoshe/debezium-timestamp-converter

💖 💪 🙅 🚩
oryanmoshe
Oryan Moshe

Posted on May 26, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related