Persistent Redis Connections in Sidekiq with Async::Redis: A Deep Dive.

alexey2257

Alexey

Posted on July 18, 2024

Persistent Redis Connections in Sidekiq with Async::Redis: A Deep Dive.

TL;DR: In this post I explain how you can have persistent connections to Redis using Async::Redis client inside Sidekiq and why it doesn't work by default. There are some basic explanations of how Async and Sidekiq work and some source code.

All example code can be found here. Note that we don't actually need Rails, but I used it for examples because Sidekiq is usually used with Rails.

Async is a composable asynchronous I/O framework for Ruby. It allows you to do things concurrently using Fibers. Since 3.0, Ruby has a fiber scheduler and Ruby core supports it. This means you can have non-blocking I/O without much effort, for example, when using Net::HTTP. If you perform a blocking operation, such as an HTTP call, inside a fiber, it will immediately yield so that another fiber can become active and do some useful work instead of blocking and waiting for the HTTP call to complete.

Using Ruby Async::Redis gem

Besides Ruby's support for the fiber scheduler, for some I/O operations, you might use specific gems, like Async::Redis. You can still use other Ruby gems that use native Ruby I/O with Async and they will give you non-blocking I/O as well, but there are two reasons to prefer Async::Redis:

  1. Its author, Samuel Williams, worked hard to ensure the interface is more consistent and easier to use. This is especially true for support of pipelines, transactions and subscriptions.
  2. It uses Async::Pool so you don't need to make connection pool by yourself. Async::Pool has persistent connections out of box, and also it's great that connections are created in a lazy way: a new one will be created only if there are no free connections (and if specified connections limit wasn't reached). This can be extremely handy if you use Async inside Sidekiq and can't calculate how many connections you will need.

The usage is simple:

First, add gem to Gemfile:

gem 'async-redis'
Enter fullscreen mode Exit fullscreen mode

Then create a client:

endpoint = Async::Redis.local_endpoint # localhost:6379
@client = Async::Redis::Client.new(endpoint, limit: 10)
Enter fullscreen mode Exit fullscreen mode

Limit parameter will be passed to Async::Pool to limit maximum number of acquired connections, you also can check a short doc about another parameter you can pass, concurrency.

# source code of Async::Pool

module Async
  module Pool
    class Controller
      def self.wrap(**options, &block)
        self.new(block, **options)
      end

      def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)
Enter fullscreen mode Exit fullscreen mode

Async::Redis.local_endpoint is just a IO::Endpoint.tcp with default local Redis host and port, we can find it in the gem's source code:

def self.local_endpoint(port: 6379)
  ::IO::Endpoint.tcp('localhost', port)
end
Enter fullscreen mode Exit fullscreen mode

Now we can call the client:

Sync do
  @client.set('key', 'value')
  puts @client.get('key') # => "value"
ensure
  @client.close
end
Enter fullscreen mode Exit fullscreen mode

The resulting code:

# app/lib/redis_example.rb

require 'async/redis'

endpoint = Async::Redis.local_endpoint
@client = Async::Redis::Client.new(endpoint)

Sync do
  @client.set('key', 'value')
  puts @client.get('key') # => "value"
ensure
  @client.close
end
Enter fullscreen mode Exit fullscreen mode

Why do we need a Sync do block? We'll dive into this a little further.

Using Async::Redis gem with Ruby on Rails

This setup is pretty simple, however, most of the time we don't write pure Ruby programs, usually we have Rails. For Rails, it would be handy to have a persistent client that we can call anywhere. Let's create one:

# app/lib/async_redis_client.rb

class AsyncRedisClient
  def self.instance
    Thread.current.thread_variable_get(:redis_client) || 
      Thread.current.thread_variable_set(:redis_client, new)
  end

  def initialize
    @client = Async::Redis::Client.new(Async::Redis.local_endpoint, limit: 10)
  end

  def with
    yield @client
  end
end
Enter fullscreen mode Exit fullscreen mode

We use Thread.current.thread_variable_get and Thread.current.thread_variable_set to make our client thread-safe. If you won't do it, you will get fiber called across threads error pretty soon.

Now we have a persistent client that should have persistent connections. We can use it in Sidekiq, right?

# app/workers/redis_worker.rb

class RedisWorker
  include Sidekiq::Job

  def perform
    AsyncRedisClient.instance.with do |redis|
      redis.set('key', 'value')
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

No. Remember using Sync do in the beginning? Without it, we will receive this error:

No async task available! (RuntimeError)

A quick search and we see that Async Redis client needs to be called within an existing reactor. You can achieve it by running code inside Async do ... or Sync do ... block. Let's have a look at Sync source code:

module Kernel
  # Run the given block of code synchronously, but within a reactor if not already in one.
  #
  # @yields {|task| ...} The block that will execute asynchronously.
  #     @parameter task [Async::Task] The task that is executing the given block.
  #
  # @public Since `stable-v1`.
  # @asynchronous Will block until given block completes executing.
  def Sync(&block)
    if task = ::Async::Task.current?
      yield task
    else
      # This calls Fiber.set_scheduler(self):
      reactor = Async::Reactor.new

      begin
        return reactor.run(finished: ::Async::Condition.new, &block).wait
      ensure
        Fiber.set_scheduler(nil)
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Pretty straightforward, creates reactor if not inside existing one. Typical pure Ruby program with Async looks like this:

  Sync do
    # blabla
    Async do
      # some async stuff
    end
    Async do
      # some more async stuff
    end
    # some more blabla
  end # will wait for the finish of all async tasks inside
Enter fullscreen mode Exit fullscreen mode

But we are using Sidekiq, so let's just do what it asks, wrap in Sync block. You could also wrap it in Async, in this example it doesn't matter a lot because both will create a Reactor and wait for task to be completed before closing Reactor. But If you already have a Reactor (like we will have at the end of the article), using Async will lead for a Sidekiq job to be completed before our async task completed. But you also can do Async do ... end.wait.

# app/workers/redis_worker_with_sync.rb

class RedisWorkerWithSync
  include Sidekiq::Job

  def perform
    Sync do
      AsyncRedisClient.instance.with do |redis|
        redis.set('key', 'value')
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

That works! You can also refactor it; for example move Sync block to Sidekiq middleware or to with block of our client.

The only thing left is to check that it works as we want: with persistent connections to Redis.

Monitoring Redis connections with Ruby and Async

We could just use redis-cli for it but we also can write a simple monitoring using Async. Redis has an info command that shows total_connections_received. We can use it to monitor newly received connections after the monitor has started:

# app/lib/redis_monitor.rb

require 'async'
require 'async/redis'
require 'tty-sparkline'
require 'tty-screen'

endpoint = Async::Redis.local_endpoint
redis = Async::Redis::Client.new(endpoint)

connection_counts = []

interval = 1
max_width = TTY::Screen.width - 20

Sync do
  initial_connections_count = redis.info[:total_connections_received].to_i

  Async do |task|
    loop do
      new_connections = redis.info[:total_connections_received].to_i - initial_connections_count

      connection_counts << new_connections

      if connection_counts.size > max_width
        connection_counts.shift
      end

      task.sleep(interval)
    end
  end

  Async do |task|
    loop do
      print "\e[H\e[2J" # Clear the screen

      puts "Redis Connection Monitoring"
      puts "Total created connections: #{connection_counts.last}"

      sparkline = TTY::Sparkline.new(connection_counts, width: max_width, height: 10)
      puts sparkline.render

      task.sleep(interval)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Here we have two tasks. The first one collects total_connections_received and writes it to connection_counts that will be used for our chart. The second task uses the tty-sparkline gem to output the chart to console.

Let's also add a Rake task to fill our Sidekiq queue:

# /lib/tasks/job_pusher.rb

namespace :jobs do
  desc "Push jobs to the queue continuously"
  task push: :environment do
    loop do
      puts 'Pushing job to the queue'
      RedisWorkerWithSync.perform_async
      sleep 1
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Now let's run all we need for our experiment:

  • bundle exec sidekiq
  • bundle exec rake jobs:push
  • ruby app/lib/redis_monitor.rb

And lets have a look at monitoring result:

The connection count starts at 6 (Sidekiq creates and manages its own connection pool to Redis, atually 7th connection is created by Sidekiq as well when it starts processing jobs) and increases constantly, adding one new connection every second (because we push a new job every second). Why does it happen? I mentioned in the beginning that Async::Redis has persistent connections, hasn't it?

Well, it has. But only within a reactor. Because existing Reactor is definition of a "lifetime" of a Ruby program with Async. After our top Sync or Async block finishes Redis should close all connections, why does this need them anymore?

How does Sidekiq process run

Okay, back to our Rails app. In our app, the lifetime should be the whole Sidekiq process. Luckily, Sidekiq has internal documentation on how it runs. I won't copy the entire documentation here, just the part we are interested in:

Sidekiq::Manager manages the resources for a given Sidekiq::Capsule. It creates and starts N Sidekiq::Processor instances. ... Each Sidekiq::Processor instance is a separate thread.

That's exactly what we need, because we need to have a separate reactor for each thread.

Sidekiq 7.3 source code uses Processor method run looks like this:

# frozen_string_literal: true

module Sidekiq
  class Processor
     def run
      # By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+
      # instead of the global pool in +Sidekiq::Config#redis_pool+.
      Thread.current[:sidekiq_capsule] = @capsule

      process_one until @done
      @callback.call(self)
    rescue Sidekiq::Shutdown
      @callback.call(self)
    rescue Exception => ex
      @callback.call(self, ex)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

It uses a loop for processing jobs process_one until @done and it's easy to wrap:

Sync do
  process_one until @done
end
Enter fullscreen mode Exit fullscreen mode

We can put a file with our monkey-patch in app/lib/sidekiq/processor.rb:

# app/lib/sidekiq/processor.rb

# frozen_string_literal: true

module Sidekiq
  class Processor
    def run
      # By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+
      # instead of the global pool in +Sidekiq::Config#redis_pool+.
      Thread.current[:sidekiq_capsule] = @capsule

      Sync do
        process_one until @done
      end
      @callback.call(self)
    rescue Sidekiq::Shutdown
      @callback.call(self)
    rescue Exception => ex
      @callback.call(self, ex)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

And we need to add it to initializer to load our code instead of original:

# config/initializers/sidekiq.rb

require_relative '../../app/lib/sidekiq/processor'
Enter fullscreen mode Exit fullscreen mode

That's it! It will run in Sync block until we shut down Sidekiq. Now we can remove Sync block from our worker and everything works perfectly with persistent connections.

class RedisWorker
  include Sidekiq::Job

  def perform
    AsyncRedisClient.instance.with do |redis|
      redis.set('key', 'value')
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Now when our Sidekiq is wrapped, we can change job pusher to use RedisWorker instead of RedisWorkerWithSinc and run it. Let's have a look at the monitor:

We see that connections size raises slowly until it reaches 12. That's because we have Sidekiq with default concurrency of 5, this means that we have 5 separate threads and when our job gets into thread that wasn't used before it has to create it's first connection. Let's change our rake task to push more jobs:

namespace :jobs do
  desc "Push jobs to the queue continuously"
  task push: :environment do
    loop do
      puts 'Pushing job to the queue'
      20.times do
        RedisWorker.perform_async
      end
      sleep 1
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

It reaches 12 instantly and stops at this number as we wanted.

Now, let's use Async inside Sidekiq. Currently, each woker performs only one Redis operation, that's why we use one same connection from the pool each time. Imagine that each worker needs to do 100 different Redis operations, with Async we could do them concurrently and more effective:

class RedisWorker
  include Sidekiq::Job

  def perform
    100.times do
      Async do
        rand = SecureRandom.hex
        AsyncRedisClient.instance.with do |redis|
          redis.set(rand, rand)
        end
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

The mathematics is simple: Sidekiq utilizes 7 connection by itself and we have 5 Sidekiq threads, each thread has a connection pool to Redis with limit equal 10. Eventually we reach 57 connections.

Note that code above will finish Sidekiq job as soon as all tasks will be scheduled, not completed. We can change code to wait for completion, but we'll need much more tasks to utilize all connections in this way:

class RedisWorker
  include Sidekiq::Job

  def perform
    tasks = 10000.times.map do
      Async do
        rand = SecureRandom.hex
        AsyncRedisClient.instance.with do |redis|
          redis.set(rand.to_s, rand.to_s)
        end
      end
    end
    tasks.map(&:wait)
  end
end

Enter fullscreen mode Exit fullscreen mode

And this is what would happen if we rolled back our monkey-patch:

# config/initializers/sidekiq.rb
# require_relative '../../app/lib/sidekiq/processor'
Enter fullscreen mode Exit fullscreen mode
class RedisWorker
  include Sidekiq::Job

  def perform
    100.times do
      Async do
        rand = SecureRandom.hex
        AsyncRedisClient.instance.with do |redis|
          redis.set(rand, rand)
        end
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Finally, we can wrap whole worker in Sync as before to see what will happen:

class RedisWorker
  include Sidekiq::Job

  def perform
    Sync do
      100.times do
        Async do
          rand = SecureRandom.hex
          AsyncRedisClient.instance.with do |redis|
            redis.set(rand, rand)
          end
        end
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

A little better because previously each Async created it's Reactor with new connection pool and now we have Sync that's owns a Reactor with pool for all Async tasks inside. But still, much worse than with patched Sidekiq.

This example is built around Redis, but same would be true for other Async I/O gems that need a Reactor, for example Async::HTTP.

Using Async could be tricky, but I encourage you to try it. If you are not familiar to it (and for some reason read this article till the end) I can recommend you to watch this video from the latest RailsConf 2024.

💖 💪 🙅 🚩
alexey2257
Alexey

Posted on July 18, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related