Making background jobs more resilient by default

doliveirakn

Kyle d'Oliveira

Posted on July 21, 2022

Making background jobs more resilient by default

When it comes to job processing, timing is everything. Running jobs in the background helps us remove the load from the web servers handling our customer's requests. However, we also want the background jobs to run in a reasonable amount of time for our customers. But what if a customer added so many background jobs that they used all of the background worker resources?

Example queue

The jobs will be processed by available workers in the order they come in, which works great for the customer that added these jobs. But unfortunately for other accounts, their segments won't refresh until the backlog of all of those jobs is finished. This could be a problem for those accounts — it would look like the system isn't functioning properly as it waits for the queue to clear.

We have solved this problem at Aha! by creating a module that:

  1. Batches separate jobs together
  2. Limits the runtime of any single job
  3. Limits parallel processing

When a job utilizes this new module, it becomes resilient to this problem by default.

Let's explore this with an example to show how to prevent this slowdown for all accounts. Let's say we have Accounts with various Segments (i.e., "groups"). Whenever we update a Segment, we want to refresh that segment. This process could take a bit of time so we will put it into a background job. We may end up with some controller code that looks like this:

class SegementsController < ApplicationController
  def update
    segment = current_account.segments.find(params[:id])
    segment.update!(segment_params)
    SegmentRefresher.perform_later(segment)
    head :ok
  end
end
Enter fullscreen mode Exit fullscreen mode

It is usually good to process what can be done immediately and then delay some of the more expensive work. This allows us to give immediate responses back to our customers and still perform the work that needs to be done. However, a customer could put thousands of SegmentRefresher jobs onto our background job queue and prevent other customers' jobs from running.

Batching separate jobs together

Typically, when a background job is enqueued via ActiveJob, the parameters for the job are passed in as arguments to perform_later. This isn't quite what we want in order to batch jobs together. Instead, we create a new method perform_batch_later that puts the arguments into a data store such as Redis from which the job can later retrieve them.

So previously the job code may have looked like the following:

class SegmentRefresher < ApplicationJob
  def perform(segment)
    # Refresh the segment
  end
end
Enter fullscreen mode Exit fullscreen mode

We now have something that looks like this:

module BatchByAccount
  extend ActiveSupport::Concern

  class_methods do
    # Push the data into Redis and then enqueue the job
    def perform_batch_later(data)
      data.each_slice(100) do |slice|
        Redis.current.rpush(data_key, slice.map(&:id))
      end
      perform_later
    end

    def self.data_key
      "SegmentRefresher:#{Account.current.id}"
    end
  end
end

class SegmentRefresher < ApplicationJob
  include BatchByAccount

  def perform
    segment_ids = Redis.current.lpop(self.class.data_key, 100)
    Segment.where(id: segment_ids).each do |segement|
      # Refresh the segment
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

The controller can now call SegmentRefresher.perform_batch_later with one or more Segments and that will be stored in Redis. Later, the job will run and grab 100 of those segment ids at a time to process.

This technique can be really powerful. It allows multiple processes to not know about the other and still batch the data together. Further, we can utilize different Redis methods to get slightly different behaviors. For example:

  1. Using rpush to add records and lpop to remove them will give us a first in/first out queue. We can use this when the order of the jobs is important.
  2. Using spush to add records and spop to remove them will give us an unordered set. This means that duplicate data is automatically filtered out, which prevents unnecessary work from being done. However, it is unordered so the jobs may be processed in a different order than they were enqueued.
  3. Using a timestamp and zadd to add records and zrangebyscore / zrem to remove them lets us create a delayed unordered set. This is useful for actions we want to perform in the future. This might show up if we want to perform an action five minutes after a customer stops interacting with an object.

Limiting the runtime of any single job

Now that we are batching data together, we want to limit how long a single job can run. In order to tackle this, we leveraged functionality from the job-iteration gem. This gem provides an interface where we can define an enumerator and what to do each iteration. The gem will handle the rest.
Utilizing this, our job and module will now look like this:
(For ease of reading, the bit of code already shown has been removed.)

module BatchByAccount
  extend ActiveSupport::Concern

  class_methods do
    def perform_batch_later(data)
      # ...
    end

    def self.data_key
      # ...
    end
  end

  included do
    include JobIteration::Iteration
  end

  def build_enumerator(*)
    Enumerator.new do |yielder|
      # We will pull 100 records out of the queue at a time and yield that to the enumerator
      while (segment_ids = Redis.current.lpop(self.class.data_key, 100)).any?
        yielder.yield segment_ids, nil
      end
    end
  end

end

class SegmentRefresher < ApplicationJob
  include BatchByAccount

  def each_iteration(segment_ids)
    Segment.where(id: segment_ids).each do |segement|
      # Refresh the segment
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Notice that the SegmentRefresher 's perform method has been swapped for an each_iteration method.

As long as there is data in Redis, this job will continue to perform until we hit the time threshold as defined by JobIteration.max_job_runtime. The default is five minutes. Once we hit the threshold, the job will be interrupted and will re-queue itself. This will ensure that even if it takes a long time to refresh all of the segments, it won't monopolize a worker.

Limiting parallel processing

Now that data is batched together and individual jobs are handling things in batches, we want to prevent race conditions of multiple jobs running at once. We solved this by using the activejob-uniqueness gem.

With this gem, we can make the jobs unique. Duplicate jobs for a single account will be ignored. Because there is only one job, we have to handle a race condition of what would happen if our one job finished at the same moment that new data is added.

The resulting job ends up looking like this:

module BatchByAccount
  extend ActiveSupport::Concern

  class_methods do
    def perform_batch_later(data)
      # ...
    end

    def self.data_key
      # ...
    end
  end

  included do
    include JobIteration::Iteration
    unique :until_expired, lock_ttl: 5.minutes

    rescue_from(StandardError, with: :handle_error)

    on_shutdown do
      # Ensure than when we are interrupting the job, that we clear the lock so that it can be re-queued
      lock_strategy.unlock(resource: lock_key)
    end

    on_complete do
      if Redis.current.llen(self.class.data_key) > 0
        # This is the race condition
        # If we are complete, but there is still data in the queue, we need to enqueue a new job to process it
        self.class.perform_later
      end
    end
  end

  end

  def handle_error(exception)
    # Ensure we unlock the job on error
    lock_strategy.unlock(resource: lock_key)
    raise exception
  end

  # These arguments can be tweaked or overridden to lock on different criteria or allow
  # some amount of parallelism
  def lock_key_arguments
    [Account.current.id]
  end

  def build_enumerator(*)
    # ...
  end
end

class SegmentRefresher < ApplicationJob
  include BatchByAccount

  def each_iteration(segment_ids)
    # ...
  end
end
Enter fullscreen mode Exit fullscreen mode

Resiliency by default

The job itself barely changed but it is now more resilient. By creating some easy-to-reuse patterns, engineers can focus more on their own features instead of worrying about common resiliency problems. We can put energy into making the right choice easy for everyone.

The final code will look like this:

module BatchByAccount
  extend ActiveSupport::Concern

  class_methods do
    def perform_batch_later(data)
      data.each_slice(100) do |slice|
        Redis.current.rpush(data_key, slice.map(&:id))
      end
      # If the job is already enqueued or running, this will be a no-op
      perform_later
    end

    def self.data_key
      "SegmentRefresher:#{Account.current.id}"
    end
  end

  included do
    include JobIteration::Iteration
    unique :until_expired, lock_ttl: 5.minutes

    rescue_from(StandardError, with: :handle_error)

    on_shutdown do
      # Ensure than when we are interrupting the job, that we clear the lock so that it can be re-queued
      lock_strategy.unlock(resource: lock_key)
    end

    on_complete do
      if Redis.current.llen(self.class.data_key) > 0
        # This is the race condition
        # If we are complete, but there is still data in the queue, we need to enqueue a new job to process it
        self.class.perform_later
      end
    end
  end

  end

  def build_enumerator(*)
    Enumerator.new do |yielder|
      while (segment_ids = Redis.current.lpop(self.class.data_key, 100)).any?
        yielder.yield segment_ids, nil
      end
    end
  end

  def handle_error(exception)
    # Ensure we unlock the job on error
    lock_strategy.unlock(resource: lock_key)
    raise exception
  end

  # These arguments can be tweaked or overridden to lock on different criteria or allow
  # some amount of parallelism
  def lock_key_arguments
    [Account.current.id]
  end

end

class SegmentRefresher < ApplicationJob
  include BatchByAccount

  def each_iteration(segment_ids)
    Segment.where(id: segment_ids).each do |segement|
      # Refresh the segment
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Sign up for a free trial of Aha! Develop

Aha! Develop is a fully extendable agile development tool. Prioritize the backlog, estimate work, and plan sprints. If you are interested in an integrated product development approach, use Aha! Roadmaps and Aha! Develop together. Sign up for a free 30-day trial or join a live demo to see why more than 5,000 companies trust our software to build lovable products and be happy doing it.

💖 💪 🙅 🚩
doliveirakn
Kyle d'Oliveira

Posted on July 21, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related