CQRS using Java and Axon - Command module
Fabio Hiroki
Posted on March 20, 2020
Introduction
In this second part of this article series, we will implement the Command
module, responsible for application state changes. Final code is in Github.
REST API Layer
We will effectively start coding our application from the external layer and keep going internally. The ProductController
class will be responsible for exposing the endpoints to request state changes.
The only dependency of this class will be Axon's CommandGateway
responsible for dispatching command
objects. The initial structure will be:
@RestController
@RequestMapping("/products")
public class ProductController {
@Autowired
public ProductController(final CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
private CommandGateway commandGateway;
@PostMapping
public CompletableFuture<String> create(@RequestBody ProductDTO dto) {
return null;
}
@PutMapping
public CompletableFuture<String> update(@RequestBody ProductDTO dto) {
return null;
}
}
Where ProductDTO
class is just a POJO to map the json
request.
public class ProductDTO {
private Long id;
private String name;
private int quantity;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getQuantity() {
return quantity;
}
public void setQuantity(int quantity) {
this.quantity = quantity;
}
}
Command model
Command
instances on this application will represent intents of state change. The subset of application state will be represented by an Aggregate
object. For example, the intent for adding a new product to cart is represented by AddProductCommand
class:
public class AddProductCommand {
public AddProductCommand(
final Long id,
final String name,
final int quantity) {
this.id = id;
this.name = name;
this.quantity = quantity;
}
@TargetAggregateIdentifier
private final Long id;
private final String name;
private final int quantity;
public Long getId() {
return id;
}
public String getName() {
return name;
}
public int getQuantity() {
return quantity;
}
}
Where TargetAggregateIdentifier
annotation is used to identify which instance of an Aggregate
type should be handled by this command.
Now to dispatch this command from our RestController
we just need to instantiate it and pass as argument through CommandGateway
send
method:
@RestController
@RequestMapping("/products")
public class ProductController {
@Autowired
public ProductController(final CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
private CommandGateway commandGateway;
@PostMapping
public CompletableFuture<String> create(@RequestBody ProductDTO dto) {
AddProductCommand command = new AddProductCommand(
dto.getId(),
dto.getName(),
dto.getQuantity());
return commandGateway.send(command);
}
// ....
}
Aggregate
As mentioned before, the Aggregate
class will be responsible for representing part of the application state plus the Command
handling for that Aggregate
.
An example of an Aggregate
could be:
{
"id": 1,
"name": "iPhone",
"quantity": 2
}
Translating into code, results in:
@Aggregate
public class ProductAggregate {
@AggregateIdentifier
private Long id;
private String name;
private int quantity;
@CommandHandler
public ProductAggregate(AddProductCommand cmd) {
// Verifies state consistency and applies events
}
}
CommandHandler
annotation on constructor
means the AddProductCommand
command is used to create a new Aggregate
.
At this point, the state of your application hasn't changed yet. Command handling is the location to perform business logic (i.e.: check if quantity is a positive value) and possibly apply Events
that will result in state change.
Event model
First we create the AddProductEvent
class with attributes needed to trigger our desired state change. In this case specifically it will be very similar to its respective Command
model.
Now we will change the ProductAggregate
constructor
to dispatch an AddProductEvent
whenever a AddProductCommand
is sent. This same class can also act as the event sourcing handler of AddProductEvent
, performing the change of the application state.
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@Aggregate
public class ProductAggregate {
@AggregateIdentifier
private Long id;
private String name;
private int quantity;
@CommandHandler
public ProductAggregate(AddProductCommand cmd) {
apply(new AddProductEvent(cmd.getId(), cmd.getName(), cmd.getQuantity()));
}
@EventSourcingHandler
public void on(AddProductEvent event) {
this.id = event.getId();
this.name = event.getName();
this.quantity = event.getQuantity();
}
}
So far we have established a checkpoint that allow us to test the application and observe what's happening. Follow the terminal commands to run the commandside
application:
docker-compose up -d
./gradlew clean assemble
java -jar commandside/build/libs/commandside.jar
Now we can test the endpoint by adding a new product:
curl -X POST http://localhost:8080/products -H 'Content-Type: application/json' -d '{"id": 1, "name": "iPhone", "quantity": 7}'
We can verify in the mongo database, in domainevents
collection that there's a new event stored there:
{
"_id":"5e0a2924b813b63783e1e092",
"aggregateIdentifier":"1",
"type":"ProductAggregate",
"sequenceNumber":"0",
"serializedPayload":"<com.example.project.command.addproduct.AddProductEvent><id>1</id><name>iPhone</name><quantity>7</quantity></com.example.project.command.addproduct.AddProductEvent>",
"timestamp":"2019-12-30T16:43:16.851862731Z",
"payloadType":"com.example.project.command.addproduct.AddProductEvent",
"payloadRevision":null,
"serializedMetaData":"<meta-data><entry><string>traceId</string><string>e62c8e0d-7505-4e99-ab7e-84b4619ee159</string></entry><entry><string>correlationId</string><string>e62c8e0d-7505-4e99-ab7e-84b4619ee159</string></entry></meta-data>",
"eventIdentifier":"6eef19d8-b22a-4be6-9fd9-7681a31580b8"
}
For each new product added through POST request, we will have a new entry on domainevents
from now on. That acts as a history of what happened on our application.
Aggregate persistence
Besides the individual event persistence, we also want to persist the aggregate on each change (State-Stored Aggregate). To achieve this we just need to add JPA annotations to turn our aggregate class into an Entity
:
@Aggregate
@Entity // This class can now be mapped to a table
public class ProductAggregate {
@AggregateIdentifier
@Id // Defines the primary key
private Long id;
@Column // Map to a column with same name
private String name;
@Column // Map to a column with same name
private int quantity;
@CommandHandler
public ProductAggregate(AddProductCommand cmd) {
apply(new AddProductEvent(cmd.getId(), cmd.getName(), cmd.getQuantity()));
}
@EventSourcingHandler
public void on(AddProductEvent event) {
this.id = event.getId();
this.name = event.getName();
this.quantity = event.getQuantity();
}
}
Now add a new product on your cart using the endpoint above, and check the product_table
on your Postgres database to verify a new entry stored matching the desired aggregate. Your Mongo database should also contain the new event.
Conclusion
On Mongo database, we have now the history of all events which we can use to understand how the application reached its current state. On the other side, Postgres database has the data we can use to display to final users, on a checkout screen, for example.
We could go back all the way and implement the CQRS and event sourcing by ourselves but thankfully we can achieve the same result using a couple of annotations from Axon.
Posted on March 20, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.