Elixir Streams, Elasticsearch, & AWS S3

lob

Lob

Posted on August 1, 2022

Elixir Streams, Elasticsearch, & AWS S3

As Staff Engineer on Lob’s Address Verification team, I was recently tasked with standing up an endpoint for customers that supplied them with all the data we had stored for a particular zip code. We use Elasticsearch pretty heavily to store our data so this boiled down to querying ES by zip code and writing the data to Amazon S3. But both services have subtle limitations on how you read/write them: ES caps your result set to 10,000 records, and Amazon S3 insists that you write to your buckets in chunks of at least 4 Megabytes. This turned out to be a perfect use case for Elixir streams!

Elixir Streams

Before we dive into the nitty gritty of Elasticsearch and AWS S3, let’s first do a brief refresher on Elixir Streams.

What is a Stream? A Stream is a composable enumerable that is computed lazily. This is typically accomplished by maintaining a little bit of state that describes where you currently are in your enumerable and a method of computing the next item. A trivial example would be a Stream of natural numbers:

The gory details of how this is implemented under the hood is the topic for another day. You rarely need to get that low level — the Stream library has a wealth of functions similar to Enum that you can use as building blocks to construct your own Streams. And several Elixir library functions natively return streams: IO.stream will turn a file (or other source) into a Stream allowing you to process very large files without having to read them into memory first in their entirety. The lazy aspect of Streams means that the minimum amount of the Stream is computed that’s needed to generate your answer.

Imagine we want to find the 100th natural number that is palindromic and also divisible by 3:

What I find elegant (and efficient) about this solution, is that no number beyond the answer, 20202, will be computed. Neat!

If you’d like to dive in deeper, there’s a great section on the Elixir language website. The API documentation is also chock full of instructive examples.

Elasticsearch

Now onto Elasticsearch. The first thing to know is that ES caps the size of the result set at 10k items. After scrutinizing the documentation, it became clear that the original method that popped up on my Stack Overflow (Scroll API) had been deprecated and the new approved method was to use PointInTime IDs. PIT IDs essentially give you a snapshot of your index at one point in time so that you get a consistent view of your search results across multiple queries. The process is:

  • acquire a PIT for your index
  • perform your queries using the PIT (without the index — it’s now implicit in the PID)
  • delete the PIT

That last step is crucial — PITs are extremely resource hungry so it is vital that you delete them as soon as you’re done. I borked our staging ES in my early experiments with PITs because I wasn’t deleting them and the expiration parameter seemed to have no effect.

The other thing we’d like to do is abstract away the underlying PIT and paging mechanism for the end user so we provide an interface that just takes an ES query and generates a Stream of hits. Let’s start by creating and deleting the PID.

*Creating the PID is just a matter of requesting a PID associated with the index that we’re interested in running queries against.

Deleting a PID is a simple HTTP delete except that the PIDs are huge so you need to supply them in the body and not as URL params. Some HTTP libraries won’t let you do this. The HTTP spec is a little muddy on the matter and last I checked on Stack Overflow there was a healthy debate on the subject. This is why we use HTTPoison — it allows payloads on DELETE requests.

Now that the PIDs are sorted out, our next order of business is figuring out how to leverage them in our queries. Our basic query is enhanced with three extra parameters:

  • size of 10,000 (the maximum allowed by elastic search)
  • pit, a hash that contains %{id: <id>, keep_alive: “1m”}
  • sort, a hash that contains %{_shard_doc: “desc”}

(For sort, you need to provide something and _shard_doc is baked into every Elastic search index so that’s nice and portable.)

With our basic query down, we can focus on how to generate the sequence of queries that will extract all the items that satisfy our search criteria. The trick here is that we feed into the next query the value of our sort field from the last hit of the previous result, like so:

Armed with these two query-generating functions, we can code up Stream.iterator that will pull all the results out of our desired index:

We need a high-level function that takes the results from this function and produces a Stream of hits. But there’s a wrinkle — we have to delete the PIT when we are done with the Stream. But how will we know? The solution is to pass in a consumer that consumes the Stream and then we delete the PIT afterwards, like so:

But if our query returns less than 10k results, we don’t need the whole PID/sort/search_after machinery — one query will do the trick.

Now we need a top-level function that query the size of the result and chooses between stream_one or stream_many depending on the value, like so:

AWS S3

The library we use to access Amazon S3 is ex_aws_s3. It handles all the low-level details for us but it does have one requirement —the input data stream must be chunks of at least 4 Mbytes. To accomplish this, we use the Stream function chunk_while. This takes four inputs:

  • the enumerable to be chunked
  • the initial value of the accumulator
  • the step function
  • a finalizer function

Our first decision is what should the accumulator look like that gets passed forward by the step function? It obviously should contain the list of the items in the current chunk. But more subtly, it should also contain the size of the current chunk so we don’t waste resources recomputing it each time. So that gives us a two-element tuple containing a list and an integer.

Next, we turn our attention to the step function. It should check if the current size is greater than or equal to the desired chunk size. If it is, we should take the list of items from the accumulator and convert them into a chunk using convert; if not, we should add it to the current chunk (and update the size) using add_chunk.

What should add_chunk do? Just push the item onto the front of a list and increase the value of size by the current chunk’s size.

The behaviour of convert depends on whether we care about the order of the items in the chunk being preserved in the output because the items in the list will be in reverse order so need to be reversed. But if we don’t care, we can skip that transformation. Putting this all together gives us:

We can now combine all the components we have written to produce a simple controller action that requests an output stream of hits from our Elasticsearch section into 4M chunks that will satisfy the requirement of the ExAWS.S3 module:

And that wraps up our exploration of Elixir streams, Elasticsearch, and AWS S3. Hope you enjoyed it!

💖 💪 🙅 🚩
lob
Lob

Posted on August 1, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related