Rails: Background Job Done Right (Case 1)

kputra

K Putra

Posted on December 27, 2019

Rails: Background Job Done Right (Case 1)

If you continue to read this article, I assume that you know Ruby, OOP in Ruby, RoR, Active Record, Sidekiq, and ruby metaprogramming.

This article series is the implementation of sidekiq's best practice using real case from my work.

Perhaps you should read these articles before we start:
Coding Sidekiq Workers the Right Way
Improving Rails Performance with Better Background Jobs

Let's start our journey!

Table of Contents:
1. Problem
2. First Solution (bad)
3. Second Solution (better)
4. Third Solution (much better)
5. Flow Comparison
6. The Code
7. Final Word

1. Problem

Every 4 hours, I have to check if there is any Invoice that need to be checked to 3rd party API. If there is any, I have to do http request to 3rd API per record found. There are 6 types of Invoices. They are unique, so there are 6 models: InvoiceA..InvoiceF.

To do this, I have to use background job. And Sidekiq is my choice.

For the cron job, I use sidekiq-cron gems

2. First Solution (bad)

The example of my initial solution for the worker was:

# app/workers/scheduled/check_status_worker.rb
class Scheduled::CheckStatusWorker
  include Sidekiq::Worker

  def perform
    invoice_as = InvoiceA.where(status: "unfinished")
    check_status_of_(invoice_as)

    # looping b, c, d ,e

    invoice_fs = InvoiceF.where(status: "unfinished")
    check_status_of_(invoice_fs)
  end

  def check_status_of_(invoices)
    invoices.each do |invoice|
      invoice.check_status
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Notice:

I don't use sidekiq_options retry: false. Well, if I use it, then if an exception happen in the middle of the job, the rest of the records won't be checked to 3rd party. Let say I have 10,000 records, and exception occured in 500th record, then 9,500 records won't be checked.

I use each method for each models. If there are 10,000 records per model, so this worker will be running for a loong time.

I call check_status from the instance model. This is not good. I should make a class to checked to 3rd party API. Remember this

I put the logic in the worker. This is also not good.

In this solution, I have 6 models and 1 worker.

What can we improve?

3. Second solution (better)

This solution consist of 3 phases.

First, I'll move the logic from worker to class. In this phase, I'll have 6 models, 1 worker, and 1 class (new).

The code of the worker and the class:

# app/workers/scheduled/check_status_worker.rb
class Scheduled::CheckStatusWorker
  include Sidekiq::Worker

  def perform
    StartScheduledCheckStatus.new.call
  end
end

# app/lib/start_scheduled_check_status.rb
class StartScheduledCheckStatus
  INVOICE = %w(invoice_a invoice_b invoice_c invoice_d invoice_e invoice_f)

  def call
    INVOICE.each { |inv| send("check_#{inv}" }
  end

  private

  INVOICE.each do |method|
    define_method "check_#{method}" do
      clazz = method.camelize
      invoices = clazz.constantize.where(status: "unfinished")
      check_status_of_(invoices)
    end
  end

  def check_status_of_(invoices)
    invoices.each do |invoice|
      invoice.check_status
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

In this phase, no improvement in perfomance. Only clean code.

Second, I'll move check_status from every models to stand alone classes. The check_status method for every models is simulated like this:

# app/models/invoice_a.rb
class InvoiceA < ApplicationRecord
  def check_status
    # action-1. build message
    # action-2. do http request
    # action-3. action based on the response
  end
end
Enter fullscreen mode Exit fullscreen mode

The url of the API is the same (https://url/check_status), so action-2 is the same across models. Action-3 is the same across models. But the messages (action-1) is unique for each models.

Knowing this, I'll make 1 class to do action-2 and action-3, and 6 classes to do action-1.

In this phase, I'll have 6 models, 1 worker, and 1 class + 7 classes (new).

The code will be look like these:

# app/lib/start_scheduled_check_status.rb
class StartScheduledCheckStatus
  ...
  private

  INVOICE.each do |method|
    define_method "check_#{method}" do
      clazz = method.camelize
      invoices = clazz.constantize.where(status: "unfinished")
      check_status_of_(invoices, clazz: clazz)
    end
  end

  def check_status_of_(invoices, clazz: clazz)
    invoices.each do |invoice|
      CheckStatus.new(invoice, clazz).call
    end
  end
end

# app/lib/check_status.rb
class CheckStatus
  def initialize(invoice, clazz)
    @invoice = invoice
    @clazz   = clazz
  end

  def call
    message = "#{@clazz}MessageBuilder".constantize.new(@invoice).call
    # action-2: do http request using message
    # action-3: action based on the response
  end
end

# app/lib/invoice_a_message_builder.rb
class InvoiceAMessageBuilder
  def initialize(invoice)
    @invoice = invoice
  end

  def call
    # action-1: build message using @invoice
  end
end
Enter fullscreen mode Exit fullscreen mode

Phew, that was long (remember, there are 6 message builder class). But in this phase, no improvement in perfomance. Only clean code. Let's go to next phase.

Third, I'll make a worker for CheckStatus. This is the improvement in performance. We push CheckStatus to a worker per record, so Scheduled::CheckStatusWorker does not have to run http request at all! If there are 10,000 records found, so there will be 10,000 jobs. This is background job done right!!

In this phase, I'll have 6 models, 1 worker + 1 worker (new), and 8 classes.

So, the code will be like these:

# app/lib/start_scheduled_check_status.rb
class StartScheduledCheckStatus
  ...

  def check_status_of_(invoices, clazz: clazz)
    invoices.each do |invoice|
      CheckStatusWorker.perform_async(invoice.id, clazz)
    end
  end
end

# app/workers/check_status_worker.rb
class CheckStatusWorker
  include Sidekiq::Worker
  sidekiq_options retry: false

  def perform(invoice_id, clazz)
    CheckStatus.new(invoice_id, clazz).call
  end
end

# app/lib/check_status.rb
class CheckStatus
  def initialize(invoice_id, clazz)
    @invoice_id = invoice_id
    @clazz      = clazz
    @invoice    = invoice
  end

  def call
    message = "#{@clazz}MessageBuilder".constantize.new(@invoice).call
    # action-2: do http request using message
    # action-3: action based on the response
  end

  private

  def invoice
    @clazz.constantize.find(@invoice_id)
  end
end
Enter fullscreen mode Exit fullscreen mode

Notice:

Now I use sidekiq_options retry: false, because if one job is failed, it won't affect another jobs. Let say there are 10,000 jobs, 1 job failed the 9,999 other jobs will still running.

Can I improve again? Yes, we can!

4. Third Solution (much better)

This solution consist of 2 phases.

First, don't use each method. But use push_bulk method instead! Why should we use this?

The push_bulk method allows us to push a large number of jobs to Redis. This method cuts out the redis network round trip latency. I wouldn't recommend pushing more than 1000 per call but YMMV based on network quality, size of job args, etc. A large number of jobs can cause a bit of Redis command processing latency.

source: github

In this phase, I'll have 6 models, 2 workers, and 8 classes.

Let's upgrade our code a bit:

# app/lib/start_scheduled_check_status.rb
class StartScheduledCheckStatus
  INVOICE = %w(invoice_a invoice_b invoice_c invoice_d invoice_e invoice_f)
  SIZE = 1000

  def call
    INVOICE.each { |inv| send("check_#{inv}" }
  end

  private

  INVOICE.each do |method|
    define_method "check_#{method}" do
      clazz = method.camelize
      invoices_ids = get_invoices_ids_from_(clazz)
      check_status_of_(invoices_ids, clazz: clazz)
    end
  end

  def get_invoices_ids_from_(clazz)
    clazz.constantize.where(status: "unfinished").pluck(:id)
  end

  def check_status_of_(invoices_ids, clazz: clazz)
    args = get_array_of_args_from_(invoices_ids, clazz)
    args.each do |arg|
      Sidekiq::Client.push_bulk('class' => CheckStatusWorker, 'args' => arg)
    end
  end

  def get_array_of_args_from_(ids, clazz)
    arrs = ids.in_groups_of(SIZE, false)
    arrs.map! { |arr| arr.in_groups_of(1) }
    arrs.map! do |arr|
      arr.map! do |a|
        a << clazz
      end
    end
    arrs
  end
end
Enter fullscreen mode Exit fullscreen mode

Notice:

I still use each. Before, every each means I put 1 invoice into a single job per invoice. But this time, every each means I put 1,000 invoices into a single job per invoice (bulk push).

The size of invoices is divided into 1000, as it is the best practice of sidekiq based on the official guide (see quoted text aboout push_bulk)

Second, we can improve our first worker. Have a little bit logic in worker is not a crime.

In this phase, I'll have 6 models, 2 workers + 1 worker (new), and 8 classes.

# app/workers/scheduled/check_status_worker.rb
class Scheduled::CheckStatusWorker
  include Sidekiq::Worker

  INVOICE = %w(invoice_a invoice_b invoice_c invoice_d invoice_e invoice_f)

  def perform
    INVOICE.each do |inv|
      SeparatedScheduledCheckStatusWorker.perform_async(inv)
    end
  end
end

# app/workers/separated_scheduled_check_status_worker.rb
class SeparatedScheduledCheckStatusWorker
  include Sidekiq::Worker

  def perform(inv)
    StartScheduledCheckStatus.new(inv).call
  end
end

# app/lib/start_scheduled_check_status.rb
class StartScheduledCheckStatus
  INVOICE = %w(invoice_a invoice_b invoice_c invoice_d invoice_e invoice_f)
  SIZE = 1000

  # inv is one of INVOICE
  def initialize(inv)
    @inv = inv
  end

  def call
    send("check_#{@inv}")
  end
  ...
Enter fullscreen mode Exit fullscreen mode

Notice:

Before, we have 1 job for 6 models. Now we separate 1 job to become 6 jobs, which means 1 job per model.

This is another memory-usage improvement.

And that's all !

5. Flow Comparison

Flow in First Solution:

  1. Every 4 hour, cron will start 1 job: Scheduled::CheckStatusWorker
  2. This job will search unfinished invoice from Invoice A
  3. Every invoice found will be checked one-by-one to 3rd party API using http request
  4. The job will continue to search unfinished invoice from Invoice B
  5. Once again, every invoice found will be checked one-by-one to 3rd party API
  6. And over and over until Invoice F.

This is like The One Ring:

One Job to rule them all, One Job to find them, One Job to bring them all, and in the http request bind them.

Remember, The One Ring is the source of problems!

Flow in Third Solution:

  1. Every 4 hour, cron will start 1 job: Scheduled::CheckStatusWorker
  2. This job will start 6 other jobs: SeparatedScheduledCheckStatusWorker, one job for one model
  3. All the 6 jobs will search unfinished invoice from their respective model
  4. Every invoice found will be pushed bulk to job CheckStatusWorker. One job for one invoice

You can see the huge differences from First Solution and Third Solution. There will be huge improvement to our apps !

6. The Code

From 6 models and 1 worker, we end up have 6 models, 3 workers, and 8 classes.

This is the entire code:

# app/workers/scheduled/check_status_worker.rb
class Scheduled::CheckStatusWorker
  include Sidekiq::Worker

  INVOICE = %w(invoice_a invoice_b invoice_c invoice_d invoice_e invoice_f)

  def perform
    INVOICE.each do |inv|
      SeparatedScheduledCheckStatusWorker.perform_async(inv)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
# app/workers/separated_scheduled_check_status_worker.rb
class SeparatedScheduledCheckStatusWorker
  include Sidekiq::Worker

  def perform(inv)
    StartScheduledCheckStatus.new(inv).call
  end
end
Enter fullscreen mode Exit fullscreen mode
# app/lib/start_scheduled_check_status.rb
class StartScheduledCheckStatus
  INVOICE = %w(invoice_a invoice_b invoice_c invoice_d invoice_e invoice_f)
  SIZE = 1000

  def initialize(inv)
    @inv = inv
  end

  def call
    send("check_#{@inv}")
  end

  private

  INVOICE.each do |method|
    define_method "check_#{method}" do
      clazz = method.camelize
      invoices_ids = get_invoices_ids_from_(clazz)
      check_status_of_(invoices_ids, clazz: clazz)
    end
  end

  def get_invoices_ids_from_(clazz)
    clazz.constantize.where(status: "unfinished").pluck(:id)
  end

  def check_status_of_(invoices_ids, clazz: clazz)
    args = get_array_of_args_from_(invoices_ids, clazz)
    args.each do |arg|
      Sidekiq::Client.push_bulk('class' => CheckStatusWorker, 'args' => arg)
    end
  end

  def get_array_of_args_from_(ids, clazz)
    arrs = ids.in_groups_of(SIZE, false)
    arrs.map! { |arr| arr.in_groups_of(1) }
    arrs.map! do |arr|
      arr.map! do |a|
        a << clazz
      end
    end
    arrs
  end
end
Enter fullscreen mode Exit fullscreen mode
# app/workers/check_status_worker.rb
class CheckStatusWorker
  include Sidekiq::Worker
  sidekiq_options retry: false

  def perform(invoice_id, clazz)
    CheckStatus.new(invoice_id, clazz).call
  end
end
Enter fullscreen mode Exit fullscreen mode
# app/lib/check_status.rb
class CheckStatus
  def initialize(invoice_id, clazz)
    @invoice_id = invoice_id
    @clazz      = clazz
    @invoice    = invoice
  end

  def call
    message = "#{@clazz}MessageBuilder".constantize.new(@invoice).call
    # action-2: do http request using message
    # action-3: action based on the response
  end

  private

  def invoice
    @clazz.constantize.find(@invoice_id)
  end
end
Enter fullscreen mode Exit fullscreen mode
# app/lib/invoice_a_message_builder.rb
class InvoiceAMessageBuilder
  def initialize(invoice)
    @invoice = invoice
  end

  def call
    # action-1: build message using @invoice
  end
end
Enter fullscreen mode Exit fullscreen mode

Note: There are 6 models, so there are 6 message builder classes.

7. Final Word

Sorry for bad english. And sorry for bad naming classes, methods, and workers. I'll improve this article later.

source: myself

💖 💪 🙅 🚩
kputra
K Putra

Posted on December 27, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related