Acme Pub Sub, an Elixir exercise
Marcio Lopes de Faria
Posted on August 23, 2021
Recently I was rejected by a company that uses Elixir ostensively after expending a lot of energy engaging in the interview process, so I was thinking of ways to get something positive. I completed a specific exercise, but one hour is a small time-box to do something solid, and I think that the way I finished it was not what they expected, so I'm here learning in public, attempting a different approach.
The exercise mandates to transform the echo server https://elixir-lang.org/getting-started/mix-otp/task-and-gen-tcp.html#echo-server, into a broadcast pub-sub server.
My first approach was to make use of an Agent to maintain all accepted sockets. When some server receives a message from one client, it iterates over all other accepted sockets persisted on the Agent, sending each message back for each client socket. Pretty Simple!
This are all the Snippets of changes made to make this first attempt work.
# in lib/acme_pub_sub/application.ex
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: AcmePubSub.TaskSupervisor},
%{
id: AcmePubSub.ClientStorage,
start: {Agent, :start_link, [fn -> [] end, [name: AcmePubSub.ClientStorage]]}
}, # An supervised Agent was included to hold all clients
Supervisor.child_spec({Task, fn -> AcmePubSub.accept(port) end}, restart: :permanent)
]
opts = [strategy: :one_for_one, name: AcmePubSub.Supervisor]
Supervisor.start_link(children, opts)
end
# in lib/acme_pub_sub.ex
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Logger.info("Accepted new connection")
{:ok, pid} = Task.Supervisor.start_child(AcmePubSub.TaskSupervisor, fn -> serve(client) end)
# All clients are stored here.
Agent.update(
AcmePubSub.ClientStorage,
fn clients ->
[client | clients]
end
)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
defp serve(socket) do
received_message = read_line(socket)
Logger.info("received message #{received_message}")
# get all clients
clients = Agent.get(
AcmePubSub.ClientStorage,
&Function.identity/1
)
# iterate over all clients, broadcasting all messages
for client <- clients,
socket != client do
write_line(received_message, client)
end
after
serve(socket)
end
This first version is on github under the first-version branch:
https://github.com/marciol/acme_pub_sub/tree/first-version
A better approach
But it'd be better to encapsulate the access to client sockets so that only the specific process can read and write to their sockets.
An aside note: Writing tests for this specific kind of application is challenging, because of the need to ensure that all receiving test clients are set up just before starting to broadcast. Thanks to Elixir Tasks it is possible to start all receiving clients asynchronously and get all received messages directly on the caller test process, and with a little trick, get all moving parts synchronized.
So, with this working solution, it'd be nice to make it even better, establishing a way to broadcast messages without writing directly on those sockets.
One interesting approach would be to separate two kinds of servers, an input server responsible for listening to all received messages and dispatch them to the correspondent output servers.
The input server must know about the corresponding output server, so that if the client closes the socket, it sends a close signal to output server.
# in lib/acme_pub_sub.ex
defp loop_acceptor(socket) do
{:ok, client_socket} = :gen_tcp.accept(socket)
Logger.info("Accepted new connection")
# the output server task
{:ok, output_pid} =
Task.Supervisor.start_child(
AcmePubSub.TaskSupervisor,
fn ->
output_server(client_socket)
end
)
# the input server task
{:ok, input_pid} =
Task.Supervisor.start_child(
AcmePubSub.TaskSupervisor,
fn ->
input_server({client_socket, output_pid})
end
)
# all connected input and output corresponding clients
Agent.update(
AcmePubSub.ConnectedClients,
fn clients ->
[{input_pid, output_pid} | clients]
end
)
loop_acceptor(socket)
end
defp input_server({client_socket, output_pid}) do
case read_line(client_socket) do
{:ok, data} ->
# it will dipatch the message to all clients
dispatch(data)
input_server({client_socket, output_pid})
{:error, :closed} ->
# when closed, it will remove the client tasks
Agent.update(
AcmePubSub.ConnectedClients,
fn clients ->
List.delete(clients, {self(), output_pid})
end
)
# it will send the close signal to the
# corresponding output server
send(output_pid, :close)
:ok
end
# dispatching function
defp dispatch(data) do
clients = Agent.get(AcmePubSub.ConnectedClients, &Function.identity/1)
# for each input/output process, different from itself
# it will dispatch the message
for {input_pid, output_pid} <- clients,
input_pid != self() do
send(output_pid, data)
end
end
# the output server
defp output_server(client_socket) do
receive do
# it will close if the input server was closed
:close ->
:ok
# receive the dispatched message,
# sending out to client
data ->
write_line(data, client_socket)
output_server(client_socket)
end
end
One alternative approach could be to encapsulate all this stuff in a GenServer, but it's an exercise for the reader, but I like this last solution because it shows how much it's possible with the Elixir primitives, Agent and Tasks.
The final solution is on the main branch of this repo:
Posted on August 23, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.