Async iterators in JS - database iteration in NodeJS

kmoskwiak

Kasper Moskwiak

Posted on February 24, 2020

Async iterators in JS - database iteration in NodeJS

In this article I will give a quick example how to process entire database (let it be MongoDB, Elasticsearch or RDBS like PostgreSQL) using modern JavaScript tools. We will combine async/await, iterators and function generator in a simple yet powerful pattern which can be used in many cases.

See working example on Runkit.

The problem

Our case is to process entire database (table in RDB, collection in Mongo) entry by entry.

The easiest way would be to select all documents, put them into runtime memory and do all operations. However that’s not the answer, especially when the dataset is large and our memory limited.

Classic approach: limit and offset

Limit and offset
There is an option to use limit combined with offset in SQL:

SELECT * FROM my_table LIMIT 50 OFFSET 100;
Enter fullscreen mode Exit fullscreen mode

which in Mongo would look like this:

db.my_collection.find({}).skip(100).limit(50)
Enter fullscreen mode Exit fullscreen mode

Note that in most databases the performance of this technique changes based on offset value. The bigger the offset is the slower the query will become. This is because in most cases database will process all skipped entries before returning those which interest us. Due to this obstacle some databases specify maximum value of offset, so it may not be possible to process entire database without some extra tweaking with database settings.

Stateful cursors

We could take advantage of stateful technique using cursors. Cursor is an object created by database and kept in its memory. It remembers context of a query, e.g. current state of pagination. Usually we have to create cursor and then use some reference to it in further queries.

Cursors

Note that in some databases cursor has its lifetime. For example in Elasticsearch we can specify expiry time for search context in scroll parameter.

You may also come across limitation of maximum amount of simultaneously opened cursors.

An alternative: range query

Next technique - range query - has couple of interesting features.

  • It is stateless. That means you do not have to care for state expiration or state synchronization between all database nodes in your network.
  • It has constant and controllable memory usage. That means it can be successfully used on low memory machines and its performance does not vary on dataset size or pagination progress.
  • It is not database specific. After all it is just an approach how to construct a query so it can be used in most databases.

Range query is very similar to combination of limit and offset. However instead of providing number of documents to be skipped we specify boundary condition which eliminates already processed documents.
In example dataset (visualized below) documents are sorted ascending by id. Condition id > 16 skips 6 documents with ids: 1, 2, 11, 13, 14, 16. Result is identical to providing offset equal 6.

Range query

Iterating over dataset in batches of size 6 comes down to 3 steps:

  1. request first 6 documents and remember id of last document (16),
  2. request next 6 documents with condition id > 16, remember last id (35),
  3. request next 6 documents with condition id > 35. Note that this time only 3 documents were returned which means it is the end of our dataset.

Things to remember:

  • dataset must be sorted by key which our condition is applied to,
  • to achieve best performance and keep constant memory usage field used in condition should be indexed,
  • also values under this field must be unique. If not range query may “lose” some documents.

We will use range query in our example.

Ooooh yeah! Can do!

Async iteration in JavaScript

We will be using async-await combined with functions generator and custom iterator.
This combination is called async iteration and its proposal can be found here. However individual parts were in NodeJS earlier the whole combo can be used inside for loop since version 10.

In the end we want to be able to use for-loop to synchronously iterate over database.

In code snippets in this article I assume we have Database object in scope with method select, which returns Promise resolving to array of documents. It accepts as argument object with two properties: idGreaterThan - which determines condition in database, and limit - the number of returned documents, e.g db.select({ idGreaterThan: 6, limit: 2}) is equivalent to SQL query SELECT * FROM exoplanets WHERE id > 6 ORDER BY id ASC LIMIT 2. In this article I have omitted implementation for Database class however simple mock can be found in this RunKit Notebook. This notebook contains also working example of code we are about to write.

readDocuments function generator

Generators are functions which can be exited and later re-entered. Their context (variable bindings) will be saved across re-entrances.
(source: function* - JavaScript | MDN)

MDN Web Docs gives excellent explanation about function generators and iterators in Javascript. For sake of this article what we must know is that generators after being called return an iterator. And iterator is an object which can be used in for..of loop.

Let's write our generator readDocuments.

/**
* Iterate over database
* @generator
* @function readDocuments
* @param {number} limit maximum number of documents
* @yields {array} list of planets
*/
async function* readDocuments(limit) {
   const db = new Database();
   let lastId = 0; // initialize with min value
   let done = false; // indicates end of iteration

   while(!done) {
       // select batch of documents and wait for database response
       // TODO: error handling
       const result = await db.select({
           idGreaterThan: lastId,
           limit: limit
       });

       // get id of last document
       lastId = result[result.length - 1].id;

       // end iteration if there are less documents than limit
       if(result.length < limit) {
           done = true;
       }

       // yield result
       yield result
   }
};
Enter fullscreen mode Exit fullscreen mode

Notice two important things in the code above: readDocuments is declared both with async keyword and function* expression. This function is a hybrid of asynchronous function and function generator and it has power of both. Let's analyze it from two perspectives.

As an async function it allows us to await an asynchronous database operation. This means that the while loop will behave synchronously. Each select from database will be executed only after the previous has been finished.

Instead of returning result we yield it. That's a part of being the function generator. As I mentioned above, generators return an iterator which can be used in for loops. Each time generator yields something program breaks up from function and goes to body of loop.

This brings us to well known for..of loop, but in async variation.

async function run() {
    // We need to place our loop inside another async function 
    // so we can use await keyword

    for await (let documents of readDocuments(4)) { 
        // This for loop behaves synchronously. 
        // Next iteration will begin after execution of code inside this loop
        await doSomethingWithDocuments(documents);
    }

}
run();
Enter fullscreen mode Exit fullscreen mode

That’s it! In Runkit example I just log planets to console every iteration. You should see output similar to the one below.

DATABASE:  SELECT * FROM exoplanets ORDER BY id ASC LIMIT 4
APP: Got 4 items from database: Proxima Centauri b, Gliese 667 Cc, Kepler-442b, Kepler-452b. Done: false

DATABASE:  SELECT * FROM exoplanets WHERE id > 7 ORDER BY id ASC LIMIT 4
APP: Got 4 items from database: Wolf 1061c, Kepler-1229b, Kapteyn b, Kepler-62f. Done: false

DATABASE:  SELECT * FROM exoplanets WHERE id > 14 ORDER BY id ASC LIMIT 4
APP: Got 4 items from database: Kepler-186f, Luyten b, TRAPPIST-1d, TRAPPIST-1e. Done: false

DATABASE:  SELECT * FROM exoplanets WHERE id > 18 ORDER BY id ASC LIMIT 4
APP: Got 4 items from database: TRAPPIST-1f, TRAPPIST-1g, LHS 1140 b, Kepler-1638b. Done: false

DATABASE:  SELECT * FROM exoplanets WHERE id > 24 ORDER BY id ASC LIMIT 4
APP: Got 1 items from database: Teegarden c*. Done: true
Enter fullscreen mode Exit fullscreen mode

I hope you enjoyed this article. In next article in this series I will show how to use this tool to read large files when memory is limited. Stay tuned!


Acks 🦄

💖 💪 🙅 🚩
kmoskwiak
Kasper Moskwiak

Posted on February 24, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related