Alex de Sousa
Posted on February 20, 2020
When I started coding in Elixir (around 2016), I was working for a financial company. Our product automatically invested money in the Forex market by copying traders' actions (market orders) in real time. We had the following:
In words, our system:
- Subscribed to PostgreSQL for receiving trader actions.
-
Published to RabbitMQ for:
- Categorizing trader actions.
- And enqueuing trader actions in the proper queue.
- Subscribed to Redis for receiving updates on prices.
-
Subscribed to several RabbitMQ queues for:
- Receiving the categorized trader actions.
- And deciding whether it should open/close some market orders or not.
- Opened and closed market orders.
We needed to be able to communicate with three systems (PostgreSQL, RabbitMQ and Redis). However, in general, we only needed three actions:
-
subscribe/1
to a channel. -
publish/2
a message in a channel. -
unsubscribe/1
from a channel.
If we could generalize those three actions into an API, we could then implement three individual adapters for every system to handle the annoying stuff like disconnections, failures, resource management, protocols, etc.
Meet Yggdrasil
Handling subscriptions should be easy and, in an ideal world, we would only need to know where to connect and start receiving messages right away.
We shouldn't need to worry about secondary (yet relevant) things like disconnections, failures and managing resources.
Yggdrasil is an immense mythical tree that connects the nine worlds in Norse cosmology.
Yggdrasil was our pub-sub generalization. Using the strong foundations of Phoenix pub-sub library, we built an agnostic publisher/subscriber application that has:
- Multi node support.
- Simple API:
subscribe/1
,unsubscribe/1
andpublish/2
. - A
GenServer
wrapper for handling subscriber events easily. - A basic adapter for using Elixir message distribution.
- Fault-tolerant adapters for:
One API to rule them all
Yggdrasil's API is very simple:
- A process subscribes to
"my_channel"
:
iex> :ok = Yggdrasil.subscribe(name: "my_channel")
iex> flush()
{:Y_CONNECTED, %Yggdrasil.Channel{...}}
- A process (in this case the same process) publishes the message
"my message"
in"my_channel"
.
iex> :ok = Yggdrasil.publish([name: "my_channel"], "my message")
- The message should be in the mailbox of the subscriber process:
iex> flush()
{:Y_EVENT, %Yggdrasil.Channel{...}, "my message"}
- The subscriber can unsubscribe from the channel to stop receiving messages:
iex> :ok = Yggdrasil.unsubscribe(name: "my_channel")
iex> flush()
{:Y_DISCONNECTED, %Yggdrasil.Channel{...}}
flush()
cleans the IEx process mailbox. In general, receiving Yggdrasil messages should be the same as receiving messages when the sender usessend/2
.
Yggdrasil behaviour
Yggdrasil provides a behaviour
for writing subscribers easily. Following the previous example, the subscriber could be written as follows:
defmodule Subscriber do
use Yggdrasil
def start_link do
channel = [name: "my_channel"]
Yggdrasil.start_link(__MODULE__, [channel])
end
@impl true
def handle_event(_channel, message, _state) do
IO.inspect {:mailbox, message}
{:ok, nil}
end
end
This subscriber will print the message as it receives it:
iex> {:ok, _pid} = Subscriber.start_link()
iex> :ok = Yggdrasil.publish([name: "my_channel"], "my message")
{:mailbox, "my_message"}
An interesting side-effect is that now we can send messages to any process as long as they are subscribed to the right channel without needing to know the process PID or name.
Conclusion
Yggdrasil hides the complexity of a pub/sub and let's you focus in what really matters: messages.
Hope you found this article useful. Happy coding!
Cover image by Todd Quackenbush
Posted on February 20, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.