Eventual Consistency Through Scheduled Jobs

peholmst

Petter Holmström

Posted on July 29, 2021

Eventual Consistency Through Scheduled Jobs

In my last blog post, I left you with a cliffhanger: how to recover from situations where individual domain event handlers fail or the entire system crashes after a transaction has committed, but before all domain event handlers have processed the event.

Like with most (if not all?) problems in software, there is no silverbullet-one-size-fits-all-solution here. Instead, you have to find the solution that best meets the requirements of your particular system. In this blog post, we are going to look at an easy(ish) approach of guaranteeing eventual consistency even if we miss a domain event now and then. The examples assume we are using Spring and Java, but the principles apply to other frameworks and languages as well.

The General Idea

The idea behind this approach is to use scheduled jobs instead of (or in addition to) domain events to synchronise state between aggregates or even between bounded contexts. The jobs run automatically at different intervals or times of day, but can also be triggered by domain event handlers. This means that if all goes well, the data will become consistent across the system within seconds of the domain event firing. On the other hand, if something goes wrong, the data will become consistent after the next successful scheduled run.

image

A system like this is not difficult to implement, but also not as trivial as it may seem at first look because of some caveats. However, you can avoid them by following these guidelines:

Make Your Jobs Accept All or Some Inputs

When you run a scheduled job, it is typically some kind of batch operation that works on all applicable inputs. However, in this particular case, you also want to be able to run the job on just one particular input or a small set of inputs. This allows for a lot of flexibility and is quite easy to do if you design the job in this way from the start.

Example

Let's say you have a job that will create an invoice for an order that has been shipped. The class would have the following methods:

@Component
public class InvoiceCreationJob {

    @Transactional(propagation = REQUIRES_NEW)    
    public void createInvoiceForOrders(OrderId... orders) {
        //...
    }

    @Transactional(propagation = REQUIRES_NEW)    
    public void createInvoiceForAllOrders() {
        //...
    }
}
Enter fullscreen mode Exit fullscreen mode
  • The first method creates invoices for the orders whose IDs have been passed as method parameters (provided, of course, that those orders have been shipped and not invoiced yet; more about that later).
  • The second method is a batch job that creates invoices for all orders that have been shipped and not invoiced yet.

Decouple the Job and the Triggering of the Job

When you design your jobs, think about when and how they will be triggered:

  • When the system starts up?
  • Manually by a user?
  • In response to a domain event?
  • As a scheduled job?
  • Through JMX?
  • In some other way you are not aware of yet?

I would recommend a design that looks like this:

image

  • The job itself is a separate object that does its thing when told to by other objects.
  • The application service starts the job in response to human user actions.
  • The domain event handler starts the job in response to a domain event.
  • The worker handles scheduled job executions (such as once per hour, every ten minutes or every day at midnight).

Example

Let's continue with the invoice generation example. We start with a domain event handler that generates an invoice as soon as an order has been shipped:

@DomainEventHandler
public class InvoiceGenerationOnShipmentTrigger {

    private final InvoiceCreationJob job;

    InvoiceGenerationOnShipmentTrigger(
            InvoiceCreationJob job) {
        this.job = job;
    }

    @TransactionalEventListener
    public void onInvoiceShipped(InvoiceShippedEvent evt) {
        job.createInvoiceForOrders(evt.getOrderId());
    }
}
Enter fullscreen mode Exit fullscreen mode

Next, we want to trigger the job every day at midnight:

@Component
public class InvoiceGenerationWorker {

    private final InvoiceCreationJob job;

    InvoiceGenerationWorker(InvoiceCreationJob job) {
        this.job = job;
    }

    @Scheduled(cron = "0 0 0 1/1 * ? *")
    public void atMidnight() {
        job.createInvoiceForAllOrders();
    }
}
Enter fullscreen mode Exit fullscreen mode

Finally, we want administrators to be able to trigger the job at any time through an application service:

@Service
public class InvoiceGenerationService {

    private final InvoiceCreationJob job;

    InvoiceGenerationService(InvoiceCreationJob job) {
        this.job = job;
    }

    @Secured("ROLE_ADMIN")
    @Async
    public void createInvoiceForAllOrders() {
        job.createInvoiceForAllOrders();
    }
}
Enter fullscreen mode Exit fullscreen mode

Make Your Jobs Idempotent

When you write a job, you know what the intended end state of the system is going to be. However, you should never make any assumptions about this nor should you make assumptions about what the start state is. Your job should always check what the start state is, and then figure out what actions it needs to take in order to get to the desired end state. If the system is already in the desired state, your job should do nothing.

If your job is implemented like this, you can run it as many times as you want with the same inputs and it will not cause any side effects (like generating multiple invoices for the same order). If a job execution fails because of e.g. a network error or power outage, you can just re-run it later.

Example

Let's again continue with the invoice generation example. In the initial design, the states an order will traverse through are the following:

image

The first step on our road to idempotence is therefore to check the state of the order and only take action if that state is SHIPPED. For each shipped order, the job will then perform the following:

image

However, there is a problem here: what happens if the second transaction fails for some reason? That would mean that the next time the job runs, there is already an invoice even though the state is SHIPPED.

Thus, the next step on our road to idempotence is to detect whether an invoice has already been create for a particular order. We can do this by adding an OrderId field to the Invoice aggregate and use this to check that no invoices have been created before creating a new one. If an invoice already exists, the job will just proceed to setting the state to INVOICED (you will need some database constraints or pessimistic locking to implement this properly, but that is outside the scope of this article)

In more complex situations, you may have to add some intermediate states (such as INVOICE_GENERATION_IN_PROGRESS) to be able to pick up where you left off in case of failures.

Avoid Simultaneous Executions of the Same Job

Since your job can be started in many different ways, there is a risk that it will be started while it is already running. If the job is idempotent, this should not cause any data consistency problems, but you may end up with transaction deadlocks or other concurrency issues. Also it is a waste of resources and may slow down other parts of the system.

If simultaneous executions of the same job is a rare occurrence, you may choose to just live with it and let the idempotence handle it. However, if it happens frequently, you should deal with it in some way. The action you take depends on the use case, but typically it is one of the following:

  • Let the current job finish, do not start the new job at all.
  • Cancel the current job, then start the new job.
  • Let the current job finish, then start the new job.

Example

I could write an entire blogpost only about this particular problem so for this example, we are going to narrow down the scope quite a bit and continue to use the invoice generation case.

Let's say we are dealing with a small application that is only deployed as a single instance so we do not need to worry about situations where the same job runs at the same time on different machines.

The first thing we want to do is to make sure that only one instance of the batch operation can run at a time. The easiest way of doing this is with an AtomicBoolean (because InvoiceCreationJob is a singleton):

@Component
public class InvoiceCreationJob {

    private final AtomicBoolean working 
        = new AtomicBoolean(false);

    @Transactional(propagation = REQUIRES_NEW)    
    public void createInvoiceForAllOrders() {
        if (working.compareAndSet(false, true)) {
            try {
                // Do the work
            } finally {
                working.set(false);
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
  • If the working variable is false, if till be set to true and the work is allowed to start. Once finished, the working variable is set back to false.
  • If the working variable is true, the method will do nothing.

This is good enough since the batch operation is only used as a fallback solution in case we would fail to properly handle some InvoiceShippedEvents.

What about the createInvoiceForOrders method then? This method is likely to run simultaneously for different Orders but that is not a problem so we do not want to prevent that.

The method is unlikely to run simultaneously for the same Order although it still can happen. In this case, we can rely on the idempotence of the operation. The same applies to the case where createInvoiceForOrders and createInvoiceForAllOrders are running simultaneously.

Use Threads Wisely

When you implement a job, you should think about which thread it is going to run in. Will it always run inside of its own thread regardless of how it was started, or inside the thread that started it, or a combination of both?

Let's have a look an some examples to illustrate why this is important:

  • Let's say a job is scheduled to be continuously executed by a worker with a five minute delay. This means that a new job should start five minutes after the previous one has finished. Now, if the job runs inside its own thread, the worker will think it has finished as soon as the job thread has started. If the job was to take longer than five minutes, the worker would start a new job even though the old one is still running. In this case, it is better to let the job run in the thread that started it.
  • Another job is started by the user through an application service. This is also a long running job. Now, if the job runs inside the thread that started it, it would block the UI until the job is finished. In this case, it is better to let the job run in its own thread.

Now let's say the jobs mentioned above are actually the same job. In order to keep your options open, you should always implement your jobs to run inside the calling thread and then let the callers worry about which thread to use.

Example

Scroll back up to the example that demonstrates different ways of triggering the job. Note that the application service uses the @Async annotation whereas the domain event handler and the worker do not. Why? Let's have a look at each case:

  • The application service is invoked through a UI and is used by administrators to manually trigger the batch job. This job can take some time to complete and we do not want this to block the UI. Therefore, we run it asynchronously inside a background thread.
  • The domain event handler is invoked when an event arrives and is a bit of an edge case: when you have a small number of event handlers that return quickly you can run them in the calling thread. However, if you have a large number of handlers or some of them take time to complete, you may want to run them in a background thread.
  • The worker is already invoked by a background thread so we can continue to use that.

Use Upper Limits in Your Queries

Your jobs will perform different queries to determine the state of the system. The amount of data that needs to be processed depends on many factors, such as how heavily the system has been used or when the job was last run.

Because of this, whenever you write a query, you should stick to the good ol' rule of thumb: Any query will return either no records, exactly one record, or a huge load of records

Unless you know for sure that the query falls in the first two categories, you should always add an upper limit to your query. You can either implement the job to run in batches until there is no more data to process, or just process the first N records and then wait for the next run.

Schedule Jobs Wisely

When you schedule a job, you typically have the following alternatives:

  • Fixed rate: the job is started at a fixed rate, such as once every 60 minutes.
  • Fixed delay: the job is started with a fixed delay after the previous execution finished.
  • Cron: the job is started at a particular time of the day, or day of the week, such as five minutes past every hour, or 2 o'clock in the morning on Sundays.

Picking the correct schedule for your job may require some trial and error. If you run too often, you could end up starting new jobs before the old ones have finished, slowing the entire system down and causing transaction deadlocks in other parts of the system. If you run too rarely, the users are unhappy. You may have to try out different schedules to find the best balance.

Finally remember to look at all your jobs as a whole: is there a risk of running into conflicts between different jobs working on the same data? Are different heavy-weight jobs running simultaneously? Do some jobs need to run before others?

Architectural Considerations

Now when we have familiarised ourself with the general idea, we need to look at how it fits into the system architecture (which is assumed to be hexagonal).

Application services and domain event handlers we already know, but where would the workers and the job go? Let's start with the worker, because it is the easiest one.

A worker is a component that triggers actions in response to events generated by a timer. It can control its own transactions and threads if needed. This puts it into the orchestrator category, together with e.g. domain event handlers, and thus it belongs in the application layer.

But what about the job itself? The answer to that is: it depends. Read on.

Single Bounded Context

When the job is about propagating changes inside the same bounded context, the job is most likely implemented as a domain service. The domain event handler, worker and application service live in the application layer and invoke the domain service as needed (and also control the transactions).

Multiple Bounded Contexts

When the job is about propagating changes from one bounded context to another, we are talking about context mapping. The implementation and location of the job depends on the integration pattern you end up choosing. Let's look at three examples (there are more solutions than that but we have to draw the line somewhere).

Customer-Supplier and Conformist

In this case, there is a one-directional relationship between both contexts where the downstream context wants the upstream context to either perform some action (push) or return something (pull). This typically leads to the following architecture:

image

  • The upstream context provides an adapter that the downstream context can use to access the system.
    • In a customer-supplier relationship, the upstream team is required to design this adapter according to the needs of the downstream team.
    • In a conformist relationship, the downstream team will use whatever the upstream team chooses to put into the adapter without having any say.
  • The job itself is an application service delegate. This means that it lives in the application layer, but it is not a pure application service as it does not perform any security checks at all.
  • If the contexts are running inside the same monolithic application, the job can be triggered by domain events coming from both contexts.
  • If the contexts are running inside separate applications, the job can only be triggered by domain events coming from the downstream context because we have no way of distributing the events. Then again, that is one of the reasons why we are using scheduled jobs instead.

Anticorruption Layer

In this case, there is again a one-directional relationship between the contexts, but in this case, the downstream team has decided to completely shield the application from the upstream context:

image

  • The downstream context provides an adapter (the anticorruption layer) that adapts the upstream context to an API that has been declared by the downstream team.
  • The job is still an application service delegate, but it talks to the upstream context through the API.
  • If the anti-corruption layer includes support for propagating events from the upstream context to the downstream context, then the job can be triggered by events coming from both contexts. Otherwise, it is limited to events coming from the downstream context only.

Middleware

A middleware solution may come in handy in cases where you have no control over any of the involved contexts, or where the relationship is bidirectional even though the contexts themselves are not aware of each others' existence:

image

  • The job, the workers, any domain event handlers and other components are moved out from the bounded context and into a separate middleware component.
  • The middleware is responsible for moving data back and fourth between the contexts, without the contexts even knowing about it.
  • This works both for monoliths and for distributed systems.

Pros and Cons

Now it is time to wrap things up by looking at the pros and cons of using scheduled jobs to achieve eventual consistency.

The main advantages with this approach are the following:

  • It works out of the box with @TransactionalEventListener and all the other Spring-based building blocks and tools we have covered in this blog post series so far.
  • You do not need to introduce any new moving parts (like a message broker) or write any infrastructure code for persisting and distributing domain events.
  • It is quite easy to implement and its complexity can be tweaked according to the use cases.
  • When done right it leads to a system that is resilient and robust.

However, it also has several drawbacks:

  • Your system is not actually event driven. Some events are used to trigger jobs early, but the jobs themselves are data-driven.
  • Additional state may have to be stored in the aggregates to direct the jobs (for example in order to achieve idempotence).
  • The coupling increases as the jobs must be fully aware of all the aggregates involved.
  • Not useful when changes need to be propagated quickly.
  • For every job you add, the more work your database will have to do. This increases the risk of transaction deadlocks and performance problems.
  • Scaling out or introducing redundancy need some special attention: you do not want multiple servers to start running the same job on the same data at the same time (even though idempotent jobs should make sure the data does not get corrupted).

Clearly there are systems that are suitable for this approach and I have successfully used it myself in customer projects. However, there are also lots of systems out there for which this approach would not be appropriate. In a future blogpost (or possibly blogposts) we are going to look at other ways of making sure our domain events are not missed and how we can distribute them between systems. Stay tuned!

💖 💪 🙅 🚩
peholmst
Petter Holmström

Posted on July 29, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Eventual Consistency Through Scheduled Jobs
domaindrivendesign Eventual Consistency Through Scheduled Jobs

July 29, 2021