Synchronization of concurrent HTTP requests in NodeJS

orkon

Alex Rudenko

Posted on January 2, 2018

Synchronization of concurrent HTTP requests in NodeJS

As the follow-up to my previous post "Making Better HTTP APIs", I wrote a simple NodeJS server which demonstrates how to implement synchronization of concurrent requests so that certain parts of business logic are not executed twice.

I used the example from the previous post, a Payment API, and wrote a simple server which follows POST/PUT pattern for resource creation but does not handle concurrent PUT requests correctly (yet). First, let's take a look at the basic implementation and, afterwards, let's extend the server to synchronize concurrent requests.

The server has two handlers POST /payments and PUT /payments/id.

app.post('/payments', (req, res) => {
  const paymentId = nextPaymentId++;
  const context = `request(post) #${nextRequestId++}`;
  handle(() => createPayment(context, paymentId), res);
});
Enter fullscreen mode Exit fullscreen mode
app.put('/payments/:id', (req, res) => {
  const context = `request(put) #${nextRequestId++}`;
  const paymentId = req.params.id;
  handle(() => conductPayment(context, paymentId), res);
});
Enter fullscreen mode Exit fullscreen mode

Both handlers define the context variable which includes the request ID. The context is useful for grouping log messages produced by the same request. Additionally, the POST /payments handler generates a new payment id. After that, both handlers delegate the execution to the handle function which invokes the correct business logic function and handles the HTTP response.

The handle function is quite simple too. Note that error handling can be improved by using extended error classes. The implication is that the business function either returns an object to be sent to the client or throws an error:

async function handle(fn, res) {
  try {
    const result = await fn();
    if (result) return res.status(200).json(result);
    res.status(204).end();
  } catch (err) {
    res.status(409).json({
      error: err.message,
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Now let's examine the business logic. The createPayment function does nothing more than storing the payment id with the indication that's an empty one. The conductPayment is more complex than createPayment:

async function conductPayment(context, paymentId) {
  const payment = await getPayment(context, paymentId);
  if (!payment) {
    throw new Error('Payment does not exist');
  }
  if (payment.state === 'PROCESSING') {
    throw new Error('Payment is in progress. Try again later.');
  }
  if (payment.state === 'PAID') {
    return payment;
  }
  if (payment.state === 'EMPTY') {
    await processPayment(context, paymentId);
  }
  throw new Error('Payment is in bad state');
}
Enter fullscreen mode Exit fullscreen mode

This function retrieves the payment object first and then examines the state of the payment. If the payment is not paid and not being processed at the moment, the function invokes the processPayment method. As it is a lengthy operation in the real world and typically involves a call to a 3rd-party service, it can take a while. I have simulated this using setTimeout. The execution of processPayment takes about 3 seconds.

Let's summarize what the server is capable of, at the moment:

  1. It can handle concurrent POST /payments requests. Empty payments stored in the database have no external side effects, and we can clean them up later.

  2. It can handle only sequential PUT /payments/id requests for the same ID.

The point #2 may not be evident from the first glance, but if we examine the code for the conductPayment function, we notice that there is a time gap between the await getPayment and await processPayment. Between those two calls, a concurrent request can arrive and read the same payment state. Thus, a concurrent request can start a parallel (and a duplicate) payment process.

Synchronization of concurrent HTTP requests

To avoid problems with concurrent requests we just need to make sure that no request for the same payment ID can start while there is another request in the code section between await getPayment and await processPayment. There are several ways to achieve it:
1) Queuing. We could ensure that instead of executing conductPayment function immediately the server puts a message to a queue specific for the corresponding payment. Another process (a worker) would fetch the messages for a payment ID one at a time, thus, eliminating the problem of concurrent execution. This approach is a good and scalable solution with one drawback: it makes the architecture more complicated with several processes to manage and a message broker to maintain.
2) Locking. We could leverage either optimistic or pessimistic locking strategy. With pessimistic locking, we could use the database or something else, for example, Redis to ensure that no concurrent request can enter the conductPayment while there is another request in progress. With optimistic locking, we could check if the payment state is still EMPTY while trying to change its state to PROCESSING (in an atomic way). If this fails, we could throw an Error and don't send the payment to the 3rd party.

Since it is not an article about locking or queuing, I show how the pessimistic locking strategy could look in the code.

app.put('/payments/:id', (req, res) => {
  const context = `request(put) #${nextRequestId++}`;
  const paymentId = req.params.id;
  handleWithLock(() => conductPayment(context, paymentId), res);
});
Enter fullscreen mode Exit fullscreen mode

Here, the handleWithLock function is exactly like handle but it ensures that only one instance of the business logic can be running at a time. This is how one could implement it:

async function handleWithLock(context, lockId, fn, res) {
  try {
    const lockState = await lock(context, lockId); // per paymentId
    if (lockState === 'locked') throw new Error('Resource is locked.');
    const result = await fn();
    if (result) {
      return res.status(200).json(result);
    }
    res.status(204).end();
  } catch (err) {
    res.status(409).json({
      error: err.message,
    });
  } finally {
    await unlock(context, lockId);
  }
}
Enter fullscreen mode Exit fullscreen mode

It is essential that the lock function is such that only one process can acquire the lock. Also, it is vital that the lock is released if the Node process crashes (or that the lock expires after some time). In this simple example, I implemented basic in-memory locks. For the production-ready implementations which are supposed to work for a cluster of Node processes, something like PostgreSQL Advisory locks or Redlock can be used. Once the processing finishes, the lock is released using the unlock function.

In this implementation, the handleWithLock function throws an error if the resource is locked. Alternatively, the server could wait till the resource is free again using a Spinlock. Below you can the simple server in action.

https://60devs.com/img/article-synchronization-of-http-requests-in-NodeJS/concurrent-requests.gif

The full server code can be found here: gist.

If I missed some of the ways to implement the synchronization or you spot a mistake in the code, please let me know and follow me on Twitter.

Originally published in my blog at 60devs.

💖 💪 🙅 🚩
orkon
Alex Rudenko

Posted on January 2, 2018

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related