Distributed query execution in CrateDB: What you need to know
marijaselakovic
Posted on July 20, 2022
This blog post has been inspired and based on the content published by Mathias Fußenegger in a series of articles on CrateDB internals. For more examples on how CrateDB executes queries please check the original posts.
The query language in CrateDB
CrateDB relies on SQL for data querying and manipulation. Using SQL as a query language reduces the learning curve, eases porting, and allows users to focus on query logic rather than dealing with low-level details of a distributed system. Furthermore, CrateDB also enables users to write UDFs (e.g., user-defined functions) to manipulate data where needed.
SQL statements are internally transformed into a series of operators that have to be applied in order, and every operator can only process its input data. For instance, WHERE
is one of the innermost operators, and SELECT
is applied almost at the end. The order of operators to be executed depends on the optimizer. As in other database systems, the optimizer in CrateDB applies optimization rules to the tree of operators or to a subset of the tree to re-write the tree into an equivalent variant that is cheaper to execute.
Learn how CrateDB generates execution plans, and the optimizations influence the order of operators. A better understanding of the execution engine in CrateDB can help you design queries that fully benefit from the scalability and performance properties of CrateDB.
Query compilation and execution
Logical plan
Generally speaking, a logical plan is the abstraction of all transformation steps needed to execute a query. To understand the logical plan in CrateDB execute the SQL query with the https://crate.io/docs/crate/reference/en/4.8/sql/statements/explain.html statement. For instance, consider the simple SELECT
statement:
EXPLAIN SELECT name FROM users;
-> Collect[doc.users | [name] | true]
In this case, the logical plan consists of a single Collect
operator. The parameters include doc.users
- the name of the table, name
- the attribute being collected, and true
- a query expression. Adding a filter in the WHERE
clause will only change the query expression as follows:
EXPLAIN SELECT name FROM users WHERE name = 'name';
-> Collect[doc.users | [name] | (name = 'name')]
This behavior is due to the implementation of the Collect
operator being quite robust in CrateDB, and there is currently no distinction between table scans or index lookup at the logical planner level.
With more complex queries, more complex logical plans get generated. For instance, consider subquery with SELECT
statement:
EXPLAIN SELECT name FROM (SELECT name FROM users WHERE name = 'name') AS n
WHERE n.name = 'foo';
And the corresponding logical plan:
Rename[name] AS n
└ Collect[doc.users | [name] | ((name = 'foo') AND (name = 'name'))]
The logical plan results in a Rename
operator. What’s interesting is that the inner WHERE
clause and the outer WHERE
clause got merged into (name = 'foo') AND (name = 'name')
and this expression is part of the Collect
operator. This happened due to filter pushdown optimization that tries to move predicates as far down the tree as possible to reduce the number of rows that need to be examined.
If the Collect
operator contains an expression, CrateDB will use the Lucene index if feasible. In the previous case, CrateDB will look for the terms foo
and name
in an inverted index. The inverted index maps the terms to a set of document IDs as illustrated in our previous article on Indexing and Storage in CrateDB. Using the Lucene index is significantly cheaper than loading rows and computing name = 'foo'
with the concrete name
value for each row.
As illustrated, EXPLAIN
prints a list of logical operators. Before executing the query, CrateDB transforms the logical plan into a physical execution plan.
Physical execution plan
A logical execution plan does not take into account the information about data distribution. CrateDB is a distributed database and data is sharded: a table can be split into many parts - so-called shards. Shards can be independently replicated and moved from one node to another. The number of shards a table can have is specified at the time the table is created.
The physical execution plan reasons where the data is located and how it is supposed to arrive at the node with which the client communicates. We refer to this node as the handler node because it handles the communication with the client and because it initiates the execution of the query. The nodes from which data is gathered are called collect nodes, and sometimes there are also merge nodes involved, which merge the results from collect nodes.
Most of the physical execution plans share the same building blocks, but how it is going to look depends on the exact query. In general, an execution plan consists of one or more execution phases. Each logical operator contains logic to either create an execution phase or to add additional transformations to an execution phase created by a child operator.
Let’s consider the physical execution plan for the simple SELECT name FROM users;
statement:
RoutedCollectPhase:
toCollect: [Ref{name}]
routing:
node-0:
users: [0, 1]
node-1:
users: [2, 3]
MergePhase:
executionNodes: [node-0]
numUpstreams: 2
The plan consists of CollectPhase
and a MergePhase
. In this example, the Collect
operator creates a CollectPhase
. CollectPhase
is a source, which means that it is supposed to output rows by reading them from disk or from memory. The Collect
operator creates this CollectPhase
by combining the information it holds itself with information from the cluster state. The cluster state is a snapshot that represents the current state of the cluster. This includes information like which tables exist, what columns they have, and on which node the different table shards are located.
Furthermore, The CollectPhase
can also contain a list of columns or expressions to retrieve, an optional filter expression, and an optional ordering expression.
The toCollect
property tells us which attributes should be collected and the routing
property tells us from where. The routing includes the shard IDs of the tables or partitions which should be queried and on which nodes they reside. In this case, the executor has to retrieve the data from node-0
and node-1
as both these nodes contain two shard copies of the table.
The MergePhase
is used to indicate that a node must merge data from all nodes involved in the CollectPhase. Usually, this merge phase is assigned to the handler node. In this case, it is node-0
and it is expecting results from two other nodes. In this scenario, node-0
is both the handler node and a collect node. It is expecting results from itself as well as from one other node.
Once the planner/optimizer finishes creating the physical execution plan, it executes it.
The execution
The execution layer looks at the routing information of the execution plan to figure out which nodes are involved in the query, and then sends each node the phases of the execution plan that they have to execute as part of a JobRequest
. Each node contains a handler that is responsible for accepting and processing these JobRequests
. To process them, they look at the phases of the plan and initiate their corresponding operations.
In the case of a CollectPhase
, this includes creating a Lucene query out of the filter expression (e.g., name = 'name'
), acquiring a Lucene reader for each shard, and iterating over the matching documents while applying all the transformations that are part of the phase before sending the result back to the handler node.
Finally, the results from execution phases might be pushed to merge nodes (e.g., node-0
in the previous example) before the final result is sent to the handler node and then back to the client.
Example: Query then fetch execution
Let’s extend the simple SELECT
query with a LIMIT
, and to make the example more realistic we also add a WHERE
clause and select an additional column. The query now looks like the following:
SELECT name, age FROM users WHERE name LIKE 'A%' LIMIT 10`
Let’s explore the logical plan with the EXPLAIN
statement:
Fetch[name, age]
└ Limit[50::bigint;0]
└ Collect[doc.users | [_fetchid] | (name LIKE 'A%')]
Starting from top to bottom, the plan contains the following operators:
The
Fetch
operator:Fetch
takes a relation as input and expects it to contain one or more_fetchid
columns. It uses these_fetchid
values to retrieve the value of other attributes, in this example, it retrievesname
andage
.The
Limit
operator.Limit
takes relation and limits it to at most 50 records.The
Collect
operator. TheCollect
operator in this example indicates that an attribute called_fetchid
should be retrieved from thedoc.users
table. The operator also includes thename LIKE 'A%'
query expression, indicating that only records matching this expression should be included._fetchid
is a system column that can be used by theFetch
operator.
As the data in CrateDB is distributed across several shards and nodes, CrateDB cannot accurately know up-front how many records each shard holds and how many of these records will match the name LIKE 'A%'
expression. Because of that, in this example CrateDB has to fetch at most 50 records from each node and then merge these together, stopping once the limit is hit.
The Fetch
operation uses the readerId
that is encoded into the _fetchid
to figure out which node it has to contact to get the actual value. The whole Fetch
operation works in batches, so multiple records are fetched at a time. It works a bit like an asynchronous flat map. The major advantage of this approach is that now each node only has to load the name
and age
values that are required. For instance, if data is stored on three nodes, the first node might load 16 records, the second node another 17, and the third node the remaining 17, making up the total 50.
What can we conclude?
This article gives the first introduction to how CrateDB executes distributed queries. We started with the overview of the logical plan, physical execution plan, and execution layer with a simple SELECT
statement. Then, we illustrate some of the optimizations and execution strategies with a more complex example involving the LIMIT
operator.
Know more about CrateDB here and check our official documentation. Get insights from our community.
To get more insights on how CrateDB executes other types of queries we refer to the original articles by Mathias Fußenegger.
Posted on July 20, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.