Learning by building, a Background Processing System in Ruby

benediktdeicke

Benedikt Deicke

Posted on April 10, 2019

Learning by building, a Background Processing System in Ruby

In today's post, we are going to implement a naive background processing system for fun! We might learn some things along the way as a peek into the internals of popular background processing systems like Sidekiq. The product of this fun is by no means intended for production use.

Let’s imagine we have a task in our application that loads one or more websites and extracts their titles. As we don’t have any influence on the performance of these websites, we’d like to perform the task outside our main thread (or the current request—if we’re building a web application), but in the background.

Encapsulating a Task

Before we get into background processing, let’s build a service object to perform the task at hand. We’ll use OpenURI and Nokogiri to extract the contents of the title tag.

require 'open-uri' 
require 'nokogiri' 

class TitleExtractorService
  def call(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}" 
  end
end

Calling the service prints the title of the given URL.

TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

This works as expected, but let’s see if we can improve the syntax a little to make it look and feel a bit more like other background processing systems. By creating a Magique::Worker module, we can add some syntactic sugar to the service object.

module Magique
  module Worker
    def self.included(base)
      base.extend(ClassMethods)
    end

    module ClassMethods
      def perform_now(*args)
        new.perform(*args)
      end
    end

    def perform(*)
      raise NotImplementedError
    end
  end
end

The module adds a perform method to the worker instance and a perform_now method to the worker class to make the invocation a bit better.

Let’s include the module into our service object. While we’re at it, let’s also rename it to TitleExtractorWorker and change the call method to perform.

class TitleExtractorWorker
  include Magique::Worker

  def perform(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

The invocation still has the same result, but it’s a bit clearer what's going on.

TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

Implementing Asynchronous Processing

Now that we have the title extraction working, we can grab all titles from past Ruby Magic articles. To do this, let’s assume we have a RUBYMAGIC constant with a list of all the URLs of past articles.

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_now(url)
end

# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

We get the titles of past articles, but it takes a while to extract them all. That’s because we wait until each request is completed before moving on to the next one.

Let’s improve that by introducing a perform_async method to our worker module. To speed things up, it creates a new thread for each URL.

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Thread.new { new.perform(*args) }
      end
    end
  end
end

After changing the invocation to TitleExtractorWorker.perform_async(url), we get all the titles almost at once. However, this also means that we’re opening more than 20 connections to the Ruby Magic blog at once. (Sorry for messing with your blog, folks! 😅)

If you’re following along with your own implementation and testing this outside of a long-running process (like a web server), don’t forget to add something like loop { sleep 1 } to the end of your script to make sure the process doesn’t immediately terminate.

Queueing up Tasks

With the approach of creating a new thread for every invocation, we’ll eventually hit resource limits (both on our side and on the websites we are accessing). As we’d like to be nice citizens, let’s change the implementation to something that is asynchronous but doesn’t feel like a denial-of-service attack.

A common way to solve this problem is to use the producer/consumer pattern. One or more producers push tasks onto a queue while one or more consumers take tasks from the queue and process them.

A queue is basically a list of elements. In theory, a simple array would do the job. However, as we’re dealing with concurrency, we need to make sure that only one producer or consumer can access the queue at a time. If we aren’t careful about this, things will end in chaos—just like two people trying to squeeze through a door at once.

This problem is known as the producer-consumer problem and there are multiple solutions to it. Luckily, it is a very common problem and Ruby ships with a proper Queue implementation that we can use without having to worry about thread synchronization.

To use it, let’s make sure both producers and consumers can access the queue. We do this by adding a class method to our Magique module and assigning an instance of Queue to it.

module Magique
  def self.backend
    @backend
  end

  def self.backend=(backend)
    @backend = backend
  end
end

Magique.backend = Queue.new

Next, we change our perform_async implementation to push a task onto the queue instead of creating its own new thread. A task is represented as a hash including a reference to the worker class as well as the arguments passed to the perform_async method.

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Magique.backend.push(worker: self, args: args)
      end
    end
  end
end

With that, we’re done with the producer side of things. Next, let’s take a look at the consumer side.

Each consumer is a separate thread that takes tasks from the queue and performs them. Instead of stopping after one task, like the thread, the consumer then takes another task from the queue and performs it, and so on. Here’s a basic implementation of a consumer called Magique::Processor. Each processor creates a new thread that loops infinitely. For every iteration, it tries to grab a new task from the queue, creates a new instance of the worker class, and calls its perform method with the given arguments.

module Magique
  class Processor
    def self.start(concurrency = 1)
      concurrency.times { |n| new("Processor #{n}") }
    end

    def initialize(name)
      thread = Thread.new do
        loop do
          payload = Magique.backend.pop
          worker_class = payload[:worker]
          worker_class.new.perform(*payload[:args])
        end
      end

      thread.name = name
    end
  end
end

In addition to the processing loop, we add a convenience method called Magique::Processor.start. This allows us to spin up multiple processors at once. While naming the thread isn’t really necessary, it will allow us to see if things are actually working as expected.

Let’s adjust the output of our TitleExtractorWorker to include the name of the current thread.

puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"

To test our background processing setup, we first need to spin up a set of processors before enqueueing our tasks.

Magique.backend = Queue.new
Magique::Processor.start(5)

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

When this is run, we still get the titles of all articles. While it’s not as fast as just using a separate thread for every task, it’s still faster than the initial implementation that had no background processing. Thanks to the added processor names, we can also confirm that all processors are working through the queue. By tweaking the number of concurrent processors, it’s possible to find a balance between processing speed and existing resource limitations.

Expanding to Multiple Processes and Machines

So far, the current implementation of our background processing system works well enough. It’s still limited to the same process, though. Resource-hungry tasks will still affect the performance of the entire process. As a final step, let’s look at distributing the workload across multiple processes and maybe even multiple machines.

The queue is the only connection between producers and consumers. Right now, it’s using an in-memory implementation. Let’s take more inspiration from Sidekiq and implement a queue using Redis.

Redis has support for lists that allow us to push and fetch tasks from. Additionally, the Redis Ruby gem is thread-safe and the Redis commands to modify lists are atomic. These properties make it possible to use it for our asynchronous background processing system without running into synchronization problems.

Let’s create a Redis backed queue that implements the push and shift methods just like the Queue we used previously.

require 'json'
require 'redis'

module Magique
  module Backend
    class Redis
      def initialize(connection = ::Redis.new)
        @connection = connection
      end

      def push(job)
        @connection.lpush('magique:queue', JSON.dump(job))
      end

      def shift
        _queue, job = @connection.brpop('magique:queue')
        payload = JSON.parse(job, symbolize_names: true)
        payload[:worker] = Object.const_get(payload[:worker])
        payload
      end
    end
  end
end

As Redis doesn’t know anything about Ruby objects, we have to serialize our tasks into JSON before storing them in the database using the lpush command that adds an element to the front of the list.

To fetch a task from the queue, we’re using the brpop command, which gets the last element from a list. If the list is empty, it’ll block until a new element is available. This is a nice way to pause our processors when no tasks are available. Finally, after getting a task out of Redis, we have to look up the real Ruby class based on the name of the worker using Object.const_get.

As a final step, let’s split things up into multiple processes. On the producer side of things, the only thing we have to do is change the backend to our newly implemented Redis queue.

# ...

Magique.backend = Magique::Backend::Redis.new

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

On the consumer side of things, we can get away with a few lines like this:

# ...

Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)

loop { sleep 1 }

When executed, the consumer process will wait for new work to arrive in the queue. Once we start the producer process that pushes tasks into the queue, we can see that they get processed immediately.

Enjoy Responsibly and Don’t Use This in Production

While we kept it far from a real world setup you would use in production (so don't!), we took a few steps in building a background processor. We started by making a process run as a background service. Then we made it async and used Queue to solve the producer-consumer problem. Then we expanded the process to multiple processes or machines using Redis rather then an in-memory implementation.

As mentioned before, this is a simplified implementation of a background processing system. There are a lot of things missing and not explicitly dealt with. These include (but are not limited to) error handling, multiple queues, scheduling, connection pooling, and signal handling.

Nonetheless, we had fun writing this and hope you enjoyed a peek under the hood of a background processing system. Perhaps you even took away a thing or two.

Guest writer Benedikt Deicke is a software engineer and CTO of Userlist.io. On the side, he’s writing a book about building SaaS applications in Ruby on Rails. You can reach out to Benedikt via Twitter.

💖 💪 🙅 🚩
benediktdeicke
Benedikt Deicke

Posted on April 10, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related