Multithreading with Worker Threads in Node.js
Camilo Reyes
Posted on July 12, 2023
The beauty of Node is that while the main event loop runs in a single thread, some expensive tasks are automatically offloaded to a separate worker pool. These include the I/O-intensive DNS and File System (except fs.FSWatcher()
) APIs, as well as the CPU-intensive Crypto and Zlib APIs. All these have a synchronous variation which will block the main thread.
This is generally how Node accomplishes concurrency while running on a single thread. However, our code can contain some computationally complex tasks that will typically block the main thread.
This is where multithreading comes in handy because we can simply run such tasks in a worker thread. The main event loop will keep running and continuously check for incoming events or requests.
In this take, we will explore the ins and outs of multithreading in Node, including some potential pitfalls.
The Basics of Multithreading in Node.js
You can find the completed project that we will build on GitHub.
Also, if you want to learn more about the basics of multithreading before you dive into this post, check out An Introduction to Multithreading in Node.js.
Now, let's get started with the tutorial.
Starting Our Project: Subset Sum Algorithm
To get started, create a new project folder and generate a package.json
file.
> mkdir node-worker-threads
> cd node-worker-threads
> npm init
Be sure to edit the package.json
file and change the project type to module
so that we can use the import/export syntax.
Next, write a CPU-bound algorithm that captures matches when the sum of the subsets equals 0. This is often referred to as the subset sum problem, and it is CPU intensive because it can take a long time to calculate large arrays.
Algorithmic optimizations are not the main focus here, so we can implement the code as succinctly as possible.
Because the output can be very large for big arrays, we will implement event emitters to send matches back in chunks. This way, we avoid sending the end result in one big blob of data (which can cause a block).
Put this code in a file named subsetSum.js
:
import { EventEmitter } from "events";
export class SubsetSum extends EventEmitter {
#sum = 0;
#set = [];
constructor(sum, set) {
super();
this.#sum = sum;
this.#set = set;
}
start() {
this.#combine(this.#set, []);
this.emit("end");
}
#combine(set, subset) {
for (let i = 0; i < set.length; i++) {
const newSubset = subset.concat(set[i]);
this.#combine(set.slice(i + 1), newSubset);
this.#processSubset(newSubset);
}
}
#processSubset(subset) {
const res = subset.reduce((prev, item) => prev + item, 0);
if (res === this.#sum) {
// send matches piecemeal
this.emit("match", subset);
}
}
}
Note that processSubset
and combine
are marked as private methods. This is because we do not need to expose these members outside of the class. Everything in SubsetSum
can be interfaced via the public start
method and the constructor.
Build the API Endpoint for Your Node Application
Now, build the API endpoint to play with this algorithm in Node. We can have two endpoints:
- One to check if the web server is still alive.
- The other to kick off the CPU-heavy calculation.
Put this code in index.js
:
import { createServer } from "http";
import { SubsetSum } from "./subsetSum.js";
createServer((req, res) => {
const url = new URL(req.url, "http://localhost");
if (url.pathname !== "/subsetSum") {
res.writeHead(200);
return res.end("I'm still here!\n");
}
const sum = 0;
const data = JSON.parse(url.searchParams.get("data"));
res.writeHead(200);
const subsetSum = new SubsetSum(sum, data);
subsetSum.on("match", (match) => {
res.write(`Match: ${JSON.stringify(match)}\n`);
});
subsetSum.on("end", () => res.end());
subsetSum.start()?.catch((err) => console.error(err));
}).listen(8080, () => console.log("Server ready on port 8080"));
Next, fire up cURL with the web server running and check the results.
> curl -G http://localhost:8080/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115,116,119,101,101,-116,109,101,-105,-102]"
If this takes too long to execute on your machine, simply narrow the length of the input array. While the algorithm is running, hit it with the health check endpoint. The health endpoint is http://localhost:8080
and returns a 200 with a message. Notice the web server chokes and does not respond until it finishes the calculation.
Also, the output comes out in one giant blob, even though we put event emitters in place. This is because CPU-bound operations block the main event loop. The event emitters do not get to send piecemeal data back to the main thread until the algorithm finishes.
To tackle this bottleneck, we will look at worker threads next.
Worker Pool and Worker Threads in Node
First off, install the NPM package called workerpool
because this minimizes the amount of code needed to put worker threads in place.
npm i workerpool --save
The implementation is twofold: a worker pool and a worker thread. The worker pool limits the number of worker threads firing side-by-side. The worker thread executes our algorithm in a separate thread outside of the main event loop and proxies emitted events.
Let’s tackle the worker thread first. Put this in subsetSumWorker.js
:
import workerpool from "workerpool";
import { SubsetSum } from "./subsetSum.js";
function subsetSumWorker(sum, set) {
const subsetSum = new SubsetSum(sum, set);
subsetSum.on("match", (data) => {
workerpool.workerEmit({ event: "match", data });
});
subsetSum.on("end", (data) => {
workerpool.workerEmit({ event: "end", data });
});
subsetSum.start();
}
// register worker thread
workerpool.worker({
subsetSum: subsetSumWorker,
});
This mostly acts as a wrapper around SubsetSum
and rebroadcasts events via workerEmit
. Lastly, the worker
registers the thread worker in the pool.
To queue up the worker thread in the worker pool, put the following code in subsetSumPool.js
:
import { EventEmitter } from "events";
import { dirname, join } from "path";
import { fileURLToPath } from "url";
import workerpool from "workerpool";
const __dirname = dirname(fileURLToPath(import.meta.url));
const workerFile = join(__dirname, "subsetSumWorker.js");
const pool = workerpool.pool(workerFile);
export class SubsetSum extends EventEmitter {
#sum = 0;
#set = [];
constructor(sum, set) {
super();
this.#sum = sum;
this.#set = set;
}
async start() {
await pool.exec("subsetSum", [this.#sum, this.#set], {
// retransmit event
on: (payload) => this.emit(payload.event, payload.data),
});
}
}
Much like before, the worker pool acts as a wrapper around the worker thread. All emitted events get retransmitted via the on
property in the exec
method. This is how the main thread can listen for incoming events.
Finally, in the web server index.js
file, we only need a single line change:
import { SubsetSum } from "./subsetSumPool.js";
With worker threads working side-by-side, the matches stream rapidly through the console. If we hit the health check concurrently, we get an immediate response because nothing is blocking the main thread.
This implementation uses the default configuration with all CPU cores available minus one, which the main thread can take up.
Keep in mind that worker threads are an abstraction around the underlying OS threads running on multiple CPU cores. We do not get direct control over the low-level details here, but this is how Node achieves multithreading via worker threads.
The key takeaway here is that our algorithm is not blocking the main event loop and fires more than a single CPU to get the job done.
Pitfalls of Multithreading in Node.js
Slap a console.log
somewhere in the algorithm — say, inside the if
branch where it finds a match.
Change this code in subsetSum.js
:
#processSubset (subset) {
const res = subset.reduce((prev, item) => (prev + item), 0)
if (res === this.#sum) {
// blocks!
console.log('match found ' + subset)
this.emit('match', subset)
}
}
Give this change a spin and notice how the main thread completely blocks as soon as the algorithm finishes. Attempting to hit the health check will also simply block until all the console messages execute.
This is because the main thread attempts to marshal console.log
messages synchronously and is part of the worker thread design.
A recommendation is to send debug information back to the main thread using an event emitter like debug
instead. Then listen for this event and call console.log
directly from the main thread.
For example:
// from worker thread - subsetSum.js
this.emit("debug", "match found " + subset);
// from worker pool - subsetSumWorker.js
subsetSum.on("debug", (data) => {
workerpool.workerEmit({ event: "debug", data });
});
// from main thread - index.js
subsetSum.on("debug", (message) => console.log(message));
This will keep broadcasting messages between the worker thread and the main thread asynchronously, so nothing is blocked.
Wrapping Up
In this post, we explored:
- What happens to CPU-heavy algorithms in Node.
- How to offload work from the main thread via worker threads and some gotchas.
In Node, worker threads are an exciting new way to offload CPU-bound calculations from the main thread. This unblocks the event loop so other tasks can run concurrently.
Happy multithreading!
P.S. If you liked this post, subscribe to our JavaScript Sorcery list for a monthly deep dive into more magical JavaScript tips and tricks.
P.P.S. If you need an APM for your Node.js app, go and check out the AppSignal APM for Node.js.
Posted on July 12, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.