Fetching large results sets from CrateDB

marijaselakovic

marijaselakovic

Posted on January 19, 2023

Fetching large results sets from CrateDB

As a distributed database system with support for the standard SQL query language, CrateDB is great to run aggregations server-side, working on huge datasets, and getting summarized results back; there are however cases where we may still want to retrieve lots of data from CrateDB, to train a machine learning model for instance.

CrateDB collects results in memory before sending them back to the clients, so trying to run a SELECT statement that returns a very large result set in one go can trigger circuit breakers or result in an OutOfMemoryError, and getting all results in a single operation can also be a challenge client-side, so we need a mechanism to fetch results in manageable batches as we are ready to process them.

One option, for cases where we are looking at a single table, and we know already that we need all the records that satisfy a condition, is to do a bulk export with the COPY TO command which accepts a WHERE clause. It happens, however, that in many cases we may want to run more complex queries, or simply storing the results in files may not fit well into our application.

The case for pagination

A common requirement is also what is called pagination, which is to present results to users with pages with a set number of results each, allowing them to move between these pages. In this case, it is common that many users will only look at the first few pages of results, so we want to implement that in the most efficient way possible.

Let’s imagine we have a table called “observations” with the following data:

ts device reading
2021-10-14T09:39:19 dev2 -1682
2022-02-02T00:33:47 dev1 827
2022-06-11T21:49:53 dev2 -713
2022-06-29T23:23:28 dev1 1059
2022-07-01T09:22:56 dev2 -689
2022-07-10T02:43:36 dev2 -570
2022-09-18T03:28:02 dev1 303
2022-10-14T20:34:10 dev1 1901

We will work with very small number of records here to visualize how different techniques work but imagine that we have thousands or even millions of records. In particular, I will show examples here of retrieving 2 rows at a time but depending on the use case you would probably be looking at retrieving 50, 1000, or even 5000 rows at a time.

Using LIMIT + OFFSET

SELECT date_format(ts),device,reading 
FROM doc.observations 
WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00'
ORDER by ts
LIMIT 2;
Enter fullscreen mode Exit fullscreen mode

This returns:

+-----------------------------+--------+---------+
| date_format(ts)             | device | reading |
+-----------------------------+--------+---------+
| 2022-02-02T00:33:47.000000Z | dev1   |     827 |
| 2022-06-11T21:49:53.000000Z | dev2   |    -713 |
+-----------------------------+--------+---------+
Enter fullscreen mode Exit fullscreen mode

We could then re-issue the query with LIMIT 2 OFFSET 2 and we would get:

+-----------------------------+--------+---------+
| date_format(ts)             | device | reading |
+-----------------------------+--------+---------+
| 2022-06-29T23:23:28.000000Z | dev1   |    1059 |
| 2022-07-01T09:22:56.000000Z | dev2   |    -689 |
+-----------------------------+--------+---------+
Enter fullscreen mode Exit fullscreen mode

There are a number of issues to be aware of with this approach.

Each new query is considered a new request and looks at current data. Consider what happens if the observation for 11 June 2022 above were to be deleted after we run the first query, but before we run the second one with OFFSET 2. Skipping 2 rows we are now skipping the observation from 29 June 2022, and the users will never see it.

Another issue is that there is not always an efficient way for CrateDB to skip the rows so, for certain queries, as the OFFSET value goes up, we may find that execution times grow larger and larger as the engine is actually going through the rows that need to be skipped and just discarding them server-side.

Using LIMIT with a WHERE clause on a watermark field

Continuing from the example above, after we get the initial 2 rows, instead of using OFFSET 2 we could run a query like this:

SELECT date_format(ts),device,reading 
FROM doc.observations 
WHERE ts > '2022-06-11T21:49:53.000000Z' AND ts <='2022-10-01 00:00'
ORDER by ts
LIMIT 2;
Enter fullscreen mode Exit fullscreen mode

That 11 June value is the last value we observed so far on the ts column that in this case, we know to be always increasing, this approach is very efficient, but it can only be used if there is a suitable field in the data which is not always the case.

Also compared to the LIMIT + OFFSET approach we discussed earlier, we cannot use this to let the users jump to a given page of results without first having obtained all the results for the previous pages, we cannot for instance jump directly to page 10 as we do not know what is the last reading of ts at page 9.

Some people call this approach above “cursor pagination”, but the most common concept behind “cursors” is something a bit different which we are going to discuss now.

Cursors

A cursor is like having a bookmark pointing to a specific record in the result set of a query, this is a generic approach that is implemented efficiently and does not require us to have a special anchor/watermark column.

In CrateDB we can use cursors at the protocol level or with SQL commands.

Cursors in CrateDB are INSENSITIVE, meaning that the client can take all the time it needs to retrieve the results, and the data will always reflect the status of the tables as it was at the time the cursor was declared, ignoring any records that were updated, deleted, or newly inserted.

Using cursors in Python

In Python one way to work with cursors is with asyncpg, taking advantage of CrateDB's compatibility with the PostgreSQL wire protocol.
First, we need to install the library:

pip install asyncpg
Enter fullscreen mode Exit fullscreen mode

Then we can use it like this:

import asyncio
import asyncpg

# If you are using jupyter-notebook 
# remove this function definition line and the indentation in the block of code that follows
async def main():
    # we will then establish a connection    
    conn = await asyncpg.connect(host='localhost', user='crate')

    # and we need a “transaction” context, 
    # there are no transactions as such in CrateDB, 
    # but this gives a scope where the cursor lives:  
    async with conn.transaction():

        # and now we can declare the cursor 
        # specifying how many rows we want asyncpg to fetch at a time from CrateDB, 
        # and we can iterate over the results: 
        query = "SELECT ts,device,reading FROM doc.observations WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00';"
        async for record in conn.cursor(query, prefetch=1000):
            print(record)

# Remove this line if you are using jupyter-notebook
asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Just to clarify, our Python code works with one record at a time, but behind the scenes asyncpg is requesting 1000 records at a time from CrateDB.

Using cursors in Java

In Java, we can use the PostgreSQL JDBC driver.
In a Maven project add this to your pom.xml:

<dependencies>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.5.0</version>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

Then we can use it like this:

import java.sql.*;
/* ... */   
/* we first establish a connection to CrateDB */
try (Connection conn = DriverManager.getConnection("jdbc:postgresql://localhost/", "crate","")) {
    try (Statement st = conn.createStatement()) {
        /* We will then open the cursor 
         * defining how many rows we want to retrieve at a time, 
         * in this case 1,000: 
         */
        st.setFetchSize(1000);
        String query = "SELECT ts,device,reading ";
        query += "FROM doc.observations ";
        query += "WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00';"; 
        try (ResultSet resultSet = st.executeQuery(query)) {
            /* and while there are rows available, we will iterate over them: */
            while (resultSet.next()) {                      
                System.out.println(resultSet.getDate("ts").toString());                     
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This works like in the Python case above, in our Java code we see one row at a time, but the rows are retrieved from CrateDB 1,000 at a time and kept in memory on the client side.

Using cursors with SQL commands

An approach that works with all clients is to use SQL commands supported since CrateDB 5.1.0.

First, we need to issue this command:

BEGIN;
Enter fullscreen mode Exit fullscreen mode

This is a SQL command that would normally start a transaction, there are no transactions as such in CrateDB, but this will create a scope on which cursors can be created.

DECLARE observations_cursor NO SCROLL CURSOR FOR 
    SELECT ts,device,reading 
    FROM doc.observations 
    WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00';
Enter fullscreen mode Exit fullscreen mode

This associates a cursor name with a query and determines the point in time at which data is “frozen” from the point of view of the cursor.

FETCH 10 FROM observations_cursor;
Enter fullscreen mode Exit fullscreen mode

This retrieves 10 rows from the query, and when issued again it will retrieve the next 10 rows and so on. We can retrieve a different number of records each time and we know we have reached the end of the result set when FETCH returns zero rows.

Once the cursor is not needed anymore it can be closed with either END;, CLOSE ALL;, CLOSE observations_cursor;, COMMIT;, COMMIT TRANSACTION;, or COMMIT WORK;.

Take a look at this short animation showing an example of how this works: Mathias Fußenegger on Twitter

We hope you find this useful, and we will be happy to hear about your experience in the Community.

💖 💪 🙅 🚩
marijaselakovic
marijaselakovic

Posted on January 19, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related