Alex de Sousa
Posted on March 26, 2020
:pg2
is a mostly unknown, but powerful Erlang module. It provides an API for creating process groups.
Process Group
So, what's a process group? Well... it's a group of Erlang/Elixir processes.
Perhaps, the correct question would be, why do we care about process groups? Well, process groups are the foundation for publisher-subscribers (pubsubs for short).
PG2
Understanding :pg2
API and how it relates to a pubsub API will make it easier to understand:
- Every process group is a
channel
e.g. a group called:my_channel
is created:
iex> :pg2.create(:my_channel)
:ok
- Every process in a group is a
subscriber
e.g.self()
is part of:my_channel
group:
iex> :pg2.join(:my_channel, self())
:ok
- A
publisher
cansend/2
messages to achannel
e.g. the publisher gets all the members of the group:my_channel
and sends"Some message"
:
iex> members = :pg2.get_members(:my_channel)
:ok
iex> for member <- members, do: send(member, "Some message")
- A
subscriber
will receive the messages in its mailbox:
iex> flush()
"Some message"
:ok
- A
subscriber
can unsubscribe from achannel
e.g.self()
leaves the group:my_channel
:
iex> :pg2.leave(:my_channel, self())
:ok
- A
channel
can be deleted:
iex> :pg2.delete(:my_channel)
:ok
And that's it! That's the API. And you know what's the best thing about it? It can work between connected nodes. Keep reading and you'll see :)
Implementing a PubSub
A PubSub
has three main functions:
-
subscribe/1
for subscribing to achannel
:
def subscribe(channel) do
pid = self()
case :pg2.get_members(channel) do
members when is_list(members) ->
if pid in members do
:ok # It's already subscribed.
else
:pg2.join(channel, pid) # Subscribes to channel
end
{:error, {:no_such_group, ^channel}} ->
:pg2.create(channel) # Creates channel
:pg2.join(channel, pid) # Subscribe to channel
end
end
-
unsubscribe/1
for unsubscribing from achannel
.
def unsubscribe(channel) do
pid = self()
case :pg2.get_members(channel) do
[^pid] ->
:pg2.leave(channel, pid) # Unsubscribes from channel
:pg2.delete(channel) # Deletes the channel
members when is_list(members) ->
if pid in members do
:pg2.leave(channel, pid) # Unsubscribes from channel
else
:ok # It's already unsubscribed
end
_ ->
:ok
end
end
-
publish/2
for sending amessage
to achannel
.
def publish(channel, message) do
case :pg2.get_members(channel) do
[_ | _] = members ->
for member <- members, do: send(member, message)
:ok
_ ->
:ok
end
end
For a full implementation of PubSub
you can check this gist.
I usually create a .iex.exs
file in my $HOME
folder and then run iex
. You could do the same with the previous gist by doing the following:
~ $ PUBSUB="https://gist.githubusercontent.com/alexdesousa/4d592fe206cca17393affaefa4c8fd33/raw/4d84894f016bd9eef84bba647c77c62b9c9a6094/pub_sub.ex"
~ $ curl "$PUBSUB" -o .iex.exs
~ $ iex
Distributed PubSub
For our distributed experiment we'll need two nodes. My machine is called matrix
and both nodes will be neo
and trinity
respectively:
-
:neo@matrix
:
alex@matrix ~ $ iex --sname neo
iex(neo@matrix)1>
-
:trinity@matrix
:
alex@matrix ~ $ iex --sname trinity
iex(trinity@matrix)1> Node.connect(:neo@matrix) # Connects both nodes
Now :neo@matrix
can subscribe to :mainframe
channel:
iex(neo@matrix)1> PubSub.subscribe(:mainframe)
:ok
And :trinity@matrix
can send a message:
iex(trinity@matrix)2> PubSub.publish(:mainframe, "Wake up, Neo...")
:ok
Note: Sometimes it takes a bit of time for nodes to synchronize their process groups, so you might need to
publish/2
your message twice.
Finally, :neo@matrix
should receive the message:
iex(neo@matrix)2> flush()
"Wake up, Neo..."
:ok
And that's it. A powerful pubsub in a few lines of code thanks to :pg2
.
Conclusion
Erlang has several built-in hidden gems like :pg2
that make our lives easier.
Happy coding!
Cover image by Nicolas Picard
Posted on March 26, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.