Correlated sub-queries in CrateDB
marijaselakovic
Posted on January 18, 2023
Correlated sub-queries are a new feature that CrateDB 5.1 supports. This post will introduce you to the concept of a correlated sub-query, its usage, and how it's currently implemented in CrateDB!
Our main motivation for this implementation was to increase the compatibility to support tools using correlated sub-queries. Grafana, for example, uses correlated sub-queries for the auto-completion of tables and columns.
Introduction: Sub-query and Correlated sub-query
Before we start with correlated sub-queries, we'll first have a look at how a sub-query works.
What is a sub-query?
A sub-query also called an inner query or inner select, is a query embedded within another query. Let’s have a look at the following example:
Suppose we have a table with customers and a table that lists CrateDB clusters owned by those customers:
SELECT * FROM doc.customers ORDER BY id;
+----+-------+---------+
| id | name | country |
+----+-------+---------+
| 1 | Anton | Austria |
| 2 | Maria | Germany |
| 3 | Anna | Italy |
+----+-------+---------+
SELECT * FROM doc.clusters ORDER BY id;
+----+-------------+-----------------+-------------+
| id | customer_id | number_of_nodes | name |
+----+-------------+-----------------+-------------+
| 1 | 1 | 10 | cluster 1 |
| 2 | 2 | 15 | cluster 2 |
| 3 | 2 | 8 | cluster 3 |
| 4 | 3 | 12 | cluster 4 |
+----+-------------+-----------------+-------------+
We want to find all the clusters owned by customers from Austria. We can first start by finding all customers from Austria:
SELECT id, name FROM customers WHERE country = 'Austria';
+----+-------+
| id | name |
+----+-------+
| 1 | Anton |
+----+-------+
Now for each row in clusters, we want to look up if the cluster’s customer_id
matches a customer where the country is Austria. Therefore we can include the first query as a sub-query into the where-clause of another query, a so-called outer-query:
SELECT clusters.id,
clusters.name
FROM clusters
WHERE clusters.customer_id IN (SELECT customers.id
FROM customers
WHERE customers.country = 'Austria');
+----+-------------------+
| id | name |
+----+-------------------+
| 1 | cluster 1 |
+----+-------------------+
We can learn how the query is executed using the EXPLAIN command. With this command, we will retrieve the execution plan of the query:
EXPLAIN
SELECT clusters.id,
clusters.name
FROM clusters
WHERE clusters.customer_id IN (SELECT customers.id
FROM customers
WHERE customers.country = 'Austria');
+------------------------------------------------------------------------------------------------+
| EXPLAIN |
+------------------------------------------------------------------------------------------------+
| MultiPhase |
| └ Collect[doc.clusters | [id, name] | (customer_id = ANY((SELECT id FROM (doc.customers))))] |
| └ OrderBy[id ASC] |
| └ Collect[doc.customers | [id] | (country = 'Austria')] |
+------------------------------------------------------------------------------------------------+
Let’s dive into the details of the execution plan to understand what exactly is happening here. The execution plan is a tree of operators which gives us insights into how this query is handled internally. Each operator represents an operation the database will execute. We will be looking into each operation from the bottom to the top:
The operator
Collect[doc.customers]
collects from the tabledoc.customers
the valueid
where the query expression(country = 'Austria')
matches on its rows. It represents the sub-query.The operator
OrderBy[id ASC]
sorts the valueid
ascending. This operator is added by the query optimizer to return the values sorted, because it make the outer-query’s filter faster.The operator
Collect[doc.clusters]
collects the valuesid
,name
from the tabledoc.clusters
where thecustomer_id
matches the result from the sub-queryselect id from doc.customers
. It represents the outer-query.The operator
MultiPhase
combines the operatorsOrderBy[id ASC]
andCollect[doc.clusters]
, executes them in order, and injects the result of the first operator into the second operator.
The order of execution is the following:
Collect all
ids
fromdoc.customers
where the country is Austria ordered byid
ascending.Collect the values
id
andname
fromdoc.cluster
where thecustomer_id
matches the ids collected from the first operation.Return these values back to the client.
Since the first operation, which represents the inner query, does not depend on any outer context, it can be executed once, standalone and the result can be injected into the outer-query.
So what is then a correlated sub-query?
Imagine now the following use-case: We would like to know the total number of nodes each customer has in its clusters. Let’s try the same approach as before and break this down into multiple steps: We can first create a query retrieving the sum of all nodes for a single customer_id
:
SELECT SUM(number_of_nodes) FROM clusters WHERE clusters.customer_id = 2;
+----------------------+
| sum(number_of_nodes) |
+----------------------+
| 23 |
+----------------------+
And then we embed this query again as a sub-query into an outer-query to generalize it for all customers:
SELECT customers.id,
customers.name,
(SELECT Sum(number_of_nodes)
FROM clusters
WHERE clusters.customer_id = customers.id) AS number_of_nodes
FROM customers;
+----+-------+-----------------+
| id | name | number_of_nodes |
+----+-------+-----------------+
| 1 | Anton | 10 |
| 3 | Anna | 12 |
| 2 | Maria | 23 |
+----+-------+-----------------+
Please note that now the sub-query uses a reference customers.id
which correlates to the outer- query. That is why it's called a correlated sub-query. The result of the sub-query will be different for each value of customers.id
from the outer-query. Therefore, the inner-query needs to be executed for each value again. Consequently, the execution of correlated sub-queries in a database is not a straightforward task.
Let’s now check the execution plan of the query using EXPLAIN command again:
EXPLAIN
SELECT customers.id,
customers.name,
(SELECT Sum(number_of_nodes)
FROM clusters
WHERE clusters.customer_id = customers.id) AS number_of_nodes
FROM customers;
+-----------------------------------------------------------------------------------------+
| EXPLAIN |
+-----------------------------------------------------------------------------------------+
| Eval[id, name, (SELECT sum(number_of_nodes) FROM (doc.clusters)) AS number_of_nodes] |
| └ CorrelatedJoin[id, name, (SELECT sum(number_of_nodes) FROM (doc.clusters))] |
| └ Collect[doc.customers | [id, name] | true] |
| └ SubPlan |
| └ Limit[2::bigint;0::bigint] |
| └ HashAggregate[sum(number_of_nodes)] |
| └ Collect[doc.clusters | [number_of_nodes, customer_id] | (customer_id = id)] |
+-----------------------------------------------------------------------------------------+
Let’s look into the execution plan:
The operator
Collect[doc.clusters]
collects from the tabledoc.clusters
the values[number_of_nodes, customer_id]
from the rows wherecustomer_id
matches a givenid
. It represents the data collection part of the sub-query.The operator
HashAggregate[sum(number_of_nodes)]
aggregates the values ofnumber of nodes
. It is dependent on the previous operation. It represents the sum function of the sub-query.The operator
Limit[2::bigint;0::bigint]
is used as an assertion. A sub-query must return at most 1 result row, so we limit the query to 2, if there is no limit to avoid retrieval.The operator
SubPlan
marks the formerly described operator tree as a correlated sub-query.The operator
Collect[doc.customers]
collects the values[id, name
] from the tabledoc.customers
. It represents the outer query.The operator
CorrelatedJoin
performs the join between the outer-query results and the sub-query results. First, the outer query will be executed and then for each value of the result of the outer-query the sub-query will be executed.The operator
Eval[id, name]
combines the results from the underlyingCorrelatedJoin
and returns the values[id, name,number_of_nodes]
back to the client.
The order of execution is the following:
Collect all the values
[id, name]
fromdoc.customers
.For each of the id’s from the first operation, collect the values
[number_of_nodes, customer_id]
where thecustomer.id
matches theid
values and aggregatenumber_of_nodes
.Return the values
id
,name
and the result of the aggregation back to the client.
The inner-query is now depending on the outer context. Therefore, it has to be executed again for every single row of the outer-query. This is the state of the current implementation in CrateDB. It works, but the execution is not optimal.
Can we do better?
The same query can also be expressed as a join. Relational databases are good at running joins because there is plenty of room for optimizations. The question is: How to convert a correlated-sub-query to a join?
Let’s take our correlated sub-query example from the beginning again:
SELECT customers.id,
customers.name,
(SELECT Sum(number_of_nodes)
FROM clusters
WHERE clusters.customer_id = customers.id) AS number_of_nodes
FROM customers;
As mentioned above, the correlated sub-query has a reference customers.id
to the outer-query. Hence the query is executed each time for every row of the outer-query. Let’s have a closer look at the sub-query:
SELECT Sum(number_of_nodes)
FROM clusters
WHERE clusters.customer_id = customers.id
The sub-query selects data from the table doc.clusters
. So instead of utilizing the sub-query in the outer query, we can replace it with a join to the table doc.clusters
. The where-clause of the sub-query, which includes the reference customers.id
to the outer-query, becomes the join condition. We replace the sub-query in the outer-query with the aggregation function Sum(number_of_nodes)
from the sub-query and we finally have to add a group-by for the aggregation:
SELECT customers.id,
customers.name,
Sum(clusters.number_of_nodes) AS number_of_nodes
FROM clusters
JOIN customers
ON clusters.customer_id = customers.id
GROUP BY customers.id,
customers.name;
Voilà, we converted the correlated sub-query into a join. Let’s run this query again to see if it yields the same result.
+----+-------+-----------------+
| id | name | number_of_nodes |
+----+-------+-----------------+
| 3 | Anna | 12 |
| 1 | Anton | 10 |
| 2 | Maria | 23 |
+----+-------+-----------------+
This looks good. Let’s now check the logical plan using the EXPLAIN command:
EXPLAIN
SELECT customers.id,
customers.name,
Sum(clusters.number_of_nodes) AS number_of_nodes
FROM clusters
JOIN customers
ON clusters.customer_id = customers.id
GROUP BY customers.id,
customers.NAME;
+-----------------------------------------------------------------------+
| EXPLAIN |
+-----------------------------------------------------------------------+
| Eval[id, name, sum(number_of_nodes) AS number_of_nodes] |
| └ GroupHashAggregate[id, name | sum(number_of_nodes)] |
| └ HashJoin[(customer_id = id)] |
| ├ Collect[doc.clusters | [number_of_nodes, customer_id] | true] |
| └ Collect[doc.customers | [id, name] | true] |
+-----------------------------------------------------------------------+
Let’s look into the execution plan:
The operator
Collect[doc.customers]
collects from the tabledoc.customers
the values[id, name]
.The operator
Collect[doc.clusters]
collects the values[number_of_nodes, customer_id]
from the tabledoc.clusters
.The operator
HashJoin[(customer_id = id)]
performs a hash-join on the output from the previous two operators with the join conditioncustomer_id = id
.The operator
GroupHashAggregate[id, name | sum(number_of_nodes)]
aggregates the values ofnumber_of_nodes
for each[id, name]
pair.The operator
Eval[id, name, sum(number_of_nodes)]
takes the results from the underlying operator tree and returns the values[id, name,number_of_nodes]
back to the client.
The order of execution is the following:
Collect the values
[id, name]
fromdoc.customers
.Collect the values
[number_of_nodes, customer_id]
fromdoc.clusters
.Join the previous two datasets together on
customer_id = id
.Aggregate the values of
number_of_nodes
for each row together.Return the values
id
,name
and the result of the aggregation back to the client.
The new execution plan is much more efficient than the previous one. The expensive execution of the correlated sub-query for each value of the outer-query is eliminated. This transformation from the correlated join to a regular join is called decorrelation. Of course, we expect that the query optimizer is clever enough to do the decorrelation for us automatically, so we don’t have to rearrange our query manually. In practice, there are more pitfalls to generalizing this concept to a broader range of queries because the queries can be much more complex. However, this gives you an idea of the concept and the further work we have to put in to integrate this into CrateDB.
Summary
In this post, we covered what a correlated sub-query is and how it is implemented in CrateDB 5.1. We also showed where the bottleneck and limitations are with the current approach and how we can improve this in the future.
Are you interested in CrateDB? Have a look at the documentation CrateDB Reference. If you have questions, check out our CrateDB Community. We're happy to help!
Posted on January 18, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.