Handle Incoming Webhooks with LiteJob for Ruby on Rails

julianrubisch

julianrubisch

Posted on November 22, 2023

Handle Incoming Webhooks with LiteJob for Ruby on Rails

In parts one and two of this series, we only dealt with the pure CRUD aspects of using SQLite as a production database.

In this post, we will explore the world of queue mechanisms, using SQLite as the pub/sub adapter for ActiveJob.

Let's make use of LiteJob to handle incoming webhooks in our Rails application.

Our Ruby on Rails Use Case

Our use case is the image generation that we are currently doing synchronously in the PromptsController:

model = Replicate.client.retrieve_model("stability-ai/stable-diffusion-img2img")
version = model.latest_version
version.predict({prompt: prompt_params[:title], image: @prompt.data_url}, replicate_rails_url)
Enter fullscreen mode Exit fullscreen mode

Clearly, there are some long-running processes here that should be deferred to a background job: the loading of the model, and the prediction itself.

Configuration

In your environment configuration files, ensure you reference LiteJob as the queue adapter:

# config/environments/development.rb
# config/environments/production.rb
Rails.application.configure do
  # ...

  config.active_job.queue_adapter = :litejob

  # ...
end
Enter fullscreen mode Exit fullscreen mode

Now we're all set to continue our investigation. Don't forget to restart your development server, though!

If you like, you can add more ActiveJob configuration in config/litejob.yml — for example, queue priorities:

queues:
  - [default, 1]
  - [urgent, 5]
  - [critical, 10, "spawn"]
Enter fullscreen mode Exit fullscreen mode

Please refer to the README for further details.

That's set up, so let's move on to generating our images asynchronously.

Generating Images Asynchronously in Our Ruby on Rails Application

We'll start by scaffolding our job with the usual Rails generator command:

$ bin/rails g job GenerateImage
Enter fullscreen mode Exit fullscreen mode

Let's move the image generation code from PromptsController into the perform method of this job:

class GenerateImageJob < ApplicationJob
  include Rails.application.routes.url_helpers

  queue_as :default

  def perform(prompt:)
    model = Replicate.client.retrieve_model("stability-ai/stable-diffusion-img2img")
    version = model.latest_version
    version.predict({prompt: prompt.title, image: prompt.data_url}, replicate_rails_url(host: Rails.application.config.action_mailer.default_url_options[:host]))
  end
end
Enter fullscreen mode Exit fullscreen mode

Note that we have to include the URL helpers module here to access replicate_rails_url (as opposed to controllers, where this happens automatically). Furthermore, we also need to explicitly specify the host — we grab it off ActionMailer's default_url_options, in this case. Look at Andy Croll's excellent 'Use Rails URL helpers outside views and controllers' post for more sophisticated uses.

To make this work with the incoming webhook, make sure you also reference your ngrok URL in your app's configuration:

# config/environments/development.rb
config.action_mailer.default_url_options = { host: "YOUR_NGROK_URL", port: 3000 }
Enter fullscreen mode Exit fullscreen mode

As the final step on the requesting side of our call to Replicate, we have to actually call the job from PromptsController:

  # app/controllers/prompts_controller.rb

  def create
    # ...
-   model = Replicate.client.retrieve_model("stability-ai/stable-diffusion-img2img")
-   version = model.latest_version
-   version.predict({prompt: prompt_params[:title],
-     image: @prompt.data_url}, replicate_rails_url)

    respond_to do |format|
      if @prompt.save
+       GenerateImageJob.perform_later(prompt: @prompt)
+
        format.html { redirect_to prompt_url(@prompt), notice: "Prompt was successfully created." }
        format.json { render :show, status: :created, location: @prompt }
      else
        format.html { render :new, status: :unprocessable_entity }
        format.json { render json: @prompt.errors, status: :unprocessable_entity }
      end
    end
  end
Enter fullscreen mode Exit fullscreen mode

Persisting Replicate.com Predictions

Now it's time to take a look at the receiving end of our prediction request, i.e., the incoming webhook.

Right now, generated predictions are handed back to us, but we don't do anything with them. Since images are purged from Replicate's CDN periodically, we want to store them locally.

Let's start by generating a new child model, Prediction. We'll prepare it to store the generated image as a binary blob again:

$ bin/rails g model Prediction prompt:references replicate_id:string replicate_version:string prediction_image:binary logs:text
$ bin/rails db:migrate
Enter fullscreen mode Exit fullscreen mode

On top of that, we create columns to store some metadata returned from Replicate.com: id, version, and logssee the API docs.

Finally, we also have to register this new association in the Prompt model:

  # app/models/prompt.rb

  class Prompt < ApplicationRecord
    include AccountScoped

    belongs_to :account
+   has_many :predictions, dependent: :destroy

    # ...
  end
Enter fullscreen mode Exit fullscreen mode

Now we just have to process the incoming predictions in our webhook, but we face an issue. We have to find a way to identify the specific Prompt instance the prediction was created for. We can circumvent this problem by adding a query param to the incoming webhook URL:

  # app/jobs/generate_image_job.rb

  class GenerateImageJob < ApplicationJob
    include Rails.application.routes.url_helpers

    queue_as :default

    def perform(prompt:)
      model = Replicate.client.retrieve_model("stability-ai/stable-diffusion-img2img")
      version = model.latest_version
-     version.predict({prompt: prompt.title, image: prompt.data_url}, replicate_rails_url(host: Rails.application.config.action_mailer.default_url_options[:host]))
+     version.predict({prompt: prompt.title, image: prompt.data_url}, replicate_rails_url(host: Rails.application.config.action_mailer.default_url_options[:host], params: {sgid: prompt.to_sgid.to_s}))
    end
  end
Enter fullscreen mode Exit fullscreen mode

This will effectively send our incoming webhook to YOUR_NGROK_URL/replicate/webhook?sgid=YOUR_RECORD_SGID, making it easy to identify the prompt.

Obtain the sgid in Ruby

Sadly, the replicate-rails gem's default controller doesn't pass the query parameters to the webhook handler, but it does pass the exact webhook URL.

So we can flex our Ruby standard library muscles to obtain the sgid from it:

# config/initializers/replicate.rb

# ...

require "open-uri"

class ReplicateWebhook
  def call(prediction)
    query = URI(prediction.webhook).query

    sgid = CGI.parse(query)["sgid"].first

    prompt = GlobalID::Locator.locate_signed(sgid)

    prompt.predictions.create(
      prediction_image: URI.parse(prediction.output.first).open.read,
      replicate_id: prediction.id,
      replicate_version: prediction.version,
      logs: prediction.logs
    )
  end
end

# ...
Enter fullscreen mode Exit fullscreen mode

Let's dissect this:

  1. First, we pluck the exact webhook URL off the incoming prediction object and parse it, returning the query string.
  2. Next, we invoke CGI.parse to convert the query string into a hash. Accessing the "sgid" key returns a one-element array, which is why we have to send first to it.
  3. Then, we can use the obtained sgid to locate the prompt instance.
  4. Finally, we append a new prediction to our prompt using the properties returned from Replicate.com. We also download the prediction image and save it in SQLite. Note that we can skip resizing because the generated image will already have the correct dimensions.

There's one final bit we have to tend to: we have to make sure our webhook handler works in an idempotent way. Webhooks can arrive duplicated or out of order, so we have to prepare for this case.

Luckily, we can use the ID returned by replicate to ensure idempotency. All we have to do is add a unique index to the replicate_id column. This will make it impossible to add a duplicate prediction to our database:

$ bin/rails g migration AddUniqueIndexToPredictions
Enter fullscreen mode Exit fullscreen mode
class AddUniqueIndexToPredictions < ActiveRecord::Migration[7.0]
  def change
    add_index :predictions, :replicate_id, unique: true
  end
end
Enter fullscreen mode Exit fullscreen mode
$ bin/rails db:migrate
Enter fullscreen mode Exit fullscreen mode

Keep in mind that you typically would still want to store your assets and attachments in object storage buckets like Amazon S3 or DigitalOcean spaces. To get a glimpse of what SQLite can do, we have opted to store and render them directly from the database here.

To complete our picture — pun intended — we again have to find a way to display our stored predictions as children of a prompt. We can do this using a data URL:

  class Prediction < ApplicationRecord
    belongs_to :prompt

+   def data_url
+     encoded_data = Base64.strict_encode64(prediction_image)
+
+     "data:image/png;base64,#{encoded_data}"
+   end
  end
Enter fullscreen mode Exit fullscreen mode
  <!-- app/views/prompts/_prompt.html.erb -->
  <div id="<%= dom_id prompt %>">
    <p>
      <strong>Title:</strong>
      <%= prompt.title %>
    </p>

    <p>
      <strong>Description:</strong>
      <%= prompt.description %>
    </p>

    <p>
      <strong>Prompt image:</strong>
      <%= image_tag prompt.data_url %>
    </p>

+   <p>
+     <strong>Generated images:</strong>
+     <% prompt.predictions.each do |prediction| %>
+       <%= image_tag prediction.data_url %>
+     <% end %>
+   </p>
  </div>
Enter fullscreen mode Exit fullscreen mode

A Look Behind the Scenes of LiteJob for Ruby on Rails

Let's quickly look into how LiteJob uses SQLite to implement a job queueing system. In essence, the class Litequeue interfaces with the SQLite queue table. This table's columns, like id, name, fire_at, value, and created_at, store and manage job details.

The push and repush methods of the Litequeue class add jobs to the queue, interfacing with their respective SQL statements. When it's time for a job's execution, the pop method in the same class retrieves and removes a job based on its scheduled time. The delete method allows for specific job removal.

The Litejobqueue class is a subclass of Litequeue, which, upon initialization, is tasked with creating the worker/s. These workers contain the main run loop that oversees the actual job execution, continuously fetching and running jobs from the queue.

Limitations of LiteJob

Now that we've broken down how to use LiteJob for asynchronous job processing, let's look at some of the potential drawbacks ensuing from this approach.

While an SQLite-based architecture provides simplicity and reduces the need for external dependencies, it also introduces several limitations.

Firstly, LiteJob's reliance on SQLite inherently restricts its horizontal scaling capabilities. Unlike other databases, SQLite is designed for single-machine use, making it challenging to distribute workload across multiple servers. This can certainly be done using novel technologies like LiteFS, but it is far from intuitive.

Additionally, even on the same machine, concurrency contends with your web server (e.g., Puma) because LiteJob spawns threads or fibers that compete with those handling the web requests.

On the other hand (and crucially, for the majority of smaller apps), you might never need such elaborate scaling concepts.

LiteJob's Performance in Benchmarks

On a more positive note, LiteJob seems to allow for a lot of overhead before you have to scale horizontally: at least the benchmarks put me in a cautiously euphoric mood. Granted, the benchmarks don't carry a realistic payload, but they demonstrate that LiteJob makes very efficient use of threads and fibers.

Up Next: Streaming Updates with LiteCable

In this installment of our series, we delved deeper into the intricacies of using SQLite with LiteJob to handle asynchronous image generation and incoming webhooks in Rails applications.

To recap, while the SQLite-based architecture of LiteJob offers simplicity and reduced external dependencies, it also presents challenges in horizontal scaling. However, for smaller applications, LiteJob's efficiency with threads and fibers suggests it can handle a significant workload before necessitating more complex scaling solutions.

In the next post of this series, we'll look at providing reactive updates to our users using Hotwire powered by LiteCable.

Until then, happy coding!

P.S. If you'd like to read Ruby Magic posts as soon as they get off the press, subscribe to our Ruby Magic newsletter and never miss a single post!

💖 💪 🙅 🚩
julianrubisch
julianrubisch

Posted on November 22, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related