Correlated sub-queries in CrateDB

marijaselakovic

marijaselakovic

Posted on January 18, 2023

Correlated sub-queries in CrateDB

Correlated sub-queries are a new feature that CrateDB 5.1 supports. This post will introduce you to the concept of a correlated sub-query, its usage, and how it's currently implemented in CrateDB!

Our main motivation for this implementation was to increase the compatibility to support tools using correlated sub-queries. Grafana, for example, uses correlated sub-queries for the auto-completion of tables and columns.

Introduction: Sub-query and Correlated sub-query

Before we start with correlated sub-queries, we'll first have a look at how a sub-query works.

What is a sub-query?

A sub-query also called an inner query or inner select, is a query embedded within another query. Let’s have a look at the following example:

Suppose we have a table with customers and a table that lists CrateDB clusters owned by those customers:

SELECT * FROM doc.customers ORDER BY id;
Enter fullscreen mode Exit fullscreen mode
+----+-------+---------+
| id | name  | country |
+----+-------+---------+
|  1 | Anton | Austria |
|  2 | Maria | Germany |
|  3 | Anna  | Italy   |
+----+-------+---------+
Enter fullscreen mode Exit fullscreen mode
SELECT * FROM doc.clusters ORDER BY id;
Enter fullscreen mode Exit fullscreen mode
+----+-------------+-----------------+-------------+
| id | customer_id | number_of_nodes | name        |
+----+-------------+-----------------+-------------+
|  1 |           1 |              10 | cluster 1   |
|  2 |           2 |              15 | cluster 2   |
|  3 |           2 |               8 | cluster 3   |
|  4 |           3 |              12 | cluster 4   |
+----+-------------+-----------------+-------------+
Enter fullscreen mode Exit fullscreen mode

We want to find all the clusters owned by customers from Austria. We can first start by finding all customers from Austria:

SELECT id, name FROM customers WHERE country = 'Austria';
Enter fullscreen mode Exit fullscreen mode
+----+-------+
| id | name  |
+----+-------+
|  1 | Anton |
+----+-------+
Enter fullscreen mode Exit fullscreen mode

Now for each row in clusters, we want to look up if the cluster’s customer_id matches a customer where the country is Austria. Therefore we can include the first query as a sub-query into the where-clause of another query, a so-called outer-query:

SELECT clusters.id,
       clusters.name
FROM   clusters
WHERE  clusters.customer_id IN (SELECT customers.id
                                FROM   customers
                                WHERE  customers.country = 'Austria');
Enter fullscreen mode Exit fullscreen mode
+----+-------------------+
| id | name              |
+----+-------------------+
|  1 | cluster 1         |
+----+-------------------+
Enter fullscreen mode Exit fullscreen mode

We can learn how the query is executed using the EXPLAIN command. With this command, we will retrieve the execution plan of the query:

EXPLAIN
SELECT clusters.id,
       clusters.name
FROM   clusters
WHERE  clusters.customer_id IN (SELECT customers.id
                                FROM   customers
                                WHERE  customers.country = 'Austria');
Enter fullscreen mode Exit fullscreen mode
+------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                        |
+------------------------------------------------------------------------------------------------+
| MultiPhase                                                                                     |
|   └ Collect[doc.clusters | [id, name] | (customer_id = ANY((SELECT id FROM (doc.customers))))] |
|   └ OrderBy[id ASC]                                                                            |
|     └ Collect[doc.customers | [id] | (country = 'Austria')]                                    |
+------------------------------------------------------------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Let’s dive into the details of the execution plan to understand what exactly is happening here. The execution plan is a tree of operators which gives us insights into how this query is handled internally. Each operator represents an operation the database will execute. We will be looking into each operation from the bottom to the top:

  • The operator Collect[doc.customers] collects from the table doc.customers the value id where the query expression (country = 'Austria') matches on its rows. It represents the sub-query.

  • The operator OrderBy[id ASC] sorts the value id ascending. This operator is added by the query optimizer to return the values sorted, because it make the outer-query’s filter faster.

  • The operator Collect[doc.clusters] collects the values id, name from the table doc.clusters where the customer_id matches the result from the sub-query select id from doc.customers. It represents the outer-query.

  • The operator MultiPhase combines the operators OrderBy[id ASC] and Collect[doc.clusters], executes them in order, and injects the result of the first operator into the second operator.

The order of execution is the following:

  • Collect all ids from doc.customers where the country is Austria ordered by id ascending.

  • Collect the values id and name from doc.cluster where the customer_id matches the ids collected from the first operation.

  • Return these values back to the client.

Since the first operation, which represents the inner query, does not depend on any outer context, it can be executed once, standalone and the result can be injected into the outer-query.

So what is then a correlated sub-query?

Imagine now the following use-case: We would like to know the total number of nodes each customer has in its clusters. Let’s try the same approach as before and break this down into multiple steps: We can first create a query retrieving the sum of all nodes for a single customer_id:

SELECT SUM(number_of_nodes) FROM clusters WHERE clusters.customer_id = 2;
Enter fullscreen mode Exit fullscreen mode
+----------------------+
| sum(number_of_nodes) |
+----------------------+
|                   23 |
+----------------------+
Enter fullscreen mode Exit fullscreen mode

And then we embed this query again as a sub-query into an outer-query to generalize it for all customers:

SELECT customers.id,
       customers.name,
       (SELECT Sum(number_of_nodes)
        FROM   clusters
        WHERE  clusters.customer_id = customers.id) AS number_of_nodes
FROM   customers;
Enter fullscreen mode Exit fullscreen mode
+----+-------+-----------------+
| id | name  | number_of_nodes |
+----+-------+-----------------+
|  1 | Anton |              10 |
|  3 | Anna  |              12 |
|  2 | Maria |              23 |
+----+-------+-----------------+
Enter fullscreen mode Exit fullscreen mode

Please note that now the sub-query uses a reference customers.id which correlates to the outer- query. That is why it's called a correlated sub-query. The result of the sub-query will be different for each value of customers.id from the outer-query. Therefore, the inner-query needs to be executed for each value again. Consequently, the execution of correlated sub-queries in a database is not a straightforward task.

Let’s now check the execution plan of the query using EXPLAIN command again:

EXPLAIN
SELECT customers.id,
       customers.name,
       (SELECT Sum(number_of_nodes)
        FROM   clusters
        WHERE  clusters.customer_id = customers.id) AS number_of_nodes
FROM   customers;
Enter fullscreen mode Exit fullscreen mode
+-----------------------------------------------------------------------------------------+
| EXPLAIN                                                                                 |
+-----------------------------------------------------------------------------------------+
| Eval[id, name, (SELECT sum(number_of_nodes) FROM (doc.clusters)) AS number_of_nodes]    |
|   └ CorrelatedJoin[id, name, (SELECT sum(number_of_nodes) FROM (doc.clusters))]         |
|     └ Collect[doc.customers | [id, name] | true]                                        |
|     └ SubPlan                                                                           |
|       └ Limit[2::bigint;0::bigint]                                                      |
|         └ HashAggregate[sum(number_of_nodes)]                                           |
|           └ Collect[doc.clusters | [number_of_nodes, customer_id] | (customer_id = id)] |
+-----------------------------------------------------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Let’s look into the execution plan:

  • The operator Collect[doc.clusters] collects from the table doc.clusters the values [number_of_nodes, customer_id] from the rows where customer_idmatches a given id. It represents the data collection part of the sub-query.

  • The operator HashAggregate[sum(number_of_nodes)] aggregates the values of number of nodes. It is dependent on the previous operation. It represents the sum function of the sub-query.

  • The operator Limit[2::bigint;0::bigint] is used as an assertion. A sub-query must return at most 1 result row, so we limit the query to 2, if there is no limit to avoid retrieval.

  • The operatorSubPlan marks the formerly described operator tree as a correlated sub-query.

  • The operator Collect[doc.customers] collects the values [id, name] from the table doc.customers. It represents the outer query.

  • The operatorCorrelatedJoin performs the join between the outer-query results and the sub-query results. First, the outer query will be executed and then for each value of the result of the outer-query the sub-query will be executed.

  • The operatorEval[id, name] combines the results from the underlying CorrelatedJoin and returns the values [id, name,number_of_nodes] back to the client.

The order of execution is the following:

  • Collect all the values [id, name] from doc.customers.

  • For each of the id’s from the first operation, collect the values [number_of_nodes, customer_id] where the customer.id matches the id values and aggregate number_of_nodes.

  • Return the values id, name and the result of the aggregation back to the client.

The inner-query is now depending on the outer context. Therefore, it has to be executed again for every single row of the outer-query. This is the state of the current implementation in CrateDB. It works, but the execution is not optimal.

Can we do better?

The same query can also be expressed as a join. Relational databases are good at running joins because there is plenty of room for optimizations. The question is: How to convert a correlated-sub-query to a join?

Let’s take our correlated sub-query example from the beginning again:

SELECT customers.id,
       customers.name,
       (SELECT Sum(number_of_nodes)
        FROM   clusters
        WHERE  clusters.customer_id = customers.id) AS number_of_nodes
FROM   customers;
Enter fullscreen mode Exit fullscreen mode

As mentioned above, the correlated sub-query has a reference customers.id to the outer-query. Hence the query is executed each time for every row of the outer-query. Let’s have a closer look at the sub-query:

SELECT Sum(number_of_nodes)
  FROM   clusters
  WHERE  clusters.customer_id = customers.id
Enter fullscreen mode Exit fullscreen mode

The sub-query selects data from the table doc.clusters. So instead of utilizing the sub-query in the outer query, we can replace it with a join to the table doc.clusters. The where-clause of the sub-query, which includes the reference customers.id to the outer-query, becomes the join condition. We replace the sub-query in the outer-query with the aggregation function Sum(number_of_nodes) from the sub-query and we finally have to add a group-by for the aggregation:

SELECT customers.id,
       customers.name,
       Sum(clusters.number_of_nodes) AS number_of_nodes
FROM   clusters
       JOIN customers
         ON clusters.customer_id = customers.id
GROUP  BY customers.id,
          customers.name;
Enter fullscreen mode Exit fullscreen mode

Voilà, we converted the correlated sub-query into a join. Let’s run this query again to see if it yields the same result.

+----+-------+-----------------+
| id | name  | number_of_nodes |
+----+-------+-----------------+
|  3 | Anna  |              12 |
|  1 | Anton |              10 |
|  2 | Maria |              23 |
+----+-------+-----------------+

Enter fullscreen mode Exit fullscreen mode

This looks good. Let’s now check the logical plan using the EXPLAIN command:

EXPLAIN
SELECT customers.id,
       customers.name,
       Sum(clusters.number_of_nodes) AS number_of_nodes
FROM   clusters
       JOIN customers
         ON clusters.customer_id = customers.id
GROUP  BY customers.id,
          customers.NAME;
Enter fullscreen mode Exit fullscreen mode
+-----------------------------------------------------------------------+
| EXPLAIN                                                               |
+-----------------------------------------------------------------------+
| Eval[id, name, sum(number_of_nodes) AS number_of_nodes]               |
|   └ GroupHashAggregate[id, name | sum(number_of_nodes)]               |
|     └ HashJoin[(customer_id = id)]                                    |
|       ├ Collect[doc.clusters | [number_of_nodes, customer_id] | true] |
|       └ Collect[doc.customers | [id, name] | true]                    |
+-----------------------------------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Let’s look into the execution plan:

  • The operator Collect[doc.customers] collects from the table doc.customers the values [id, name].

  • The operator Collect[doc.clusters] collects the values [number_of_nodes, customer_id] from the table doc.clusters.

  • The operator HashJoin[(customer_id = id)] performs a hash-join on the output from the previous two operators with the join condition customer_id = id.

  • The operator GroupHashAggregate[id, name | sum(number_of_nodes)] aggregates the values of number_of_nodes for each [id, name] pair.

  • The operatorEval[id, name, sum(number_of_nodes)] takes the results from the underlying operator tree and returns the values [id, name,number_of_nodes] back to the client.

The order of execution is the following:

  • Collect the values [id, name]from doc.customers.

  • Collect the values [number_of_nodes, customer_id] from doc.clusters.

  • Join the previous two datasets together on customer_id = id.

  • Aggregate the values of number_of_nodes for each row together.

  • Return the values id, name and the result of the aggregation back to the client.

The new execution plan is much more efficient than the previous one. The expensive execution of the correlated sub-query for each value of the outer-query is eliminated. This transformation from the correlated join to a regular join is called decorrelation. Of course, we expect that the query optimizer is clever enough to do the decorrelation for us automatically, so we don’t have to rearrange our query manually. In practice, there are more pitfalls to generalizing this concept to a broader range of queries because the queries can be much more complex. However, this gives you an idea of the concept and the further work we have to put in to integrate this into CrateDB.

Summary

In this post, we covered what a correlated sub-query is and how it is implemented in CrateDB 5.1. We also showed where the bottleneck and limitations are with the current approach and how we can improve this in the future.

Are you interested in CrateDB? Have a look at the documentation CrateDB Reference. If you have questions, check out our CrateDB Community. We're happy to help!

💖 💪 🙅 🚩
marijaselakovic
marijaselakovic

Posted on January 18, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related