Async Elixir Telemetry

christianalexander

Christian Alexander

Posted on February 22, 2024

Async Elixir Telemetry

Learn how to use Telemetry and GenServer in Elixir to wire up analytics without sacrificing app responsiveness.


One of the most fantastic abstractions used in many Elixir applications is provided by the Telemetry library. It allows libraries to emit arbitrary events that other code can subscribe to. This is similar to a Node.js event emitter, but make it global.

While this doesn't seem very important at first glance, the power of Telemetry comes from its vast adoption. Packages such as Phoenix (the popular web framework), Oban (the popular background job framework), and Finch (the HTTP request library used by Req) all emit events through Telemetry.

Typically, Telemetry is used to publish metrics through systems like Prometheus, StatsD, and CloudWatch. These solutions often rely on the telemetry_metrics library and consume numeric values included in many telemetry events.

Telemetry also tends to be used for request logging, since every Phoenix request emits a series of events containing the requested path, method, IP, and response status code. It's a really convenient way to plug in logging services without altering router and controller code.

The most important limitation to consider when using telemetry can be found in the readme:

The handle_event callback of each handler is invoked synchronously on each telemetry:execute call. Therefore, it is extremely important to avoid blocking operations. If you need to perform any action that is not immediate, consider offloading the work to a separate process (or a pool of processes) by sending a message.

This brings me to the topic of today's post, but first I want to set some context.

Intent: Capture Product Analytics

Recently I've been working on a lightweight sprint point estimation tool using Phoenix LiveView. This tool is called Sprinty Points, and I think most agile teams will like it.

Sprinty Points doesn't yet have a database. Every user is anonymous and session state lives in GenServers. This has allowed me to build a very performant prototype really quickly without worrying about schema evolution.

Unfortunately, this architecture makes it hard for me to keep track of user metrics and the growth of the application. Without a users table, it's hard for me to know how many users I have. Without an events or sessions table, it's hard for me to get an estimate on usage.

To get some very basic metrics, I looked around and found an analytics tool with a generous free tier. It's called PostHog (not sponsored, just a fan). They have a very simple HTTP API through which I can post user registrations and custom events. These events can be turned into dashboards.

Since I wasn't very attached to any specific analytics application, I decided to not directly call their API from my handler. Instead, I added some custom telemetry events to a few key controllers and wired up a custom Telemetry handler to send the events out.

Sample Code, Synchronous Approach

The first approach I took involved three parts: create a simple PostHog client module, wire up a simple Telemetry handler, and start emitting events from Phoenix controllers.

PostHog Client Module

The client module is a basic wrapper around their capture endpoint, using Req for HTTP and an environment variable for the API key.

defmodule Posthog do
  require Logger

  @base_url "https://app.posthog.com"

  def capture(event, distinct_id, properties \\ %{}) do
    api_key = api_key()
    body = %{event: event, distinct_id: distinct_id, api_key: api_key, properties: properties}

    if is_nil(api_key) do
      Logger.info("Posthog API key not set, skipping event: #{Jason.encode!(body)}")
      {:ok, nil}
    else
      base_request()
      |> Req.post(url: "/capture", json: body)
      |> transform_response()
    end
  end

  def capture_batch(entries) do
    api_key = api_key()
    body = %{batch: entries, api_key: api_key}

    if is_nil(api_key) do
      Logger.info("Posthog API key not set, skipping batch: #{Jason.encode!(body)}")
      {:ok, nil}
    else
      base_request()
      |> Req.post(url: "/capture", json: body)
      |> transform_response()
    end
  end

  defp base_request do
    Req.new(base_url: base_url())
  end

  defp transform_response({:ok, %{status: status} = response}) when status >= 400 do
    {:error, response}
  end

  defp transform_response(response), do: response

  defp api_key do
    Application.get_env(:points, :posthog_api_key)
  end

  defp base_url do
    case Application.get_env(:points, :posthog_api_url) do
      nil -> @base_url
      url -> url
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

The Synchronous Telemetry Handler

This handler just passes whatever is sent under the [:posthog, :event] key directly to the capture method.

defmodule Points.TelemetryHandlers.SimplePosthog do
  def attach() do
    :telemetry.attach("posthog-events", [:posthog, :event], &__MODULE__.handle_event/4)
  end

  def detach() do
    :telemetry.detach("posthog-events")
  end

  def handle_event([:posthog, :event], measure, _meta, config) do
    Posthog.capture(measure)
  end
end
Enter fullscreen mode Exit fullscreen mode

To register the handler, I just called Points.TelemetryHandlers.SimplePosthog.attach() in the initialization method of my application.

Sending an Event

With the plumbing in place, all I had to do was send a Telemetry event under the expected key.

Here's a snippet from the controller that creates a new session:

defmodule PointsWeb.SessionController do
  alias Points.SessionServer
  use PointsWeb, :controller

  def new(conn, params) do
    session_id = generate_new_session_id()
    SessionServer.ensure_started(session_id)

    :telemetry.execute([:posthog, :event], %{
      event: "session_created",
      distinct_id: conn.assigns.current_user.id,
      properties: %{
        "session_id" => session_id
      }
    })

    redirect(conn, to: ~p"/sessions/#{session_id}")
  end
end
Enter fullscreen mode Exit fullscreen mode

The value passed as the second argument to :telemetry.execute makes it to PostHog.

Reflecting on the Approach

The biggest problem with this naive solution is that the :telemetry.execute call caused a meaningful, inconsistent delay every time a user created a session. This is because the application had to wait for the registered event handler to execute before the application could proceed, and the event handler made a blocking call to an external service.

Clearly, there must be a better way.

Making it Better: Async

As with many other problems faced in Elixir, the solution may involve a GenServer. Before I get into that, I want to make an important note: I didn't reach for a GenServer until I found that I had a need for a background process with persistent state. It's valuable to try a simple approach before introducing another process to the supervision tree.

Knowing that the synchronous event handler caused a real delay in the application, I chose to follow the advice from the Telemetry readme:

If you need to perform any action that is not immediate, consider offloading the work to a separate process (or a pool of processes) by sending a message.

The new architecture includes a handler that stores the event in memory and periodically flushes its received events to PostHog. Initially, I was going to directly use GenServer state to temporarily store events, but the handle_event method is called by the process that invokes :telemetry.execute. To get this event into the GenServer state, I'd have to use GenServer.cast and cause a bottleneck in the GenServer's mailbox.

Instead of GenServer state, I realized a write-optimized Ets table would be a more appropriate temporary storage location. This is natively implemented code within the BEAM virtual machine and scales much better than a single process.

The new Telemetry handler can be found below. To use it, I added Points.TelemetryReporters.Posthog to my application supervision tree. When it starts up, it attaches to the existing Telemetry event key and goes to work.

defmodule Points.TelemetryReporters.Posthog do
  use GenServer, shutdown: 2_000

  require Logger

  @default_publish_interval 10_000

  def start_link(opts) when is_list(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @impl GenServer
  def init(opts) do
    name = opts[:name]
    table_id = create_table(String.to_atom("#{name}.Ets"))

    Process.flag(:trap_exit, true)

    attach(table_id)

    Logger.info("#{name} Started")

    {:ok, {name, table_id}, @default_publish_interval}
  end

  @impl GenServer
  def handle_info(:timeout, {name, table_id} = state) do
    now = DateTime.to_unix(DateTime.utc_now(), :millisecond)

    events = :ets.select(table_id, [{{:"$1", :"$2"}, [{:<, :"$1", now}], [:"$2"]}])

    if length(events) > 0 do
      case Posthog.capture_batch(events) do
        {:ok, _} ->
          :ets.select_delete(table_id, [{{:"$1", :"$2"}, [{:<, :"$1", now}], [true]}])

        {:error, error} ->
          Logger.error("[#{name}] Failed to flush events to Posthog: #{inspect(error)}")
      end
    end

    {:noreply, state, @default_publish_interval}
  end

  @impl GenServer
  def terminate(reason, {name, table_id}) do
    Logger.warning("[#{name}] Stopped with reason #{inspect(reason)}")
    detach()

    Logger.info("[#{name}] Flushing final events to Posthog")
    events = :ets.select(table_id, [{{:"$1", :"$2"}, [], [:"$2"]}])

    if length(events) > 0 do
      case Posthog.capture_batch(events) do
        {:ok, _} -> Logger.info("[#{name}] Final events flushed to Posthog")
        {:error, _} -> Logger.error("[#{name}] Failed to flush final events to Posthog")
      end
    end

    :ets.delete(table_id)
  end

  def attach(table_id) do
    :telemetry.attach("posthog-events", [:posthog, :event], &__MODULE__.handle_event/4,
      table_id: table_id
    )
  end

  def detach() do
    :telemetry.detach("posthog-events")
  end

  def handle_event([:posthog, :event], measure, _meta, config) do
    table_id = config[:table_id]
    :ets.insert(table_id, {DateTime.to_unix(DateTime.utc_now(), :millisecond), measure})
  end

  defp create_table(name) do
    :ets.new(name, [:named_table, :duplicate_bag, :public, {:write_concurrency, true}])
  end
end
Enter fullscreen mode Exit fullscreen mode

Every ten seconds, this process checks the Ets table for new events and conditionally posts them to Posthog as a batch operation. This makes heavy use of the third response value of a GenServer lifecycle method: the timeout. After some specified number of milliseconds passes, if no other event is processed by the GenServer, the handle_info method is called with a message of :timeout. It's a great way to create a background loop in an Elixir application.

To avoid losing events when the server shuts down, exits are trapped and the terminate callback makes a final attempt to send events out. The shutdown value of 2_000 indicates that the process wants two seconds to perform its cleanup task. This behavior is described well in the Elixir docs.

Outcome

Now that events are buffered in memory through a write-optimized Ets table, I don't have to worry about the latency penalty of a blocking network call. The real beauty of this approach is that I didn't have to change any of my :telemetry.execute calls to get this performance improvement. Thanks to the Telemetry abstraction, it just works.

I'm able to measure the growth of the application I'm working on and may be motivated to continue working on it—knowing that it has a healthy user base that comes back sprint after sprint.

💖 💪 🙅 🚩
christianalexander
Christian Alexander

Posted on February 22, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Async Elixir Telemetry
elixir Async Elixir Telemetry

February 22, 2024