Implementing Message Outbox Pattern with Oban
calvinsadewa
Posted on February 8, 2020
Hey guys, today i want to share Message Outbox pattern and implementation with Oban in Elixir
I have used them extensively and hope that it can benefit you too.
Problems
What happen if as part of database transaction, you need to send email / push notification / outside communication?
Implemented naively, we could just send email / push notification as part of transaction.
Imagine we have transfer_money
function which send money from a sender to receiver,
as part of operation, transfering money need to also send email notification to sender and receiver.
Naively, we could make transfer_money
a transaction which do:
- substract sender balance
- send email to sender regarding substracted money
- add receiver balance
- send email to receiver regarding added money
Here is pseudo-code snippet implementing transfer_money
function in elixir.
# model/payment.ex
defmodule Model.Payment do
alias Repo
import Ecto.Changeset, only: [change: 2]
# Transfer money from sender to receiver
# as part of transfer, we will send email notification
def transfer_money(%{
sender: %Account{} = sender,
receiver: %Account{} = receiver,
amount: %Decimal{} = amount
}) do
Repo.transaction(fn ->
Repo.update!(change(sender, balance: Decimal.sub(sender.balance, amount))) # operation 1
send_transfer_email!(sender, "substracted by", amount) # operation 2
Repo.update!(change(receiver, balance: Decimal.add(receiver.balance, amount))) # operation 3
send_transfer_email!(sender, "received", amount) # operation 4
end)
end
# Send transfer email notification
def send_transfer_email!(account, operation, amount) do
to = account.email
from "payment@mycompany.com"
body = """
Your bank account #{account.account_number} have #{operation} #{amount}
"""
{:ok, _} = send_email(to, from, body)
end
end
Despite seeming okay, it is possible bug that can happen. Let me show a scenario:
- transfer_money function initiated
- Open a transaction
- substract sender balance by amount via DB operation
- send email notification to sender that balance has been substracted
- add receiver balance by amount via DB operation
- try send email notification to recevier that balance has been added, however failed due to transient network failure
- Transaction rolledback
In above failure scenario, even though transfer_money function rolledback,
sender already sended email notifing s/he bank account subtracted emoji.
While mis-sent email may seem harmless (though will probably shock sender that s/he subtracted twice),
These kind of cases can also happen with Message Queue (RabbitMQ, Kafka, SQS) using system,
which would mean subscribing service get wrong data, in turn may spawn more problem.
The problem is that operation 2 and 4 cannot rolledback, Message Outbox pattern help us with it.
Message Outbox
Message Outbox is a pattern in which we insert intention to send message to Outbox in transactional manner,
Outbox is usually a table which then polled/listened regulary by Worker. Worker would send any message waiting to be sended.
With that in mind, let us patch transfer_money
to:
- substract sender balance
- insert intent to send email for sender to outbox
- add receiver balance
- insert intent to send email for receiver to outbox
We also need to implement Worker which poll/listen to our outbox, and send email if any pending.
Here is snippet of pseudo function implementing Message Outbox
# model/payment.ex
defmodule Model.Payment do
...
# insert Send transfer email notification to outbox
def send_transfer_email!(account, operation, amount) do
Repo.insert!(%SendMessage{account: account, operation: operation, amount: amount})
end
# Real implementation of sending email
def real_send_transfer_email!(account, operation, amount) do
to = account.email
from "payment@mycompany.com"
body = """
Your bank account #{account.account_number} have #{operation} #{amount}
"""
{:ok, _} = send_email(to, from, body)
end
end
# outbox/worker.ex
defmodule Outbox.Worker do
# Perform is done regularly by worker, either via polling/listening
def perform(%SendMessage{} = send_message) do
Model.Payment.real_send_transfer_email!(account)
end
end
With Message Outbox, outside communication become part of transaction, and can be rolledback in case transaction failed.
However, implementing worker which listen/poll for message can be hard,
not to mention we still need to handle cases where we failed sended message and retry logic.
Oban
Oban is a transactional job queue in elixir built on PostgreSQL. It handle many of the operation relating to job queue such as enqueuing job,
distribute job to worker, handle job processing throughout lifecycle, log job status, and more. You can see more at link https://hexdocs.pm/oban/1.0.0/Oban.html
Because of it's transactional feature in PostgreSQL, we can leverage Oban as our Outbox.
Using Oban, we will do:
- Setup Oban in config.exs and Migration (I left out this part, see https://hexdocs.pm/oban/1.0.0/Oban.html )
- Implement an Oban worker which would send email for us,
Outbox.Email
, and implement processing for jobsend_email
- modify
send_transfer_email!
to insert jobsend_email
toOutbox.Email
Let's modify our snippet to use Oban.
# model/payment.ex
defmodule Model.Payment do
alias Repo
import Ecto.Changeset, only: [change: 2]
# Transfer money from sender to receiver
# as part of transfer, we will send email notification
def transfer_money(%{
sender: %Account{} = sender,
receiver: %Account{} = receiver,
amount: %Decimal{} = amount
}) do
Repo.transaction(fn ->
Repo.update!(change(sender, balance: Decimal.sub(sender.balance, amount))) # operation 1
send_transfer_email!(sender, "substracted by", amount) # operation 2
Repo.update!(change(receiver, balance: Decimal.add(receiver.balance, amount))) # operation 3
send_transfer_email!(sender, "received", amount) # operation 4
end)
end
# insert intent to send transfer email notification
def send_transfer_email!(account, operation, amount) do
to = account.email
from "payment@mycompany.com"
body = """
Your bank account #{account.account_number} have #{operation} #{amount}
"""
# Insert args to Outbox.Email
%{
to: to,
from: from,
body: body,
event: "send_email"
}
|> Outbox.Email.new()
|> Oban.insert!()
end
end
# outbox/email.ex
defmodule Outbox.Email do
use Oban.Worker, queue: :email
@impl Oban.Worker
# Process sending email
def perform(%{"event" => "send_email"} = args, _job) do
{:ok, _} = send_email(args["to"], args["from"], args["body"])
end
end
It's pretty simple doesn't it? Though args
for Oban worker is interesting.
When i insert args
to Outbox.Email
, i insert it as Atom Map. However when i process args
at perform
function, it become String Map.
This is because when Oban inserting job to Database, it transform args
to JSON, and when Worker try to process job, args
is parsed as JSON.
While Message Outbox is good, one weakness is that Message Outbox cannot depend on outside communication result inside transaction.
If your transaction really need to depend on outside communication, you may consider implementing two phase (or three phase) commit.
**Note: Oban is built upon PostgreSQL, so if you use MariaDB/MySQL you may consider other Job Queue such as Rihanna or implement polling by using cron-based Quantum
**Special thanks for Nanda Girindratama and Rifki Adrian for early review
Posted on February 8, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.