Transactional Outbox: from idea to open-source

bibendi

Misha Merkushin

Posted on June 9, 2024

Transactional Outbox: from idea to open-source

Hey there! Misha Merkushin here, Team Lead of the Ruby Platform team at Kuper Tech. We’re the crew behind the internal libraries and microservice architecture improvements for everything Ruby. This article dives into the Transactional Outbox pattern and a tool we built and iteratively developed in-house that we've just recently released to the world. It tackles the challenge of ensuring reliable and consistent message delivery from your application, guaranteeing messages are sent only after a database transaction is successfully completed.

Outbox Cover image

The Quest for Reliable Delivery

Our architecture followed a classic pattern: a monolithic storefront for customers and a separate microservice backend to handle order fulfillment. Customers would browse the storefront, add items to their cart, and place orders. These orders were then sent to the backend via a REST API using asynchronous Sidekiq jobs.

While we steered clear of synchronous distributed transactions, we encountered an issue with order loss, inevitably leading to a poor customer experience. Sure, Sidekiq offers retry functionality to handle network hiccups, but that didn't address the root cause.

The problem lies in how the background job is initiated. If a job is queued before the database transaction commits, we risk sending incomplete data to the backend. There's no guarantee the transaction will succeed. Conversely, if we queue the job immediately after the transaction commits, the application process might unexpectedly terminate before the job is queued—exceeding memory limits and getting killed, for example. The result? A lost order and a frustrated customer.

Enter the Outbox Pattern

The Transactional Outbox pattern is an architectural pattern used in distributed systems to ensure reliable message delivery. It works by persisting messages to a data store (typically an "outbox" table in the database) before they are eventually delivered to a message broker. This approach guarantees data integrity—either everything is committed, or the entire operation is rolled back if an error occurs.
Here's how it works:

  1. Order Creation and Persistence: When the application receives an order creation request, it opens a database transaction. The order details are saved, and within the same transaction, a message for the other service is generated and persisted to a dedicated "outbox" table in the database. The transaction is then committed.
  2. Polling for New Messages: A separate process periodically polls the outbox table for new entries.
  3. Message Processing and Delivery: This process picks up messages from the outbox table and handles them. In our case, it means sending the messages to the message broker.

The outbox table usually includes the following fields:

  • primary key
  • payload (the message body)
  • statuses to track the message's current state.

If there are network issues or the broker is unavailable, the process will retry, ensuring the message isn't lost.

Transactional outbox pattern

This pattern guarantees at-least-once message delivery, meaning messages will be delivered at least once but might be delivered more than once, potentially leading to duplicates. This approach ensures reliability in unstable network environments where messages might get lost.

Here's a simplified code example:

Order.transaction do
  order.complete!
  OrderOutboxItem.create(order)
end
Enter fullscreen mode Exit fullscreen mode

When to Use the Outbox Pattern

Imagine a large distributed system with hundreds of services, all communicating with each other through messages. These messages trigger events and updates, keeping each service aligned and enabling them to execute their own business logic. The reliability requirements for message delivery between these services depend heavily on the specific business logic they govern. Let's illustrate this with a couple of examples.

Example 1: Real-time Location Tracking (Low Reliability Requirements)

A courier is delivering an order, and the system tracks their location to provide accurate delivery estimates to the customer. This generates a high volume of data as the courier's position constantly changes. However, losing a few location updates has minimal impact on the overall process.

Example 2: Food Delivery Order Placement (High Reliability Requirements)

A customer places an order for pizza delivery. The restaurant needs to receive the order promptly to start preparing the food. Here, timely and reliable messaging is critical:

  • Delayed order reception: Leads to a dissatisfied customer who receives their pizza late.
  • Lost order message: Results in the customer never receiving their order, costing the company both revenue and customer trust.

In this article, we'll focus on scenarios like the second example, where reliable data transmission is paramount, and message loss would severely disrupt business operations.

Our Outbox Journey

At the company, we have a range of services written in Ruby. During development, we needed a simple yet scalable solution for reliable data transfer between these services. We went through several development stages, tackling new challenges and overcoming obstacles along the way. Starting with basic approaches, we gradually refined and enhanced our solution. Examining these stages will provide a deeper understanding of the Outbox pattern and help you avoid some pitfalls we encountered.

Streaming Store Configurations

Stores are the heart of any marketplace. As a delivery service, our partners with a diverse range of stores, each with its own set of parameters. For instance:

  • A store can be open or closed.
  • A store might have specific operating hours.

Technically, this means changes to a store's settings in one microservice must be communicated to dozens of others. With thousands of stores in our ecosystem, settings are constantly being updated and fine-tuned. The traffic volume is low but consistent. Crucially, these setting changes must reach their destinations without any loss. A delay of up to 10 minutes is acceptable. This made the Outbox pattern a perfect fit for this task, ensuring reliable and secure data transmission.

Our First Foray

To test our concept, we opted for the path of least resistance: a Rake task triggered by a Cron job. The idea was simple: every minute, the task would gather data about recently placed orders and any orders that hadn't been sent previously (due to network errors, for example) and deliver them to the message broker queue.

task :publish_store do
  StoreOutboxItem.pending.find_each(&:publish!)
end
Enter fullscreen mode Exit fullscreen mode

While functional for a proof of concept, this approach had drawbacks:

  • Slow application boot times: Processing large volumes of data during application startup led to increased boot times.
  • Lack of scalability: The single-threaded nature of this solution limited its ability to handle growing data volumes efficiently.

A Step Towards Efficiency

Launching a new Ruby process for each task proved inefficient. Enter Schked, a job scheduler that allowed us to schedule a recurring task to send messages to the broker every minute. Sidekiq would then handle these tasks.

every "1m" do
  StoreOutboxItemsPublishJob.enqueue
end

class StoreOutboxItemsPublishJob < ApplicationJob
  def perform
    StoreOutboxItem.pending.find_each(&:publish!)
  end
end
Enter fullscreen mode Exit fullscreen mode

This approach eliminated the overhead of environment initialization, making task execution nearly instantaneous. However, scalability remained a challenge. There was a risk of concurrent task execution, potentially disrupting order processing.

To mitigate this, we ran Sidekiq in single-threaded mode, ensuring tasks were executed sequentially. This solved the concurrency issue but limited our ability to leverage Sidekiq's full potential for parallel processing.

Since scalability remained unaddressed, it's time to turn to the Kafka concept. A Kafka topic is analogous to a message queue. Topics are divided into partitions, enabling parallel processing and enhanced throughput. When sending a message, we can choose which partition it goes to, allowing us to group events related to a specific order within the same partition. By having a single consumer process messages from a single partition, we achieve parallel processing while maintaining order.

The idea was to parallelize event dispatching across different partitions using multiple threads. This required pre-calculating the partition when saving an outbox item.

Outbox partitions

Therefore, Schked schedules a Sidekiq job to send messages to a specific partition:

every "1m" do
  PARTITIONS_COUNT.times do |partition|
    StoreOutboxItemsPublishJob.enqueue(partition)
  end
end
Enter fullscreen mode Exit fullscreen mode

This achieved parallel dispatching across different partitions.

Job Overlapping

As discussed, our current task handling mechanism suffers from potential job accumulation and overlap. This can overwhelm the system, preventing it from processing all events within the allotted time. To ensure smoother and more reliable task processing, we can leverage the popular sidekiq-uniq-jobs gem. This gem prevents a new job from starting if a previous job with the same unique identifier is still running.

class StoreOutboxItemsPublishJob < ApplicationJob
  sidekiq_options lock: :until_executed

  def perform(partition)
    StoreOutboxItem.publish!
  end
end
Enter fullscreen mode Exit fullscreen mode

This approach offers several advantages:

  • Reliance on standard tools: Simplifies support and deployment.
  • Good observability: Built-in metrics provide valuable insights.
  • Scalability: Increasing parallelism is as easy as adding more partitions to the Kafka topic and increasing the Sidekiq queue concurrency.

However, despite these benefits, the system remains complex, with multiple potential points of failure.
Everything ran relatively smoothly until one day... it broke. An experienced Ruby developer would quickly identify the weakest link 🙂. sidekiq-uniq-jobs failed us. We faced a new challenge: reducing points of failure to pave the way for tackling larger-scale challenges.

Migrating Order Processing from Async HTTP to Outbox

Our system comprised two main components: a storefront and a back-office system, actively exchanging order information at a rate of around 100 messages per second.

Historically, we used a combination of Sidekiq and HTTP for inter-system communication. This approach worked adequately with moderate data volumes. However, as the load increased, we encountered issues with message ordering, system scalability, and message loss. Moreover, the system lacked extensibility—adding new consumers required modifications to the monolithic storefront.

Recognizing the criticality of timely order information exchange, the Ruby Platform team decided to migrate the order synchronization mechanism to Kafka. It became evident that our existing Schked/Sidekiq/Uniq-jobs setup couldn't deliver the required reliability and performance.

We realized the limitations of our current solution and decided to implement a dedicated outbox daemon. This approach aimed to:

  • Reduce points of failure to one.
  • Enable independent, replicable, and interchangeable daemons.

This ensures that the failure of one daemon wouldn't impact the others.

The new daemon's concept was straightforward and effective:

  • Dedicated Process: A separate process handles messages for all Kafka partitions in a multi-threaded manner.
  • Process Synchronization: Process synchronization is achieved using Redis with the Red Lock algorithm, ensuring that only one process can process a specific partition at any given time, preventing data conflicts.

This new architecture uses a single Ruby process for the entire system, simplifying maintenance and management. Scalability is achieved by increasing the number of processes and threads, allowing the system to adapt to growing workloads.

A Safe Migration to Kafka

Our primary goal was to replace the existing Sidekiq+HTTP transport mechanism with Kafka while ensuring a seamless and stable transition. To achieve this, we decided to run both systems in parallel, gradually migrating from the old to the new. This required our outbox pattern implementation to support multiple transport mechanisms simultaneously.

Here's how we approached the migration:

  1. Outbox Integration: We made minor adjustments to our existing Sidekiq+HTTP implementation to integrate with the Outbox pattern.
  2. Parallel Order Dispatch: We started duplicating order dispatches through Kafka.
  3. Performance Comparison and Cutover: By monitoring and comparing performance metrics (e.g., order processing speed), we confirmed the superiority of our new Kafka-based solution. Once confident, we safely decommissioned the old synchronization system, leaving the new Kafka transport as the sole mechanism.

This approach allowed us to build and fine-tune the new synchronization process without jeopardizing the existing system's stability. This evolutionary migration strategy, characterized by minimal risks and zero downtime, can be applied beyond transport protocol replacements. It proves valuable for breaking down monolithic applications into smaller, manageable services. This experience reinforces the effectiveness of gradual and controlled transitions when implementing changes within critical systems.

Observability & Tracing

Our previous Sidekiq-based outbox implementation benefited from built-in metrics provided by tools like yabeda-schked and yabeda-sidekiq. These tools offered valuable insights into the system's health and performance with minimal effort. However, developing a custom outbox daemon from scratch meant we had to implement all observability metrics, including metrics and distributed tracing, ourselves.

After deploying the new daemon, it became clear that its functionality extended beyond simple message delivery. To maintain the same level of observability we had with Sidekiq, we needed to integrate a robust metrics collection and tracing system. This would allow us to monitor performance, identify bottlenecks, and respond quickly to any arising issues.

During the development and launch of the new daemon, we focused on the following key aspects:

  • Performance Metrics: We implemented metrics tracking the number of processed messages, processing time, errors, and other crucial indicators to assess the system's efficiency.
  • Distributed Tracing: To understand the flow of execution and interactions between different system components, we integrated distributed tracing. This allowed us to follow each request across our microservices, simplifying debugging and performance optimization.

Outbox Grafana

While our new daemon delivered enhanced stability and scalability, it also demanded extra effort to ensure the same level of observability—a crucial aspect of maintaining a high-quality service.

Inbox Pattern

Let's shift our perspective to the message consumer, who faces similar challenges. For instance, Kafka doesn't inherently guarantee exactly-once message delivery semantics. To address this, we extend the concept of "Transactional Outbox" to "Transactional Inbox," mirroring the mechanism on the consumer side.

Here's how the Inbox Pattern works:

  1. Message Persistence: Upon arrival, messages are saved to a dedicated "inbox" table in the database. When using Kafka, this is handled by an "inbox-kafka-consumer," which commits the processed message offsets within a transaction.
  2. Inbox Processing: A separate inbox process fetches events from the database and triggers the corresponding business logic for each. This process resembles the outbox process, but instead of sending messages to a broker, it executes the business logic associated with the received message.

This approach mirrors the Outbox pattern, with the consumer initiating the database record creation. The mechanism for polling events from the database remains similar to the Outbox pattern. However, instead of sending a message to the broker during processing (as in the Outbox), the system executes the business logic designed to handle the received message.

This pattern effectively ensures exactly-once message delivery semantics. This is achieved by creating inbox records with the same unique identifier (UUID) used for outbox records and employing a unique index on this column in the database.

It's important to note that this approach is particularly valuable when strong guarantees of correct message processing by the consumer are required.

Scaling

After implementing the Inbox pattern, we encountered a new challenge—ensuring its scalability. It's important to highlight that our architecture conceptually separates message consumption from actual message processing.

Consuming messages from Kafka is generally fast, as it mainly involves persisting received data to the database. However, message processing, which includes executing business logic, is significantly more time-consuming.

This disparity in processing speeds allows us to scale consumption and processing independently. For instance, if the consumer's throughput is sufficient for recording all incoming messages but business logic processing lags behind, we can scale processing separately by increasing the number of threads responsible for executing the business logic.

Increasing the number of processing threads beyond the number of partitions is counterproductive. We can only process messages from one partition at a time, leaving other threads idle while waiting for data access. This limitation arises because our scaling approach relies on parallel processing of messages from different partitions.

Consequently, we encounter a limitation: message processing scalability is bound by the number of partitions in the Kafka topic. This can be problematic if message processing involves resource-intensive business logic and increasing Kafka partitions is challenging for various reasons.

Virtual Partitions

Previously, scaling our outbox/inbox daemon involved several cumbersome steps: increasing Kafka partitions, restarting message producers, and then restarting the daemon itself. This process was inefficient and required excessive manual intervention.

To address this, we introduced the concept of "buckets." Instead of using partition numbers, we now use bucket numbers when creating outbox/inbox records. The number of buckets is limited and calculated similarly to partition numbers—as the remainder of dividing the event key. However, multiple buckets can be mapped to a single Kafka partition.

This approach allows us to create a large number of buckets upfront while dynamically adjusting the number of Kafka partitions without restarting the entire system. Scaling the outbox/inbox daemon now simply requires a configuration change, significantly simplifying system operation.

Outbox buckets

Through these efforts, we successfully tackled the challenge of streaming completed orders with high reliability and minimal latency.
Our solution boasts several key improvements:

  • Outbox/Inbox Daemon: This provides complete control over message processing and enables tailored optimizations.
  • Kafka Partition-Based Outbox Scaling: We can dynamically adjust system throughput by modifying the number of partitions.
  • Robust Inbox Pattern Implementation: This guarantees exactly-once message processing, crucial for data-sensitive business processes.
  • Independent Scaling with Virtual Partitions (Buckets): This allows for system scalability without modifying Kafka configurations or restarting producers.

Ultimately, we achieved a flexible, scalable, and fault-tolerant message processing system capable of handling high loads and ensuring reliable delivery of critical data.

Widespread Outbox Adoption

The Ruby Platform team's overarching goal was to create and standardize a universal message handling solution for all Ruby services at the company. A well-designed solution would free product teams from tackling technical complexities, allowing them to focus on product development and business logic implementation.
Our toolkit needed to be compatible with diverse Ruby services, adaptable to varying workloads, and equipped with comprehensive monitoring capabilities. From this perspective, our outbox-based solution had reached a certain maturity:

  • Successful Implementation: Deployed across multiple Ruby services.
  • Proven Stability: Demonstrated reliable performance in production environments.
  • Observability and Scalability: Offered robust monitoring and scaling capabilities.

However, widespread adoption presented new challenges. Despite independent scaling, fine-tuning the daemon for specific workloads required calculating the relationship between several parameters:

  • Number of Outbox partitions.
  • Number of buckets.
  • Number of daemon processes.
  • Number of threads per process.

This configuration proved too complex for users, forcing them to delve into technical implementation details. Users often misconfigured the system, setting too few partitions or too many daemon processes, leading to performance degradation.

For wider adoption, we needed to simplify configuration and make the scaling process more intuitive and user-friendly.

New Architecture

Let's recap how our daemon works: each thread sequentially polls the database for new messages within its assigned buckets. Once a message is fetched, the thread processes it, executing the associated business logic. Polling is generally quick, while message processing can take significantly longer.

Initially, our scaling unit for business logic was the thread itself, which handled both polling and processing. This approach led to an undesirable outcome: scaling up increased the intensity of both operations, even when unnecessary. For example, if business logic execution became a bottleneck, increasing the number of threads also amplified database polling frequency, even if the bottleneck wasn't related to message retrieval. This resulted in "overscaling."

Increasing the number of threads proportionally increased database queries. We constantly polled the database for new messages, unable to reduce the polling frequency without impacting processing speed.
Our new architecture addresses these issues by separating the daemon into two thread pools: a polling pool and a processing pool.

Divide and Conquer

Our new architecture decouples polling and processing responsibilities by utilizing two distinct thread pools and employing Redis as an intermediary message queue.

Polling Pool:

  • Threads in this pool are responsible for fetching new messages from the database for each partition.
  • Upon discovering new messages, they are enqueued into Redis using the LPUSH command.
  • To maintain data consistency and processing order, a lock is acquired on the partition during polling.

Processing Pool:

  • Threads in this pool dequeue messages from Redis using the BRPOP command.
  • Each thread processes messages from a specific bucket, acquiring a lock during processing to prevent concurrent access and preserve message order within the bucket.

Each daemon now comprises two thread pools: a polling pool and a processing pool. By default, we maintain a 2:1 ratio of processing threads to polling threads, but this ratio can be adjusted as needed.

Advantages of the New Architecture

Our revamped architecture delivers several key benefits:

  • Simplified Scaling: Since polling is a lightweight operation, the polling pool can have significantly fewer threads than the processing pool. Scaling the system now involves increasing the number of daemon replicas, each with its own polling and processing pools.
  • Enhanced Performance: Decoupling polling and processing enables more efficient resource utilization. Polling threads are not blocked during message processing, and processing threads don't idle waiting for new data.
  • Flexible Configuration: The ratio of polling to processing threads can be easily adjusted based on workload characteristics and performance requirements.
  • Automated Scaling with Kubernetes HPA: Utilizing Redis as a buffer between the polling and processing pools facilitates seamless autoscaling (HPA) in Kubernetes. The Redis queue size accurately reflects the processing pool's load. If the queue grows, indicating processing bottlenecks, HPA can automatically scale up daemon replicas. Conversely, if the queue is empty, HPA can scale down replicas to optimize resource consumption.

This new architecture delivers a more flexible, scalable, and easily configurable message processing system.

Outbox full architecture

Evolutionary Journey

This story exemplifies the iterative development of a tool, with each stage driven by new challenges and escalating demands for performance and reliability.

The key takeaway? Ruby can handle almost any task. Many open-source projects start as solutions to specific business problems and evolve into versatile, battle-tested tools used across numerous production systems.

This article explored the three stages of our Inbox/Outbox tool's development, crafted by the Ruby Platform team to address message-handling challenges. Each stage focused on enhancing reliability, scalability, and user-friendliness.

Our final solution, refined through real-world deployments in both large monolithic applications and distributed systems spanning dozens of services, has proven its stability and effectiveness. Confident in its capabilities, we've decided to share it with the community as an open-source project.

Outbox logo

💖 💪 🙅 🚩
bibendi
Misha Merkushin

Posted on June 9, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related