Mastering Large Backfill Migrations in Rails and Sidekiq
Jan Bajena
Posted on March 25, 2024
Migrating large datasets within a Rails application can be a daunting task. We've recently learned that the hard way at Productboard when trying to backfill data from a 230GB PostgreSQL table into a new one. It took us a few weeks, 2 database incidents, and a lot of nerves to get the task done. Was it perfect? Definitely not, but the upside of this process is that we've learned a lot, and now we can share it with you, dear reader :)
In this post, I'll explore effective strategies and best safety practices for tackling big data migrations using Rails and Sidekiq. Hopefully, it'll help you avoid repeating the same mistakes we made during our adventure.
Why Use Rails at All?
Some could argue whether using Rails is necessary at all if the only thing we need to do is to extract the data from one PostgreSQL table into another. The thing is, this migration involved a lot of business logic, which is very well encapsulated in our application and would be very time-consuming and error-prone to duplicate into SQL, so we quickly abandoned this idea in favor of loading the records in the application, doing necessary transformations, and storing the data in the new format.
One additional argument for using Rails was the ability to use Sidekiq to parallelize the workload and process each of our DB tenants (called spaces
in our case) in a separate job.
Rails Tips
ActiveRecord is a great pattern which abstracts a lot of DB complexity from the programmer. However, when manipulating large data sets without a deep understanding of what's hapenning under the hood, it's easy to shoot yourself in the foot.
Below, I present a few tips that can help you avoid some headscratches :)
Use DB Replica
In the first iteration we've run our migration on the primary database that's used for most important user flows - the ones involving writes. There's nothing worse than e.g. writing a long comment just to see that saving failed when pressing ENTER.
That's why in the next iteration instead of the primary database, we utilise a read replica for fetching the data.
ActiveRecord provides an easy way of querying data from read replicas. Follow the Rails guide to set up the connection to the secondary database and then just wrap your selects in connected_to
method as in the example below:
scope = ActiveRecord::Base.connected_to(role: :reading, prevent_writes: true) { SourceModel.all }
# Process the record set as always...
Use find_each
/ in_batches
for fetching the data
This tip is quite basic for seasoned Rails developers, but still worth keeping in mind: when processing large amounts of records always make sure to query your tables in batches. You really don't want to run a single DB query for each of your 10 million records in the table ;)
Rails provides two methods for this purpose:
-
in_batches
- loads records from table in batches (by default 1000 records per batch) and yields each batch in a block -
find_each
- loads records from table in batches and yields each record separate in a block
SourceModel.all.in_batches do |records|
# Process batch of records at once
end
SourceModel.all.find_each do |record|
# Process a single record
end
Warning: One commonly overlooked thing about these batch methods is that YOU CAN'T SORT the result set. Rails will always use the primary key for iterating. I've been hit by this problem quite a few times already 🙈
Use Rails' insert_all
methods to speed up writes
Similarly to the read methods listed above, Rails supports batch inserts and upserts (from version 6.0.0).
insert_all
allows you to bulk insert records, significantly speeding up the process compared to individual INSERT
statements. This method is ideal for quickly populating the new table with transformed data.
One great thing about this method is that in case if e.g. you're restarting the migration and you already have some records inserted in the table with unique index applied insert_all
will automatically skip duplicate inserts. If you have multiple unique constraints on the same table, you can specify the desired one by providing unique_by
option.
models_data = [
{ first_name: "Foo", last_name: "Bar" },
{ first_name: "Eloquent Ruby", last_name: "Russ" }
]
DestinationModel.insert_all(models_data)
Warning: There are two caveats to using this method:
- Keep in mind that the array (
models_data
in the example above) you're using forinsert_all
needs to have at least 1 element. Otherwise the call will crash. - All the hashes in the array have to have the same set of attributes, so even if e.g. one of your columns has some default value defined, you'll still have to provide that attribute in the hash.
Prevent N+1 queries when loading associated records
Often it happens that in your migrations you need access to multiple related records. For example when migrating Comment
s you might also need their author
information.
Naive implementation could look like this:
Comment.all.find_each do |comment|
next if comment.author.inactive?
# Other migration logic
end
Can you spot the problem already? Calling comment.author
will fire a separate SQL query for each comment.
This can be solved by calling includes(:author)
on your ActiveRecord query:
Comment.all.find_each do |comment|
next if comment.author.inactive?
# Other migration logic
end
Sometimes calling includes
might not be possible, for example when comments and authors exist in different databases.
In such cases, you can simply fire a separate, explicit query to load authors and use index_by(&:id)
to speed up accessing them by IDs in the memory:
Comment.all.in_batches do |comments_batch|
authors = User.where(id: comments_batch.map(&:author_id).index_by(&:id)
comments_batch.each do |comment|
author = authors[comment.author_id]
next if author.inactive?
# Other migration logic
end
end
Use select
to Load Only Necessary Columns
Rails by default loads all the columns when querying the table. It can be unnecessarily heavy e.g. when the table has text
fields with potentially long contents.
When you only need certain fields from the database, use select
to load just those columns. This reduces memory usage and speeds up the data loading process.
SourceModel.all.select(:id, :name).find_each do |model|
# migration logic
end
Sidekiq Tips
When processing large data sets in Ruby, Sidekiq can be extremely helpful for splitting the work into smaller, parallel chunks. Below you can find a few tips that can help you making your migrations bulletproof.
Use push_bulk
to schedule effectively
Sometimes you'll need to push lots of jobs to Sidekiq in order to process your migration. Scheduling each job using perform_async
method is ineffective as it causes a lot of unnecessary round-trips to Redis.
Fortunately Sidekiq has a way to push jobs in batches. You can do it by using push_bulk
method as in the example below:
class MigrationJob
include Sidekiq::Job
def perform(record_id)
end
end
class LoaderWorker
include Sidekiq::Job
SIZE = 1000
def perform(idx)
# assume we want to create a job for each of 200,000 database records
# query for our set of 1000 records
SourceModel.all.in_batches(of: 1000) do |batch|
array_of_args = batch.map { |record| [record.id] }
# push 1000 jobs in one network call to Redis, saves 999 round trips
Sidekiq::Client.push_bulk('class' => MigrationJob, 'args' => array_of_args)
end
end
end
It's also worth mentioning that in version 6.3.0 Sidekiq introduced perform_bulk
method.
The code below is equivalent to the one above. It looks much cleaner
class LoaderWorker
include Sidekiq::Job
def perform
SourceModel.all.in_batches(of: 1000) do |batch|
array_of_args = batch.map { |record| [record.id] }
MigrationJob.perform_bulk(array_of_args)
end
end
end
Use Sidekiq Batches to monitor the progress
When processing a huge volume of migration Sidekiq jobs it might be difficult to track overall progress. This is where Sidekiq batches come in handy. By scheduling jobs within a batch you can later easily see the percentage of completion and view all the failed executions in Sidekiq dashboard.
You can also define a callback job to be performed once a batch is finished. Such callback job can for example send you a Slack notification with an information whether the batch succeeded or failed freeing you of constantly checking the Sidekiq
Sidekiq Batches allow you to group jobs and monitor their overall progress. This is especially useful for large migrations, as it provides visibility into the process and helps identify issues early.
In the snippet below you can find an example of a generic job that allows us to schedule a defined job class for each of the Productboard spaces:
module Sidekiq::Migrations
class ScheduleForAllSpacesJob < ApplicationJob
sidekiq_options(queue: :low)
def perform(job_class, *arguments)
batch = Sidekiq::Batch.new
batch.description = "#{job_class}Batch"
batch.on(:complete, Sidekiq::Migrations::ScheduleForAllSpacesJob::Callback, 'job_class' => job_class)
batch.jobs do
Space.all.select(:id).find_in_batches do |spaces|
Sidekiq::Client.push_bulk('class' => job_class.constantize, 'args' => spaces.map { |space| [space.id] + arguments })
end
end
end
end
end
class Sidekiq::Migrations::ScheduleForAllSpacesJob::Callback
def on_complete(status, options)
Rails.logger.info "[#{self.class.name}] Batch completed: #{status.as_json}, #{options}"
end
end
# And then use it like this:
Sidekiq::Migrations::ScheduleForAllSpacesJob.perform_async(
History::Sidekiq::Migrations::BackfillHistoryEntriesForFeaturesJob
)
Leverage Sidekiq::Iteration Gem
For processing large datasets in batches, the sidekiq-iteration gem is invaluable. It allows you to iterate over a table and process the records in manageable chunks.
What's great is that the gem takes care of handling job interruptions caused e.g. by new version deploys or DB connectivity issues. When a situation like this takes place, sidekiq-iteration
saves the iteration cursor position and reschedules the job.
What you need to do is to include SidekiqIteration::Iteration
module, define the collection to iterate over (build_enumerator
method) and define your logic for processing a batch of records (each_iteration
method):
class BackfillSourceRecordsJob
include Sidekiq::Worker
include SidekiqIteration::Iteration
# After 5 minutes the job will be rescheduled with current cursor position
self.max_job_runtime = 5.minutes
def build_enumerator(space_id, cursor: nil)
scope = SourceRecord.where(space_id:)
active_record_batches_enumerator(scope, cursor:, batch_size: 1000)
end
def each_iteration(records, *)
# Process a batch of records
end
end
Then simply schedule the job as you'd schedule any sidekiq job:
BackfillSourceRecordsJob.perform_async(123)
Control the number of concurrently processed jobs
Splitting a big migration into smaller chunks is a great way to process the data in a reasonable time. Unfortunately, if not done carefully, it can slow your system down significantly.
When scheduling jobs using standard perform_async
, Sidekiq will start picking up them up from the queue as fast as it can, filling all the worker threads possible.
In case your jobs execute heavy database queries or have long runtime it can slow the rest of your system down significantly.
That's why it's reasonable to limit the number of concurrent migration jobs. Below you can find a few strategies that can help you achieve that.
Option 1: Use perform_in
If you know that each of your jobs takes a similar amount of time, you can easily plan split the execution into time windows by scheduling jobs to be ran at a specific time, using perform_in
.
PARALLEL_JOBS = 5
SPACE_IDS = [1, 2, 5245, ...]
perform_in = 0
SPACE_IDS.each_slice(PARALLEL_JOBS) do |space_ids|
space_ids.each do |space_id|
History::Sidekiq::Migrations::BackfillHistoryEntriesForFeaturesJob.perform_in(
perform_in.minutes,
space_id
)
end
perform_in += 1
end
It's worth noting push_bulk
also supports at
option, so the snippet above could also look like this:
PARALLEL_JOBS = 5
perform_in = 0
SPACE_IDS.each_slice(PARALLEL_JOBS) do |space_ids|
Sidekiq::Client.push_bulk(
'class' => History::Sidekiq::Migrations::BackfillHistoryEntriesForFeaturesJob,
'args' => space_ids.map { |space_id| [space_id] },
'at' => [perform_in.minutes.from_now] * space_ids.length
)
end
Option 2: Dedicated queue and instances
Another option to limit the concurrency could be to schedule all your migration jobs into a separate queue dedicated only to your task and spin up Sidekiq server instances (e.g. via a dedicated Kubernetes deployment) configured to process that queue only.
# config/sidekiq-migration.yaml file:
:queues:
- "migration"
In initializers/sidekiq.rb
:
Sidekiq.configure_server do |config|
config.concurrency = ENV['SIDEKIQ_CONCURRENCY']
# rest of your Sidekiq config
end
and then initialize your server providing the desire concurrency and queues config:
SIDEKIQ_CONCURRENCY=10 bundle exec sidekiq -v -C config/sidekiq-migration.yml
Option 3: Use Sidekiq::Limiter
In our case each of the migration jobs had a different execution time, so using perform_in
wasn't an option. We also didn't want to deal with introducing new Kubernetes deployments, so in the end we've ended up with a solution that involved Sidekiq::Limiter.concurrent
from sidekiq enterprise.
The idea is that each job's data processing section is wrapped in a Sidekiq::Limiter. At the same time the main, "scheduler" job is also guarded using the same limiter key, so it'd only schedule a new job if it sees that there's a free "spot" in the limiter.
We had to "hack" sidekiq-iteration
gem slightly in order to make this idea work, but in the end it served its purpose really well.
Here's how the "scheduler" job that enqueues the jobs for each of the Productboard workspaces looks like:
class ScheduleForAllSpacesThrottledJob < ApplicationJob
include SidekiqIteration::Iteration
# Sidekiq-iteration needs an object that responds to `backoff` when throttling
ThrottleCondition = Struct.new(:backoff)
MAX_WORKERS_COUNT = 5
BACKOFF_SECONDS = 30
self.max_job_runtime = 1.minute
sidekiq_options(
# These retries will be just caused by sidekiq iteration, so we can retry indefinitely
max_retries: 9999999
)
attr_reader :job_class
def build_enumerator(job_class, cursor: nil)
@job_class = job_class.constantize
active_record_records_enumerator(Space.all.select(:id), cursor:, batch_size: MAX_WORKERS_COUNT)
end
def each_iteration(space, *)
# Schedule a new job only there's less than MAX_WORKERS_COUNT jobs being processed currently
within_limit do
job_class.perform_async(space_id)
end
rescue Sidekiq::Limiter::OverLimit
@over_limit = true
end
# This overrides `sidekiq-iteration` method
def find_throttle_condition
return unless @over_limit
ThrottleCondition.new(backoff_seconds.seconds)
end
def within_limit
Sidekiq::Limiter.concurrent(
job_class::CONCURRENCY_KEY,
job_class::MAX_WORKERS_COUNT,
wait_timeout: 0
).within_limit do
yield
end
end
end
And here's the definition of the job that processes data for a single space:
class BackfillHistoryEntriesJob < ApplicationJob
CONCURRENCY_KEY = 'backfill_history_entries'.freeze
MAX_WORKERS_COUNT = 5
include SidekiqIteration::Iteration
self.max_job_runtime = 1.minute
sidekiq_options(
# These retries will be just caused by sidekiq iteration, so we can retry indefinitely
max_retries: 999999
)
attr_reader :space_id
# We've overridden this method from `sidekiq-iteration`:
def iterate_with_enumerator(*args)
within_limit do
super(*args)
end
rescue Sidekiq::Limiter::OverLimit
# Setting following two instance variables will tell sidekiq-iteration to reenqueue the job and try processing again in a few seconds.
@job_iteration_retry_backoff = backoff_seconds.seconds
@needs_reenqueue = true
false
end
def build_enumerator(space_id, cursor: nil)
@space_id = space_id
active_record_batches_enumerator(
Note.all.select(:id).where(space_id:),
cursor:
)
end
def each_iteration(records, *)
# Migration logic here
end
def within_limit
Sidekiq::Limiter.concurrent(
CONCURRENCY_KEY,
MAX_WORKERS_COUNT
).within_limit do
yield
end
end
end
end
Option 4: Use Sidekiq::Limiter
and control it through feature flags
This solution from the previous section did the job and prevented us from overloading the database. However, while monitoring the DB CPU and IOPS we've noticed that the database is underutilised, so we made one tweak to that solution. Instead of storing MAX_WORKERS_COUNT
and BACKOFF_SECONDS
in constants, we stored those values in variations of a LaunchDarkly flag and read them in runtime. This allowed us to dynamically adjust the job parameters according to the current system load.
That's how the limiter definition looked like after the changes:
def within_limit
Sidekiq::Limiter.concurrent(
CONCURRENCY_KEY,
max_workers_count
).within_limit do
yield
end
end
def backoff_seconds
job_config[:backoff_seconds] || 5
end
def max_workers_count
job_config[:workers] || 5
end
def job_config
# Our own implementation of LaunchDarkly client
PbLaunchdarklyClient.global.variation(
'main.multiplayer.history.backfill-configuration',
default_value: {}
).with_indifferent_access
end
In the screenshot below you can see how happy I was when I increased the workers count from 5 to 10 and the migration started picking up the pace 🎉
Implement a Kill-Switch
It's a good practice to have a kill-switch mechanism in place to immediately halt the migration if something goes wrong.
We've implemented our kill-switch using a LaunchDarkly flag. This little condition saved us a few times already. Highly recommended!
def build_enumerator(job_class, cursor: nil)
if PbLaunchdarklyClient::Global.flag?('main.multiplayer.history.backfill-kill-switch')
Rails.logger.info("[#{self.class.name}][cursor:#{cursor}] Killswitch enabled. Skipping.")
return [].to_enum
end
active_record_records_enumerator(Space.all.select(:id), cursor:)
end
In Conclusion
Successfully navigating large backfill migrations in Rails, as illustrated by our experience at Productboard, requires a blend of strategic planning, robust tool utilization, and a deep understanding of both Rails and Sidekiq. From grappling with a massive 230GB PostgreSQL table migration to overcoming database incidents and performance bottlenecks, our journey was full of challenges but ultimately led to valuable insights.
I hope this post was useful and will serve you as a pre-mortem to a never existing post mortem :)
Posted on March 25, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.