Shalvah
Posted on November 1, 2022
This post is part 1 of a series where I build a task queue system. Other parts:
Motivation
Some time ago, I came across a post suggesting that engineering teams should build their own task queues. I don't agree, but thinking about it made me realize that task queues are actually a great learning project, especially if you're learning a new language or stuck in the "advanced beginner" spot. Nearly every app that goes to production needs one. And there are many different challenges to tackle and different implementations to try out. Plus, knowing what goes on within a task qeue makes you utilise your existing quees better.
I realized I could do with some challenges in these areas, so I'm going to take my own advice and build my own queueing system. This article aims to be sort of a journal of what I'm doing and learning.
Thoughts on queues
Types
I'm very far from being a queue expert, but I know that there are different types of queues. I know there are job queues and task queues (which might be the same thing?) and message queues. The idea is that job/task queues (such as Ruby's Sidekiq and Resque) are for pushing tasks you need done, and one or more workers pick them from the queue and execute them. A message queue (eg RabbitMQ, Kafka), by comparison, is for publishing messages, which can then be consumed (one or more times) by interested subscribers.
it isn't strictly separated; people often use message queues as job queues too, and vice versa. But I'll be building a task queue here. I think a message queue is another interesting challenge, though.
Parts
I like to think of queue systems as having two main parts: the infrastructure and the interface. The infrastructure is what powers the queue, and is responsible for making sure your jobs run as you wish. The interface is how your application communicates with the infrastructure (for instance, to queue jobs). There could also be a web UI for viewing information about your queues, but I'm ignoring that for now.
A queuing system can come with one or both parts, and they can have different degrees of complexity. For example:
- Beanstalkd, RabbitMQ, and Kafka are mostly used as external infrastructure. You don't have to install Beanstalkd into your app, just start the process and install (or write) a library to talk to it.
- Rails' ActiveRecord is an interface to different queuing backends.
- Sidekiq and Delayed Job are Ruby tools that are installed as part of your app. They come with their own interface, but you can also use them with other interfaces, such as Rails' ActiveJob.
- The Node.js libraries BullMQ and Bee Queue are also infrastructure and interface. However, you typically have to use their interface.
- Laravel's queuing system is an interface that comes with its own infrastructure, and its infra gives you different queuing backends as options, from database- and Redis-backed ones to services like Beanstalkd or Amazon Simple Queue Service (SQS).
Architecture
There are often two major architecture decisions:
- Where jobs are stored (and how long, and when)
- How jobs are executed
Two common options for job storage are a database and Redis. Redis is nice because it allows fast access, and you can easily retrieve new jobs without having to poll at intervals (for example, by using blocking commands). Databases are nice because they're simpler and allow a wider variety of queries, which is useful if you intend to store jobs or support querying them through the UI.
I think Redis is generally superior for this, especially for high-activity systems. With a database, the more jobs you insert, the longer your queries take. And even when you delete old records, they aren't necessarily immediately deleted on disk.
As for execution, most in-app queue systems seem to follow this basic process:
- you push a job to the queue by saving it to the datastore
- a process that's always listening (the worker) fetches it from the datastore and executes it
But there's no rule that we have to do this. For instance, a basic queue system could be something like this:
- Put the code for your job in a separate file, say
send_welcome_email.rb
# send_welcome_email.rb
def send_welcome_email(user_id)
# ...
end
input = ARGV
send_welcome_email(*input)
- Run the script in a separate process and don't wait for the result
# in your controller
user = ...
Process.spawn("ruby send_welcome_email.rb #{user.id}")
But this isn't a robust system. For starters, it takes time to start a process, we have no error handling or management capabilities, and we might end up overloading our machine with too many processes.
Using a worker makes things better:
- The worker is already active and listening, so there is no startup overhead and it can react to new jobs quickly.
- You can start multiple workers to improve concurrency (e.g. 1 process per CPU core).
- Jobs are pushed to a store, which is better for reliability
In practice, though, there are many different variations of this:
- The worker process could itself spawn multiple threads for more concurrency (Sidekiq, GoodJob). This means that your jobs must be thread-safe.
- The worker process could fork itself into a new process for each job (Resque).
- There could be a "master" worker, which serves as a coordinator and manager of the others. It can start or stop processes depending on your configuration. It can monitor the health of processes, kill those that go past a memory limit or execution time, and start new ones. (Sidekiq Enterprise)
- You might not talk to the datastore directly; instead, the master process has its internal storage, and merely exposes an API (such as localhost:2946). To enqueue a job, you send it as a request to that endpoint. This is typically used by external infra like Beanstalkd.
What I'm building
Architecture
I'll be building something inspired by Sidekiq. I think its architecture is powerful and its interface is simple. In my case, this means:
- Data store: Redis (but I'll start out with a database implementation). Sidekiq doesn't store completed jobs, but I'd like to store them for a limited time
- Executors: worker processes which spawn threads. I do like the "master" model, though, so I'll see if I can give that a try too.
Features
I did some research to find out the most common and useful queue features, and I've highlighted some I think are a good start:
- dispatching jobs (with a delay, on specific queue, in bulk, on a schedule)
- giving queues names and priorities
- telling a worker which queues to process
- chaining jobs
- splitting a job into batches
- error handling
- retry policies
- job callbacks/middleware
- graceful shutdown of workers
- testing helpers
- web UI
- metrics
Setup
I've created a dummy Sinatra app with a route that queues jobs:
get "/queue/:count?" do
# Enqueue jobs...
"Queued #{params[:count] || 1} jobs"
end
Next up, configuring the database using the Sequel query builder.
## lib/db.rb
require "sequel"
DB = Sequel.sqlite(File.join(__dir__, '../database.db'))
# lib/db_migrate.rb
DB.drop_table? :jobs # drop if exists
DB.create_table :jobs do
#
end
Nothing serious in the migration script yet; it's just a stub for when I'm ready to run migrations.
Okay, we're ready. Let's do some queueing!
Job interface
Most job libraries have you create a job class to contain the logic you want to execute. Then, when queueing, you either:
- pass an instance of the job to the library, which is then serialized, stored the job, and unserialized when it's time to execute, or
- pass the job arguments to the library, and it stores those, then creates an instance of the job when it's ready to execute it.
I'll go with the second, because I don't have to worry about any gotchas in serialization.
Here's what I want queuing a job to look like:
DoSomeStuff.dispatch(args, job_options)
# Example:
SendWelcomeEmail.dispatch(user.id, wait: 2.days)
args
will be passed to the job when executing it, while job_options
are meant for the executor. Here the caller can configure things like a delay, queue name, or priority. Some job libraries use additional methods for this (ActiveJob would be DoSomeStuff.set(options).perform(arguments)
, while Sidekiq would be DoSomeStuff.perform_after(time, args)
), but I'm sticking with this for a start.
Now, to implement my job interface. We're starting out with the database as our store, so let's set up our jobs table. I haven't fully thought out the implementation yet, but at the least our jobs will need to store the job arguments and the execution details.
# lib/db_migrate.rb
DB.drop_table? :jobs
DB.create_table :jobs do
String :id, primary_key: true, size: 12
String :name, null: false
JSON :args, null: false
String :queue, default: "default"
DateTime :created_at, default: Sequel::CURRENT_TIMESTAMP, index: true
DateTime :next_execution_at, null: true, index: true # for scheduled jobs and retries
DateTime :last_executed_at, null: true
Integer :attempts, default: 0
String :state, default: "waiting", index: true
String :error_details, null: true
String :reserved_by, null: true
end
Since we've defined a state
column, let's talk a bit about the job lifecycle. I've decided to go with these:
- waiting: The job is waiting to be picked up.
- executing: The job has been picked up by a worker.
- succeeded
- failed: The job failed, but it's going to be retried.
- dead: The job has failed and exhausted the maximum retry count. It won't be retried anymore.
The reserved_by
field is used for locks. It allows us to improve concurrency by having multiple queue workers without them clashing with each other. When a worker picks up a job, it sets that field so that other workers don't try to pick up the same job. A worker will only pick up a job that isn't reserved.
Run this with ruby lib/db_migrate.rb
, and we're good.
Next, we'll provide a Queueable
class that has the dispatch
method, which DoSomeStuff
extends.
# gator/queueable.rb
module Gator
class Queueable
def self.dispatch(*args, **options)
job = Gator::Models::Job.new(name: self.name, args: args)
job.save
Gator::Logger.new.info "Enqueued job id=#{job.id} args=#{job.args}"
end
attr_reader :logger
def initialize
super
@logger = Gator::Logger.new
end
end
end
# jobs/do_some_stuff.rb
class Jobs::DoSomeStuff < Gator::Queueable
def handle(arg1, arg2 = nil)
logger.info "HIIII #{arg1} and #{arg2}"
end
end
Zooming in on some details:
-
Gator::Queueable
is the class end-users will extend. Right now, it only provides two things: the staticdispatch
method, and alogger
for when your job is being executed. -
Gator::Models::Job
is a class that we use to interact with jobs in the database. Its contents:
module Gator
module Models
class Job < Sequel::Model
def before_create
self.id = "job_" + SecureRandom.hex(12)
self.args = self[:args].to_json
end
def args = JSON.parse(self[:args])
end
end
end
- We store the class name as the name of the job, so the job executor can create an instance of the job class. Unfortunately, this means that if we rename the class after enqueuing a job, but before it is executed, our executor might crash, since the job class no longer exists. (This would also happen if we used the serialize-store-deserialize approach—there'd be no class to deserialize to.)
This is a known limitation of such job systems. It can be fixed by giving every job a name that's independent of the class name, and looking that up instead. Example:
jobs = {
'do_stuff' => DoSomeStuff,
}
# We can safely rename the class
jobs = {
'do_stuff' => DoStuff,
}
However, this pushes the problem to another layer. What happens if you want to change the custom name? Similar problem. There isn't a full solution for this, but it isn't a very frequent problem, and you can usually find several workarounds.
And now updating our route, and visiting it:
get "/queue/:count?" do
count = Integer(params[:count] || 1)
count.times do |i|
DoSomeStuff.dispatch("Nellie #{i}", "Buster #{i}")
end
"Queued #{count} jobs"
end
The logs:
INFO [2022-10-27 12:31:37] Enqueued job id=job_0f25c864a7a16bb7a4c8ade6 args=["Nellie 0", "Buster 0"]
Okay, great! We can enqueue jobs now. That's the easy part.
The queue worker
Our queue worker will poll the database for new jobs. If there are any, it will try to reserve one.
loop do
if (job = next_job)
execute_job(job)
else
sleep(interval)
end
end
Let's flesh this out into a Worker
class:
module Gator
class Worker
attr_reader :polling_interval, :logger, :worker_id
def initialize(**opts)
super
@polling_interval = 5 # Check for new jobs every 5s
@logger = Gator::Logger.new
@worker_id = "wrk_" + SecureRandom.hex(8)
end
def run
logger.info "Worker #{worker_id} ready"
loop do
if (job = next_job)
execute_job job
cleanup job
else
sleep polling_interval
end
end
end
def next_job
job = check_for_jobs
return nil unless job
reserve_job(job) || nil
end
def check_for_jobs
query = Models::Job.where(state: "waiting", reserved_by: nil)
query = query.where { (next_execution_at =~ nil) | (next_execution_at <= Time.now) }
query.first
end
def reserve_job(job)
# Important: we only reserve the job if it hasn't already been reserved by another worker
updated_count = Models::Job.where(id: job.id, reserved_by: nil).update(reserved_by: worker_id)
updated_count == 1 ? job : false
end
def execute_job(job)
Object.const_get(job.name).new.handle(job.args)
logger.info "Processed job id=#{job.id} result=succeeded args=#{job.args}"
end
def cleanup(job)
job.reserved_by = nil
job.attempts += 1
job.last_executed_at = Time.now
job.state = "succeeded"
job.save
end
end
end
Finally, a CLI script that instantiates and starts a Worker
:
# gator/bin/work.rb
require 'optparse'
options = {}
OptionParser.new do |parser|
parser.on("-r", "--require FILE", "File to load at startup. Use this to boot your app.")
end.parse!(into: options)
require options.delete(:require) if options[:require]
require_relative '../worker'
w = Gator::Worker.new
w.run
Note the --require
option. I've added this so we can load the user's app before starting the worker, otherwise we'll get errors for undefined classes.
Now we start the worker (boot.rb
is the file that loads my app):
ruby ./gator/bin/work.rb -r ./app/boot.rb
And it immediately executes our waiting job:
INFO [2022-10-27 13:31:41] Worker wrk_1c423b74b85c72d6 ready
INFO [2022-10-27 13:31:42] HIIII Nellie 0 and Buster 0
INFO [2022-10-27 13:31:42] Processed job id=job_0f25c864a7a16bb7a4c8ade6 result=succeeded args=["Nellie 0", "Buster 0"]
We can enqueue more jobs, and process them almost instantly:
One limitation of thr reserved_by
system is that, if the worker is somehow killed while processing a job, it will never be picked up again. To fix this, we could add a reserved_at
column that we use to decide if a job is still locked. But I won't bother with that, since we're switching to Redis later, which makes time-based locks easier.
Concurrency
Let's test our concurrency setup. What happens when we have multiple queue workers? I'll try enqueueing three jobs with three workers listening:
Well, it's a bit disappointing. All three jobs are processed by the same worker.
There are a few reasons for this:
- The job doesn't don't do any real work, just prints a log. That takes almost no time. If you look at the timestamps, you'll see that all three jobs are executed within a second.
- Our worker
loop
algorithm only sleeps if there weren't any jobs. As long as there's a job to process, the worker will keep processing.
These two factors combined mean that the worker that picks up the first job will finish it, and pick up the next, and so on, before the other workers even get to check for jobs again (after 5s). The impact of more workers will only be seen when we have a lot of jobs, or our jobs actually do some work that takes time.
Let's see what happens if I add a little "work" to the job:
class Jobs::DoSomeStuff < Gator::Job
def handle(arg1, arg2 = nil)
+ sleep 1.5
logger.info "HIIII #{arg1} and #{arg2}"
end
end
Ah, that's better. One worker still has nothing to do, but the other two share the three jobs. And none is processed twice.
Handling failures
But what if a job fails? Right now, it would crash our worker. So one more thing for today: let's add some basic error handling.
For now, all we'll do is record it when an error happens. We just need to adjust a few methods in our worker:
module Gator
class Worker
# ...
def run
logger.info "Worker #{worker_id} ready"
loop do
if (job = next_job)
- execute_job(job)
- cleanup(job)
+ error = execute_job(job)
+ cleanup(job, error)
else
sleep polling_interval
end
end
end
def execute_job(job)
Object.const_get(job.name).new.handle(*job.args)
logger.info "Processed job id=#{job.id} result=succeeded args=#{job.args}"
+ nil
+ rescue => e
+ logger.info "Processed job id=#{job.id} result=failed args=#{job.args}"
+ e
end
- def cleanup(job)
+ def cleanup(job, error = nil)
job.reserved_by = nil
job.attempts += 1
job.last_executed_at = Time.now
- job.state = "succeeded"
+ job.state = error ? "failed" : "succeeded"
+ job.error_details = error if error
job.save
end
end
end
And I'll edit the job so it randomly fails:
class Jobs::DoSomeStuff < Gator::Queueable
def handle(arg1, arg2 = nil)
sleep 1.5
+ raise "Oh no, a problem" if rand > 0.8
logger.info "HIIII #{arg1} and #{arg2}"
end
end
And now, when I queue a couple of jobs, some fail:
And we can see them marked as failed in the database, complete with the error details:
Okay, that's a good start. You can see the full code at this point on this commit.
Check out the next post in the series.
Posted on November 1, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.