Simple CQRS in NodeJS with Typescript

rusfighter

ilija

Posted on January 25, 2023

Simple CQRS in NodeJS with Typescript

Introduction to CQRS and why it matters

CQRS stands for Command Query Responsibility Segregation and is a concept that splits read and write operations on data. It means that you can use a different model to update information than the model you use to read information. Thus you can scale those models independently. In most of the applications there are more reads than writes. With CQRS we can potentially use a different datastore for the read models and make additional optimalizations. However in this blog we will use PostgreSQL as our read and write datastore, but in the next article I will provide a way to sync it with other databases which can be used to read data from our queries.

Query Model

The most important part of the query model is that we should NOT introduce any side-effects (except logging & auditing) when using the read model.

Querying the database (PostgreSQL) should not be ground breaking. Personally I like to have full type-safety so we can easily catch bugs during the development time without introducing any tests that are just testing the data type from our datastore to match the data type our API expects. I like to go database schema first, which means that we generate types from the database schema and work with those. Any change to the schema of the database is made with SQL migrations and after that, the typescript types are regenerated. Another approach is to use a code-first tool like TypeORM or Prisma. However in my experience such tools often produce not efficient SQL queries and are less easy to extend. In my projects I use library kysely (https://github.com/koskimas/kysely) with kysely-codegen (https://github.com/RobinBlomberg/kysely-codegen) to have a full type-safe SQL builder.

A simple Fastify query handler could look something like:

fastify.get('/get-accounts', (_, reply) => {
    const accounts = await db
        .selectFrom('accounts')
        .select(['id', 'first_name', 'last_name'])
        .execute();
    // additional logic can be used to transform the database response
    return accounts;
});
Enter fullscreen mode Exit fullscreen mode

Since we using a type-safe sql builder, the return type is type-safe. Additionally if we are using typescript on the frontend, we can leverage the type-safety and use tools like tRPC to have full type-safety, from the database to our view layer.

Another solution I often use is to use a service like Hasura (https://hasura.io) which will generate a GraphQL endpoint from our database. This service can be scaled independently.

The write model

Compared to the read side, this part is much harder to master. When modifying the data we should consider several things that likely will decide how the model is implemented and used. Some of those things are:

  • Concurrency: how should we treat concurrent writes to the same model
  • Data consistency: should the model always adhere to the invariants
  • Write throughput
  • Atomicity
  • Does mutating the model produces any side-effects (sending email etc)

For many of these things the datastore choice is important. Personally I will always choose a ACID compliant, SQL database like PostgreSQL. It is important to note that some applications that require high write throughput but do not require high data consistency are better of with other datastore types.

The rule that I always try to adhere is that every command handler should only produce a single side-effect/transaction.

This will guarantee our system is not ending in an inconsistent state.

A bad example of a command handler would be something like:

// Function that can cause an inconsistent state
async function createAccount(account: Account, group_id: string) {
  // insert user account into database
  await db.insertAccount(account);
  // add user to a group
  await db.addToGroup(account_id, group_id);
  // send email
  await emailService.sendWelcomeEmail(account.email, account.first_name);
}
Enter fullscreen mode Exit fullscreen mode

Lets improve the createAccount function such that it only has one side-effect:

// Function that can be slow
async function createAccount(account: Account, group_id: string) {
  await db.createTransaction(async (db) => {
    // insert user account into database
    await db.insertAccount(account);
    // add user to a group
    await db.addToGroup(account_id, group_id);
    // send email
    await emailService.sendWelcomeEmail(account.email, account.first_name);
  });
}
Enter fullscreen mode Exit fullscreen mode

This will actually work and be consistent, however is not scalable and not extendable and has lower commit success rate. Assume sendWelcomeEmail takes 10 seconds to process (due network lag or slow external service), this means that the connection is kept open for at least 10 seconds and cannot be used by other processes. Additionally, what if we want to notify the administrators that a new account is created? We would need to modify the body of the transaction and possibly reduce the commit success rate.

A better approach would to insert the account, add to the group and schedule the sendWelcomeEmail in an atomic operation and execute the side-effects asynchronously in background. Additionally we can write an event to your database to easily extend (non-critical) side-effects (will be discussed in next article). An optimal function would be something like:

// Function we want
async function createAccount(account: Account, group_id: string) {
  // the magic happens here
  await execute(
    insertAccount(account),
    addToGroup(account_id, group_id),
    sendWelcomeEmail(account.email, account.first_name),
    // this is an integration event
    accountCreated(account_id)
  );
}
Enter fullscreen mode Exit fullscreen mode

You might wonder what the return type of the insertAccount, addToGroup, sendWelcomeEmail and accountCreated are. The input type of the execute function is following:

export interface ICommand {
  readonly sql: string;
  readonly parameters: ReadonlyArray<any>;
}

export async function execute(...commands: (ICommand | ICommand[])[]): Promise<void> {
  const queries = commands.flat();
  await db.createTransaction(async (db) => {
     for(let i = 0; i<queries.length; ++i) {
       await db.query(queries[i].sql, queries[i].parameters);
     }
  });

  // we can further improve the performance with help of
  // https://vitaly-t.github.io/pg-promise/helpers.html concat function
  // const sql = pgpInstance.helpers.concat(commands.flat().map((q) => ({ query: q.sql, values: q.parameters })));
  // await db.query(sql);
}
Enter fullscreen mode Exit fullscreen mode

An example of implementation of insertAccount would be:

// client is kysely db builder
function insertAccount(account_id: string): ICommand[] {
  return [
    client
      .insertInto('accounts')
      .values({
        id: account_id,
      })
      .compile(),
  ];
}
Enter fullscreen mode Exit fullscreen mode

Because an array of ICommand is returned, we can easily add additional database mutations on some actions.

Our command handler is now atomic and far more performant than before, having a higher success rate (only depends on the database).

There is only one missing part: data consistency. Many write models have some invariants which should always hold even if the system has concurrent writes. In DDD paradigm they are implemented inside an aggregate root, aka a consistency boundary. An simple but not trivial example would be a fulfillment of an order. A business requirement is that an order can only be paid once. Once it is paid, the order is shipped. Lets write a fulfillOrder command handler with the current solution.

async function fulfillOrder(order_id: string) {
  const order = await db.selectFrom("orders")
                        .select(["state", "id"])
                        .where("id", "=", order_id)
                        .executeTakeFirst();

  // this order state is now stale data
  // if we had concurrent requets there is possibility 
  // that the state is different in the database
  if (order.state === "paid") {
    throw new Error("order already paid");
  }

  await execute(
    pay(order_id: string),
    shipOrder(order_id: string),
  );
}

function pay(order_id: string): ICommand[] {
  return [
    client
      .updateTable('orders')
      .set({ state: "paid" })
      .where("id", "=", order_id)
      .compile(),
  ];
}
Enter fullscreen mode Exit fullscreen mode

This looks fine at first glance, however there is a slight possibility that we might ship the order twice. This is due that when we retrieve the order, the order can be already be fulfilled by another request. In general these bugs are hard to spot and are hard to test. Fortunately we have several solutions for this kind of problem. First solution would be to wrap whole handler inside a transaction and use a strong transaction isolation level (Serializable Isolation Level). This will do the work but will reduce the performance, especially when the logic between retrieving and writing data is complex.

Another approach which I personally use is optimistic concurrency. Optimistic concurrency assumes that multiple transactions can frequently complete without interfering with each other, which improves the performance. However to implement optimistic concurrency we will need to make some modifications. First we need to introduce a version column to our orders table, then we need to ensure that the mutation only succeeds against the version against which the invariant is checked, and update the version accordingly. Lastly we need to ensure the whole transaction cannot interfer with similar transactions:

async function fulfillOrder(order_id: string) {
  const order = await db.selectFrom("orders")
                        .select(["state", "id", "version"])
                        .where("id", "=", order_id)
                        .executeTakeFirst();

  // this order state is now stale data
  // if we had concurrent requets there is possibility 
  // that the state is different in the database
  if (order.state === "paid") {
    throw new Error("order already paid");
  }

  await execute(
    pay(order_id: string, order.version),
    shipOrder(order_id: string),
  );
}

function pay(order_id: string, expected_version: number): ICommand[] {
  return [
    // since every statement in PostgreSQL is executed in order we should lock this resource
    { sql: "select pg_advisory_xact_lock($1);", parameters: [stringToInt(order_id)] },
    // this will reject if we have some newer version
    client
        .updateTable('orders')
        .set({ id: null })
        .where("id", "=", order_id)
        .where("version", ">", expected_version)
        .compile(),
    client
      .updateTable('orders')
      .set({ state: "paid", version: expected_version + 1 })
      .where("id", "=", order_id)
      .compile(),
  ];
}
Enter fullscreen mode Exit fullscreen mode

A trick that I use is to update the row to invalid DB constraint (id is primary key thus cannot be null) when a new version exists. With help of PostgreSQL advisory locks we ensure consistency between the update statements.

Summary

We have a simple and framework-less solution to achieve high write throughput and have a strong consistency on command handler basis. An additional benefit of using ICommand is that testing also becomes less hard to do. We only need to mock the execute function and test if all commands are given as input.
In the next post we will look at integration events and how to process background tasks such as sending email, notifying, etc… with help of the outbox pattern and message brokers/tasks queues.

Thanks for reading.

Originally written on https://medium.com/

💖 💪 🙅 🚩
rusfighter
ilija

Posted on January 25, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related