Mastering Large Backfill Migrations in Rails and Sidekiq

bajena

Jan Bajena

Posted on March 25, 2024

Mastering Large Backfill Migrations in Rails and Sidekiq

Migrating large datasets within a Rails application can be a daunting task. We've recently learned that the hard way at Productboard when trying to backfill data from a 230GB PostgreSQL table into a new one. It took us a few weeks, 2 database incidents, and a lot of nerves to get the task done. Was it perfect? Definitely not, but the upside of this process is that we've learned a lot, and now we can share it with you, dear reader :)

In this post, I'll explore effective strategies and best safety practices for tackling big data migrations using Rails and Sidekiq. Hopefully, it'll help you avoid repeating the same mistakes we made during our adventure.

Why Use Rails at All?

Some could argue whether using Rails is necessary at all if the only thing we need to do is to extract the data from one PostgreSQL table into another. The thing is, this migration involved a lot of business logic, which is very well encapsulated in our application and would be very time-consuming and error-prone to duplicate into SQL, so we quickly abandoned this idea in favor of loading the records in the application, doing necessary transformations, and storing the data in the new format.

One additional argument for using Rails was the ability to use Sidekiq to parallelize the workload and process each of our DB tenants (called spaces in our case) in a separate job.

Rails Tips

ActiveRecord is a great pattern which abstracts a lot of DB complexity from the programmer. However, when manipulating large data sets without a deep understanding of what's hapenning under the hood, it's easy to shoot yourself in the foot.

Below, I present a few tips that can help you avoid some headscratches :)

Use DB Replica

In the first iteration we've run our migration on the primary database that's used for most important user flows - the ones involving writes. There's nothing worse than e.g. writing a long comment just to see that saving failed when pressing ENTER.

That's why in the next iteration instead of the primary database, we utilise a read replica for fetching the data.

ActiveRecord provides an easy way of querying data from read replicas. Follow the Rails guide to set up the connection to the secondary database and then just wrap your selects in connected_to method as in the example below:

scope = ActiveRecord::Base.connected_to(role:  :reading, prevent_writes:  true) { SourceModel.all }

# Process the record set as always...
Enter fullscreen mode Exit fullscreen mode

Use find_each / in_batches for fetching the data

This tip is quite basic for seasoned Rails developers, but still worth keeping in mind: when processing large amounts of records always make sure to query your tables in batches. You really don't want to run a single DB query for each of your 10 million records in the table ;)

Rails provides two methods for this purpose:

  • in_batches - loads records from table in batches (by default 1000 records per batch) and yields each batch in a block
  • find_each - loads records from table in batches and yields each record separate in a block
SourceModel.all.in_batches do |records|
  # Process batch of records at once
end

SourceModel.all.find_each do |record|
  # Process a single record
end
Enter fullscreen mode Exit fullscreen mode

Warning: One commonly overlooked thing about these batch methods is that YOU CAN'T SORT the result set. Rails will always use the primary key for iterating. I've been hit by this problem quite a few times already 🙈

Use Rails' insert_all methods to speed up writes

Similarly to the read methods listed above, Rails supports batch inserts and upserts (from version 6.0.0).

insert_all allows you to bulk insert records, significantly speeding up the process compared to individual INSERT statements. This method is ideal for quickly populating the new table with transformed data.

One great thing about this method is that in case if e.g. you're restarting the migration and you already have some records inserted in the table with unique index applied insert_all will automatically skip duplicate inserts. If you have multiple unique constraints on the same table, you can specify the desired one by providing unique_by option.

models_data = [
  { first_name: "Foo", last_name: "Bar" },
  { first_name: "Eloquent Ruby", last_name: "Russ" }
]

DestinationModel.insert_all(models_data)
Enter fullscreen mode Exit fullscreen mode

Warning: There are two caveats to using this method:

  1. Keep in mind that the array (models_data in the example above) you're using for insert_all needs to have at least 1 element. Otherwise the call will crash.
  2. All the hashes in the array have to have the same set of attributes, so even if e.g. one of your columns has some default value defined, you'll still have to provide that attribute in the hash.

Prevent N+1 queries when loading associated records

Often it happens that in your migrations you need access to multiple related records. For example when migrating Comments you might also need their author information.

Naive implementation could look like this:

Comment.all.find_each do |comment|
  next if comment.author.inactive?

  # Other migration logic
end
Enter fullscreen mode Exit fullscreen mode

Can you spot the problem already? Calling comment.author will fire a separate SQL query for each comment.

This can be solved by calling includes(:author) on your ActiveRecord query:

Comment.all.find_each do |comment|
  next if comment.author.inactive?

  # Other migration logic
end
Enter fullscreen mode Exit fullscreen mode

Sometimes calling includes might not be possible, for example when comments and authors exist in different databases.

In such cases, you can simply fire a separate, explicit query to load authors and use index_by(&:id) to speed up accessing them by IDs in the memory:

Comment.all.in_batches do |comments_batch|
  authors = User.where(id: comments_batch.map(&:author_id).index_by(&:id)

  comments_batch.each do |comment|
    author = authors[comment.author_id]
    next if author.inactive?

    # Other migration logic
  end
end
Enter fullscreen mode Exit fullscreen mode

Use select to Load Only Necessary Columns

Rails by default loads all the columns when querying the table. It can be unnecessarily heavy e.g. when the table has text fields with potentially long contents.

When you only need certain fields from the database, use select to load just those columns. This reduces memory usage and speeds up the data loading process.

SourceModel.all.select(:id, :name).find_each do |model|
  # migration logic
end
Enter fullscreen mode Exit fullscreen mode

Sidekiq Tips

When processing large data sets in Ruby, Sidekiq can be extremely helpful for splitting the work into smaller, parallel chunks. Below you can find a few tips that can help you making your migrations bulletproof.

Use push_bulk to schedule effectively

Sometimes you'll need to push lots of jobs to Sidekiq in order to process your migration. Scheduling each job using perform_async method is ineffective as it causes a lot of unnecessary round-trips to Redis.

Fortunately Sidekiq has a way to push jobs in batches. You can do it by using push_bulk method as in the example below:

class MigrationJob
  include Sidekiq::Job

  def perform(record_id)
  end
end

class LoaderWorker
  include Sidekiq::Job

  SIZE = 1000

  def perform(idx)
    # assume we want to create a job for each of 200,000 database records
    # query for our set of 1000 records

    SourceModel.all.in_batches(of: 1000) do |batch|
      array_of_args = batch.map { |record| [record.id] }

      # push 1000 jobs in one network call to Redis, saves 999 round trips
      Sidekiq::Client.push_bulk('class' => MigrationJob, 'args' => array_of_args)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

It's also worth mentioning that in version 6.3.0 Sidekiq introduced perform_bulk method.

The code below is equivalent to the one above. It looks much cleaner

class LoaderWorker
  include Sidekiq::Job

  def perform
    SourceModel.all.in_batches(of: 1000) do |batch|
      array_of_args = batch.map { |record| [record.id] }

      MigrationJob.perform_bulk(array_of_args)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Use Sidekiq Batches to monitor the progress

When processing a huge volume of migration Sidekiq jobs it might be difficult to track overall progress. This is where Sidekiq batches come in handy. By scheduling jobs within a batch you can later easily see the percentage of completion and view all the failed executions in Sidekiq dashboard.

You can also define a callback job to be performed once a batch is finished. Such callback job can for example send you a Slack notification with an information whether the batch succeeded or failed freeing you of constantly checking the Sidekiq

Sidekiq Batches allow you to group jobs and monitor their overall progress. This is especially useful for large migrations, as it provides visibility into the process and helps identify issues early.

In the snippet below you can find an example of a generic job that allows us to schedule a defined job class for each of the Productboard spaces:

module Sidekiq::Migrations
  class ScheduleForAllSpacesJob < ApplicationJob
    sidekiq_options(queue: :low)

    def perform(job_class, *arguments)
      batch = Sidekiq::Batch.new

      batch.description = "#{job_class}Batch"
      batch.on(:complete, Sidekiq::Migrations::ScheduleForAllSpacesJob::Callback, 'job_class' => job_class)
      batch.jobs do
        Space.all.select(:id).find_in_batches do |spaces|
          Sidekiq::Client.push_bulk('class' => job_class.constantize, 'args' => spaces.map { |space| [space.id] + arguments })
        end
      end
    end
  end
end

class Sidekiq::Migrations::ScheduleForAllSpacesJob::Callback
  def on_complete(status, options)
    Rails.logger.info "[#{self.class.name}] Batch completed: #{status.as_json}, #{options}"
  end
end

# And then use it like this:
Sidekiq::Migrations::ScheduleForAllSpacesJob.perform_async(
  History::Sidekiq::Migrations::BackfillHistoryEntriesForFeaturesJob
)
Enter fullscreen mode Exit fullscreen mode

Sidekiq batch

Leverage Sidekiq::Iteration Gem

For processing large datasets in batches, the sidekiq-iteration gem is invaluable. It allows you to iterate over a table and process the records in manageable chunks.

What's great is that the gem takes care of handling job interruptions caused e.g. by new version deploys or DB connectivity issues. When a situation like this takes place, sidekiq-iteration saves the iteration cursor position and reschedules the job.

What you need to do is to include SidekiqIteration::Iteration module, define the collection to iterate over (build_enumerator method) and define your logic for processing a batch of records (each_iteration method):

 class BackfillSourceRecordsJob
   include Sidekiq::Worker
   include SidekiqIteration::Iteration

   # After 5 minutes the job will be rescheduled with current cursor position
   self.max_job_runtime = 5.minutes

   def build_enumerator(space_id, cursor: nil)
     scope = SourceRecord.where(space_id:)

     active_record_batches_enumerator(scope, cursor:, batch_size: 1000)
   end

   def each_iteration(records, *)
     # Process a batch of records
   end
end
Enter fullscreen mode Exit fullscreen mode

Then simply schedule the job as you'd schedule any sidekiq job:

BackfillSourceRecordsJob.perform_async(123)
Enter fullscreen mode Exit fullscreen mode

Control the number of concurrently processed jobs

Splitting a big migration into smaller chunks is a great way to process the data in a reasonable time. Unfortunately, if not done carefully, it can slow your system down significantly.

When scheduling jobs using standard perform_async, Sidekiq will start picking up them up from the queue as fast as it can, filling all the worker threads possible.

In case your jobs execute heavy database queries or have long runtime it can slow the rest of your system down significantly.

That's why it's reasonable to limit the number of concurrent migration jobs. Below you can find a few strategies that can help you achieve that.

Option 1: Use perform_in

If you know that each of your jobs takes a similar amount of time, you can easily plan split the execution into time windows by scheduling jobs to be ran at a specific time, using perform_in.

PARALLEL_JOBS = 5
SPACE_IDS = [1, 2, 5245, ...]

perform_in = 0

SPACE_IDS.each_slice(PARALLEL_JOBS) do |space_ids|
  space_ids.each do |space_id| 
    History::Sidekiq::Migrations::BackfillHistoryEntriesForFeaturesJob.perform_in(
      perform_in.minutes, 
      space_id
    )
  end

  perform_in += 1
end
Enter fullscreen mode Exit fullscreen mode

It's worth noting push_bulk also supports at option, so the snippet above could also look like this:

PARALLEL_JOBS = 5

perform_in = 0

SPACE_IDS.each_slice(PARALLEL_JOBS) do |space_ids|
   Sidekiq::Client.push_bulk(
     'class' => History::Sidekiq::Migrations::BackfillHistoryEntriesForFeaturesJob,
     'args' => space_ids.map { |space_id| [space_id] },
     'at' => [perform_in.minutes.from_now] * space_ids.length
   )
end
Enter fullscreen mode Exit fullscreen mode

Option 2: Dedicated queue and instances

Another option to limit the concurrency could be to schedule all your migration jobs into a separate queue dedicated only to your task and spin up Sidekiq server instances (e.g. via a dedicated Kubernetes deployment) configured to process that queue only.

# config/sidekiq-migration.yaml file:
:queues:
  - "migration"
Enter fullscreen mode Exit fullscreen mode

In initializers/sidekiq.rb:

Sidekiq.configure_server do |config|
  config.concurrency = ENV['SIDEKIQ_CONCURRENCY']

  # rest of your Sidekiq config
end
Enter fullscreen mode Exit fullscreen mode

and then initialize your server providing the desire concurrency and queues config:

SIDEKIQ_CONCURRENCY=10 bundle exec sidekiq -v -C config/sidekiq-migration.yml
Enter fullscreen mode Exit fullscreen mode

Option 3: Use Sidekiq::Limiter

In our case each of the migration jobs had a different execution time, so using perform_in wasn't an option. We also didn't want to deal with introducing new Kubernetes deployments, so in the end we've ended up with a solution that involved Sidekiq::Limiter.concurrent from sidekiq enterprise.

The idea is that each job's data processing section is wrapped in a Sidekiq::Limiter. At the same time the main, "scheduler" job is also guarded using the same limiter key, so it'd only schedule a new job if it sees that there's a free "spot" in the limiter.

We had to "hack" sidekiq-iteration gem slightly in order to make this idea work, but in the end it served its purpose really well.

Here's how the "scheduler" job that enqueues the jobs for each of the Productboard workspaces looks like:

 class ScheduleForAllSpacesThrottledJob < ApplicationJob
   include SidekiqIteration::Iteration

   # Sidekiq-iteration needs an object that responds to `backoff` when throttling
   ThrottleCondition = Struct.new(:backoff)

   MAX_WORKERS_COUNT = 5
   BACKOFF_SECONDS = 30

   self.max_job_runtime = 1.minute

   sidekiq_options(
     # These retries will be just caused by sidekiq iteration, so we can retry indefinitely
     max_retries: 9999999
   )

   attr_reader :job_class

   def build_enumerator(job_class, cursor: nil)
     @job_class = job_class.constantize

     active_record_records_enumerator(Space.all.select(:id), cursor:, batch_size: MAX_WORKERS_COUNT)
   end

   def each_iteration(space, *)
     # Schedule a new job only there's less than MAX_WORKERS_COUNT jobs being processed currently
     within_limit do
       job_class.perform_async(space_id)
     end
   rescue Sidekiq::Limiter::OverLimit
     @over_limit = true
   end

   # This overrides `sidekiq-iteration` method
   def find_throttle_condition
     return unless @over_limit

     ThrottleCondition.new(backoff_seconds.seconds)
   end

   def within_limit
     Sidekiq::Limiter.concurrent(
       job_class::CONCURRENCY_KEY, 
       job_class::MAX_WORKERS_COUNT, 
       wait_timeout: 0
     ).within_limit do
       yield
     end
   end
 end
Enter fullscreen mode Exit fullscreen mode

And here's the definition of the job that processes data for a single space:

  class BackfillHistoryEntriesJob < ApplicationJob
    CONCURRENCY_KEY = 'backfill_history_entries'.freeze
    MAX_WORKERS_COUNT = 5

    include SidekiqIteration::Iteration

    self.max_job_runtime = 1.minute

    sidekiq_options(
      # These retries will be just caused by sidekiq iteration, so we can retry indefinitely
      max_retries: 999999
    )

    attr_reader :space_id

    # We've overridden this method from `sidekiq-iteration`:
    def iterate_with_enumerator(*args)
      within_limit do
        super(*args)
      end
    rescue Sidekiq::Limiter::OverLimit
      # Setting following two instance variables will tell sidekiq-iteration to reenqueue the job and try processing again in a few seconds.
      @job_iteration_retry_backoff = backoff_seconds.seconds
      @needs_reenqueue = true

      false
    end

    def build_enumerator(space_id, cursor: nil)
      @space_id = space_id

      active_record_batches_enumerator(
        Note.all.select(:id).where(space_id:), 
        cursor:
      )
    end

    def each_iteration(records, *)
      # Migration logic here
    end

    def within_limit
      Sidekiq::Limiter.concurrent(
        CONCURRENCY_KEY, 
        MAX_WORKERS_COUNT
      ).within_limit do
        yield
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Option 4: Use Sidekiq::Limiter and control it through feature flags

This solution from the previous section did the job and prevented us from overloading the database. However, while monitoring the DB CPU and IOPS we've noticed that the database is underutilised, so we made one tweak to that solution. Instead of storing MAX_WORKERS_COUNT and BACKOFF_SECONDS in constants, we stored those values in variations of a LaunchDarkly flag and read them in runtime. This allowed us to dynamically adjust the job parameters according to the current system load.

image

That's how the limiter definition looked like after the changes:

def within_limit
  Sidekiq::Limiter.concurrent(
    CONCURRENCY_KEY, 
    max_workers_count
  ).within_limit do
    yield
  end
end

def backoff_seconds
  job_config[:backoff_seconds] || 5
end

def max_workers_count
  job_config[:workers] || 5
end

def job_config
  # Our own implementation of LaunchDarkly client 
  PbLaunchdarklyClient.global.variation(
    'main.multiplayer.history.backfill-configuration', 
    default_value: {}
  ).with_indifferent_access
end
Enter fullscreen mode Exit fullscreen mode

In the screenshot below you can see how happy I was when I increased the workers count from 5 to 10 and the migration started picking up the pace 🎉

image

Implement a Kill-Switch

It's a good practice to have a kill-switch mechanism in place to immediately halt the migration if something goes wrong.

We've implemented our kill-switch using a LaunchDarkly flag. This little condition saved us a few times already. Highly recommended!


def build_enumerator(job_class, cursor: nil)
  if PbLaunchdarklyClient::Global.flag?('main.multiplayer.history.backfill-kill-switch')
    Rails.logger.info("[#{self.class.name}][cursor:#{cursor}] Killswitch enabled. Skipping.")

    return [].to_enum
  end

  active_record_records_enumerator(Space.all.select(:id), cursor:)
end
Enter fullscreen mode Exit fullscreen mode

In Conclusion

Successfully navigating large backfill migrations in Rails, as illustrated by our experience at Productboard, requires a blend of strategic planning, robust tool utilization, and a deep understanding of both Rails and Sidekiq. From grappling with a massive 230GB PostgreSQL table migration to overcoming database incidents and performance bottlenecks, our journey was full of challenges but ultimately led to valuable insights.

I hope this post was useful and will serve you as a pre-mortem to a never existing post mortem :)

💖 💪 🙅 🚩
bajena
Jan Bajena

Posted on March 25, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related