Gorabbit: RabbitMQ supercharged for Go Applications

m3talux

Alex Khoury

Posted on April 9, 2024

Gorabbit: RabbitMQ supercharged for Go Applications

The tech team at Kardinal is proud to present their latest open-source library: Gorabbit. A RabbitMQ wrapper that brings ease of implementation and robustness to your Golang projects.

Image description


Introduction

In our event-driven project, where efficiency in event production and consumption is crucial, the common choices are Kafka and RabbitMQ. We opted for RabbitMQ due to its simplicity compared to the perceived overkill of Kafka for our needs. However, when exploring the official Go driver for RabbitMQ on GitHub, we found it to be feature-rich but overly low-level, offering complex methods with numerous undocumented arguments. This posed a challenge for our microservices architecture in the Kardinal project, as we needed a more streamlined and efficient way to implement the communicator without excessive boilerplate code.

Context

The primary objective behind implementing RabbitMQ communication was to facilitate the exchange of critical information essential for constructing core models in our product. The significance of this communication lies in the fact that the loss of even a single event could potentially fail a client's action.

RabbitMQ's inherent queuing system offers a layer of protection by ensuring that messages are stored in queues, thereby mitigating the impact of temporary outages or crashes in one or more microservices. However, this safeguard primarily benefits the receiving module. For the sending module, there remains a vulnerability: if the RabbitMQ server becomes unavailable, all outgoing messages risk being lost.

Additionally, the receiving module is only partially immune to risks. The act of consuming a message may fail unexpectedly, with no built-in mechanism for a subsequent retry. This introduces a potential point of failure in the communication process.

Problems with amqp091-go

The official RabbitMQ plugin, amqp091-go, maintained by the RabbitMQ team, boasts rich features. However, its complexity becomes apparent with intricate syntax and the need to manage connection and channel states manually. The abundance of methods and arguments for simple operations adds to the verbosity. Let's take a basic example: sending a message.

Here's a snippet using amqp091-go for a "simple" producer:

package main

import (
    "context"
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    uri := "amqp://guest:guest@localhost:5672/"

    config := amqp.Config{
        Vhost:      "/",
        Properties: amqp.NewConnectionProperties(),
    }

    conn, err := amqp.DialConfig(uri, config)
    if err != nil {
        log.Fatalf("producer: error in dial: %s", err)
    }

    defer conn.Close()

    channel, err := conn.Channel()
    if err != nil {
        log.Fatalf("error getting a channel: %s", err)
    }

    err = channel.PublishWithContext(
        context.Background(),
        "exchange",
        "routing_key",
        true,
        false,
        amqp.Publishing{
            Headers:         amqp.Table{},
            ContentType:     "application/json",
            ContentEncoding: "",
            DeliveryMode:    amqp.Persistent,
            Priority:        0,
            AppId:           "producer",
            Body:            []byte("body"),
        },
    )

    if err != nil {
        log.Fatalf("error publishing message: %s", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Now, compare this with the equivalent using Gorabbit:

package main

import (
    "log"

    "github.com/KardinalAI/gorabbit"
)

func main() {
    client := gorabbit.NewClient(gorabbit.DefaultClientOptions())

    err := client.Publish("exchange", "routing_key", "body")
    if err != nil {
        log.Fatalf("error publishing message: %s", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

In just a few lines of code, Gorabbit simplifies the process of connecting to the RabbitMQ server and publishing a message to an existing exchange. This streamlined approach was a key goal in the development of Gorabbit.

Additionally, amqp091-go lacks built-in safeguards for connections and channels, as it is not designed to provide such features. Consequently, managing connection or channel losses becomes a manual and potentially messy process using Go channels, especially in larger applications. Gorabbit addresses this challenge by internally handling all connections and channels: it creates them as needed, re-creates essential components in the event of losses, maintains a record of active elements, and provides convenient health checks to monitor the overall status of the system. This approach ensures a more streamlined and efficient management of connections and channels in the Gorabbit library.

Introduction to Gorabbit

When working with RabbitMQ, there are typically two distinct aspects: configuring exchanges, queues, and bindings, and managing the sending and receiving of messages. Gorabbit was conceptualized to provide convenient access to both a "client" and a "manager," where the manager handles setup operations, and the client facilitates user-friendly interactions.

Client

Gorabbit's client simplifies complex functionalities into two main operations: publishing and consuming.

Despite the apparent simplicity, the mechanisms underlying these operations in Gorabbit are robust, secure, and error-resistant. Let's delve deeper into these operations.

Publishing

Publishing a message with Gorabbit's client is achieved with a single line of code:

err := client.Publish("exchange", "routing_key", "body")
Enter fullscreen mode Exit fullscreen mode
  • The first argument specifies the exchange to which the message will be published.
  • The second argument is the routing key associated with the publishing.
  • The third argument, the message body, can be of any type since it is expected to be of type interface{}. Internally, the body is JSON-encoded before transmission.

Note: Publishing messages is a blocking operation, so consider using goroutines in case this call should not be blocking.

If the client was initialized with the default behavior of the KeepAlive mechanism set to true, publishing is cached if the RabbitMQ server becomes unreachable. The visual below illustrates how this failsafe mechanism operates.

Producer Safeguard Schema

By default, the internal cache can support up to 128 publishing with a TTL of 60 seconds, but those parameters can easily be updated when initializing the client:

clientOptions := gorabbit.
        DefaultClientOptions().
        SetPublishingCacheSize(1024).
        SetPublishingCacheTTL(30 * time.Minute)

client := gorabbit.NewClient(clientOptions)
Enter fullscreen mode Exit fullscreen mode

Consuming

Consuming messages in Gorabbit diverges from the approach taken by the native plugin, providing a more streamlined and flexible experience.

Here's an example of message consumption using amqp091-go:

msgs, err := channel.Consume(
    "testing", // queue
    "",        // consumer
    true,      // auto ack
    false,     // exclusive
    false,     // no local
    false,     // no wait
    nil,       //args
)

if err != nil {
    panic(err)
}

forever := make(chan bool)

go func() {
    for msg := range msgs {
        fmt.Printf("Received Message: %s\n", msg.Body)
    }
}()

<-forever
Enter fullscreen mode Exit fullscreen mode

Contrastingly, Gorabbit adopts a different approach. It encourages the registration of consumers within the client, accompanied by a handler function, allowing the client to internally and automatically process events. This approach grants greater flexibility in handling incoming messages based on routing keys, mirroring the functionality of an exchange.

Here's what the process looks like using Gorabbit:

// handlers are a map[string]func(payload []byte) error
// where the key is a routing key in its full form or using wildcards.
messageHandlers := gorabbit.MQTTMessageHandlers{
    "event.message.key": func(payload []byte) error {
        // Process the payload
        return nil
    },
    "event.other_message.*": func(payload []byte) error {
        // Process the payload
        return nil
    },
}

queueName := "event_queue"
consumerName := "event_consumer"
prefetchCount := 10

err := client.RegisterConsumer(gorabbit.MessageConsumer{
    Queue:         queueName,
    Name:          consumerName,
    PrefetchCount: prefetchCount,
    Handlers:      messageHandlers,
})
Enter fullscreen mode Exit fullscreen mode

This approach facilitates the registration of consumers at the start or dynamically during runtime. The complexity of processing is abstracted, and safeguards are implemented to ensure the continuity of consumers even during temporary RabbitMQ server outages.

Here is an overview of how the consumer safeguard works:

Consumer Safeguard Schema

Manager

The Gorabbit manager serves a dual purpose: it facilitates the configuration of exchanges, queues, and bindings within an existing RabbitMQ server. Additionally, the manager proves invaluable in testing scenarios, ensuring the existence of a queue, tallying the number of messages within a queue, or extracting and analyzing specific messages. This dual functionality enhances the versatility of Gorabbit, making it a valuable tool for both production setups and testing environments.

Here is how a manager can be initialized:

manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions())
Enter fullscreen mode Exit fullscreen mode

Setup

Gorabbit simplifies the setup process by allowing you to configure exchanges, queues, and bindings effortlessly, either through a RabbitMQ Schema Definition JSON file or individual entity-specific methods.

Using a Schema Definition File

You can automatically set up exchanges, queues, and bindings by referencing a RabbitMQ Schema Definition JSON file:

err := manager.SetupFromDefinitions("/path/to/definitions.json")
Enter fullscreen mode Exit fullscreen mode

Individual Entity Setup

Alternatively, you have the flexibility to set up each entity separately using their respective methods. Here are examples for creating exchanges, queues, and bindings:

Exchange Creation:

err := manager.CreateExchange(gorabbit.ExchangeConfig{
    Name:      "events_exchange",
    Type:      gorabbit.ExchangeTypeTopic,
    Persisted: false,
    Args:      nil,
})
Enter fullscreen mode Exit fullscreen mode

Queue Creation:

err := manager.CreateQueue(gorabbit.QueueConfig{
    Name:      "events_queue",
    Durable:   false,
    Exclusive: false,
    Args:      nil,
    Bindings: &[]gorabbit.BindingConfig{
        {
            RoutingKey: "event.foo.bar.created",
            Exchange:   "events_exchange",
        },
    },
})
Enter fullscreen mode Exit fullscreen mode

Binding Creation:

err := manager.BindExchangeToQueueViaRoutingKey("events_exchange", "events_queue", "event.foo.bar.created")
Enter fullscreen mode Exit fullscreen mode

Queue Deletion:

err := manager.DeleteQueue("events_queue")
Enter fullscreen mode Exit fullscreen mode

Exchange Deletion:

err := manager.DeleteExchange("events_exchange")
Enter fullscreen mode Exit fullscreen mode

This flexibility empowers users to choose the most convenient approach based on their specific setup requirements, providing a seamless and intuitive experience.

Testing

Gorabbit’s manager provides a suite of methods designed to facilitate testing and assess the state of existing queues. These include actions such as pushing a message, retrieving the latest message, counting the number of messages, purging a queue of all its messages, and ensuring the existence of a specified queue.

Queue messages count:

messageCount, err := manager.GetNumberOfMessages("events_queue")
Enter fullscreen mode Exit fullscreen mode

Push message:

err := manager.PushMessageToExchange("events_exchange", "event.foo.bar.created", "single_message_payload")
Enter fullscreen mode Exit fullscreen mode

Pop message:

message, err := manager.PopMessageFromQueue("events_queue", true)
Enter fullscreen mode Exit fullscreen mode

Purge queue:

err := manager.PurgeQueue("events_queue")
Enter fullscreen mode Exit fullscreen mode

Conclusion

In our pursuit of efficient event production and consumption within the Kardinal project, we chose RabbitMQ over Kafka for its simplicity. However, the native Go driver, amqp091-go, proved overly low-level and complex for our microservices architecture. This led us to develop Gorabbit, a library aimed at simplifying RabbitMQ communication in Go.

While Gorabbit has made significant strides in enhancing the developer experience, we acknowledge that there is always room for improvement and the addition of new functionalities. As the project evolves, we encourage the community to contribute ideas, suggestions, and code to make Gorabbit even more robust and user-friendly.

Gorabbit's GitHub repository is open to contributions, and we invite developers to join us in shaping the future of this library. Together, we can continue to refine and expand Gorabbit to meet the diverse needs of the Go and RabbitMQ community. Your involvement is key to making Gorabbit a go-to choice for seamless and efficient RabbitMQ communication in Go projects.

đź’– đź’Ş đź™… đźš©
m3talux
Alex Khoury

Posted on April 9, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related