Async Elixir Telemetry
Christian Alexander
Posted on February 22, 2024
Learn how to use Telemetry and GenServer in Elixir to wire up analytics without sacrificing app responsiveness.
One of the most fantastic abstractions used in many Elixir applications is provided by the Telemetry library. It allows libraries to emit arbitrary events that other code can subscribe to. This is similar to a Node.js event emitter, but make it global.
While this doesn't seem very important at first glance, the power of Telemetry comes from its vast adoption. Packages such as Phoenix (the popular web framework), Oban (the popular background job framework), and Finch (the HTTP request library used by Req) all emit events through Telemetry.
Typically, Telemetry is used to publish metrics through systems like Prometheus, StatsD, and CloudWatch. These solutions often rely on the telemetry_metrics library and consume numeric values included in many telemetry events.
Telemetry also tends to be used for request logging, since every Phoenix request emits a series of events containing the requested path, method, IP, and response status code. It's a really convenient way to plug in logging services without altering router and controller code.
The most important limitation to consider when using telemetry can be found in the readme:
The
handle_event
callback of each handler is invoked synchronously on eachtelemetry:execute
call. Therefore, it is extremely important to avoid blocking operations. If you need to perform any action that is not immediate, consider offloading the work to a separate process (or a pool of processes) by sending a message.
This brings me to the topic of today's post, but first I want to set some context.
Intent: Capture Product Analytics
Recently I've been working on a lightweight sprint point estimation tool using Phoenix LiveView. This tool is called Sprinty Points, and I think most agile teams will like it.
Sprinty Points doesn't yet have a database. Every user is anonymous and session state lives in GenServers. This has allowed me to build a very performant prototype really quickly without worrying about schema evolution.
Unfortunately, this architecture makes it hard for me to keep track of user metrics and the growth of the application. Without a users
table, it's hard for me to know how many users I have. Without an events
or sessions
table, it's hard for me to get an estimate on usage.
To get some very basic metrics, I looked around and found an analytics tool with a generous free tier. It's called PostHog (not sponsored, just a fan). They have a very simple HTTP API through which I can post user registrations and custom events. These events can be turned into dashboards.
Since I wasn't very attached to any specific analytics application, I decided to not directly call their API from my handler. Instead, I added some custom telemetry events to a few key controllers and wired up a custom Telemetry handler to send the events out.
Sample Code, Synchronous Approach
The first approach I took involved three parts: create a simple PostHog client module, wire up a simple Telemetry handler, and start emitting events from Phoenix controllers.
PostHog Client Module
The client module is a basic wrapper around their capture endpoint, using Req for HTTP and an environment variable for the API key.
defmodule Posthog do
require Logger
@base_url "https://app.posthog.com"
def capture(event, distinct_id, properties \\ %{}) do
api_key = api_key()
body = %{event: event, distinct_id: distinct_id, api_key: api_key, properties: properties}
if is_nil(api_key) do
Logger.info("Posthog API key not set, skipping event: #{Jason.encode!(body)}")
{:ok, nil}
else
base_request()
|> Req.post(url: "/capture", json: body)
|> transform_response()
end
end
def capture_batch(entries) do
api_key = api_key()
body = %{batch: entries, api_key: api_key}
if is_nil(api_key) do
Logger.info("Posthog API key not set, skipping batch: #{Jason.encode!(body)}")
{:ok, nil}
else
base_request()
|> Req.post(url: "/capture", json: body)
|> transform_response()
end
end
defp base_request do
Req.new(base_url: base_url())
end
defp transform_response({:ok, %{status: status} = response}) when status >= 400 do
{:error, response}
end
defp transform_response(response), do: response
defp api_key do
Application.get_env(:points, :posthog_api_key)
end
defp base_url do
case Application.get_env(:points, :posthog_api_url) do
nil -> @base_url
url -> url
end
end
end
The Synchronous Telemetry Handler
This handler just passes whatever is sent under the [:posthog, :event]
key directly to the capture method.
defmodule Points.TelemetryHandlers.SimplePosthog do
def attach() do
:telemetry.attach("posthog-events", [:posthog, :event], &__MODULE__.handle_event/4)
end
def detach() do
:telemetry.detach("posthog-events")
end
def handle_event([:posthog, :event], measure, _meta, config) do
Posthog.capture(measure)
end
end
To register the handler, I just called Points.TelemetryHandlers.SimplePosthog.attach()
in the initialization method of my application.
Sending an Event
With the plumbing in place, all I had to do was send a Telemetry event under the expected key.
Here's a snippet from the controller that creates a new session:
defmodule PointsWeb.SessionController do
alias Points.SessionServer
use PointsWeb, :controller
def new(conn, params) do
session_id = generate_new_session_id()
SessionServer.ensure_started(session_id)
:telemetry.execute([:posthog, :event], %{
event: "session_created",
distinct_id: conn.assigns.current_user.id,
properties: %{
"session_id" => session_id
}
})
redirect(conn, to: ~p"/sessions/#{session_id}")
end
end
The value passed as the second argument to :telemetry.execute
makes it to PostHog.
Reflecting on the Approach
The biggest problem with this naive solution is that the :telemetry.execute
call caused a meaningful, inconsistent delay every time a user created a session. This is because the application had to wait for the registered event handler to execute before the application could proceed, and the event handler made a blocking call to an external service.
Clearly, there must be a better way.
Making it Better: Async
As with many other problems faced in Elixir, the solution may involve a GenServer. Before I get into that, I want to make an important note: I didn't reach for a GenServer until I found that I had a need for a background process with persistent state. It's valuable to try a simple approach before introducing another process to the supervision tree.
Knowing that the synchronous event handler caused a real delay in the application, I chose to follow the advice from the Telemetry readme:
If you need to perform any action that is not immediate, consider offloading the work to a separate process (or a pool of processes) by sending a message.
The new architecture includes a handler that stores the event in memory and periodically flushes its received events to PostHog. Initially, I was going to directly use GenServer state to temporarily store events, but the handle_event
method is called by the process that invokes :telemetry.execute
. To get this event into the GenServer state, I'd have to use GenServer.cast
and cause a bottleneck in the GenServer's mailbox.
Instead of GenServer state, I realized a write-optimized Ets table would be a more appropriate temporary storage location. This is natively implemented code within the BEAM virtual machine and scales much better than a single process.
The new Telemetry handler can be found below. To use it, I added Points.TelemetryReporters.Posthog
to my application supervision tree. When it starts up, it attaches to the existing Telemetry event key and goes to work.
defmodule Points.TelemetryReporters.Posthog do
use GenServer, shutdown: 2_000
require Logger
@default_publish_interval 10_000
def start_link(opts) when is_list(opts) do
GenServer.start_link(__MODULE__, opts)
end
@impl GenServer
def init(opts) do
name = opts[:name]
table_id = create_table(String.to_atom("#{name}.Ets"))
Process.flag(:trap_exit, true)
attach(table_id)
Logger.info("#{name} Started")
{:ok, {name, table_id}, @default_publish_interval}
end
@impl GenServer
def handle_info(:timeout, {name, table_id} = state) do
now = DateTime.to_unix(DateTime.utc_now(), :millisecond)
events = :ets.select(table_id, [{{:"$1", :"$2"}, [{:<, :"$1", now}], [:"$2"]}])
if length(events) > 0 do
case Posthog.capture_batch(events) do
{:ok, _} ->
:ets.select_delete(table_id, [{{:"$1", :"$2"}, [{:<, :"$1", now}], [true]}])
{:error, error} ->
Logger.error("[#{name}] Failed to flush events to Posthog: #{inspect(error)}")
end
end
{:noreply, state, @default_publish_interval}
end
@impl GenServer
def terminate(reason, {name, table_id}) do
Logger.warning("[#{name}] Stopped with reason #{inspect(reason)}")
detach()
Logger.info("[#{name}] Flushing final events to Posthog")
events = :ets.select(table_id, [{{:"$1", :"$2"}, [], [:"$2"]}])
if length(events) > 0 do
case Posthog.capture_batch(events) do
{:ok, _} -> Logger.info("[#{name}] Final events flushed to Posthog")
{:error, _} -> Logger.error("[#{name}] Failed to flush final events to Posthog")
end
end
:ets.delete(table_id)
end
def attach(table_id) do
:telemetry.attach("posthog-events", [:posthog, :event], &__MODULE__.handle_event/4,
table_id: table_id
)
end
def detach() do
:telemetry.detach("posthog-events")
end
def handle_event([:posthog, :event], measure, _meta, config) do
table_id = config[:table_id]
:ets.insert(table_id, {DateTime.to_unix(DateTime.utc_now(), :millisecond), measure})
end
defp create_table(name) do
:ets.new(name, [:named_table, :duplicate_bag, :public, {:write_concurrency, true}])
end
end
Every ten seconds, this process checks the Ets table for new events and conditionally posts them to Posthog as a batch operation. This makes heavy use of the third response value of a GenServer lifecycle method: the timeout. After some specified number of milliseconds passes, if no other event is processed by the GenServer, the handle_info
method is called with a message of :timeout
. It's a great way to create a background loop in an Elixir application.
To avoid losing events when the server shuts down, exits are trapped and the terminate
callback makes a final attempt to send events out. The shutdown
value of 2_000
indicates that the process wants two seconds to perform its cleanup task. This behavior is described well in the Elixir docs.
Outcome
Now that events are buffered in memory through a write-optimized Ets table, I don't have to worry about the latency penalty of a blocking network call. The real beauty of this approach is that I didn't have to change any of my :telemetry.execute
calls to get this performance improvement. Thanks to the Telemetry abstraction, it just works.
I'm able to measure the growth of the application I'm working on and may be motivated to continue working on it—knowing that it has a healthy user base that comes back sprint after sprint.
Posted on February 22, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.