CQRS and Event Sourcing

atipij

atipiJ

Posted on December 21, 2020

CQRS and Event Sourcing

Introduction

The purpose of this article is to go through some basic concepts of Command Query Responsibility Segregation (CQRS) and Event Sourcing design patterns.
There's going to be an analysis of these patterns and then we will implement these to the use case.
We will apply CQRS to the use case and we will apply Event Sourcing to fix some shortcomings of CQRS.

Repository containing the Kata that we solved to implement the patterns: CherryChain bank account kata

CQRS

CQRS is a pattern used for modeling the domain of software. Its name stands for Command Query Responsibility Segregation and, as you can guess, its goal is to separate the writing part of the data from the reading part.
Often in projects, these two concepts are enclosed in a single domain model.
However, as the application begins to grow, it could easily become difficult to manage and expensive to maintain.
On the contrary, by separating these two concepts of commands and queries, it is possible to maintain a low complexity even in very large projects.

cqrs

Event Sourcing

Event Sourcing is an event-centric technique for persisting the Application State. In this approach, the state is stored as a series of events, in an event store. When the state is updated, events are emitted and get inserted into the event store.

event_sourcing

Loading the state consists of iterating through all the saved events and summing them to reconstruct the state in its entirety. Whenever the state gets modified, an event is emitted. This means that any consumer that is listening to these events can handle them accordingly, using the data inside the event. A consumer is responsible for listening and consuming events.

Event Sourcing has some inherent benefits when used as an application architecture, for example, it makes write operations much faster since it only has to generate and save a lightweight event.

Using Event Sourcing alongside CQRS brings some advantages. In CQRS there is an application write repository which normally saves as-is when commands arrive. By substituting this part of the application with Event Sourcing, writing operations will become faster, and it will also be possible to have application state snapshots at any point in time.

Use case

The scenario in which we're going to apply these patterns is the Bank account kata using the Kotlin programming language.
The goal is to create a service that allows basic "banking" operations such as deposit and withdrawal. Of course everything
has to be programmed following the patterns accordingly.

Here is a list of features that we are going to develop:

  • The bank can register a customer
  • The bank can create an account for a customer
  • The bank can deposit money to an account
  • The bank can withdraw money from an account

The bank can register a customer

register_customer

The first step was to create a class representing the bank and allowing it to register a customer.
Following the TDD (Test Driven Development) methodology we started by creating the following test and then proceeding with the implementation of the feature:

  @Test
  internal fun `it should register a customer`() {
    val customerId = bank.registerCustomer("Firstname Lastname")

    assertThat(bank.getCustomer(customerId).fullName).isEqualTo("Firstname Lastname")
  }
Enter fullscreen mode Exit fullscreen mode

So what we need is a function to register a customer and another one to read a customer, given his identifier.
From now we can already begin to think about the CQRS pattern, which allows us to keep the concepts of writing and reading data isolated.
When we want to register a customer we will have to launch a command, while when we want to read a customer we will have to make a query.

At the moment we can focus on the registration function, in which the customer's full name is passed as input. The function then will generate an id (the id will be then returned by the function) and launch a command to create the new user.

  fun registerCustomer(fullName: String): UUID {
    val id = UUID.randomUUID()
    commandBus.publish(RegisterCustomer(id, fullName))
    return id
  }
Enter fullscreen mode Exit fullscreen mode

In our kata, to implement the registration of a new customer, we created the RegisterCustomer command:

sealed class Command

data class RegisterCustomer(val id: UUID, val fullName: String): Command()
Enter fullscreen mode Exit fullscreen mode

and the DefaultCommandBus communication channel which is used to publish the commands that reach a class (which extends the CommandHandler interface) that knows how to handle them:

interface CommandBus {
  fun publish(command: Command)
}

class DefaultCommandBus(private val handler: CommandHandler) : CommandBus {
  override fun publish(command: Command) {
    handler.handle(command)
  }
}
Enter fullscreen mode Exit fullscreen mode

We have called the class used to manage these commands DefaultCommandHandler, which, upon receiving RegisterCustomer command, creates a new customer, registers it, and then saves it in the repository:

interface CommandHandler {
  fun handle(command: Command)
}

class DefaultCommandHandler(private val customerRepository: Repository<Customer>) : CommandHandler {
  override fun handle(command: Command) {
    when (command) {
      is RegisterCustomer -> registerCustomer(command)
    }
  }

  private fun registerCustomer(command: RegisterCustomer) {
    val customer = Customer()
    customer.register(command.id, command.fullName)
    customerRepository.put(customer)
  }
}
Enter fullscreen mode Exit fullscreen mode

The Customer state contains information regarding the bank's customer. There are an identification code and full name. Since we have used the Event Sourcing pattern to make sure our client's story is not lost, we don't update the information directly. We save it as a list of events. As we can see, the function used for registration adds the CustomerRegistered event
to the customer state's changes list:

class Customer {
  lateinit var id: UUID
  val changes = mutableListOf<Event>()

  fun register(id: UUID, fullName: String) {
    changes.add(CustomerRegistered(id, fullName))
  }
}

interface Event { val id: UUID }

data class CustomerRegistered(override val id: UUID, val fullName: String) : Event
Enter fullscreen mode Exit fullscreen mode

The Repository where our new customer is stored is made up of an EventStore and an EventBus. When the function to save the customer is called, each of the events regarding that particular customer (identified by an id) are saved in the Store and then sent via the dedicated Bus.

interface Repository<T> {
  fun put(entity: T)
}

class InMemoryCustomerRepository(private val eventStore: EventStore, private val eventBus: EventBus): Repository<Customer> {
  override fun put(customer: Customer) {
    customer.changes.forEach {
      eventStore.append(it)
      eventBus.emit(it)
    }
  }
}

Enter fullscreen mode Exit fullscreen mode

InMemoryEventStore is our class that takes care of keeping events in memory and then being subsequently interrogated when necessary. (It doesn't necessarily need to be an in-memory implementation, the EventStore interface can be implemented as any one wishes).

interface EventStore {
  fun append(event: Event)
}

class InMemoryEventStore : EventStore {
  private val list = mutableListOf<Event>()

  override fun append(event: Event) {
    list.add(event)
  }
}
Enter fullscreen mode Exit fullscreen mode

DefaultEventBus is the communication channel where all the events are sent which are then managed by the EventHandler:

interface EventBus {
  fun emit(event: Event)
}

class DefaultEventBus(private vararg val handlers: EventHandler) : EventBus {
  override fun emit(event: Event) {
      handlers.forEach { it.handle(event) }
  }
}
Enter fullscreen mode Exit fullscreen mode

CustomerViewsEventHandler is a class that handles customer events and is has a collaborator called CustomerViews. When this handler receives a CustomerRegistered event, it creates a CustomerView with the data it receives and projects it in the view:

interface EventHandler {
  fun handle(event: Event)
}

class CustomerViewsEventHandler(private val customerViews: CustomerViews) : EventHandler {
  override fun handle(event: Event) {
    when(event) {
      is CustomerRegistered -> handle(event)
    }
  }

  fun handle(event: CustomerRegistered) {
    customerViews.add(CustomerView(event.id, event.fullName))
  }
}

data class CustomerView(val id: UUID, val fullName: String)
Enter fullscreen mode Exit fullscreen mode

InMemoryCustomerViews is the view where all customer information is projected into. The views can then be queried by the bank using the get function. This keeps the command and query logic separate as indicated by the CQRS pattern:

interface CustomerViews {
  fun add(customerView: CustomerView)
  fun get(customerId: UUID): CustomerView
}

class InMemoryCustomerViews: CustomerViews {
  private val data = mutableMapOf<UUID, CustomerView>()

  override fun add(customerView: CustomerView) {
    data[customerView.id] = customerView
  }

  override fun get(customerId: UUID): CustomerView {
    return data[customerId]!!
  }
}
Enter fullscreen mode Exit fullscreen mode

In this way, the first point of the kata was completed following the CQRS and Event Sourcing pattern.

The bank can create an account for a customer

create_account

Now we want to make sure that the bank can create one or more accounts for each customer where the balance will be stored.
After creating the test we went on to write a function to create an account on a specific customer, in which an id is generated to identify the account and then publish the CreateAccount command on the command bus:

  fun createAccount(customerId: UUID): UUID {
    val id = UUID.randomUUID()
    commandBus.publish(CreateAccount(id, customerId));
    return id;
  }
Enter fullscreen mode Exit fullscreen mode

As we saw in the previous chapter, the class that will handle the command we published is DefaultCommandHandler.
So in addition to the management of the "RegisterCustomer" command, we have added the management of the new CreateAccount command.
A new account is created with the id and customer id passed by the command and finally stored in the repository in the same way we saw in the previous chapter (using EventStore and EventBus).

class DefaultCommandHandler(private val accountRepository: Repository<Account>, private val customerRepository: Repository<Customer>) : CommandHandler {

  override fun handle(command: Command) {
    when (command) {
      is RegisterCustomer -> registerCustomer(command)
      is CreateAccount -> create(command)
    }
  }

  private fun registerCustomer(command: RegisterCustomer) {
    val customer = Customer()
    customer.register(command.id, command.fullName, command.type)
    customerRepository.put(customer)
  }

  private fun create(createAccount: CreateAccount) {
    val currentAccount = Account()
    currentAccount.create(createAccount.id, createAccount.customerId)
    accountRepository.put(currentAccount)
  }
}
Enter fullscreen mode Exit fullscreen mode

The Account class is composed as follows: an identifier, a customer id, and a list of events.
When the creation method is used, the AccountCreated event is produced which will then be published by the Repository on the event bus.

class Account() {
  lateinit var id: UUID
  lateinit var customerId: UUID
  val change = mutableListOf<Event>()

  fun create(id: UUID, customerId: UUID) {
    change.add(AccountCreated(id, customerId))
  }
}
Enter fullscreen mode Exit fullscreen mode

At this point, the AccountViewsEventHandler class will receive the AccountCreated event with which the AccountViews projection will be populated. It will then be possible to make queries to find out the balance of each customer.
As you can see from the code, in the phase of creating the balance it is initialized to zero.

class AccountViewsEventHandler(private val accountViews: AccountViews) : EventHandler {
  override fun handle(event: Event) {
    when (event) {
      is AccountCreated -> handle(event)
    }
  }

  private fun handle(accountCreated: AccountCreated) {
    accountViews.insert(AccountView(
      id = accountCreated.id,
      customerId = accountCreated.customerId,
      balance = 0
    ))
  }
}
Enter fullscreen mode Exit fullscreen mode

Also in this new feature that we added to the bank, we were able to follow the CQRS and ES pattern.

The bank can deposit money to an account

deposit_money

To allow the bank to deposit money to an account, we allowed our application to handle the DepositMoney command. The handler of this command will update the Account state by increasing its balance based on the amount deposited. Before doing this update operation, it first has to load the existing account information. This is the function (present in the DefaultCommandHandler) that handles the command:

private fun deposit(depositMoney: DepositMoney) {
    val currentAccount = accountRepository.get(id = depositMoney.accountId)

    if (currentAccount!=null) {
      currentAccount.deposit(depositMoney.amount)
      accountRepository.put(currentAccount)
    } else {
      throw BankException("account does not exist")
    }
  }
Enter fullscreen mode Exit fullscreen mode

We know that this information is stored as a series of events in our event store. So before doing any update operation, the application first reads all the events, saved in the event store, relative to the specific account in which the money will be deposited.
We then create an empty instance of Account. These events are then used in the hydrate function present in the empty Account instance, which iterates through the events and creates the state information from scratch.

class InMemoryAccountRepository(private val eventStore: EventStore, private val eventBus: EventBus) : Repository<Account> {
  override fun get(id: UUID): Account? {
    val events = eventStore.getEventsBy(id = id)
    return if (events.isEmpty()) {
      null
    } else {
      val account = Account()
      account.hydrate(events)
      account
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The hydrate() function works as follows:

  fun hydrate(events: List<Event>) {
    events.forEach {event ->
      when (event) {
        is AccountCreated -> {
          this.id = event.id
          this.customerId = event.customerId
        }
        is MoneyDeposited -> this.balance += event.amount
      }
    }
  }
Enter fullscreen mode Exit fullscreen mode

This is a key step in Event Sourcing. As we can see, all the events are processed and based on their type, an update operation is applied to the Account instance. This means that the first event must be an AccountCreated event, which will set the account's id and customerId. Every other MoneyDeposited event that might have been emitted in the past will be iterated through, and will update the current Account instance. After the hydration is done, we will have in our hands the instance of the account that had been stored as a series of events in the event store. We update the changes of this instance by putting in our new MoneyDeposited event, which is then emitted to the event bus and appended to the event store.

class InMemoryAccountRepository(private val eventStore: EventStore, private val eventBus: EventBus) : Repository<Account> {
  override fun put(account: Account) {
    account.change.forEach { event ->
      eventStore.append(event)
      eventBus.emit(event)
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

It is intercepted by the AccountViewsEventHandler. This handler is responsible for keeping the Account Views synchronized with the actual state. The Views are the part of the application that are visible externally. The views don't necessarily store the state as-is the state can receive any desired transformation before being exposed externally.

class AccountViewsEventHandler(private val accountViews: AccountViews) : EventHandler {
  private fun handle(moneyDeposited: MoneyDeposited) {
    val accountView = accountViews.get(moneyDeposited.id)
    val updatedView = accountView.copy(balance = accountView.balance + moneyDeposited.amount)
    accountViews.delete(moneyDeposited.id)
    accountViews.insert(updatedView)
  }
}
Enter fullscreen mode Exit fullscreen mode

With this feature, it is possible to deposit money into an account. We also encountered the hydrate() function, which helps us to update an existing account by iterating through the events present in the event store.

The bank can withdraw money from an account

withdraw_money

To allow the bank to withdraw money from an account, the application handles the WithdrawMoney command. This operation is analogous to the depositing of money. This function is present in the DefaultCommandHandler:

private fun withdraw(withdrawMoney: WithdrawMoney) {
  val currentAccount = accountRepository.get(id = withdrawMoney.accountId)

  if (currentAccount.hasEnough(withdrawMoney.amount)) {
    currentAccount.withdraw(withdrawMoney.amount)
    accountRepository.put(currentAccount)
  } else {
    throw BankException("not enough money")
  }
Enter fullscreen mode Exit fullscreen mode

It updates the Account by decreasing its balance based on the amount withdrawn. The withdraw process first loads the state information by iterating through its stored events (hydrate() function):

fun hydrate(events: List<Event>) {
  events.forEach {event ->
    when (event) {
      is AccountCreated -> {
        this.id = event.id
        this.customerId = event.customerId
      }
      is MoneyDeposited -> this.balance += event.amount
      is MoneyWithdrawn -> this.balance -= event.amount
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

After the events have been iterated through completely, then we have the Account instance that we wanted and we can proceed with updating it. At this point a MoneyWithdrawn event is created, containing the necessary information of the withdrawal it is then appended to the changes of the account:

fun withdraw(amount: Int, fee: Int) {
  change.add(MoneyWithdrawn(id, amount, fee))
}
Enter fullscreen mode Exit fullscreen mode

This event is then saved to the event store, emitted to the event bus and intercepted by the AccountViewsEventHandler, which projects the changes to the AccountViews:

private fun handle(moneyWithdrawn: MoneyWithdrawn) {
  val accountView = accountViews.get(moneyWithdrawn.id)
  val updatedView = accountView.copy(balance = accountView.balance - moneyWithdrawn.amount)
  accountViews.delete(moneyWithdrawn.id)
  accountViews.insert(updatedView)
}
Enter fullscreen mode Exit fullscreen mode

With this feature, it is possible to withdraw money from an account.

Conclusions

As we have seen, by using the CQRS and ES pattern, we managed to solve this kata.
Some pros of this pattern are:

  • There is a single source of truth
  • Views can be generated with high customization
  • The separation of actions that change the state and queries can bring high optimization

However there are some obstacles (or opportunities!) that need to be conquered when working with this pattern:

  • Kickstarting a project can prove difficult
  • There is a necessity to have a good model of the domain of the project
  • Events might lose meaning over time if the domain isn't modeled properly
💖 💪 🙅 🚩
atipij
atipiJ

Posted on December 21, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

CQRS and Event Sourcing
kotlin CQRS and Event Sourcing

December 21, 2020