How to Keep Sync ElasticSearch with MySQL Relational Data

batuhannarci

İsmail Batuhan NARCI

Posted on December 29, 2022

How to Keep Sync ElasticSearch with MySQL Relational Data

Before starting to deep dive into the subject, I want to say that I'm not an expert on ElasticSearch (ES) 😃. My motivation for writing this is that it is difficult to find comprehensive guides about parent/child relationships on ES.

On ES, we can find hundreds of posts and guides about indexing data. However, I want to concentrate on indexing relational data in this story.

Content

  • What is the problem with indexing relational data?
  • How we can solve this problem?
  • Proof of concept of my proposition
  • Resources

What is the problem with indexing relational data?

Our apps now retrieve data from many tables or even databases. All those data must be combined to be displayed on a panel or dashboard. Using an RDBMS database for this is becoming slower as the data grows larger.

We want to filter or search this data in addition to showing it. If we have multiple text fields from different tables, joining those tables and performing a search can put a significant strain on our database.

There ES comes to the stage. It's a distributed, free, and open search and analytics engine for all types of data. And, this is how we can solve this issue.

How we can solve this problem?

ES can be used for searching, logging, analytics, and several useful stuff 😃. We focus on searching capability in this story.

To make a search on our data, we need to index it first on ES and we can use several patterns to index data.

Denormalizing, nested or parent/child patterns are the most used ways to index relational data.

Let's say we have two tables: product and tag. A product can have zero or more tags and a tag can belong to one or more products. The entity relation looks like the one below.

Product-Tag Entity Relation

We can write a query like that to get a product and its tags.

SELECT p.id, p.name, t.id, t.name
FROM product p
         LEFT JOIN tag t ON t.product_id = p.id
WHERE p.id = 98452;
Enter fullscreen mode Exit fullscreen mode

And the result can be like this

Query Response 1

The denormalized version of this relationship is;

{
  "product" : {
    "id" : 214673,
    "name" : "AliExpress Fitness Trackers - mint green",
    "tags" : "tag1, tag22"
  }
}
Enter fullscreen mode Exit fullscreen mode

To achieve this we can modify the query above like the following.

SELECT p.id as product_id, p.name as product_name, t.id as tag_id, group_concat(t.name) as tags
FROM product p
         LEFT JOIN tag t ON t.product_id = p.id
WHERE p.id = 2141673
GROUP BY t.product_id;
Enter fullscreen mode Exit fullscreen mode

This pattern is the easiest way to index relational data but has some caveats. If you have several tables to join with many columns, it's not practical and performant.

So there comes another approach: nested structure.

For the same example, the nested structure of this relationship is;

{
  "product" : {
    "id" : 214673,
    "name" : "AliExpress Fitness Trackers - mint green",
    "tags" : {
      {
        "id" : 1,
        "name" : "tag1"
      }, 
      {
        "id" : 2,
        "name" : "tag22"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

This appears to be very promising, but with the caveat that we must proceed to choose a parent/child structure. If a nested entity is updated in a nested structure, the entire index must be updated. If you have nested entities that are frequently updated, you should use the next structure.

For the same example, the parent/child structure of this relationship is;

{
  "product" : {
    "id" : 214673,
    "name" : "AliExpress Fitness Trackers - mint green"
  }
}

{
  "tag" : {
    "id" : 1,
    "name" : "tag1",
    "product_id" : 214673
  }
}

{
  "tag" : {
    "id" : 2,
    "name" : "tag22",
    "product_id" : 214673
  }
}
Enter fullscreen mode Exit fullscreen mode

We can use any programming language to index data on ES.

In this story, I'll use Logstash and multiple pipelines to create the initial index and keep synced with our database.

After all of this summarized info, I'm going to proof of the parent/child concept.

Proof of concept of my proposition

My technical stack is;

  • MySQL 8
  • ElasticSearch 8.5.3
  • Logstash 8.5.3 with JDBC plugin enabled
  • Kibana 8.5.3
  • Docker

You can find the complete code in the es_relations_example repo.

First, we are going to create a database and fill it with sample data. Then, we will add ES, Kibana, and Logstash step by step. At last, we will look into pipelines one by one to understand better how it works.

Step 1: Create MySQL and fill it with data.

After you create your project directory, Create a directory with the name data and put this SQL file in it. Then, create a file with the name docker-compose.yml. Put the following lines in it.

version: "3"
services:
  mysql:
    image: mysql:8
    ports:
      - 3306:3306
    environment:
      MYSQL_RANDOM_ROOT_PASSWORD: "yes"
      MYSQL_DATABASE: "es_relations_example"
      MYSQL_USER: "test"
      MYSQL_PASSWORD: "test"
    volumes:
      - ./data/:/docker-entrypoint-initdb.d/
Enter fullscreen mode Exit fullscreen mode

That piece of code says that create a MySQL 8 server and use given queries in the data directory to fill it.

If you want to set the root password, you can delete MYSQL_RANDOM_ROOT_PASSWORD config and add MYSQL_RANDOM_ROOT with a root password as your wish.

Then, run the following commands to see that the database is set up correctly.

docker-compose up -d mysql

# Once the container is ready, run the following to find container ID
docker ps -l # -l for latest container

docker exec -it 136faa620a82 bash # Use the container ID that you get above

mysql -utest -ptest es_relations_example # Username, password and database name given in docker-compose.yml

show tables;
Enter fullscreen mode Exit fullscreen mode

You need to see an output like below.

MySQL Table List

Yaay! 🎊 Our database is running.

Step 2: Set up ElasticSearch and Kibana. First, create a directory with the name volumes and another one in it with the name elasticsearch. This directory will keep our indexed data. Then, add the following lines to the docker-compose.yml.

elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.5.3
    environment:
      discovery.type: "single-node"
      bootstrap.memory_lock: "true"
      xpack.security.enabled: "false"
      xpack.security.enrollment.enabled: "false"
      xpack.monitoring.collection.enabled: "true"
      ES_JAVA_OPTS: "-Xms1g -Xmx1g"
    volumes:
      - ./volumes/elasticsearch/data:/usr/share/elasticsearch/data
    ports:
      - 9200:9200

  kibana:
    image: docker.elastic.co/kibana/kibana:8.5.3
    environment:
      ELASTICSEARCH_HOSTS: "http://elasticsearch:9200"
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
Enter fullscreen mode Exit fullscreen mode

You can run the following command to build our new containers.

docker-compose up -d elasticsearch kibana

# You can check if everything is fine after build is done
docker ps
Enter fullscreen mode Exit fullscreen mode

Docker Container List

You should see something like the above. Everything is fine till now.

Step 3: Now, we can set up Logstash to send data from the database to ES.

We can connect Logstash to MySQL using the JDBC plugin.

So, create a file named Dockerfile-logstash at the root of your project and put the following in it.

FROM docker.elastic.co/logstash/logstash:8.5.3

# Download JDBC MySQL connector
RUN curl -L --output "mysql-connector-j-8.0.31.tar.gz" "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.0.31.tar.gz" \
    && tar -xf "mysql-connector-j-8.0.31.tar.gz" "mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar" \
    && mv "mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar" "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j.jar" \
    && rm -r "mysql-connector-j-8.0.31" "mysql-connector-j-8.0.31.tar.gz"

ENTRYPOINT ["/usr/local/bin/docker-entrypoint"]
Enter fullscreen mode Exit fullscreen mode

And, add the following lines to the docker-compose.yml.

logstash:
    build:
      context: .
      dockerfile: ./Dockerfile-logstash
    environment:
      LS_JAVA_OPTS: "-Xmx1g -Xms1g"
    depends_on:
      - mysql
      - elasticsearch
Enter fullscreen mode Exit fullscreen mode

Now, we have everything that we need but need to configure Logstash to get data from the database and index it on ES.

Logstash is using pipelines to retrieve data from the source and process it and lastly sent it to the output which we choose.

We'll use three pipelines to create the index at first, keep updates synced and keep deletions synced.

Let's create a directory named logstash under volumes. Create another three directories under logstash with the name config, pipeline, and templates.

Create a file named products.json under volumes/logstash/templates and put the following in it.

This will be our index mapping template. You can find details about the mapping here.

{
    "index_patterns": "products",
    "template": {
        "settings" : {
            "index" : {
                "number_of_shards" : "1",
                "number_of_replicas" : "1"
            }
        },
        "mappings": {
            "properties": {
                "@timestamp": {
                    "type": "date"
                },
                "barcode": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "brand_name": {
                    "type": "keyword",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "category_name": {
                    "type": "keyword",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "name": {
                    "type": "keyword",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "product_id": {
                    "type": "long"
                },
                "tag_name": {
                    "type": "keyword",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "join_field": { 
                    "type": "join",
                    "relations": {
                        "product": "tag"
                    }
                },
                "type": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Now, we will create our first pipeline to index data from scratch. Create a file named initial_index.conf under volumes/logstash/pipeline.

The content of this will be as below.

# This part contains database information with a query to get data. 
input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/es_relations_example"
    jdbc_user => "test"
    jdbc_password => "test"
    clean_run => true
    record_last_run => false
    statement => "SELECT product.id as id, product.name, category.name as category_name, brand.name as brand_name, barcode FROM product LEFT JOIN category ON product.category_id = category.id LEFT JOIN brand ON product.brand_id = brand.id"
    type => "product"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/es_relations_example"
    jdbc_user => "test"
    jdbc_password => "test"
    clean_run => true
    record_last_run => false
    statement => "SELECT id, product_id, name as tag_name FROM tag"
    type => "tag"
  }
}

# We are adding some fields to create parent/child structure and remove unnecessary field
filter {
  if [type] == "product" {
    mutate {
      add_field => {"join_field" => "product"}
      remove_field => ["@version"]
    }
  } else if [type] == "tag" {
    mutate {
      add_field => {
        "[join_field][name]" => "tag"
        "[join_field][parent]" => "%{product_id}"
      }
      remove_field => ["@version"]
    }
  }
}

# Send the data to the ElasticSearch with our mapping schema.
# stdout's are for debug purpose. You can delete them
output {
  if [type] == "product" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      action => "index"
      document_id => "%{id}"
      routing => "%{id}"
      manage_template => true
      template => "/usr/share/logstash/templates/products.json"
      template_name => "products"
      template_overwrite => true
    }
    stdout {}
  } else if [type] == "tag" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      action => "index"
      document_id => "%{id}"
      routing => "%{product_id}"
      manage_template => true
      template => "/usr/share/logstash/templates/products.json"
      template_name => "products"
      template_overwrite => true
    }
    stdout {}
  }
}
Enter fullscreen mode Exit fullscreen mode

We have to tell Logstash that we have a pipeline to execute.

Create a file named pipelines.yml in volumes/logstash/config.

Put the lines below in it.

- pipeline.id: initial_index-pipeline
  path.config: "/usr/share/logstash/pipeline/initial_index.conf"
Enter fullscreen mode Exit fullscreen mode

And finally, update docker-compose.yml as the following.

logstash:
    build:
      context: .
      dockerfile: ./Dockerfile-logstash
    environment:
      LS_JAVA_OPTS: "-Xmx1g -Xms1g"
    depends_on:
      - mysql
      - elasticsearch
    volumes:
      - ./volumes/logstash/pipeline/:/usr/share/logstash/pipeline/
      - ./volumes/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml
      - ./volumes/logstash/templates/products.json:/usr/share/logstash/templates/products.json
Enter fullscreen mode Exit fullscreen mode

Run the following command and Logstash will index our data into ES.

docker-compose up -d logstash

docker ps -l # Get the container ID of logstash

docker logs -f 7e22a14208fa # This will print out the logs of the Logstash container
Enter fullscreen mode Exit fullscreen mode

In the end, you need to see a line like that
[2022-12-28T14:30:16,972][INFO ][logstash.runner] Logstash shut down.

At this point, our data has to be indexed 😃

Let's check if it's there.

Go to http://localhost:5601/app/dev_tools

This will open a query console to make a search in our index.

We can get all indexed data using the query below.

GET products/_search
{
  "query": {
    "match_all": {}
  }
}
Enter fullscreen mode Exit fullscreen mode

The result looks like this.

Dev Tools Query Console
As you can see in the response, hits->total->value:104, we have 100 products and 4 tags in our database.

I actually use two patterns at the same time in this project.
Denormalized data to put category/brand names in the product object and parent/child pattern to get product tags.

Use the query below to see products with tags.

GET products/_search
{
  "query": {
    "has_child": {
      "type": "tag",
      "query": {
        "match_all": {}
      },
      "inner_hits": {}
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

This will return products and their tags.
The key points of this project so far are how we create the parent/child relationship.

  1. Check the products.json to see the join_field. This is how we define the relationship between product and tag.
  2. Check the initial_index.conf filter section to see how we add necessary fields to the related type of document. We just add the join_field field to the product and join_fields name and parent fields to the tag.

You can find out detailed information about join at this link.

We indexed our data already. But, what will happen when something is updated or deleted from the database?

I have a solution for you 😃 We need two more pipelines to keep updates/deletes synced with our database.

Let's create another pipeline with the name keep_sync.conf under volumes/logstash/pipeline and put the followings in it.

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/es_relations_example"
    jdbc_user => "test"
    jdbc_password => "test"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT product.id as id, product.name, category.name as category_name, brand.name as brand_name, barcode, UNIX_TIMESTAMP(GREATEST(product.date_updated, brand.date_updated, category.date_updated)) as unix_ts_in_secs FROM product LEFT JOIN category ON product.category_id = category.id LEFT JOIN brand ON product.brand_id = brand.id WHERE UNIX_TIMESTAMP(GREATEST(product.date_updated, brand.date_updated, category.date_updated)) > :sql_last_value AND GREATEST(product.date_updated, brand.date_updated, category.date_updated) < NOW()"
    type => "product"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/es_relations_example"
    jdbc_user => "test"
    jdbc_password => "test"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT id, product_id, name as tag_name, date_created as unix_ts_in_secs FROM tag WHERE UNIX_TIMESTAMP(date_created) > :sql_last_value AND date_created < NOW()"
    type => "tag"
  }
}

filter {
  if [type] == "product" {
    mutate {
      add_field => {"join_field" => "product"}
      remove_field => ["@version", "unix_ts_in_secs"]
    }
  } else if [type] == "tag" {
    mutate {
      add_field => {
        "[join_field][name]" => "tag"
        "[join_field][parent]" => "%{product_id}"
      }
      remove_field => ["@version", "unix_ts_in_secs"]
    }
  }
}

output {
  if [type] == "product" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      action => "index"
      document_id => "%{id}"
      routing => "%{id}"
    }
  } else if [type] == "tag" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      action => "index"
      document_id => "%{id}"
      routing => "%{product_id}"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Did you notice the changes in the jdbc section?

We modified the SQL statement to get updated rows a bit and put a cron scheduler to run it automatically. This will run the statement every 5 seconds and retrieve the updates if there are any.

And, we'll say Logstash to run this pipeline too by adding the following lines to the pipeline.yml

- pipeline.id: keep_sync-pipeline
  path.config: "/usr/share/logstash/pipeline/keep_sync.conf"
Enter fullscreen mode Exit fullscreen mode

Now, build and run the Logstash container again.

docker-compose up -d logstash
Enter fullscreen mode Exit fullscreen mode

When you check the Logstash logs, you will see that it won't shut down anymore. Instead, it will periodically run the statement.

Let's check if it is working.

Firstly, I'm going to check the indexed data for the product with ID 2141673.

Product with ID 2141673

Now, I'll update the name of it from the database.

Product with ID 2141673 Updated

A few seconds later, it was already synced.

We indexed our data and can keep synced updates. Lastly, we will keep synced deletions.

To keep deletions synced, there are two ways.

The first one is to add a soft delete column to the related tables and modify your queries to keep them away from your results and delete those rows later with a script.

The second one is to create a log book for your deletions and read this table using Logstash and delete related indexes.

The latter one is better suited for our case and we implemented it. When you check the example database, you will see a table with named sync_log. When rows are deleted from other tables, their triggers create a row on this.

We'll be going to add a new pipeline to read this table and delete related indexes from ES.

Now, create a new pipeline named keep_sync_deletions.conf.

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/es_relations_example"
    jdbc_user => "test"
    jdbc_password => "test"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT model_id, model_type, parent_id, UNIX_TIMESTAMP(action_time) as unix_ts_in_secs FROM sync_log WHERE UNIX_TIMESTAMP(action_time) > :sql_last_value AND action_time < NOW()"
    type => "tag"
  }
}

filter {
  mutate {
    remove_field => ["@version", "unix_ts_in_secs"]
  } 
}

output {
  if [type] == "product" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      action => "delete"
      document_id => "%{model_id}"
      routing => "%{model_id}"
    }
  } else if [type] == "tag" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      action => "delete"
      document_id => "%{model_id}"
      routing => "%{parent_id}"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The main difference of this pipeline is we changed the action in the output section. The action was index for the other ones but now it's delete.

Update the pipeline.yml to say Logstash we have a new pipeline. Then, restart the Logstash container.

- pipeline.id: keep_sync_deletions-pipeline
  path.config: "/usr/share/logstash/pipeline/keep_sync_deletions.conf"
Enter fullscreen mode Exit fullscreen mode
docker-compose up -d logstash
Enter fullscreen mode Exit fullscreen mode

You can delete something from your database and try to reach it in the index. You can find it anymore, hopefully. 😃

Last Words

I try to give a brief explanation about how we can keep our relational data in ES and sync it after. I hope you find this story useful. Keep in mind this story assumes that you already know about the tech stack which we used. So, I try not to deep dive into how they are working or set up.

This is my first story by the way. Please let me know anything about the story.

Feel free to ask questions about the ES, Logstash, or something.

Thank you for your time.

Resources

💖 💪 🙅 🚩
batuhannarci
İsmail Batuhan NARCI

Posted on December 29, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

What was your win this week?
weeklyretro What was your win this week?

November 29, 2024

Where GitOps Meets ClickOps
devops Where GitOps Meets ClickOps

November 29, 2024

How to Use KitOps with MLflow
beginners How to Use KitOps with MLflow

November 29, 2024

Modern C++ for LeetCode 🧑‍💻🚀
leetcode Modern C++ for LeetCode 🧑‍💻🚀

November 29, 2024