Limit concurrent asynchronous calls
YCM Jason
Posted on September 11, 2018
Although Javascript is designed to be single threaded, you could still do things concurrently.
For example, we can read multiple files concurrently.
const readFile = require('util').promisify(require('fs').readFile);
const readAllFiles = async (paths) => {
return await Promise.all(paths.map(p => readFile(p, 'utf8')));
}
However, reading files could be quite computationally expensive; if there are more than 10k paths, you will probably hear the fans on your machine speed up as your machine struggles. Your node server/program will respond significantly slower too as there are 10k+ file reading operations in the OS's thread-pool competing with the node server.
The solution is simple. Simply limit the number of file reading operations in the thread-pool. In another words, limit the number of concurrent calls to readFile
.
Let's define a generic function asyncLimit(fn, n)
which will return a function that does exactly what fn
does, but with the number of concurrent calls to fn
limited to n
. We will assume fn
returns a Promise
.
const asyncLimit = (fn, n) => {
return function (...args) {
return fn.apply(this, args);
};
};
Since we know that asyncLimit
returns a function that does whatever fn
does, we first write this out. Note that we don't use arrow function as fn
might need the binding to this
. Arrow function does not have it's own binding.
If you are not familiar with this
in Javascript, read my article explaining what is this
later. For now, just ignore it.
const asyncLimit = (fn, n) => {
let pendingPromises = [];
return function (...args) {
const p = fn.apply(this, args);
pendingPromises.push(p);
return p;
};
};
Since fn
returns a Promise
, we could keep track of the "process" of each call by keeping the promises they returns. We keep those promises in the list pendingPromises
.
const asyncLimit = (fn, n) => {
let pendingPromises = [];
return async function (...args) {
if (pendingPromises.length >= n) {
await Promise.race(pendingPromises);
}
const p = fn.apply(this, args);
pendingPromises.push(p);
return p;
};
};
We mark our returning function as async
, this enables us to use await
in the function. We only want to execute fn
only if there are less than n
concurrent calls going on. pendingPromises
contains all previous promises. So we can just check the pendingPromises.length
to find out how many concurrent calls there are.
If pendingPromises.length >= n
, we will need to wait until one of the pendingPromises
finishes before executing. So we added await Promise.race(pendingPromises)
.
const asyncLimit = (fn, n) => {
let pendingPromises = [];
return async function (...args) {
if (pendingPromises.length >= n) {
await Promise.race(pendingPromises);
}
const p = fn.apply(this, args);
pendingPromises.push(p);
await p;
pendingPromises = pendingPromises.filter(pending => pending !== p);
return p;
};
};
We want to get rid of the promise in the pendingPromises
once they are finished. First we execute fn
, and it returns p
. Then we add p
to the pendingPromises
. After this, we can do await p
; p
will be finished after this line. So we simply filter
out p
from pendingPromises
.
We are almost done. Let's recap what we are doing here:
if pendingPromises.length < n
- we call
fn
and obtain the promisep
- push
p
ontopendingPromises
- wait
p
to finish - remove
p
frompendingPromises
- return p
if pendingPromises.length >= n
, we will wait until one of the pendingPromises
resolves/rejects before doing the above.
There is one problem with our code tho. Let's consider the following:
const f = limitAsync(someFunction, 1);
f(); // 1st call, someFunction returns promise p1
f(); // 2nd call, someFunction returns promise p2
f(); // 3rd call, someFunction returns promise p3
The first call goes perfectly and pendingPromises.length
becomes 1
.
Since pendingPromises.length >= 1
, we know that both 2nd and 3rd call will be calling await Promise.race([p1])
. This means that when p1
finishes, both 2nd and 3rd calls will both get notified and executes someFunction
concurrently.
Put it simple, our code does not make the 3rd call to wait until the 2nd call has finished!
We know that 2nd call will get notified first and resumes from await Promise.race([p1])
. 2nd call executes someFunction
and pushes its promise to pendingPromises
, then it will do await p
.
As 2nd call does await p
, 3rd call will resume from await Promise.race([p1])
. And here is where the problem is. The current implementation allow the 3rd call to execute someFunction
and blah blah blah that follows.
But what we want is that the 3rd call would check pendingPromises.length >= n
again and do await Promise.race([p2])
. To do this, we could simply change if
to while
.
So the final code would be:
const asyncLimit = (fn, n) => {
let pendingPromises = [];
return async function (...args) {
while (pendingPromises.length >= n) {
await Promise.race(pendingPromises).catch(() => {});
}
const p = fn.apply(this, args);
pendingPromises.push(p);
await p.catch(() => {});
pendingPromises = pendingPromises.filter(pending => pending !== p);
return p;
};
};
Notice that I have added .catch(() => {})
to the Promise.race
and await p
. This is because we don't care if the promise resolves or rejects, we just wanna know if they are finished.
I have publish this to npm if you wish to use. Here is the github link if you want to see how I added tests for this function.
What do you think? Did you follow the tutorial?
EDIT:
- removed
async
forasyncLimit
. Thanks to @benjaminblack
Posted on September 11, 2018
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.