Concurrency in JavaScript and the power of Web Workers

olyop

Oliver Plummer

Posted on April 3, 2024

Concurrency in JavaScript and the power of Web Workers

Introduction

Recently I stumbled upon a problem while working on a browser application I was building where it seemed I ran into the limits of working in JavaScript's single threaded execution environment.

My application needed to process potentially thousands of files all in the browser to scan their metadata to save locally to IndexedDB. The problem was that the library function I was using to process the files with was quite an expensive call which consequently blocked the browser and most of the interactivity with the UI. This is not acceptable in a UI where the user may want to continue using the application in a performant way or in my case the ability to cancel the file processing.

Now the first thing all JavaScript developers learn is that JavaScript runs in a single execution/threaded environment. Whether they are starting out in Node.js or writing front-end browser applications we are told that this is by design as the event loop (in Node.js) or the browser engine will handle long running tasks and queue them accordindly. This is the reason the page blocks on expensive synchronous tasks.

Since I needed to process the files locally, and uploading them to a server to process them was not an option, this is where I thought of Web Workers. They allow you to launch a seperate thread where you can run your expensive function calls freeing the main window thread.

Web Workers Introduction

Here are the basics of using web workers in the browser with an implmentation left out:

// main.ts

// Create a new worker
const worker = new Worker("my-worker.js");

// Listen for responses from the worker
worker.addEventListener("message", event => {
  const data = event.data as MyResponse;

  // do something with the response
});

let message: MyCustomMessage;

// ...

// Sending a message to the worker thread
worker.postMessage(message);

// ... later in the program
worker.terminate();
Enter fullscreen mode Exit fullscreen mode
// my-worker.ts

addEventListener("message", event => {
  const data = event.data as MyMessage

  // ...process result

  let response: MyCustomRepsonse;

  // Send the response back to the main window thread
  postMessage(response);
});
Enter fullscreen mode Exit fullscreen mode

The best way to think about Web Workers and how to utilize them is to think of them as just a plain old function. You send the worker a message (function arguments), the worker processes the input (worker file), then returns a response (function return statement).

You can combine the web worker creation, sending, and message receiving into one function that returns a promise. Or you can launch a web worker that sits in the background waiting for messages. We should note that initiating a web worker (new Worker(...)) is a costly operation but either way just make sure we terminate the worker after use.

Here is a example of using a Web Worker that checks if a number is prime of not.

// main.ts

function isPrime(value: number) {
  return new Promise<boolean>(resolve => {
    const worker = new Worker("is-prime-worker.js");

    worker.addEventListener("message", event => {
      const data = event.data as boolean;

      resolve(data);

      worker.terminate();
    });

    worker.postMessage(value);
  });
}
Enter fullscreen mode Exit fullscreen mode
// is-prime-worker.ts

addEventListener("message", event => {
  const data = event.data as number;

  postMessage(isPrime(data));
});

function isPrime(value: number) {
  for (let i = 2, s = Math.sqrt(num); i <= s; i++) {
    if (num % i === 0) return false;
  }
  return num > 1;
}
Enter fullscreen mode Exit fullscreen mode

Concurrency

Concurrency is therefore possible in JavaScript if we use multiple web workers, with the main window thread acting as the communicator for all worker threads.

Concurrent web worker basic setup

When building my application I needed to send an array of ArrayBuffer's to the web workers for them to process and send back a result.

Since the time to process is dependent on an external thread we cannot use a loop but a recursive function. What I mean is the worker.onmessage event will how we trigger the next item to be processed. Let me explain below:

Concurrent web worker setup

First of all we need a pool of workers. The first question I had was how many to create? Well for optimal performance we should utilize all threads the user has available. Luckily the browser provides a global property for this and can be accessed via navigator.hardwareConcurrency. Since we already have one being used and we wish not to block it, use navigator.hardwareConcurrency - 1.

const MAX_WORKERS = navigator.hardwareConcurrency - 1;
Enter fullscreen mode Exit fullscreen mode

Now let's create the pool with each worker having a flag that specifies when it is currently processing.

interface WorkerPoolItem {
  value: Worker;
  isProcessing: boolean;
}

const pool: WorkerPoolItem[] = [];

for (let i = 0; i < MAX_WORKERS; i += 1) {
  const worker: WorkerPoolItem = {
    isProcessing: false,
    value: new Worker(workerURL),
  };

  pool.push(worker);
}
Enter fullscreen mode Exit fullscreen mode

Now we will create a function called batchProcessor. This function will be responsible for finding free workers in the pool and sending work to be processed. Please note this function mutates the work array passed in because of the use of pop() but you could of course do this many other ways.

function batchProcessor<T>(
  pool: WorkerPoolItem[],
  work: T[],
  onComplete: () => void,
) {
  const batch: [T, WorkerPoolItem][] = [];

  // find available workers
  const availableWorkers =
    pool.filter(worker => !worker.isProcessing);

  // assign work to available workers
  for (const worker of availableWorkers) {
    const value = work.pop();

    // if value is undefined this means
    // there is no more work to do
    if (value === undefined) break;

    batch.push([value, worker]);
  }

  // no work left?
  if (batch.length === 0) {
    // are any workers still processing?
    if (pool.every(worker => !worker.isProcessing)) {
      // now we are done
      onComplete();
    }

    // always return as there is no more work to do
    return;
  }

  // send work to workers
  for (const item of batch) {
    const [value, worker] = item;

    // mark worker as processing
    worker.isProcessing = true;

    // send work to worker
    worker.value.postMessage(value);
  }
}
Enter fullscreen mode Exit fullscreen mode

The first time this function is called on the pool it will get all workers started with work to do. As we will get to see this function will be called recursively and it has to handle the initial case (where all workers are free) and the ending case (where there is more workers free than work left to do). It also handles another scenario which will get into where we may want to do some processing before we send out a message to a worker and during this time multiple workers may have completed and we have a batch of work to send out - this is because of the way the event loop works.

Combining the code for the worker pool initilization and the batchProcessor function we get a function concurrentProcessor that processes an array of arbitrary values by concurrently sending it to a pool of workers. Note how the worker.onmessage event is what triggers the rest of the data to be processed.

async function concurrentProcessor<T, V>(
  workerURL: string,
  work: T[],
  onProcess: (value: V) => void,
) {
  const pool: WorkerPoolItem[] = [];

  try {
    // promisify the recursive function
    await new Promise<void>(resolve => {
      // initialize worker pool
      for (let i = 0; i < MAX_WORKERS; i += 1) {
        const worker: WorkerPoolItem = {
          isProcessing: false,
          value: new Worker(workerURL),
        };

        // on recieve message from worker
        worker.value.addEventListener("message", event => {
          onProcess(event.data as V);

          worker.isProcessing = false;

          // Since a worker has just completed processing
          // a value, we can start processing another batch
          batchProcessor(pool, work, resolve);
        });

        pool.push(worker);
      }

      // start processing first batch of work
      batchProcessor(pool, work, resolve);
    });
  } finally {
    // terminate workers
    for (const worker of pool) {
      worker.value.terminate();
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Final Implementation

This implementation extends the functionality a bit further by allowing us to process the value before sending it out to the worker.

GitHub

npm install concurrent-processor
Enter fullscreen mode Exit fullscreen mode

In my case I needed to send the ArrayBuffer of a File object to the worker but I did not want to have an array of ArrayBuffer's as this would use far to much memory.

const workerURL = "metadata-scanner.js";

const values: File[] = [
  // ...
];

// to be passed into options.onRead
function onRead(file: File): Promise<ArrayBuffer> {
  return file.arrayBuffer();
}

type MyData = unknown;

const data: MyData[];

function onProcess(value: MyData) {
  data.push(data);
}

await concurrentProcessor<File, MyData, ArrayBuffer>({
  workerURL,
  onRead,
  onProcess,
  values,
});
Enter fullscreen mode Exit fullscreen mode
// metadata-scanner.js

addEventListener("message", event => {
  const data = event.data as ArrayBuffer;

  postMessage(scanMetadata(data));
});

function scanMetadata(data: ArrayBuffer): MyData {
  // ...
}
Enter fullscreen mode Exit fullscreen mode
// concurrent-processor.ts

const MAX_WORKERS = navigator.hardwareConcurrency - 1;

export async function concurrentProcessor<T, V, R = T>(
  options: ConcurrentProcessorOptions<T, V, R>,
) {
  const pool: WorkerPoolItem[] = [];

  try {
    // promisify the recursive function
    await new Promise<void>(resolve => {
      // initialize worker pool
      for (let i = 0; i < MAX_WORKERS; i += 1) {
        const worker: WorkerPoolItem = {
          isProcessing: false,
          value: new Worker(options.workerURL, options.workerOptions),
        };

        worker.value.addEventListener("message", event => {
          options.onProcess(event.data as V);

          worker.isProcessing = false;

          // Since a worker has just completed processing
          // a value, we can start processing another batch
          void batchProcessor<T, V, R>(pool, options, resolve);
        });

        pool.push(worker);
      }

      // Start processing first batch of work
      void batchProcessor<T, V, R>(pool, options, resolve);
    });
  } finally {
    // terminate workers
    for (const worker of pool) {
      worker.value.terminate();
    }
  }
}

async function batchProcessor<T, V, R = T>(
  pool: WorkerPoolItem[],
  options: ConcurrentProcessorOptions<T, V, R>,
  onComplete: () => void,
) {
  const batch: [T, WorkerPoolItem][] = [];

  // find available workers
  const availableWorkers = pool.filter(worker => !worker.isProcessing);

  // assign work to available workers
  for (const worker of availableWorkers) {
    const value = options.values.pop();

    // if value is undefined this means there is no more work to do
    if (value === undefined) break;

    batch.push([value, worker]);
  }

  // no work left?
  if (batch.length === 0) {
    // are any workers still processing?
    if (pool.every(worker => !worker.isProcessing)) {
      // now we are done
      onComplete();
    }

    // always return as there is no more work to do
    return;
  }

  // mark workers as processing
  // we should do this here as below we may have to wait for promises to resolve
  // and we don't want another batch to be processed in the meantime
  for (const item of batch) {
    const worker = item[1];

    worker.isProcessing = true;
  }

  let values: [R, WorkerPoolItem][] = [];

  if (options.onRead) {
    const promises: [Promise<R>, WorkerPoolItem][] = [];

    for (const item of batch) {
      const [value, worker] = item;

      const result = options.onRead(value);

      if (result instanceof Promise) {
        promises.push([result, worker]);
      } else {
        values.push([result, worker]);
      }
    }

    if (promises.length > 0) {
      const promiseValues = await Promise.all(promises.map(([promise]) => promise));

      for (let i = 0; i < promiseValues.length; i += 1) {
        // @ts-expect-error
        values.push([promiseValues[i], promises[i][1]]);
      }
    }
  } else {
    // if there is no onRead function, it just passes the values through
    values = batch as unknown as [R, WorkerPoolItem][];
  }

  // send work to workers
  for (const item of values) {
    const [value, worker] = item;

    if (options.transfer && isTransferable(value)) {
      worker.value.postMessage(value, [value]);
    } else {
      worker.value.postMessage(value);
    }
  }
}

function isTransferable(value: unknown): value is Transferable {
  return (
    value instanceof ArrayBuffer ||
    value instanceof OffscreenCanvas ||
    value instanceof ImageBitmap ||
    value instanceof MessagePort ||
    value instanceof ReadableStream ||
    value instanceof WritableStream ||
    value instanceof TransformStream ||
    value instanceof VideoFrame
  );
}

interface WorkerPoolItem {
  value: Worker;
  isProcessing: boolean;
}

export interface ConcurrentProcessorOptions<T, V, R = T> {
  workerURL: string;
  workerOptions?: WorkerOptions;
  transfer?: boolean;
  values: T[];
  onRead?: (value: T) => Promise<R> | R;
  onProcess: (value: V) => void;
}
Enter fullscreen mode Exit fullscreen mode
💖 💪 🙅 🚩
olyop
Oliver Plummer

Posted on April 3, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related