Alexey
Posted on July 18, 2024
TL;DR: In this post I explain how you can have persistent connections to Redis using Async::Redis client inside Sidekiq and why it doesn't work by default. There are some basic explanations of how Async and Sidekiq work and some source code.
All example code can be found here. Note that we don't actually need Rails, but I used it for examples because Sidekiq is usually used with Rails.
Async is a composable asynchronous I/O framework for Ruby. It allows you to do things concurrently using Fibers. Since 3.0, Ruby has a fiber scheduler and Ruby core supports it. This means you can have non-blocking I/O without much effort, for example, when using Net::HTTP
. If you perform a blocking operation, such as an HTTP call, inside a fiber, it will immediately yield so that another fiber can become active and do some useful work instead of blocking and waiting for the HTTP call to complete.
Using Ruby Async::Redis gem
Besides Ruby's support for the fiber scheduler, for some I/O operations, you might use specific gems, like Async::Redis. You can still use other Ruby gems that use native Ruby I/O with Async and they will give you non-blocking I/O as well, but there are two reasons to prefer Async::Redis:
- Its author, Samuel Williams, worked hard to ensure the interface is more consistent and easier to use. This is especially true for support of pipelines, transactions and subscriptions.
- It uses Async::Pool so you don't need to make connection pool by yourself. Async::Pool has persistent connections out of box, and also it's great that connections are created in a lazy way: a new one will be created only if there are no free connections (and if specified connections limit wasn't reached). This can be extremely handy if you use Async inside Sidekiq and can't calculate how many connections you will need.
The usage is simple:
First, add gem to Gemfile:
gem 'async-redis'
Then create a client:
endpoint = Async::Redis.local_endpoint # localhost:6379
@client = Async::Redis::Client.new(endpoint, limit: 10)
Limit
parameter will be passed to Async::Pool to limit maximum number of acquired connections, you also can check a short doc about another parameter you can pass, concurrency
.
# source code of Async::Pool
module Async
module Pool
class Controller
def self.wrap(**options, &block)
self.new(block, **options)
end
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)
Async::Redis.local_endpoint
is just a IO::Endpoint.tcp
with default local Redis host and port, we can find it in the gem's source code:
def self.local_endpoint(port: 6379)
::IO::Endpoint.tcp('localhost', port)
end
Now we can call the client:
Sync do
@client.set('key', 'value')
puts @client.get('key') # => "value"
ensure
@client.close
end
The resulting code:
# app/lib/redis_example.rb
require 'async/redis'
endpoint = Async::Redis.local_endpoint
@client = Async::Redis::Client.new(endpoint)
Sync do
@client.set('key', 'value')
puts @client.get('key') # => "value"
ensure
@client.close
end
Why do we need a Sync do
block? We'll dive into this a little further.
Using Async::Redis gem with Ruby on Rails
This setup is pretty simple, however, most of the time we don't write pure Ruby programs, usually we have Rails. For Rails, it would be handy to have a persistent client that we can call anywhere. Let's create one:
# app/lib/async_redis_client.rb
class AsyncRedisClient
def self.instance
Thread.current.thread_variable_get(:redis_client) ||
Thread.current.thread_variable_set(:redis_client, new)
end
def initialize
@client = Async::Redis::Client.new(Async::Redis.local_endpoint, limit: 10)
end
def with
yield @client
end
end
We use Thread.current.thread_variable_get
and Thread.current.thread_variable_set
to make our client thread-safe. If you won't do it, you will get fiber called across threads
error pretty soon.
Now we have a persistent client that should have persistent connections. We can use it in Sidekiq, right?
# app/workers/redis_worker.rb
class RedisWorker
include Sidekiq::Job
def perform
AsyncRedisClient.instance.with do |redis|
redis.set('key', 'value')
end
end
end
No. Remember using Sync do
in the beginning? Without it, we will receive this error:
No async task available! (RuntimeError)
A quick search and we see that Async Redis client needs to be called within an existing reactor. You can achieve it by running code inside Async do ...
or Sync do ...
block. Let's have a look at Sync
source code:
module Kernel
# Run the given block of code synchronously, but within a reactor if not already in one.
#
# @yields {|task| ...} The block that will execute asynchronously.
# @parameter task [Async::Task] The task that is executing the given block.
#
# @public Since `stable-v1`.
# @asynchronous Will block until given block completes executing.
def Sync(&block)
if task = ::Async::Task.current?
yield task
else
# This calls Fiber.set_scheduler(self):
reactor = Async::Reactor.new
begin
return reactor.run(finished: ::Async::Condition.new, &block).wait
ensure
Fiber.set_scheduler(nil)
end
end
end
end
Pretty straightforward, creates reactor if not inside existing one. Typical pure Ruby program with Async looks like this:
Sync do
# blabla
Async do
# some async stuff
end
Async do
# some more async stuff
end
# some more blabla
end # will wait for the finish of all async tasks inside
But we are using Sidekiq, so let's just do what it asks, wrap in Sync
block. You could also wrap it in Async
, in this example it doesn't matter a lot because both will create a Reactor and wait for task to be completed before closing Reactor. But If you already have a Reactor (like we will have at the end of the article), using Async
will lead for a Sidekiq job to be completed before our async task completed. But you also can do Async do ... end.wait
.
# app/workers/redis_worker_with_sync.rb
class RedisWorkerWithSync
include Sidekiq::Job
def perform
Sync do
AsyncRedisClient.instance.with do |redis|
redis.set('key', 'value')
end
end
end
end
That works! You can also refactor it; for example move Sync
block to Sidekiq middleware or to with
block of our client.
The only thing left is to check that it works as we want: with persistent connections to Redis.
Monitoring Redis connections with Ruby and Async
We could just use redis-cli
for it but we also can write a simple monitoring using Async
. Redis has an info
command that shows total_connections_received
. We can use it to monitor newly received connections after the monitor has started:
# app/lib/redis_monitor.rb
require 'async'
require 'async/redis'
require 'tty-sparkline'
require 'tty-screen'
endpoint = Async::Redis.local_endpoint
redis = Async::Redis::Client.new(endpoint)
connection_counts = []
interval = 1
max_width = TTY::Screen.width - 20
Sync do
initial_connections_count = redis.info[:total_connections_received].to_i
Async do |task|
loop do
new_connections = redis.info[:total_connections_received].to_i - initial_connections_count
connection_counts << new_connections
if connection_counts.size > max_width
connection_counts.shift
end
task.sleep(interval)
end
end
Async do |task|
loop do
print "\e[H\e[2J" # Clear the screen
puts "Redis Connection Monitoring"
puts "Total created connections: #{connection_counts.last}"
sparkline = TTY::Sparkline.new(connection_counts, width: max_width, height: 10)
puts sparkline.render
task.sleep(interval)
end
end
end
Here we have two tasks. The first one collects total_connections_received
and writes it to connection_counts
that will be used for our chart. The second task uses the tty-sparkline
gem to output the chart to console.
Let's also add a Rake task to fill our Sidekiq queue:
# /lib/tasks/job_pusher.rb
namespace :jobs do
desc "Push jobs to the queue continuously"
task push: :environment do
loop do
puts 'Pushing job to the queue'
RedisWorkerWithSync.perform_async
sleep 1
end
end
end
Now let's run all we need for our experiment:
bundle exec sidekiq
bundle exec rake jobs:push
-
ruby app/lib/redis_monitor.rb
And lets have a look at monitoring result:
The connection count starts at 6 (Sidekiq creates and manages its own connection pool to Redis, atually 7th connection is created by Sidekiq as well when it starts processing jobs) and increases constantly, adding one new connection every second (because we push a new job every second). Why does it happen? I mentioned in the beginning that Async::Redis has persistent connections, hasn't it?
Well, it has. But only within a reactor. Because existing Reactor is definition of a "lifetime" of a Ruby program with Async. After our top Sync or Async block finishes Redis should close all connections, why does this need them anymore?
How does Sidekiq process run
Okay, back to our Rails app. In our app, the lifetime should be the whole Sidekiq process. Luckily, Sidekiq has internal documentation on how it runs. I won't copy the entire documentation here, just the part we are interested in:
Sidekiq::Manager
manages the resources for a givenSidekiq::Capsule
. It creates and starts NSidekiq::Processor
instances. ... EachSidekiq::Processor
instance is a separate thread.
That's exactly what we need, because we need to have a separate reactor for each thread.
Sidekiq 7.3 source code uses Processor
method run
looks like this:
# frozen_string_literal: true
module Sidekiq
class Processor
def run
# By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+
# instead of the global pool in +Sidekiq::Config#redis_pool+.
Thread.current[:sidekiq_capsule] = @capsule
process_one until @done
@callback.call(self)
rescue Sidekiq::Shutdown
@callback.call(self)
rescue Exception => ex
@callback.call(self, ex)
end
end
end
It uses a loop for processing jobs process_one until @done
and it's easy to wrap:
Sync do
process_one until @done
end
We can put a file with our monkey-patch in app/lib/sidekiq/processor.rb
:
# app/lib/sidekiq/processor.rb
# frozen_string_literal: true
module Sidekiq
class Processor
def run
# By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+
# instead of the global pool in +Sidekiq::Config#redis_pool+.
Thread.current[:sidekiq_capsule] = @capsule
Sync do
process_one until @done
end
@callback.call(self)
rescue Sidekiq::Shutdown
@callback.call(self)
rescue Exception => ex
@callback.call(self, ex)
end
end
end
And we need to add it to initializer to load our code instead of original:
# config/initializers/sidekiq.rb
require_relative '../../app/lib/sidekiq/processor'
That's it! It will run in Sync block until we shut down Sidekiq. Now we can remove Sync block from our worker and everything works perfectly with persistent connections.
class RedisWorker
include Sidekiq::Job
def perform
AsyncRedisClient.instance.with do |redis|
redis.set('key', 'value')
end
end
end
Now when our Sidekiq is wrapped, we can change job pusher to use RedisWorker
instead of RedisWorkerWithSinc
and run it. Let's have a look at the monitor:
We see that connections size raises slowly until it reaches 12. That's because we have Sidekiq with default concurrency of 5, this means that we have 5 separate threads and when our job gets into thread that wasn't used before it has to create it's first connection. Let's change our rake task to push more jobs:
namespace :jobs do
desc "Push jobs to the queue continuously"
task push: :environment do
loop do
puts 'Pushing job to the queue'
20.times do
RedisWorker.perform_async
end
sleep 1
end
end
end
It reaches 12 instantly and stops at this number as we wanted.
Now, let's use Async inside Sidekiq. Currently, each woker performs only one Redis operation, that's why we use one same connection from the pool each time. Imagine that each worker needs to do 100 different Redis operations, with Async we could do them concurrently and more effective:
class RedisWorker
include Sidekiq::Job
def perform
100.times do
Async do
rand = SecureRandom.hex
AsyncRedisClient.instance.with do |redis|
redis.set(rand, rand)
end
end
end
end
end
The mathematics is simple: Sidekiq utilizes 7 connection by itself and we have 5 Sidekiq threads, each thread has a connection pool to Redis with limit equal 10. Eventually we reach 57 connections.
Note that code above will finish Sidekiq job as soon as all tasks will be scheduled, not completed. We can change code to wait for completion, but we'll need much more tasks to utilize all connections in this way:
class RedisWorker
include Sidekiq::Job
def perform
tasks = 10000.times.map do
Async do
rand = SecureRandom.hex
AsyncRedisClient.instance.with do |redis|
redis.set(rand.to_s, rand.to_s)
end
end
end
tasks.map(&:wait)
end
end
And this is what would happen if we rolled back our monkey-patch:
# config/initializers/sidekiq.rb
# require_relative '../../app/lib/sidekiq/processor'
class RedisWorker
include Sidekiq::Job
def perform
100.times do
Async do
rand = SecureRandom.hex
AsyncRedisClient.instance.with do |redis|
redis.set(rand, rand)
end
end
end
end
end
Finally, we can wrap whole worker in Sync
as before to see what will happen:
class RedisWorker
include Sidekiq::Job
def perform
Sync do
100.times do
Async do
rand = SecureRandom.hex
AsyncRedisClient.instance.with do |redis|
redis.set(rand, rand)
end
end
end
end
end
end
A little better because previously each Async
created it's Reactor with new connection pool and now we have Sync
that's owns a Reactor with pool for all Async tasks inside. But still, much worse than with patched Sidekiq.
This example is built around Redis, but same would be true for other Async I/O gems that need a Reactor, for example Async::HTTP
.
Using Async
could be tricky, but I encourage you to try it. If you are not familiar to it (and for some reason read this article till the end) I can recommend you to watch this video from the latest RailsConf 2024.
Posted on July 18, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.