Acme Pub Sub, an Elixir exercise

marciol

Marcio Lopes de Faria

Posted on August 23, 2021

Acme Pub Sub, an Elixir exercise

Recently I was rejected by a company that uses Elixir ostensively after expending a lot of energy engaging in the interview process, so I was thinking of ways to get something positive. I completed a specific exercise, but one hour is a small time-box to do something solid, and I think that the way I finished it was not what they expected, so I'm here learning in public, attempting a different approach.

The exercise mandates to transform the echo server https://elixir-lang.org/getting-started/mix-otp/task-and-gen-tcp.html#echo-server, into a broadcast pub-sub server.

My first approach was to make use of an Agent to maintain all accepted sockets. When some server receives a message from one client, it iterates over all other accepted sockets persisted on the Agent, sending each message back for each client socket. Pretty Simple!

This are all the Snippets of changes made to make this first attempt work.

# in lib/acme_pub_sub/application.ex
  def start(_type, _args) do
    port = String.to_integer(System.get_env("PORT") || "4040")

    children = [
      {Task.Supervisor, name: AcmePubSub.TaskSupervisor},
      %{
        id: AcmePubSub.ClientStorage,
        start: {Agent, :start_link, [fn -> [] end, [name: AcmePubSub.ClientStorage]]}
      }, # An supervised Agent was included to hold all clients
      Supervisor.child_spec({Task, fn -> AcmePubSub.accept(port) end}, restart: :permanent)
    ]

    opts = [strategy: :one_for_one, name: AcmePubSub.Supervisor]
    Supervisor.start_link(children, opts)
  end

# in lib/acme_pub_sub.ex

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    Logger.info("Accepted new connection")

    {:ok, pid} = Task.Supervisor.start_child(AcmePubSub.TaskSupervisor, fn -> serve(client) end)

    # All clients are stored here.
    Agent.update(
      AcmePubSub.ClientStorage, 
      fn clients -> 
        [client | clients] 
      end
    )

    :ok = :gen_tcp.controlling_process(client, pid)

    loop_acceptor(socket)
  end

  defp serve(socket) do
    received_message = read_line(socket)
    Logger.info("received message #{received_message}")

    # get all clients
    clients = Agent.get(
      AcmePubSub.ClientStorage, 
      &Function.identity/1
    )

    # iterate over all clients, broadcasting all messages
    for client <- clients,
        socket != client do
      write_line(received_message, client)
    end
  after
    serve(socket)
  end
Enter fullscreen mode Exit fullscreen mode

This first version is on github under the first-version branch:

https://github.com/marciol/acme_pub_sub/tree/first-version

A better approach

But it'd be better to encapsulate the access to client sockets so that only the specific process can read and write to their sockets.

An aside note: Writing tests for this specific kind of application is challenging, because of the need to ensure that all receiving test clients are set up just before starting to broadcast. Thanks to Elixir Tasks it is possible to start all receiving clients asynchronously and get all received messages directly on the caller test process, and with a little trick, get all moving parts synchronized.

So, with this working solution, it'd be nice to make it even better, establishing a way to broadcast messages without writing directly on those sockets.

One interesting approach would be to separate two kinds of servers, an input server responsible for listening to all received messages and dispatch them to the correspondent output servers.

The input server must know about the corresponding output server, so that if the client closes the socket, it sends a close signal to output server.

# in lib/acme_pub_sub.ex

  defp loop_acceptor(socket) do
    {:ok, client_socket} = :gen_tcp.accept(socket)
    Logger.info("Accepted new connection")

    # the output server task
    {:ok, output_pid} =
      Task.Supervisor.start_child(
        AcmePubSub.TaskSupervisor,
        fn ->
          output_server(client_socket)
        end
      )

    # the input server task
    {:ok, input_pid} =
      Task.Supervisor.start_child(
        AcmePubSub.TaskSupervisor,
        fn ->
          input_server({client_socket, output_pid})
        end
      )

    # all connected input and output corresponding clients
    Agent.update(
      AcmePubSub.ConnectedClients,
      fn clients ->
        [{input_pid, output_pid} | clients]
      end
    )

    loop_acceptor(socket)
  end

  defp input_server({client_socket, output_pid}) do
    case read_line(client_socket) do
      {:ok, data} ->
        # it will dipatch the message to all clients
        dispatch(data)
        input_server({client_socket, output_pid})

      {:error, :closed} ->
        # when closed, it will remove the client tasks
        Agent.update(
          AcmePubSub.ConnectedClients,
          fn clients ->
            List.delete(clients, {self(), output_pid})
          end
        )

        # it will send the close signal to the
        # corresponding output server
        send(output_pid, :close)

        :ok
    end

  # dispatching function
  defp dispatch(data) do
    clients = Agent.get(AcmePubSub.ConnectedClients, &Function.identity/1)

    # for each input/output process, different from itself
    # it will dispatch the message
    for {input_pid, output_pid} <- clients,
        input_pid != self() do
      send(output_pid, data)
    end
  end


  # the output server
  defp output_server(client_socket) do
    receive do
      # it will close if the input server was closed
      :close ->
        :ok

      # receive the dispatched message, 
      # sending out to client
      data ->
        write_line(data, client_socket)
        output_server(client_socket)
    end
  end
Enter fullscreen mode Exit fullscreen mode

One alternative approach could be to encapsulate all this stuff in a GenServer, but it's an exercise for the reader, but I like this last solution because it shows how much it's possible with the Elixir primitives, Agent and Tasks.

The final solution is on the main branch of this repo:

https://github.com/marciol/acme_pub_sub

💖 💪 🙅 🚩
marciol
Marcio Lopes de Faria

Posted on August 23, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Acme Pub Sub, an Elixir exercise
elixir Acme Pub Sub, an Elixir exercise

August 23, 2021