Repository Pattern with Typescript, Node.js and native PostgreSQL driver

fyapy

aabdullin

Posted on March 20, 2022

Repository Pattern with Typescript, Node.js and native PostgreSQL driver

Not too old times i worked with databases like MongoDB, PostgreSQL using Mongoose, TypeORM or Knex wrapped to repository, but... In one of the last projects i will need to build high-load, GEO distributed system.
Interesting task 😋

For this type of systems PostgreSQL not best solution, and for a number of reasons like lack of replication out of the box. And we strictly must not have «Vendor lock», and therefore also did not take modern SQL databases like Amazon Aurora. And end of the ends the choice was made in favor Cassandra, for this article where we will talking about low-lever implementation of Repository Pattern it is not important, in your case it can be any unpopular database like HBase for example.

Okay, database Cassandra chosen, but, how we will organize our Data Layer to interact with database?🤨
We don't able to use Knex, it is just not support CQL, and we don't have good alternatives. And i am clearly understand what to use pure CQL not good idea, because it is significantly make harder to support project in the future.

All source code can be found here.

What are the basic features we want to see from our Data Access Layer?

  • Provide ready to use CRUD operations
  • Good TypeScript support
  • Support transactions
  • Column aliases (in DB "created_at", in code "createdAt")
  • Hidden columns
  • Select specific columns
  • Where (For simplicity, I will show simple implementation without "or" operator support)
  • Support relations (For simplicity, I will skip this paragraph within the article, but it is easy to add)
  • Subscriptions on entity events like BeforeInsert, AfterInsert, BeforeDelete, AfterDelete, and etc. (same as relations)
  • Ability to easy extend API
  • Migrations (It is not part of this article)

In the absence of good ready-made solutions, an excellent option would be to implement the repository pattern and query builder like helpers.

Requirements:

  • Node.js 17.5.0+
  • PostgreSQL 14.2+

NPM packeges:

  • TypeScript 4.6.2+
  • PG 8.7.3+
  • Node PG migrate 6.2.1+

Why PG?
For the clarity of the article to a large circle of developers, the entire explanation will be built by PostgreSQL and PG package.
And in a real project, the database schema will change over time, and in order to be able to perform migrations, we will use Node PG migrate.

Setting up environment

Before we start we need to install our packages, i will use Yarn.

yarn add pg && yarn add -D typescript @types/pg node-pg-migrate
Enter fullscreen mode Exit fullscreen mode

Low-lever helpers to work with PG driver

First before start implementing out Repository we need to create some helper functions to make work with PG package easier.
We will have a queryRow function for situations where we want to get only one row from the database.

export const queryRow = async <T = any>(sql: string, values: any[] | null, tx?: PoolClient): Promise<T> => {
  // Get connection from PG Pool or use passed connection, will be explained below
  const client = await getConnect(tx)

  // I think will be better to separate handling query with passed values 
  if (Array.isArray(values)) {
    try {
      const res = await client.query(sql, values)

      return res.rows[0] as T
    } catch (e) {
      throw e
    } finally {
      // if we not have passed connection, we need close opened connection
      if (!tx) client.release()
    }
  }

  try {
    const res = await client.query(sql)

    return res.rows[0] as T
  } catch (e) {
    throw e
  } finally {
    if (!tx) client.release()
  }
}
Enter fullscreen mode Exit fullscreen mode

And we will have query function to work with the list of data returned by the database.

export const query = async <T = any>(sql: string, values?: any[] | null, tx?: PoolClient) => {
  const client = await getConnect(tx)

  if (Array.isArray(values)) {
    try {
      const res = await client.query(sql, values)

      return res.rows as T[]
    } catch (e) {
      throw e
    } finally {
      if (!tx) client.release()
    }
  }

  try {
    const res = await client.query(sql)

    return res.rows as T[]
  } catch (e) {
    throw e
  } finally {
    if (!tx) client.release()
  }
}
Enter fullscreen mode Exit fullscreen mode

Any functions responsible for working with the database or responsible for generating SQL can be added to the helpers.

For example getConnect, what will look if we don't have passed connection it will get fresh connection from Pool.

export const getConnect = (tx?: PoolClient): Promise<PoolClient> => {
  if (tx) {
    return tx as unknown as Promise<PoolClient>
  }
  // pool it is global connection variable
  // !!! Warning !!!
  // Be very-very carefully when working with global variables
  // And you should not access this variable from business logic
  return pool.connect()
}
Enter fullscreen mode Exit fullscreen mode

Or here is an example of functions for generating SQL code when working with transactions.

export const startTrx = async (pool: Pool) => {
  const tx = await pool.connect()
  await tx.query('BEGIN')
  return tx
}
export const commit = (pool: PoolClient) => pool.query('COMMIT')
export const rollback = (pool: PoolClient) => pool.query('ROLLBACK')
Enter fullscreen mode Exit fullscreen mode

Or functions what will help to determine error type when we dealing with error handling.

export const isUniqueErr = (error: any, table?: string) => {
  if (table) {
    // 23505 it is one of PostgreSQL error codes, what mean it is unique error
    // Read more here: https://www.postgresql.org/docs/14/errcodes-appendix.html
    return error.code === '23505' && error.severity === 'ERROR' && error.table === table
  }

  return error.code === '23505' && error.severity === 'ERROR'
}
Enter fullscreen mode Exit fullscreen mode

And finally

Repository implemetation

Firstly i will implement just create method to show its looks. Now need create interfaces what will cover our operations like Create and Read.

interface Writer<T, C> {
  create(value: Partial<T>, tx?: C): Promise<T>
}
Enter fullscreen mode Exit fullscreen mode

Where T it is a entity/model typing, and C it is database client type.
And after we need define base interface for any database dialect repository.

export type BaseRepository<T, C> = Writer<T, C>
Enter fullscreen mode Exit fullscreen mode

And here we able to create our database repository, in my case i will use PostgreSQL database with PG driver, but if you use other database you need to implement logic using API of your database.

import type { Pool, PoolClient } from 'pg'
import type {
  BaseRepository,
  ColumnData,
} from './types'
import { buildAliasMapper, insertValues } from './queryBuilder'
import { queryRow } from './utils'

export class PGRepository<T> implements BaseRepository<T, PoolClient> {
  readonly table: string
  readonly pool: Pool
  readonly columnAlias: (col: keyof T) => string
  readonly allColumns: string

  constructor({
    pool,
    table,
    mapping,
  }: {
    table: string
    pool: Pool
    // model/entity alias mapping map, will be explained below
    mapping: Record<keyof T, ColumnData>
  }) {
    // About aliasMapper will be explained below
    const aliasMapper = buildAliasMapper<T>(mapping)

    this.pool = pool
    this.table = `"${table}"`
    // About aliasMapper will be explained below
    this.columnAlias = aliasMapper

    // String with all of columns (SQL - '*'), it is will computed on class initialization
    // Example of output: "id" AS "id", "name" AS "name", "email" AS "email", "created_at" AS "createdAt"
    // It is just for optimization
    this.allColumns = Object.entries(mapping).reduce((acc, [key, value]: [string, ColumnData]) => {
      // skip hidden columns
      if (typeof value === 'object' && value.hidden) {
        return acc
      }

      const sql = `${aliasMapper(key as keyof T)} AS "${key}"`

      return acc
        ? acc += `, ${sql}`
        : sql
    }, '')
  }


  async create(value: Partial<T>, tx?: PoolClient): Promise<T> {
    // Here we will transform JavaScript object, to SQL columns string
    const _cols: string[] = []
    const _values: any[] = []

    for (const key of Object.keys(value) as Array<keyof T>) {
      // Extract from alias mapper original database columns
      _cols.push(this.columnAlias(key))
      _values.push(value[key])
    }
    // End of transform

    const cols = _cols.join(', ')
    // insertValues will create string with value bindings, to prevent SQL-injections
    // Example of output: $1, $2, $3
    const values = insertValues(_values)

    const row = await queryRow<T>(
      `INSERT INTO ${this.table} (${cols}) VALUES (${values}) RETURNING ${this.allColumns}`,
      _values,
      tx,
    )

    return row
  }
}
Enter fullscreen mode Exit fullscreen mode

Warning
Don't use arrow functions like this.
Because in future it will break overriding methods with super.create() calls.

create = async (value: Partial<T>, tx?: PoolClient): Promise<T> => {
  // code...
}
Enter fullscreen mode Exit fullscreen mode

Column alias mapper

Above you able to see magic functions const aliasMapper = buildAliasMapper<T>(mapping) and insertValues, I suggest looking at the code of the buildAliasMapper function.

export type ID = string | number
export type ColumnData = string | {
  name: string
  hidden?: boolean
}

export function buildAliasMapper<T extends AnyObject>(obj: Record<keyof T, ColumnData>) {
  // use ES6 Map structure for performance reasons
  // More here: https://www.measurethat.net/Benchmarks/Show/11290/4/map-vs-object-real-world-performance
  const _mapper = new Map<keyof T, string>()

  for (const [key, value] of Object.entries(obj)) {
    // Create mapping 
    // JS representation property name to PostgreSQL column name
    _mapper.set(key, typeof value === 'string'
      ? value
      : value.name)
  }

  // And return function what will get JS key and output PostgreSQL column name
  return (col: keyof T): string => `"${_mapper.get(col)!}"`
}

export const insertValues = (values: any[]) => values.map((_, index) => `$${index + 1}`).join(', ')
Enter fullscreen mode Exit fullscreen mode

Example how buildAliasMapper works:

export interface User {
  id: number
  name: string
  email: string
  hash?: string
  createdAt: string
}

const aliasMapper = buildAliasMapper<User>({
  id: 'id',
  name: 'name',
  email: 'email',
  hash: {
    name: 'password_hash',
    hidden: true,
  },
  createdAt: 'created_at',
})

aliasMapper('createdAt') // output: "created_at" (with double quotes)
Enter fullscreen mode Exit fullscreen mode

I think now you understand why constructor have property mapping: Record<keyof T, ColumnData> and how alias mapping works.

Now, we create the Repository file to specific entity.

import type { Pool, PoolClient } from 'pg'
import { PGRepository, queryRow, ID } from 'repository'

export interface User {
  id: number
  name: string
  email: string
  hash?: string
  createdAt: string
}

export class UserRepository extends PGRepository<User> {
  constructor(pool: Pool) {
    super({
      pool,
      table: 'users',
      mapping: {
        id: 'id',
        name: 'name',
        email: 'email',
        hash: {
          name: 'password_hash',
          hidden: true,
        },
        createdAt: 'created_at',
      },
    })
  }

  async isTodayCreated(id: ID, tx?: PoolClient) {
    const user = await this.findOne(id, {
      select: ['createdAt'],
      tx,
    })

    if (!user) {
      throw new Error(`User with id '${id}' don't exists`)
    }

    const userDate = new Date(user.createdAt).getTime()
    const todayDate = new Date().getTime()
    const dayOffset = 3600 * 1000 * 24

    return userDate + dayOffset > todayDate
  }
}
Enter fullscreen mode Exit fullscreen mode

Connect to database.

import { Pool } from 'pg'
import 'dotenv/config'

const parsePostgresUrl = (url: string) => {
  const sl1 = url.split(':')

  const firstPart = sl1[1].replace('//', '')
  const splittedFirstPart = firstPart.split('@')

  const host = splittedFirstPart[1]
  const userCredentials = splittedFirstPart[0].split(':')
  const user = userCredentials[0]
  const password = userCredentials[1]

  const splittedSecondPart = sl1[2].split('/')

  const port = Number(splittedSecondPart[0])
  const database = splittedSecondPart[1]

  return {
    host,
    user,
    password,
    port,
    database,
  }
}

// Global connections pool variable
// !!! Warning !!!
// Be very-very carefully when working with global variables
// And you should not access this variable from business logic
export let pool: Pool

export const connectPostgres = async () => {
  const config = parsePostgresUrl(process.env.DATABASE_URL!)
  const newPool = new Pool(config)

  await newPool.connect()

  pool = newPool
  return newPool
}
Enter fullscreen mode Exit fullscreen mode

Now let's go use our created repository.

import { connectPostgres } from 'db'
import { UserRepository } from 'modules/product'

(async () => {
    // connecting to database
    const pool = await connectPostgres()

    // initializing the repository
    const userRepository = new UserRepository(pool)

    // call create method from repository
    const user = await userRepository.create({
      name: 'fyapy',
      email: 'fyapy@gmail.com',
      hash: '123',
    });
    console.log(JSON.stringify(user, null, 2))

    if (user) {
      const isCreatedToday = await userRepository.isTodayCreated(user.id);
      console.log(`is user ${user.name} created today? ${isCreatedToday}`)
    }
})()
Enter fullscreen mode Exit fullscreen mode

Now let's create interfaces for the rest of the CRUD methods.

import type { PoolClient } from 'pg'

export type AnyObject = Record<string, any>
export type ColumnData = string | {
  name: string
  hidden?: boolean
}

export type ID = string | number

interface Writer<T, C> {
  create(value: Partial<T>, tx?: C): Promise<T>
  createMany(values: Partial<T>[], tx?: C): Promise<T[]>
  update(id: ID, newValue: Partial<T>, tx?: C): Promise<T>
  delete(id: ID, tx?: C): Promise<boolean>
}

export interface FindOptions<T, C> {
  select?: Array<keyof T>
  tx?: C
}

interface Reader<T, C> {
  find(value: Partial<T>, options?: FindOptions<T, C>): Promise<T[]>
  findOne(id: ID | Partial<T>, options?: FindOptions<T, C>): Promise<T>
  exist(id: ID | Partial<T>, tx?: PoolClient): Promise<boolean>
}

export type BaseRepository<T, C> = Writer<T, C> & Reader<T, C>
Enter fullscreen mode Exit fullscreen mode

Now, in accordance with the interface, we will write the implementation of the methods.

import { Pool, PoolClient } from 'pg'
import { buildAliasMapper, insertValues } from './queryBuilder'
import {
  BaseRepository,
  FindOptions,
  ID,
  ColumnData,
} from './types'
import { query, queryRow } from './utils'

export class PGRepository<T> implements BaseRepository<T, PoolClient> {
  readonly table: string
  readonly primaryKey: string
  readonly pool: Pool
  readonly columnAlias: (col: keyof T) => string
  readonly cols: (...args: Array<keyof T>) => string
  readonly allColumns: string
  readonly where: (values: Partial<T>, initialIndex?: number) => string

  constructor({
    pool,
    table,
    mapping,
    // variable for storing id/primaryKey, for situations when out 'id' columns have name like 'postId'.
    // by default we think what primaryKey is 'id'
    primaryKey = 'id',
  }: {
    table: string
    pool: Pool
    primaryKey?: string
    mapping: Record<keyof T, ColumnData>
  }) {
    const aliasMapper = buildAliasMapper<T>(mapping)

    this.pool = pool
    this.table = `"${table}"`
    this.columnAlias = aliasMapper
    this.primaryKey = primaryKey

    // select SQL-generator for only specific columns
    // example payload: ['createdAt']
    // output: '"created_at" as "createdAt"'
    this.cols = (...args: Array<keyof T>) => args.map(key => `${aliasMapper(key)} AS "${key}"`).join(', ')
    // Almost the same as this.cols, only predefined and for all columns except hidden columns
    this.allColumns = Object.entries(mapping).reduce((acc, [key, value]: [string, ColumnData]) => {
      if (typeof value === 'object' && value.hidden) {
        return acc
      }

      const sql = `${aliasMapper(key as keyof T)} AS "${key}"`

      return acc
        ? acc += `, ${sql}`
        : sql
    }, '')
    // SQL-generator for WHERE clause
    this.where = (values: Partial<T>, initialIndex = 0) => {
      const sql = Object.keys(values).reduce((acc, key, index) => {
        const condition = `${aliasMapper(key as keyof T)} = $${index + initialIndex + 1}`

        return acc === ''
          ? `${acc} ${condition}`
          : `${acc}AND ${condition}`
      }, '')

      return `WHERE ${sql}`
    }
  }


  async create(value: Partial<T>, tx?: PoolClient): Promise<T> {
    const _cols: string[] = []
    const _values: any[] = []

    for (const key of Object.keys(value) as Array<keyof T>) {
      _cols.push(this.columnAlias(key))
      _values.push(value[key])
    }

    const cols = _cols.join(', ')
    const values = insertValues(_values)

    const row = await queryRow<T>(
      `INSERT INTO ${this.table} (${cols}) VALUES (${values}) RETURNING ${this.allColumns}`,
      _values,
      tx,
    )

    return row
  }

  async createMany(values: Partial<T>[], tx?: PoolClient): Promise<T[]> {
    const _cols: string[] = []
    const _values: any[][] = []

    for (const value of values) {
      const keys = Object.keys(value) as Array<keyof T>

      for (const key of keys) {
        if (_cols.length !== keys.length) _cols.push(this.columnAlias(key))

        _values.push(value[key] as any)
      }
    }

    const cols = _cols.join(', ')
    const inlinedValues = values
      .map((_, index) => `(${_cols.map((_, cIndex) => {
        const offset = index !== 0
          ? _cols.length * index
          : 0

        return `$${cIndex + 1 + offset}`
      })})`)
      .join(', ')

    const rows = await query<T>(`
      INSERT INTO ${this.table} (${cols})
      VALUES ${inlinedValues}
      RETURNING ${this.allColumns}
    `, _values, tx)

    return rows
  }

  update(id: ID, newValue: Partial<T>, tx?: PoolClient): Promise<T> {
    const sqlSet = Object.keys(newValue).reduce((acc, key, index) => {
      const sql = `${this.columnAlias(key as keyof T)} = $${index + 2}`

      return acc !== ''
        ? `${acc}, ${sql}`
        : sql
    }, '')

    return queryRow<T>(
      `UPDATE ${this.table} SET ${sqlSet} WHERE "${this.primaryKey}" = $1 RETURNING ${this.allColumns}`,
      [id, ...Object.values(newValue)],
      tx,
    )
  }

  delete(id: ID, tx?: PoolClient): Promise<boolean> {
    return queryRow<boolean>(
      `DELETE FROM ${this.table} WHERE "${this.primaryKey}" = $1`,
      [id],
      tx,
    )
  }

  async find(value: Partial<T>, options: FindOptions<T, PoolClient> = {}): Promise<T[]> {
    const cols = options.select
      ? this.cols(...options.select)
      : this.allColumns

    const sql = `SELECT ${cols} FROM ${this.table} ${this.where(value)}`

    const res = await query<T>(sql, Object.values(value), options.tx)

    return res
  }

  async findOne(id: ID | Partial<T>, options: FindOptions<T, PoolClient> = {}): Promise<T> {
    const isPrimitive = typeof id !== 'object'
    const cols = options.select
      ? this.cols(...options.select)
      : this.allColumns
    const values = isPrimitive
      ? [id]
      : Object.values(id)

    let sql = `SELECT ${cols} FROM ${this.table}`

    if (isPrimitive) {
      sql += ` WHERE "${this.primaryKey}" = $1`
    } else {
      sql += ` ${this.where(id)}`
    }

    const res = await queryRow<T>(sql, values, options.tx)

    return res
  }

  async exist(id: ID | Partial<T>, tx?: PoolClient): Promise<boolean> {
    let sql = `SELECT COUNT(*)::integer as count FROM ${this.table}`
    const isPrimitive = typeof id !== 'object'
    const values = isPrimitive
      ? [id]
      : Object.values(id)

    if (isPrimitive) {
      sql += ` WHERE "${this.primaryKey}" = $1`
    } else {
      sql += ` ${this.where(id)}`
    }

    sql += ' LIMIT 1'

    const res = await queryRow<{ count: number }>(sql, values, tx)

    return res.count !== 0
  }
}
Enter fullscreen mode Exit fullscreen mode

Usage in real-world project

I suggest to start explain how it is work in real project from main.ts file.
For routing I like to use fastify.
But for example we will use architecture with repositories > handlers layers.
For real project you need to use repositories > services > handlers layers for easier code maintainability in future. All repository calls need to be proxied by services, no direct calls repositories in handlers must not be.

import type { Pool } from 'pg'
import fastify from 'fastify'
import { connectPostgres } from 'db'
import * as users from 'users'

// DI context analog, here repositories dependencies
// In this example I will skip services layer
// but repositories need to be passed to services
// and services will need to be passed to handlers
export interface Repositories {
  pool: Pool
  userRepository: users.UserRepository
}

const main = async () => {
  const app = fastify({
    trustProxy: true,
  })
  const pool = await connectPostgres()


  const repositories: Repositories = {
    pool,
    userRepository: new users.UserRepository(pool),
  }

  // In real project here will be passed services
  app.register(users.setupRoutes(repositories), {
    prefix: '/users',
  })


  try {
    const url = await app.listen(process.env.PORT || 8080, '0.0.0.0')

    console.log(`Server started: ${url}`)
  } catch (error) {
    console.error('Server starting error:\n', error)
  }
}

main()
Enter fullscreen mode Exit fullscreen mode

Let's create Controller/Handler, Fastify handler in my case.
I will skip of UserService Realization, in is will just inject UserRepository, and proxy call [method name].
And validation will be skipped.

import type { FastifyPluginCallback } from 'fastify'
import type { Repositories } from 'types'
import { commit, isUniqueErr, rollback, startTrx } from 'repository'

export const setupRoutes = ({
  pool,
  userRepository,
}: Repositories): FastifyPluginCallback => (fastify, otps, done) => {
  // select all columns
  fastify.get<{
    Params: { id: string }
  }>('/:id/all', async ({ params }) => {
    const user = await userRepository.findOne(params.id)

    return {
      user: user ?? null,
    }
  })
  // select certain columns
  fastify.get<{
    Params: { id: string }
  }>('/:id', async ({ params }) => {
    const user = await userRepository.findOne(params.id, {
      select: ['id', 'name', 'email'],
    })

    return {
      user: user ?? null,
    }
  })

  fastify.post<{
    Body: {
      name: string
      email: string
      password: string
    }
  }>('/', async ({ body }, res) => {
    const tx = await startTrx(pool)
    try {
      const user = await userRepository.create({
        name: body.name,
        email: body.email,
        hash: body.password,
      }, tx)

      await commit(tx)

      res.status(201)
      return {
        user: user ?? null,
      }
    } catch (e) {
      await rollback(tx)

      if (isUniqueErr(e)) {
        res.status(400)
        return {
          message: 'User aleady exist!',
        }
      }

      throw e
    } finally {
      // don't forget to close connection
      tx.release()
    }
  })

  done()
}
Enter fullscreen mode Exit fullscreen mode

Source code

All source code can be found here.
Also for fans of functional programming, which I am, I prepared my own version in folders/files with the prefix fp.

Conclusion

Advice from my side

  • Strongly recommend for better maintainability, move the base code of the repository into a separate NPM package. Especially if you have several back-end applications or you have a microservice architecture. Because after adding feature, finding and fixing a bug, you will need make changes to all projects, with NPM package you just need to update version of package.
  • I skipped many features like where or support, limit, subscriptions on entity events like BeforeInsert, AfterInsert, BeforeDelete, AfterDelete, and etc.
  • Write tests!!! I seriously, because Data Access Layer it is critical thing, you need to be sure what all works as expected after make changes in codebase.
  • And you able to implement more code optimizations for functions like this.cols or this.where and others, but keep a balance between optimizations and code readability.

In this article I explained how to implement Repository Pattern with low-level database driver.

But I strongly ask you to make sure before using this solution in production in such things as:

  • You don't have good open source ORM/QueryBuilder/Library alternatives for your database.
  • You have Experienced Developers who fully understand what they are doing and why.

But what if you can't answer yes to these questions?
I think you have taken on a task that is too difficult for you :)

💖 💪 🙅 🚩
fyapy
aabdullin

Posted on March 20, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related