Serverless Event-Driven Architecture on Azure: A Worked Example - Part 2
Mario Meyrelles
Posted on June 3, 2020
Introduction
In the first post of this series here, we discussed how we can design a serverless and event-driven architecture project with components that are fairly easy to understand and integrate. In this second part of the series, we focus on command handling and Aggregate Root design, illustrating how to process commands and generate events inside the business logic. We created a fictional business scenario to help illustrate the concepts.
Some Business Context
Our New Prepaid Card
We are considering in this example a hypothetical prepaid card that stores credits that users can load using their own money. These credits can be used to shop on physical stores using a mobile app.
For users, there is a fee of 0,02% when money is loaded into the card. If the user loads more than $500.00 in a single transaction there is no fee. Users can freely send credits to other users of the platform using the mobile application offered to the users. There are no limits or charges for credits transfers. Transactions involving a significant amount of money are subject to a validation process and can take longer to be processed. The users also can apply for a plastic card if they wish to do credit and debit transactions using the network of the common credit card operators. This card can also be used to do ATM withdrawals. The only costs that will be applied for plastic card transactions are the processing and operator costs, that are not under our control.
For merchants, there is only a 0,025% fee on each transaction for merchants using the platform and absolutely nothing more in the first year. In the next year, the fees will be still low but can be raised a little bit. Any merchant can use this platform to accept payments configuring an account on the platform. The setup process is online and usually, it`s possible to start selling in less than two working days. The merchant can see the money corresponding to each sale in less than 24 hours in their account. The merchant also can receive the payment in credits and make the conversion to money in the future. If the merchant waits 31 days to make the conversion, the transaction fee is removed for the given sale.
The scope of this sample is to model the process of transferring loaded credits from one user to another using the app. We will ignore the other ways to use credits and all the complexities involved. Also, we are ignoring the reload process and we are ignoring the fees management.
Exploring the Domain
As shown above, we have partitioned the problem in a more understandable structure. We have 3 bounded contexts here:
Bounded Context | Mission |
---|---|
User Management | Focus on the lifecycle of a user as a customer of our prepaid card service. In this sample, we will assume that all cardholders are already created, are valid, and have the app configured. We will only seed configured users and we won’t implement any use case. |
Balance Management | Controls the cardholder's balance. Any action like a reload, payment or credit transfer is ultimately done via a credit or debit transaction. |
App Credits Transfers | Controls free credit transfers done inside the platform. Collaborates with Balance Management to securely add or remove credits from the accounts involved. |
As you can imagine, the whole business problem space can be huge. For sake of simplicity we are only showing subdomains of interest in green:
Subdomain | Mission |
---|---|
Account Transactions | Assures that all the requests for debits and credits are fulfilled or rejected correctly, avoiding overdrafts and duplicated transactions. In this implementation, we show the details of working with event sourced aggregates. |
Credits Transfers | Responds for all the free credits transfers. The transfers can be done immediately and can be scheduled. Values considered too high are subject to human validation. Automatic rejection of transfers can occur when certain conditions are met. This subdomain shows an implementation of a Process Manager that effectively settles the credits transfers that are accepted after the validation process. We also implement the workflow for human validation in this project. |
Now it's time to go into the details of the aggregates, commands, and events we plan to implement in this sample project:
Aggregate | Commands | Events |
---|---|---|
Cardholder | ||
Prepaid Card Account (Account Transactions) | DecreaseCredits IncreaseCredits ReverseCreditsChange |
CreditsDecreased CreditsDecreaseRejected CreditsDecreaseReversed CreditsIncreased CreditsIncreaseRejected CreditsIncreaseReversed |
App Credits Transfer | CreateCreditsTransfer | CreditsTransferAccepted CreditsTransferFailed CreditsTransferManuallyApproved CreditsTransferManuallyRejected CreditsTransferRejected CreditsTransferScheduled (sprint 2) CreditsTransferSentToManualValidation CreditsTransferSucceeded |
As the reader can see, the Credits Transfer aggregate is controlling its state using events to represent most of the possible states of a credits transfer. The events in the Prepaid Card Account aggregate are supporting the credits transfer and may not be exhaustive. The Cardholder aggregate only supports the Prepaid Card Account during validation and won`t have any command or event considered in this partition of the platform design. We won’t show in this article how we work scheduled commands and events. This discussion will be done in a future post.
Technical Implementation
Remembering the general architecture of the last post
Before diving into more technical details and code implementation, let's remember the general architecture that we described in the last post:
HTTP Endpoints and Azure Functions
HTTP endpoints are the most common way to communicate with the external world. Often, HTTP endpoints and used inside microservices communication. Azure Functions are very useful to build very lightweight APIs that can be used as a backend. In more complex architectures, the API layer can be put behind Azure API Gateway.
In the context of event-driven architectures, logic inside HTTP endpoints is part of command processing and therefore, can and should help the client to issue correct commands. This means that they can have access to read models and they write to the command store.
Basic API with Azure Functions
The implementation of a typical HTTP that can be like the following:
// TransfersApi.cs
using FluentValidation;
using FluentValidation.Results;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands;
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
namespace ServerlessStreaming.Api
{
public static partial class TransfersApi
{
[FunctionName("TransfersApi")]
public static async Task<IActionResult> Run(
// be sure to add content-type: application/json on your request!
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "create")]
HttpRequest req,
[CosmosDB(
databaseName: "CreditsTransfers",
collectionName: "Commands",
ConnectionStringSetting = "CosmosDBConnectionString")]
IAsyncCollector<CreateCreditsTransferCommand> commands,
ILogger log)
{
log.LogInformation("Starting validation of Credit transfer request...");
using var reader = new StreamReader(req.Body);
string requestBody = await reader.ReadToEndAsync();
var payload = JsonConvert.DeserializeObject<CreditsTransferModel>(requestBody);
if (payload == null)
{
return new BadRequestObjectResult("Invalid Credit Transfer Request.");
}
var validator = new CreditsTransferValidator();
var validationResult = validator.Validate(payload);
if (!validationResult.IsValid)
{
return new BadRequestObjectResult(validationResult.Errors.Select(e => new {
Field = e.PropertyName,
Error = e.ErrorMessage
}));
}
var command = new CreateCreditsTransferCommand(
aggregateId: payload.Id,
correlationId: Guid.NewGuid().ToString(),
createdAtUtc: DateTime.UtcNow,
payload: payload);
try
{
await commands.AddAsync(command);
await commands.FlushAsync();
log.LogWarning($"Request Accepted. CorrelationId: {command.CorrelationId}, AggregateId: {command.AggregateId}");
req.HttpContext.Response.Headers.Add("ETag", command.ETag);
return new AcceptedResult();
}
catch (DocumentClientException ex) when (ex.Error.Code == "Conflict")
{
log.LogWarning("Returning Not Modified: ", payload.Id);
req.HttpContext.Response.Headers.Add("ETag", command.ETag);
return new StatusCodeResult(304);
}
}
}
public class CreditsTransferValidator : AbstractValidator<CreditsTransferModel>
{
// see also: https://www.tomfaltesek.com/azure-functions-input-validation/
public CreditsTransferValidator()
{
RuleFor(x => x.Id).NotEmpty();
RuleFor(x => x.Sender.Id).NotEmpty();
RuleFor(x => x.Receiver.Id).NotEmpty();
RuleFor(x => x.Sender.AccountId).NotEmpty();
RuleFor(x => x.Receiver.AccountId).NotEmpty();
RuleFor(x => x.Sender.DocumentNumber).NotEmpty();
RuleFor(x => x.Receiver.DocumentNumber).NotEmpty();
// todo: validate if sender and receiver are different
// todo: validade if account and users are valid
RuleFor(x => x.Amount).GreaterThan(0m);
}
protected override bool PreValidate(ValidationContext<CreditsTransferModel> context, ValidationResult result)
{
if (context.InstanceToValidate == null)
{
result.Errors.Add(new ValidationFailure("", "Please ensure a model was supplied."));
return false;
}
return true;
}
}
}
The code above demonstrates how an Azure Function can react to HTTP requests and produce commands. The function is annotated with bindings and triggers. We can see a trigger carrying the payload of an HTTP request and we can see an output binding that sends the commands to CosmosDB, our command and event store in this project. We can also see an ILogger
parameter in the function signature. The logger is extensively used to send data to Application Insights and to debug locally. The extensive use of triggers and bindings makes Azure Functions very pleasant to use and in many cases, removes the need to manage instances of Http clients or CosmosDB clients in the function code.
The function implements basic validations and using FluentValidator library in .NET. Having a valid payload, we create a new command and send it to CosmosDB using a very convenient output binding. With this binding, we can just call CosmosDB. We handle a possible insertion conflict by calling FlushAsync
to be able to handle any exception during the insertion and possibly, return to the caller that a given request has already been processed. When the request is not valid, we use the validation messages to inform the caller about wrong or missing parameters. We expect that the payload conforms to a model that represents the payload of the data that contains all the information needed to process the command.
The API above is used to create a new credit transfer. It does not depend on external data to perform validation. In this example, we have another API that is exposed publicly, the Transactions API. We have a more complex situation, where data from the read model needs to be used to perform the validation process:
A more complex API
// TransactionsApi.cs
using FluentValidation;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate.Commands;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
namespace ServerlessStreaming.Api
{
public class TransactionsApi
{
// as a client to the application, the api can use the read model to validate commands.
private readonly ICosmosDbService<PrepaidCardAccount> prepaidAccountReadModel;
public TransactionsApi(ICosmosDbService<PrepaidCardAccount> prepaidAccountReadModel)
{
this.prepaidAccountReadModel = prepaidAccountReadModel;
}
[FunctionName("TransactionsApi")]
public async Task<IActionResult> Run(
// be sure to add content-type: application/json on your request!
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "create")]
HttpRequest req,
[CosmosDB(
databaseName: "CreditsTransfers",
collectionName: "Commands",
ConnectionStringSetting = "CosmosDBConnectionString")]
IAsyncCollector<IncreaseCreditsCommand> increaseCreditsCommands,
[CosmosDB(
databaseName: "CreditsTransfers",
collectionName: "Commands",
ConnectionStringSetting = "CosmosDBConnectionString")]
IAsyncCollector<DecreaseCreditsCommand> decreaseCreditsCommands,
ILogger log)
{
log.LogInformation("Starting validation of Credit Change request...");
using var reader = new StreamReader(req.Body);
string requestBody = await reader.ReadToEndAsync();
var payload = JsonConvert.DeserializeObject<AccountTransactionModel>(requestBody);
// some validation rules here: https://docs.fluentvalidation.net/en/latest/advanced.html
var existingAccount = await prepaidAccountReadModel.GetItemAsync(payload.PrepaidCardAccountId, payload.PrepaidCardAccountId);
var validator = new TransactionValidator();
var validationContext = new ValidationContext<AccountTransactionModel>(payload);
validationContext.RootContextData["account"] = existingAccount; // using the account stored on read model to perform validation of business data.
var validationResult = validator.Validate(validationContext);
if (!validationResult.IsValid)
{
return new BadRequestObjectResult(validationResult.Errors.Select(e => new {
Field = e.PropertyName,
Error = e.ErrorMessage
}));
}
switch (payload.Operation)
{
case AccountTransactionModel.IncreaseOperation:
{
return await PerformIncreaseOperation(req, payload, increaseCreditsCommands, log);
}
case AccountTransactionModel.DecreaseOperation:
{
return await PerformDecreaseOperation(req, payload, decreaseCreditsCommands, log);
}
default:
{
return new BadRequestObjectResult("Invalid Request: please inform operation.");
}
}
}
private static async Task<IActionResult> PerformDecreaseOperation(HttpRequest req, AccountTransactionModel creditsChangeRequest, IAsyncCollector<DecreaseCreditsCommand> decreaseCreditsCommands, ILogger log)
{
var command = new DecreaseCreditsCommand(aggregateId: creditsChangeRequest.PrepaidCardAccountId,
correlationId: creditsChangeRequest.CorrelationId ?? Guid.NewGuid().ToString(),
createdAtUtc: DateTime.UtcNow,
payload: creditsChangeRequest);
try
{
await decreaseCreditsCommands.AddAsync(command);
await decreaseCreditsCommands.FlushAsync();
log.LogWarning($"Decrease Credits Command generated. CorrelationId: {command.CorrelationId}, AggregateId: {command.AggregateId}");
req.HttpContext.Response.Headers.Add("ETag", command.ETag);
return new AcceptedResult();
}
catch (DocumentClientException ex) when (ex.Error.Code == "Conflict")
{
log.LogWarning("Returning Not Modified: ", creditsChangeRequest.Id);
req.HttpContext.Response.Headers.Add("ETag", command.ETag);
return new StatusCodeResult(304);
}
}
private static async Task<IActionResult> PerformIncreaseOperation(HttpRequest req, AccountTransactionModel creditsChangeRequest, IAsyncCollector<IncreaseCreditsCommand> increaseCreditsCommands, ILogger log)
{
var command = new IncreaseCreditsCommand(aggregateId: creditsChangeRequest.PrepaidCardAccountId,
correlationId: creditsChangeRequest.CorrelationId ?? Guid.NewGuid().ToString(),
createdAtUtc: DateTime.UtcNow,
payload: creditsChangeRequest);
try
{
await increaseCreditsCommands.AddAsync(command);
await increaseCreditsCommands.FlushAsync();
req.HttpContext.Response.Headers.Add("ETag", command.ETag);
log.LogWarning($"Increase Credits Command generated. CorrelationId: {command.CorrelationId}, AggregateId: {command.AggregateId}");
return new AcceptedResult();
}
catch (DocumentClientException ex) when (ex.Error.Code == "Conflict")
{
log.LogWarning("Returning Not Modified: ", creditsChangeRequest.Id);
req.HttpContext.Response.Headers.Add("ETag", command.ETag);
return new StatusCodeResult(304);
}
}
}
}
In this second example, we are demonstrating an API to increase or decrease credits for a given account. These transactions can be called from other services like the Transfers API. This function is using dependency injection to receive a dependency on a service that can do queries in Cosmos DB. This mechanism is very similar to the ASP.NET Core dependency injection infrastructure you are probably used to work with.
The validation logic is more complex. We first go to CosmosDB and look for an account. Inside the validation process, we check if the informed account owner is valid in a custom validator:
using FluentValidation;
using FluentValidation.Results;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate;
namespace ServerlessStreaming.Api
{
public class TransactionValidator : AbstractValidator<AccountTransactionModel>
{
// see also: https://www.tomfaltesek.com/azure-functions-input-validation/
public TransactionValidator()
{
RuleFor(x => x).NotNull();
RuleFor(x => x.Id).NotEmpty();
RuleFor(x => x.PrepaidCardAccountId).NotEmpty();
RuleFor(x => x.CardholderId).NotEmpty();
RuleFor(x => x.Amount).GreaterThan(0m);
RuleFor(x => x.CardholderId).Custom((x, context) => {
var account = context.ParentContext.RootContextData["account"] as PrepaidCardAccount;
if (x != account.CardHolderId)
{
context.AddFailure("The informed account does not have a valid owner.");
}
});
}
protected override bool PreValidate(ValidationContext<AccountTransactionModel> context, ValidationResult result)
{
if (context.InstanceToValidate == null)
{
result.Errors.Add(new ValidationFailure("", "Please ensure a model was supplied."));
return false;
}
return true;
}
}
}
If there is a mismatch, we add an error to the list of validation errors and return them to the client. Another difference is that we are using two output bindings - one for each kind of transaction. The rest of the function is generally the same. We do the transformation of a request into a valid command that potentially will succeed and produce a change in the global state.
Commands and Aggregates in the world of CQRS and Event Sourcing
CQRS and Event Sourcing, as said in many other articles, work very well in combination. We will show here a complete example and we will describe how we can build our event sourced aggregates to properly organize our business logic.
Physical Organization
In our solution, we have a project to organize the domain and maintain the business rules together. In serverless architectures, it's often tempting to write business logic inside the functions. We should try to avoid this. Our domain logic should try to be host-agnostic.
The image below shows the physical structure of our example project. It's easy to see the aggregates, commands, and events.
A new developer can rapidly understand what are the main commands and events produced by the system. The structure of commands and events shown above can help to make things more intention revealing. The business rules are usually found in command handlers.
Command Structure
A command contains all the data necessary to be processed without any other dependencies. Commands are very simple structures and fairly easy to understand.
First, let's see our command implementation details:
using Newtonsoft.Json;
using System;
namespace ServerlessStreaming.Common
{
public interface ICommand
{
string Id { get; set; }
string CorrelationId { get; }
string AggregateId { get; }
DateTime CreatedAtUtc { get; }
string CommandName { get; set; }
}
public abstract class Command<TPayload> : ICommand where TPayload : IPayload
{
private string commandName;
protected Command(string aggregateId, string correlationId, DateTime createdAtUtc, TPayload payload)
{
CorrelationId = correlationId;
CreatedAtUtc = createdAtUtc;
AggregateId = aggregateId;
Payload = payload;
commandName = GetType().FullName;
}
[JsonProperty(PropertyName = "id")]
public string Id { get; set; }
public string CommandName
{
get
{
return commandName;
}
set
{
if (value == GetType().FullName)
{
commandName = value;
}
else
{
throw new TypeInitializationException(GetType().FullName, new Exception("The command name must match the C# full type name in order to prevent json desserialization mismatches."));
}
}
}
public string CorrelationId { get; }
public DateTime CreatedAtUtc { get; }
public string AggregateId { get; }
public TPayload Payload { get; }
public string ETag
{
get
{
//todo: needs to be opaque
return CommandHash;
}
}
public abstract string CommandHash { get; }
public override string ToString()
{
try
{
var json = JsonConvert.SerializeObject(this);
return json;
}
catch (Exception)
{
return $"AggregateId: {AggregateId}, CorrelationId: {CorrelationId}, Id: {Id}, CommandName: {CommandName}";
}
}
}
}
The structure is not complicated. We have a Command
class that requires a type parameter of type TPayload
that represents the model of the command. Another extremely important field is CorrelationId
. The correlation identifier is used to tie all the commands and events produced by a given request. This is mandatory for the troubleshooting of the multiple parts of the system and can help in auditing. With this unique identifier, we can query all the commands, events, and log messages on either CosmosDB or Application Insights. This is a big win for event-driven architectures. We can investigate and understand how the state was reached and we can see the motivations of each state change. A command encapsulates a motivation to change the global state. This is not true for common CRUD systems, where we only see the final state of an aggregate.
We also introduce the concept of Aggregate Id represented by another required field of the command. The aggregate id is the concrete subject of the command. The command must refer to an aggregate identified so it changes the state of this specific aggregate. The aggregate id for an account can be, for example, 1236-5649-9998-7721
.
All commands are required to have a unique name and we automate this by getting the C# class name as the command name in our example. We use the command name to partition the Commands
container in CosmosDB. Usually, we share the same container with multiple commands to reduce the overall solution's financial cost. Since we will have multiple commands with different JSON structures, the process of deserialization of the commands requires that we add an identifier on each command - we use the command name also to help in the deserialization process so we can find the correct deserializer. We use ETag to help the caller in client-side caching.
A concrete command should be something like this:
using ServerlessStreaming.Common;
using System;
namespace ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands
{
public class CreateCreditsTransferCommand : Command<CreditsTransferModel>
{
public CreateCreditsTransferCommand(
string aggregateId,
string correlationId,
DateTime createdAtUtc,
CreditsTransferModel payload)
: base(aggregateId, correlationId, createdAtUtc, payload) { }
public override string CommandHash => AggregateId + "_" + Payload.Id;
}
}
We don't expose an empty constructor and we use CreditsTransferModel
as our command payload. The type parameter of the base class informs what is the type acceptable for the model. The code for CreditsTransferModel
is:
using Newtonsoft.Json;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel.CardholderAggregate;
using System;
namespace ServerlessStreaming.DomainModel.CreditsTransferAggregate
{
public class CreditsTransferModel : IPayload
{
[JsonProperty(PropertyName = "id")]
public string Id { get; set; }
public CardholderInfo Sender { get; set; }
public CardholderInfo Receiver { get; set; }
public decimal Amount { get; set; }
public string TransferPurpose { get; set; }
public DateTime? DateScheduledUtc { get; set; }
public DateTime DateCreatedUtc { get; set; }
public DateTime? DateProcessedUtc { get; set; }
}
}
As we can see here, the model contains the fields needed to create a valid transfer operation. We force all the models to have an Id
property. The IPayload
interface enforces this.
Command Handling Process: Azure Functions as a Host
As we can recall, usually commands are generated by an API and then saved on a CosmosDB Collection in our architecture. The main reason for this is to be able to react to each command using the mechanisms available on the cloud for us. So we reactively receive a valid command from CosmosDB and trigger another Azure Function. In general, every command will have a corresponding command handler. The physical structure in the code is trivial:
The command handler for CreateCreditsTransferCommand
is here:
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Polly;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using static ServerlessStreaming.Common.UtilityFunctions;
namespace CreditsTransferProcessor
{
public class CreateCreditsTransferCommandHandler
{
private readonly CreditsTransferCommandHandlers commandHandlers;
public CreateCreditsTransferCommandHandler(CreditsTransferCommandHandlers commandHandlers)
{
this.commandHandlers = commandHandlers;
}
[FunctionName("CommandHandler_CreateCreditsTransfer")]
public async Task Run(
[CosmosDBTrigger(
databaseName: "CreditsTransfers",
collectionName: "Commands",
ConnectionStringSetting = "CosmosDBConnectionString",
LeaseCollectionName = "leases",
LeaseCollectionPrefix = "CH_CCT")] // abbr of this command handler
IReadOnlyList<Document> commandsReceived,
[CosmosDB(
databaseName: "CreditsTransfers",
collectionName: "Events",
ConnectionStringSetting = "CosmosDBConnectionString")]
IAsyncCollector<Event> outputEvents,
[CosmosDB(
databaseName: "CreditsTransfers",
collectionName: "Commands",
ConnectionStringSetting = "CosmosDBConnectionString")]
IAsyncCollector<CommandHandlerFailed> commandsFailed,
[CosmosDB(
databaseName: "CreditsTransfers",
collectionName: "Commands",
ConnectionStringSetting = "CosmosDBConnectionString")]
IAsyncCollector<CommandHandlerSucceeded> commandsSucceeded,
[Queue("command-handler-failed")]
IAsyncCollector<string> deadLetterQueue,
ILogger log)
{
if (commandsReceived == null)
{
log.LogWarning("No commands available to process!");
return;
}
log.LogInformation("Number of commands to process: " + commandsReceived.Count);
foreach (var c in commandsReceived)
{
CreateCreditsTransferCommand command = default;
try
{
log.LogDebug("Processing document id: {Id}...", c.Id);
var input = CanDesserializeAs<CreateCreditsTransferCommand>(c);
if (input.IsNullOrEmpty())
{
continue;
}
command = JsonConvert.DeserializeObject<CreateCreditsTransferCommand>(input);
var context = new Context().WithLogger(log);
context.Add("command", command);
await RetryPolicies.GetPolicy().ExecuteAsync(async (context) =>
{
if (command != null && command.CommandName == typeof(CreateCreditsTransferCommand).FullName)
{
// invoke domain model logic to decide if the command should be accepted.
await commandHandlers.HandleAsync(command, log, outputEvents);
// send command confirmation
var commandSucceeded = new CommandHandlerSucceeded(command.Id, command.CorrelationId, command.AggregateId, command.CommandHash);
await commandsSucceeded.AddAsync(commandSucceeded);
log.LogWarning("Command Handler succeeded: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}",
command.CommandName, command.CorrelationId, command.AggregateId);
}
}, context);
}
catch (JsonSerializationException ex) when (ex.InnerException is TypeInitializationException)
{
continue;
}
catch (Exception ex)
{
await DeadLettering.HandleFailure(commandsFailed, deadLetterQueue, log, command, ex);
}
log.LogDebug("Processed document id: {Id}...", c.Id);
}
}
}
}
The constructor is receiving a concrete instance of a command handler defined in our domain model. As we can see, there is a new trigger here: CosmosDBTrigger
. The Azure Function runtime uses the CosmosDB Change Feed mechanism to invoke this function when a new item is added on the Commands
container. There are some configurations and infrastructure to make this work but in general, we need an auxiliary collection to store some locks - the lease container - and we must make sure that we don't have a duplicated lease collection prefix.
We can also see that there are output bindings to Events
collection and other bindings to store commands that are succeeded and failed. We have also a binding can store the payload of failed commands.
The function can receive more than one command and we must not lose any command. For each command received, we check if the message is useful for us. We will receive all the commands in all command handlers and we must ignore commands that we don't handle so other functions can handle it. After successful deserialization, we start a retry context and try to perform our business operation, delegating this work to the command handler that was injected in the constructor. This function works as a gateway to our domain logic. After a successful operation, the CommandHandlerSucceeded
message is saved on CosmosDB.
When something fails, we do a retry. If it fails again, we first try to send a CommandHandlerFailed
message to CosmosDB. If it fails again, we store the command payload on a storage queue. These are very exceptional situations and must be monitored by the operations team. The error handling logic is as follows:
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate;
using System;
using System.Threading.Tasks;
namespace ServerlessStreaming.Common
{
public static class DeadLettering
{
public static async Task HandleFailure<T>(IAsyncCollector<CommandHandlerFailed> commandsFailed, IAsyncCollector<string> commandHandlerDlq, ILogger log, Command<T> cmd, Exception ex)
where T : IPayload
{
log.LogDebug("Starting command error handling using Cosmos: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", cmd.CommandName, cmd.CorrelationId, cmd.AggregateId);
var commandHandlerFailed = new CommandHandlerFailed(cmd.Id, cmd.CorrelationId, cmd.AggregateId, cmd.CommandHash, ex);
try
{
await commandsFailed.AddAsync(commandHandlerFailed);
log.LogError("Command Handler failed: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}, Exception: {Exception}",
cmd.CommandName, cmd.CorrelationId, cmd.AggregateId, ex);
}
catch (Exception ex2)
{
log.LogDebug("Starting command error handling using Queue Storage after cosmos failure: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}",
cmd.CommandName, cmd.CorrelationId, cmd.AggregateId);
try
{
await commandHandlerDlq.AddAsync(cmd.ToString());
await commandHandlerDlq.FlushAsync();
log.LogDebug("Command sent to dead letter: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", cmd.CommandName, cmd.CorrelationId, cmd.AggregateId);
}
catch (Exception ex3)
{
log.LogCritical("Error sending failed command to dead letter queue. Exception: {Exception}:", ex3);
throw;
}
log.LogCritical("Command Handler and command was deadlettered: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}, Exception: {Exception}", cmd.CommandName, cmd.CorrelationId, cmd.AggregateId, ex2);
}
}
}
}
As we can see above, we try our best to notify the failure to the external world. If we can use CosmosDB to communicate the command handling failure, we can react to these messages and perhaps, notify the customer using some sort of automatic notification. The storage queue is our last resort. We would have used other kinds of outputs here. The idea is to use a different persistence mechanism.
Please note again that we are using a single CosmosDB container to store all the commands and we do the same to store all the events. This is a cost optimization strategy to avoid extra costs. It would be possible to create a container for each type of command or event. This would be very analogous to a relational modeling strategy. As each new container can add cost to the solution, we generally try to avoid a bigger number of containers. The side-effect of this organization is that all the functions are called on each insert and we should ignore the messages that we don't want to handle. This kind of organization, of course, can be modified later if needed. You need to monitor the performance of your solution and find bottlenecks or cost pressure continuously.
Command Handling Process: The Command Handler
The command handler works as a small orchestrator. It receives input from the host, in our case, an Azure Function. The command handler is part of the domain logic. With a command in hands, it calls the Aggregate Root to get the command handled. It depends on an EventSourcedRepository
to persist the results of command handling: Events! In an event-sourced application using CQRS and DDD concepts, we ask for the domain to process a command and produce events.
using Microsoft.Extensions.Logging;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands;
using System;
using ServerlessStreaming.Common;
using ServerlessStreaming.Infrastructure;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using System.Linq;
namespace ServerlessStreaming.DomainModel.CreditsTransferAggregate
{
public class CreditsTransferCommandHandlers
{
private readonly EventSourcedRepository eventSourcedRepository;
public CreditsTransferCommandHandlers(EventSourcedRepository eventSourcedRepository)
{
this.eventSourcedRepository = eventSourcedRepository;
}
public async Task HandleAsync(CreateCreditsTransferCommand command, ILogger hostLogger, IAsyncCollector<Event> azureOutputBinding = null)
{
if (azureOutputBinding != null)
{
eventSourcedRepository.SetWritePersistence(azureOutputBinding);
}
var eventStream = await eventSourcedRepository.LoadAggregateStreamAsync(command.AggregateId);
// this is applicable only when we are creating a new aggregate. We must have no prior events.
if (eventStream.Any())
{
throw new Exception("Aggregate has already been created"); // todo: better exceptions;
}
var creditsTransfer = new CreditsTransferAggregate(command, hostLogger);
creditsTransfer.DesignateCreditsTransferHandlingStrategy(command);
await eventSourcedRepository.SaveToAggregateStreamAsync(creditsTransfer.Changes);
}
public async Task HandleAsync(ManuallyApproveCreditsTransferCommand command, ILogger hostLogger, IAsyncCollector<Event> azureOutputBinding = null)
{
if (azureOutputBinding != null)
{
eventSourcedRepository.SetWritePersistence(azureOutputBinding);
}
var eventStream = await eventSourcedRepository.LoadAggregateStreamAsync(command.AggregateId);
var creditsTransfer = new CreditsTransferAggregate(eventStream, hostLogger);
creditsTransfer.PerformManualApproval(command);
await eventSourcedRepository.SaveToAggregateStreamAsync(creditsTransfer.Changes);
}
public async Task HandleAsync(ManuallyRejectCreditsTransferCommand command, ILogger azureFunctionsLogger, IAsyncCollector<Event> azureOutputBinding = null)
{
if (azureOutputBinding != null)
{
eventSourcedRepository.SetWritePersistence(azureOutputBinding);
}
var eventStream = await eventSourcedRepository.LoadAggregateStreamAsync(command.AggregateId);
var creditsTransfer = new CreditsTransferAggregate(eventStream, azureFunctionsLogger);
creditsTransfer.PerformManualRejection(command);
await eventSourcedRepository.SaveToAggregateStreamAsync(creditsTransfer.Changes);
}
}
}
As we can see in this code, the command handler knows how to call the aggregate. It calls a public method on the aggregate root and captures the Changes
property to persist the events to the event-sourced repository. Since we are using the output binding mechanism of the host, Azure Functions, we need to slightly change the command handler to receive this infrastructure dependency and configure the repository to understand IAsyncCollector<T>
. We are also using the logging infrastructure of Azure in the form of a common ILogger
.
Understanding the Event Stream
Before going into the demonstration of how an aggregate root is implemented, it's important to visualize a concrete example of what an event sourced aggregate means. It's far easier to explain using an example.
Consider that a new account is created. Then some credits are added. The account number created is 1234-4321-5678-0987
. The aggregate name is Account
.
Event Name | Aggregate Version | Event Body | Balance (current state of this aggregate) |
---|---|---|---|
Account Created | 0 | {AggregateId: 1234-4321-5678-0987} |
$ 0.00 |
Credits Increased | 1 | {AggregateId: 1234-4321-5678-0987, Amount: 1000.00} |
$ 1000.00 |
Credits Decreased | 2 | {AggregateId: 1234-4321-5678-0987, Amount: 10.00} |
$ 990.00 |
As we can see, we only save to the database these events. We don't perform an update. Never. A database that only appends events is called event store in the industry. We are using CosmosDB as our event store. As you can imagine, the current state of Account
aggregate is derived from the event stream. We must read the entire event stream to calculate the current state of the aggregate. This is a major mindset change. The repository code and aggregate code now need to understand how to work with an event stream. We admit that this is not trivial and requires some study time, but you should consider this strategy to create new systems that are not small. Using event sourcing is an advantage over traditional database-state based development because all the changes in the global state are documented in the event stream. The global state is the entire event stream. We can easily understand how a given aggregate evolved and we gain the ability to do time-travel debugging to identify and solve issues.
Aggregate Versions and Concurrency
Suppose there are two instances of the DecreaseCreditsCommandHandler that intend to decrease $500.00, almost at the same time (race condition). This would be possible if we just added events to the stream without checking anything, allowing the user to have then a negative balance. The situation would be like this:
Event Name | Aggregate Version | Event Body | Balance (current state of this aggregate) |
---|---|---|---|
Account Created | 0 | {AggregateId: 1234-4321-5678-0987} |
$ 0.00 |
Credits Increased | 1 | {AggregateId: 1234-4321-5678-0987, Amount: 1000.00} |
$ 1000.00 |
Credits Decreased | 2 | {AggregateId: 1234-4321-5678-0987, Amount: 10.00} |
$ 990.00 |
Credits Decreased | 3 | {AggregateId: 1234-4321-5678-0987, Amount: 500.00} |
$ 490.00 |
Credits Decreased | 3 | {AggregateId: 1234-4321-5678-0987, Amount: 500.00} |
$ -10.00 |
We can see that we are breaking a domain invariant that avoids accounts with a negative amount of credits. To avoid this problem we should reject the command that would lead to an invalid state. A solution to this problem would be to query the event stream and detect the current aggregate version. We then try to publish a new event incrementing the version. If there is an event with the same version, we do a retry a little bit later.
Command Name | Event Name | Aggregate Version | Event Body | Balance (current state of this aggregate) |
---|---|---|---|---|
CreateNewAccount | Account Created | 0 | {AggregateId: 1234-4321-5678-0987} |
$ 0.00 |
IncreaseCredits | Credits Increased | 1 | {AggregateId: 1234-4321-5678-0987, Amount: 1000.00} |
$ 1000.00 |
DecreaseCredits | Credits Decreased | 2 | {AggregateId: 1234-4321-5678-0987, Amount: 10.00} |
$ 990.00 |
DecreaseCredits thread#1 | Credits Decreased | 3 | {AggregateId: 1234-4321-5678-0987, Amount: 500.00} |
$ 490.00 |
DecreaseCredits thread#2 | - | still 3 because there is another event with version 3 | no event published | $ 490.00 |
DecreaseCredits thread#2 (retry) | Credits Decrease Rejected | 4 | {AggregateId: 1234-4321-5678-0987, Amount: 500.00, Reason: "Insufficient Funds"} |
$ 490.00 |
Thread #2 tries to decrease the credits and fails. Tries again and command succeeds, now, producing an event explaining that there were invalid funds.
We have added a uniqueness constraint on the Events
collection so we can get an exception upon the insertion of a duplicated version for an aggregate.
Command Handlers and Read Models
As we saw in the previous post, the read model is eventually consistent. It means that an event published will eventually be reflected in the read model. Usually, the read model update interval is very slow and is imperceptible to the users. Some developers can be tempted to access the read model to perform some additional validations and run business logic. This is not correct. There is always a risk of looking at stale data. The command handler should only look at the event stream of the aggregate it intends to affect. Deviations of this rule should be carefully discussed. Access to read models affected by other aggregates can be considered, but usually, the UI of the application should feed the command with data from diverse read models. The API that accepts requests from the UI can also do this kind of preparation work to issue correct commands, eventually, obtaining data from multiple read models.
An Event Sourced Repository
Our event-sourced repository is little bit unconventional:
using Microsoft.Azure.WebJobs;
using Newtonsoft.Json;
using ServerlessStreaming.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace ServerlessStreaming.Infrastructure
{
public class EventSourcedRepository
{
private readonly ICosmosDBEventStore<Event> eventStore;
private IAsyncCollector<Event> azureOutputBinding;
public EventSourcedRepository(ICosmosDBEventStore<Event> eventStore)
{
this.eventStore = eventStore;
}
public async Task SaveToAggregateStreamAsync(IEnumerable<Event> events)
{
if (azureOutputBinding == null)
{
await SaveToAggregateStreamUsingEventStore(events);
}
else
{
await SaveToAggregateStreamUsingAzureOutputBinding(events, azureOutputBinding);
}
}
private async Task SaveToAggregateStreamUsingAzureOutputBinding(IEnumerable<Event> events, IAsyncCollector<Event> azureOutputBinding)
{
foreach (var item in events)
{
await azureOutputBinding.AddAsync(item);
}
await azureOutputBinding.FlushAsync();
}
private async Task SaveToAggregateStreamUsingEventStore(IEnumerable<Event> events)
{
// todo: optization & cache - not performant.
foreach (var e in events)
{
var concreteEventType = AppDomain.CurrentDomain.GetAssemblies().Select(x => x.GetType(e.EventName)).Where(x => x != null).First();
var concreteEvent = JsonConvert.DeserializeObject(e.ToString(), concreteEventType);
await eventStore.AddItemRawAsync(concreteEvent, e.EventName);
}
}
public async Task<IEnumerable<Event>> LoadAggregateStreamAsync(string aggregateId)
{
var eventStoreQuery = $@" SELECT c.id, c.EventName, c.EventDateTimeUtc, c.AggregateId, c.AggregateVersion, c.CorrelationId, c.Payload
FROM c
WHERE c.AggregateId = '{aggregateId}'
ORDER BY c.AggregateVersion";
// we get all the events still as Cosmos DB documents.
var events = await eventStore.GetItemsRawAsync(eventStoreQuery);
var concreteEvents = new List<Event>();
foreach (var ev in events)
{
var concreteEventTypeName = ev.GetPropertyValue<string>("EventName");
var concreteEventType = AppDomain.CurrentDomain.GetAssemblies()
.Select(x => x.GetType(concreteEventTypeName))
.Where(x => x != null).First();
if (JsonConvert.DeserializeObject(ev.ToString(), concreteEventType) is Event concreteEvent)
{
concreteEvents.Add(concreteEvent);
}
else
{
throw new InvalidOperationException("Could not process event or the event is unknown.");
}
}
return concreteEvents;
}
public EventSourcedRepository SetWritePersistence(IAsyncCollector<Event> outputBinding)
{
this.azureOutputBinding = outputBinding;
return this;
}
}
}
This repository works adding events to an aggregate's stream and loading the stream back when needed. The repository depends on a low-level service to interact with CosmosDB. We also can work adding items to either our CosmosDB service or a IAsyncCollector<Event>
.
We must be aware that most of the domain logic is placed on the write side of the application. The changes in the global state are controlled by the aggregates, command handlers, and specialized repositories that work with the event store. We should not be tempted to query directly the event store to get data, for example, to satisfy UI requests. Since the read and write sides of the application are physically separated, as CQRS recommends, we use the read side to query the read models and then, build the UI. This repository is used only to manage the events in the event store.
Understanding the Aggregate Root
Let's go back to the command handling logic. What we do is briefly is:
- Receive a command in an Azure Function that is triggered by the insertion of a new command in CosmosDB
- Call the Command Handler
- The Command Handler calls the Aggregate
- The Aggregate works with the command payload and produces events
- The Command Handler persists those events using the event-sourced repository.
We now need to understand the details of an Aggregate Root and how it effectively handles business logic. Let's start with the implementation of the base AggregateRoot
class:
using Microsoft.Extensions.Logging;
using ServerlessStreaming.Common;
using System;
using System.Collections.Generic;
using System.Linq;
namespace ServerlessStreaming.DomainModel
{
public abstract class AggregateRoot
{
protected AggregateRoot(IEnumerable<Event> eventStream, ILogger log)
{
this.Log = log;
if (eventStream.Any() == false)
{
throw new Exception("Could not load aggregate from the event stream because it is empty.");
}
this.EventStream.AddRange(eventStream);
LoadFromHistory(this.EventStream);
LoadAggregateIdFromStream();
Log.LogDebug("Aggregate State Restored.");
}
protected AggregateRoot(ILogger log) {
this.Log = log;
}
private string aggregateId;
public string AggregateId
{
get => aggregateId;
protected set
{
if (value.IsNullOrEmpty())
{
throw new ArgumentException("The Aggregate Id must not be empty.");
}
aggregateId = value;
}
}
protected List<Event> EventStream { get; } = new List<Event>();
public List<Event> Changes { get; } = new List<Event>();
protected ILogger Log { get; }
public abstract void Apply(Event evt);
protected abstract void LoadFromHistory(IEnumerable<Event> eventStream);
protected virtual void LoadAggregateIdFromStream()
{
this.AggregateId = EventStream.First().AggregateId;
}
}
}
The Aggregate Root is responsible to manage a group of related objects so they can work together as a unit to the external world. When we want to reconstruct the aggregate, we pass an event stream to the constructor so we can process each event ordered by the aggregate version. Sometimes we need to create a new aggregate, like a new user account, without any event stream. So we have two constructors available. The logging is required in our project.
During reconstruction, we process each event and update the state of the aggregate. This is done when LoadFromHistory
is called:
protected override void LoadFromHistory(IEnumerable<Event> eventStream)
{
foreach (var evt in eventStream)
{
_ = evt switch
{
CreditsDecreasedEvent e => Apply(e, false),
CreditsIncreasedEvent e => Apply(e, false),
CreditsDecreaseRejectedEvent e => Apply(e, false),
CreditsIncreaseRejectedEvent e => Apply(e, false),
CreditsDecreaseReversedEvent e => Apply(e, false),
CreditsIncreaseReversedEvent e => Apply(e, false),
};
currentAggregateVersion = evt.AggregateVersion;
}
}
We have to Apply
each event received. Like this:
private int Apply(CreditsDecreasedEvent e, bool isNew)
{
state.CurrentBalance -= (e.Payload as AccountTransactionModel).Amount;
state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
We are applying each preexistent event and updating the local state of the Aggregate. With an updated state, the aggregate is capable of doing correct decisions and generate correct events. Please note that the current aggregate version is loaded during the LoadAggregateIdFromStream
call. The aggregate id is also obtained from any event on the stream. We use the first one. Please note that each event that changes the state must have a corresponding Apply
method.
Please note that this reconstruction of the state is something that only occurs in the memory of the host running the code of the Aggregate Root. The calculated state is usually not saved anywhere unless we do a process called "snapshot", which is a photography of the state ate a given moment that is also saved in the stream. A snapshot can be a performance optimization when the event stream is very long. Please also note that we can work on the raw events processing in any way we want - we can do different projections of the events in the event store for different purposes. This is pure gold. You can read the stream and use it for different use cases, potentially unknown yet.
When the reconstruction of the aggregate is completed, the command handler is ready to dispatch commands to the aggregate:
public void DecreaseCredits(DecreaseCreditsCommand command)
{
var currentBalance = state.CurrentBalance;
// check if we have enough credits to be accept this command:
var amountToDecrease = command.Payload.Amount;
if (amountToDecrease <= state.CurrentBalance)
{
// if so, we can do the debit.
ProceedWithCreditsDecrease(command, currentBalance, currentAggregateVersion);
}
else
{
// if not, we have to communicate the rejection using an Event
RejectCreditsDecrease(command, currentBalance, currentAggregateVersion);
}
}
private void ProceedWithCreditsDecrease(DecreaseCreditsCommand command, decimal currentBalance, int currentAggregateVersion)
{
var ev = new CreditsDecreasedEvent(
command.CorrelationId,
DateTime.UtcNow,
command.AggregateId,
currentAggregateVersion + 1,
command.Payload)
{
PreviousBalance = currentBalance,
CurrentBalance = currentBalance - command.Payload.Amount
};
Log.LogWarning("Credits Decrease will be accepted. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);
Apply(ev);
}
The aggregate is simply exposing the DecreaseCredits
method and deciding the command should be accepted or rejected. When the credits decrease is accepted, we create CreditsDecreasedEvent
, incrementing the aggregate version and filling the necessary fields. We also log this to increase the visibility of the operations even during development time. The event can also contain custom fields. We tie the previous and current balance in the event. This is useful to troubleshoot the balance variation easily. And probably, this makes it more difficult to tamper the balance.
We have to communicate the existence of this new event to the world. We have to put all the new events on the Changes
list. The Apply
method again is called:
public override void Apply(Event evt)
{
var aggregateVersion = evt switch
{
CreditsDecreasedEvent e => Apply(e, true),
CreditsIncreasedEvent e => Apply(e, true),
CreditsDecreaseRejectedEvent e => Apply(e, true),
CreditsIncreaseRejectedEvent e => Apply(e, true),
CreditsDecreaseReversedEvent e => Apply(e, true),
CreditsIncreaseReversedEvent e => Apply(e, true),
};
currentAggregateVersion = aggregateVersion;
}
private int Apply(CreditsDecreasedEvent e, bool isNew)
{
state.CurrentBalance -= (e.Payload as AccountTransactionModel).Amount;
state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
We apply the new event and update again the local state, as other calls to this aggregate can occur. The Changes
collection is used by the command handler logic to send the events to the repository:
eventSourcedRepository.SaveToAggregateStreamAsync(prepaidCardAccountAggregate.Changes);
The state of the aggregate can be any object. The PrepaidCardAccountAggregate
uses the entity PrepaidCardAccount
to store the current state of the aggregate.
private readonly PrepaidCardAccount state = new PrepaidCardAccount();
private int currentAggregateVersion = -1;
public PrepaidCardAccountAggregate(IEnumerable<Event> eventStream, ILogger log)
: base(eventStream, log) { }
// rest of the PrepaidCardAccountAggregate ommitted
An entity is just a simple class or a class that can collaborate with other entities and value objects. The code for PrepaidCardAccount
entity is as follows:
using Newtonsoft.Json;
using System;
namespace ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate
{
public class PrepaidCardAccount
{
[JsonProperty(PropertyName = "id")]
public string Id { get; set; }
public string CardHolderId { get; set; }
public decimal CurrentBalance { get; set; }
public DateTime AccountCreatedAtUtc { get; set; }
public DateTime BalanceUpdatedAtUtc { get; set; }
public int AggregateVersion { get; set; }
}
}
Now we can see the whole code for PrepaidCardAccountAggregate
. Please forgive us, but we felt that for C# and .NET there are only a few complete examples of how to implement a realistic aggregate root.
using Microsoft.Extensions.Logging;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate.Commands;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate
{
public class PrepaidCardAccountAggregate : AggregateRoot
{
private readonly PrepaidCardAccount state = new PrepaidCardAccount();
private int currentAggregateVersion = -1;
public PrepaidCardAccountAggregate(IEnumerable<Event> eventStream, ILogger log)
: base(eventStream, log) { }
public void DecreaseCredits(DecreaseCreditsCommand command)
{
var currentBalance = state.CurrentBalance;
// check if we have enough credits to be accept this command:
var amountToDecrease = command.Payload.Amount;
if (amountToDecrease <= state.CurrentBalance)
{
// if so, we can do the debit.
ProceedWithCreditsDecrease(command, currentBalance, currentAggregateVersion);
}
else
{
// if not, we have to communicate the rejection using an Event
RejectCreditsDecrease(command, currentBalance, currentAggregateVersion);
}
}
public void IncreaseCredits(IncreaseCreditsCommand command)
{
var currentBalance = state.CurrentBalance;
var ev = new CreditsIncreasedEvent(command.CorrelationId, DateTime.UtcNow, command.AggregateId, currentAggregateVersion + 1, command.Payload)
{
PreviousBalance = currentBalance,
CurrentBalance = currentBalance + command.Payload.Amount
};
Log.LogWarning("Credits Increase will be accepted. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);
Apply(ev);
}
public void ReverseCreditsChange(ReverseCreditsChangeCommand command)
{
var eventToRevert = EventStream.Single(x => (x.Payload as IPayload).Id == command.Payload.PayloadId);
if (eventToRevert.EventName.Contains(nameof(CreditsIncreasedEvent)))
{
ReverseCreditsIncrease(command, eventToRevert as CreditsIncreasedEvent);
}
else
{
ReverseCreditsDecrease(command, eventToRevert as CreditsDecreasedEvent);
}
}
private void ReverseCreditsIncrease(ReverseCreditsChangeCommand command, CreditsIncreasedEvent eventToRevert)
{
var currentBalance = state.CurrentBalance;
var amountToRevert = (eventToRevert.Payload as AccountTransactionModel).Amount;
var evt = new CreditsIncreaseReversedEvent(command.CorrelationId,
DateTime.UtcNow,
command.AggregateId,
currentAggregateVersion + 1,
command.Payload)
{
PreviousBalance = currentBalance,
AmountToRevert = amountToRevert,
CurrentBalance = currentBalance - amountToRevert
};
Log.LogWarning("Credits Increase will be reversed. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);
Apply(evt);
}
private void ReverseCreditsDecrease(ReverseCreditsChangeCommand command, CreditsDecreasedEvent eventToRevert)
{
var currentBalance = state.CurrentBalance;
var amountToRevert = (eventToRevert.Payload as AccountTransactionModel).Amount;
var evt = new CreditsDecreaseReversedEvent(command.CorrelationId,
DateTime.UtcNow,
command.AggregateId,
currentAggregateVersion + 1,
command.Payload)
{
PreviousBalance = currentBalance,
AmountToRevert = amountToRevert,
CurrentBalance = currentBalance + amountToRevert
};
Log.LogWarning("Credits Decrease will be reversed. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);
Apply(evt);
}
private void ProceedWithCreditsDecrease(DecreaseCreditsCommand command, decimal currentBalance, int currentAggregateVersion)
{
var ev = new CreditsDecreasedEvent(
command.CorrelationId,
DateTime.UtcNow,
command.AggregateId,
currentAggregateVersion + 1,
command.Payload)
{
PreviousBalance = currentBalance,
CurrentBalance = currentBalance - command.Payload.Amount
};
Log.LogWarning("Credits Decrease will be accepted. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);
Apply(ev);
}
private void RejectCreditsDecrease(DecreaseCreditsCommand command, decimal currentBalance, int currentAggregateVersion)
{
var ev = new CreditsDecreaseRejectedEvent(
command.CorrelationId,
DateTime.UtcNow,
command.AggregateId,
currentAggregateVersion + 1,
command.Payload)
{
PreviousBalance = currentBalance,
AmountRequested = command.Payload.Amount,
Reason = "Insufficient Credits. Requested amount: " + command.Payload.Amount
};
Log.LogWarning("Credits Decrease will be rejected due to insufficiente funds. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);
Apply(ev);
}
public override void Apply(Event evt)
{
var aggregateVersion = evt switch
{
CreditsDecreasedEvent e => Apply(e, true),
CreditsIncreasedEvent e => Apply(e, true),
CreditsDecreaseRejectedEvent e => Apply(e, true),
CreditsIncreaseRejectedEvent e => Apply(e, true),
CreditsDecreaseReversedEvent e => Apply(e, true),
CreditsIncreaseReversedEvent e => Apply(e, true),
};
currentAggregateVersion = aggregateVersion;
}
protected override void LoadFromHistory(IEnumerable<Event> eventStream)
{
foreach (var evt in eventStream)
{
_ = evt switch
{
CreditsDecreasedEvent e => Apply(e, false),
CreditsIncreasedEvent e => Apply(e, false),
CreditsDecreaseRejectedEvent e => Apply(e, false),
CreditsIncreaseRejectedEvent e => Apply(e, false),
CreditsDecreaseReversedEvent e => Apply(e, false),
CreditsIncreaseReversedEvent e => Apply(e, false),
};
currentAggregateVersion = evt.AggregateVersion;
}
}
private int Apply(CreditsDecreasedEvent e, bool isNew)
{
state.CurrentBalance -= (e.Payload as AccountTransactionModel).Amount;
state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
private int Apply(CreditsIncreasedEvent e, bool isNew)
{
state.CurrentBalance += (e.Payload as AccountTransactionModel).Amount;
state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
private int Apply(CreditsDecreaseRejectedEvent e, bool isNew)
{
// no state changes right now. But this can change in the future.
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
private int Apply(CreditsIncreaseRejectedEvent e, bool isNew)
{
// no state changes right now. But this can change in the future.
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
private int Apply(CreditsDecreaseReversedEvent e, bool isNew)
{
state.CurrentBalance += e.AmountToRevert;
state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
private int Apply(CreditsIncreaseReversedEvent e, bool isNew)
{
state.CurrentBalance -= e.AmountToRevert;
state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;
if (isNew)
{
Changes.Add(e);
}
return e.AggregateVersion;
}
}
}
The advantages of this design, despite the need to change the mindset, is a more testable aggregate root and the clarity of what the system can do, given certain inputs. This kind of code is very testable and the business logic is not coupled to the host. The use of either Azure Functions or other .NET host is irrelevant when we centralize the business rules in a cohesive domain model. It is possible to simulate many situations and validate if the domain invariants are still preserved.
Conclusion of Part 2
Wow! We were able to describe how the commands are processed by a simple cloud structure using serverless components: Azure Functions and CosmosDB. This article is an effort to explain concretely many concepts that are spread around many blogs and documentation on the web, giving some north for developers that are planning to create applications on Azure in a more structured way still keeping the joy of serverless technologies in terms of flexibility and costs. The use of DDD and CQRS/ES concepts are not restricted to monoliths or services running on-premises or on Kubernetes.
Having the right mindset it is viable to CQRS, DDD, Event Sourcing in a reactive and serverless environment. Azure is a great place
In part 3 we will describe how events are effectively published and handled.
Posted on June 3, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.