Elixir Pubsub In Less Than 50 Lines

alexdesousa

Alex de Sousa

Posted on March 26, 2020

Elixir Pubsub In Less Than 50 Lines

:pg2 is a mostly unknown, but powerful Erlang module. It provides an API for creating process groups.

Process Group

So, what's a process group? Well... it's a group of Erlang/Elixir processes.

Perhaps, the correct question would be, why do we care about process groups? Well, process groups are the foundation for publisher-subscribers (pubsubs for short).

PG2

Understanding :pg2 API and how it relates to a pubsub API will make it easier to understand:

  • Every process group is a channel e.g. a group called :my_channel is created:
   iex> :pg2.create(:my_channel)
   :ok
  • Every process in a group is a subscriber e.g. self() is part of :my_channel group:
   iex> :pg2.join(:my_channel, self())
   :ok
  • A publisher can send/2 messages to a channel e.g. the publisher gets all the members of the group :my_channel and sends "Some message":
   iex> members = :pg2.get_members(:my_channel)
   :ok
   iex> for member <- members, do: send(member, "Some message")
  • A subscriber will receive the messages in its mailbox:
   iex> flush()
   "Some message"
   :ok
  • A subscriber can unsubscribe from a channel e.g. self() leaves the group :my_channel:
   iex> :pg2.leave(:my_channel, self())
   :ok
  • A channel can be deleted:
   iex> :pg2.delete(:my_channel)
   :ok

And that's it! That's the API. And you know what's the best thing about it? It can work between connected nodes. Keep reading and you'll see :)

Message in cereal

Implementing a PubSub

A PubSub has three main functions:

  • subscribe/1 for subscribing to a channel:
   def subscribe(channel) do
     pid = self()

     case :pg2.get_members(channel) do
       members when is_list(members) ->
         if pid in members do
           :ok                     # It's already subscribed.
         else
           :pg2.join(channel, pid) # Subscribes to channel
         end

       {:error, {:no_such_group, ^channel}} ->
         :pg2.create(channel)      # Creates channel
         :pg2.join(channel, pid)   # Subscribe to channel
     end
   end
  • unsubscribe/1 for unsubscribing from a channel.
    def unsubscribe(channel) do
      pid = self()

      case :pg2.get_members(channel) do
        [^pid] ->
          :pg2.leave(channel, pid)   # Unsubscribes from channel
          :pg2.delete(channel)       # Deletes the channel

        members when is_list(members) ->
          if pid in members do
            :pg2.leave(channel, pid) # Unsubscribes from channel
          else
            :ok                      # It's already unsubscribed
          end

        _ ->
          :ok
      end
    end
  • publish/2 for sending a message to a channel.
   def publish(channel, message) do
     case :pg2.get_members(channel) do
       [_ | _] = members ->
         for member <- members, do: send(member, message)
         :ok

       _ ->
         :ok
     end
   end

For a full implementation of PubSub you can check this gist.

I usually create a .iex.exs file in my $HOME folder and then run iex. You could do the same with the previous gist by doing the following:

~ $ PUBSUB="https://gist.githubusercontent.com/alexdesousa/4d592fe206cca17393affaefa4c8fd33/raw/4d84894f016bd9eef84bba647c77c62b9c9a6094/pub_sub.ex"
~ $ curl "$PUBSUB" -o .iex.exs
~ $ iex

It's that easy

Distributed PubSub

For our distributed experiment we'll need two nodes. My machine is called matrix and both nodes will be neo and trinity respectively:

  • :neo@matrix:
   alex@matrix ~ $ iex --sname neo
   iex(neo@matrix)1>
  • :trinity@matrix:
   alex@matrix ~ $ iex --sname trinity
   iex(trinity@matrix)1> Node.connect(:neo@matrix) # Connects both nodes

Now :neo@matrix can subscribe to :mainframe channel:

iex(neo@matrix)1> PubSub.subscribe(:mainframe)
:ok

And :trinity@matrix can send a message:

iex(trinity@matrix)2> PubSub.publish(:mainframe, "Wake up, Neo...")
:ok 

Note: Sometimes it takes a bit of time for nodes to synchronize their process groups, so you might need to publish/2 your message twice.

Finally, :neo@matrix should receive the message:

iex(neo@matrix)2> flush()
"Wake up, Neo..."
:ok

And that's it. A powerful pubsub in a few lines of code thanks to :pg2.

Follow the white rabbit

Conclusion

Erlang has several built-in hidden gems like :pg2 that make our lives easier.

gem

Happy coding!

Cover image by Nicolas Picard

💖 💪 🙅 🚩
alexdesousa
Alex de Sousa

Posted on March 26, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related