RabbitMQ in NodeJS
RoeyBenHarushDev
Posted on February 25, 2023
**RabbitMQ **is an open source message broker software that implements the Advanced Message Queuing Protocol (AMQP).
Advanced Message Queuing Protocol (AMQP) is an application layer protocol that focuses on process-to-process communication across IP networks. An encoding schema and a set of procedures allow for two different servers to communicate regardless of the technology used. Overall, the goal of AMQP is to enable message passing through broker services over TCP/IP connections. AMQP is considered a compact protocol, since it’s a binary protocol, meaning that everything sent over AMQP is binary data. A binary protocol avoids sending useless data over the wire.
It can be used with a variety of programming languages including NodeJS to provide reliable and scalable communication between microservices in a distributed system.
For example, Let's say we're building a pizza delivery system that consists of several microservices. These microservices include:
- A menu service that provides information about available pizzas and their prices.
- An order service that takes customer orders and sends them to the kitchen service.
- A kitchen service that receives orders from the order service and prepares the pizzas.
- A delivery service that receives orders from the order service, checks the delivery address, and delivers the pizzas to the customer.
To handle communication between these services, we'll use a message queue. We'll use RabbitMQ as our message broker, and we'll use the amqplib library for NodeJS to interact with RabbitMQ.
Here's an overview of how our pizza delivery system will work:
- The customer will place an order through the order service. The order service will send the order to the kitchen service via the message queue.
- The kitchen service will receive the order from the message queue and begin preparing the pizzas.
- When the pizzas are ready, the kitchen service will send a message to the delivery service via the message queue.
- The delivery service will receive the message and check the delivery address. If the address is valid, the delivery service will deliver the pizzas to the customer.
Let's walk through the implementation of this system in NodeJS.
Here's the code for the order service:
const amqp = require('amqplib');
async function main() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const orderQueue = 'orders';
const pizzaExchange = 'pizzas';
await channel.assertQueue(orderQueue);
await channel.assertExchange(pizzaExchange, 'direct');
channel.consume(orderQueue, async (message) => {
const order = JSON.parse(message.content.toString());
// Send the order to the kitchen service
await channel.publish(pizzaExchange, 'kitchen', Buffer.from(JSON.stringify(order)));
// Acknowledge receipt of the message
channel.ack(message);
});
}
main();
In this code, we connect to the RabbitMQ server and create a channel. We then define the name of the order queue and the name of the pizza exchange. We assert the existence of the order queue and the pizza exchange, and then we start consuming messages from the order queue. When we receive a message, we parse the order from the message payload and publish it to the pizza exchange with the routing key "kitchen". Finally, we acknowledge receipt of the message.
Next, let's look at the kitchen service:
const amqp = require('amqplib');
async function main() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const pizzaExchange = 'pizzas';
await channel.assertExchange(pizzaExchange, 'direct');
channel.consume('kitchen', async (message) => {
const order = JSON.parse(message.content.toString());
// Prepare the pizzas
const pizzas = preparePizzas(order);
// Send a message to the delivery service
await channel.publish(pizzaExchange, 'delivery', Buffer.from(JSON.stringify(pizzas)));
// Acknowledge receipt of the message
channel.ack(message);
});
}
function preparePizzas(order) {
// ...
}
main();
In this codewe connect to the RabbitMQ server and create a channel. We define the name of the pizza exchange and assert its existence. We start consuming messages from the "kitchen" queue and when we receive a message, we parse the order from the message payload and prepare the pizzas by calling the preparePizzas function. Once the pizzas are ready, we publish a message to the pizza exchange with the routing key "delivery" and the payload containing the pizzas. Finally, we acknowledge receipt of the message.
Next, let's look at the delivery service:
const amqp = require('amqplib');
async function main() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const pizzaExchange = 'pizzas';
const deliveryQueue = 'deliveries';
await channel.assertExchange(pizzaExchange, 'direct');
await channel.assertQueue(deliveryQueue);
channel.consume('delivery', async (message) => {
const pizzas = JSON.parse(message.content.toString());
// Check the delivery address
const address = pizzas[0].deliveryAddress;
const isValidAddress = checkDeliveryAddress(address);
if (isValidAddress) {
// Deliver the pizzas
deliverPizzas(pizzas);
} else {
console.log(`Invalid delivery address: ${address}`);
}
// Acknowledge receipt of the message
channel.ack(message);
});
}
function checkDeliveryAddress(address) {
// ...
}
function deliverPizzas(pizzas) {
// ...
}
main();
In this code, we connect to the RabbitMQ server and create a channel. We define the name of the pizza exchange and the name of the delivery queue, and assert their existence. We start consuming messages from the "delivery" queue. When we receive a message, we parse the pizzas from the message payload and check the delivery address by calling the checkDeliveryAddress function. If the address is valid, we deliver the pizzas by calling the deliverPizzas function. Finally, we acknowledge receipt of the message.
And that's it! We've implemented a pizza delivery system using NodeJS and a message queue to handle communication between microservices. Of course, this is a simplified example, and in a real-world system, there would be many more details to consider, such as error handling, security, and scalability. However, this should give you a good idea of how message queues can be used to build reliable and scalable distributed systems.
To use RabbitMQ in NodeJS, the amqplib library can be installed. This library provides a simple API to interact with RabbitMQ, including sending and receiving messages.
To install the amqplib library in your NodeJS project, you can use npm (Node Package Manager).
Here are the steps:
- Open your terminal and navigate to your NodeJS project directory. 2.Run the following command to install the amqplib library: npm install amqplib. 3.This will download and install the latest version of the amqplib library in your project's node_modules folder, and add it as a dependency to your project's package.json file.
That's it! Now you can import the amqplib library in your NodeJS code and start using it to interact with RabbitMQ. Here's an example of how to import the library in your code:
const amqp = require('amqplib');
And you can use the amqp object to create a connection to RabbitMQ, create channels, declare queues and exchanges, and publish and consume messages.
An example of sending a message to a RabbitMQ queue in NodeJS:
const ampq = require('ampqlib/callback_api');
ampq.connect('amqp://localhost', (error, connection) => {
connection.createChannel((error, channel) => {
const queue = 'hello';
const msg = 'Hello World!';
channel.assertQueue(queue, {
durable: false
});
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
});
In this example, we're using the callback API to send a message to a RabbitMQ queue using NodeJS.
First, we use the connect function to establish a connection to RabbitMQ, passing in a connection string and a callback function. When the connection is established, the callback function is invoked with an error (if any) and a connection object.
Next, we use the createChannel method on the connection object to create a channel. Channels are used to communicate with RabbitMQ and perform operations like declaring queues and exchanges, and publishing and consuming messages.
We then declare a queue using the assertQueue method on the channel object, passing in a queue name and an options object that specifies whether the queue should be durable or not. We set durable to false in this case, which means that the queue will not survive a RabbitMQ server restart.
Finally, we use the sendToQueue method on the channel object to send a message to the queue, passing in the queue name and a buffer containing the message. In this case, we're sending the message "Hello World!".
We also use the console.log function to log a message to the console indicating that the message has been sent.
The setTimeout function is used to close the connection and exit the NodeJS process after a 500ms delay. This ensures that the message has enough time to be sent before the process exits.
As for the question of why we're using the callback API in this example, it's because this example code is based on the older, callback-based API of the amqplib library. However, as I mentioned earlier, the amqplib library also provides a newer Promise-based API that can be used as an alternative to the callback API. The Promise-based API can help simplify code and avoid callback hell, but the choice between the two APIs ultimately depends on the specific use case and personal preference.
RabbitMQ also provides a web-based management console that allows users to view and manage RabbitMQ objects, such as exchanges, queues, and bindings, from a graphical user interface.
The RabbitMQ Management Plugin is included with RabbitMQ and can be enabled by running the following command in the RabbitMQ server:
Bash: rabbitmq-plugins enable rabbitmq_management
Once enabled, you can access the management console by visiting http://localhost:15672 in your web browser. The default login credentials are guest/guest, but it's recommended to change the password to something more secure.
Some benefits of using the RabbitMQ Management Plugin include:
Easy monitoring: The management console provides real-time monitoring of your RabbitMQ instances and allows you to view various metrics, such as message rates and queue lengths, in a graphical format.
Easy management: You can easily create and manage RabbitMQ objects using the web interface, which can be more user-friendly than using command-line tools or APIs.
Security: The management console allows you to manage user accounts and permissions, ensuring that only authorized users can access and modify your RabbitMQ instances.
Troubleshooting: The management console provides a lot of information on the status of your RabbitMQ instances, making it easier to troubleshoot issues and quickly resolve them.
Overall, the RabbitMQ Management Plugin can be a valuable tool for managing and monitoring RabbitMQ instances, especially in complex environments where multiple queues and exchanges are being used.
Concepts and Methods
Message Broker
A message broker is an intermediary platform that enables communication between different applications by passing messages between them. It acts as a mediator for messages exchanged between clients, allowing for decoupled and asynchronous communication.
Channel
In RabbitMQ, a channel is a lightweight connection that provides a virtual connection to a RabbitMQ broker. Channels are used to communicate with RabbitMQ and perform operations like declaring queues and exchanges, and publishing and consuming messages.
Queue
A queue is a message buffer that stores messages produced by publishers and awaits processing by consumer applications. In RabbitMQ, a queue holds messages in memory or on disk and allows for efficient message distribution between different applications.
Queue Assertion
Queue assertion is a process of verifying if a queue exists and creating one if it does not exist. In RabbitMQ, assertQueue is used to declare and create a queue if it does not already exist.
Exchange
An exchange is a message routing agent that routes messages to one or more queues. In RabbitMQ, messages sent by publishers are first routed to an exchange, which then routes them to the appropriate queues based on their routing keys.
Binding
A binding is a relationship between an exchange and a queue that determines how messages are routed from the exchange to the queue. In RabbitMQ, a binding is established by specifying the exchange name, the queue name, and the routing key.
Routing Key
A routing key is a message attribute that is used by the exchange to route messages to the appropriate queue. In RabbitMQ, the routing key is used to determine the bindings that the message should be routed to.
Simplified Code using Async/Await
Here's the same code as before, but simplified using the async/await syntax:
const amqp = require('amqplib');
const queue = 'hello';
const msg = 'Hello World!';
(async () => {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: false });
await channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Sent %s", msg);
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
} catch (error) {
console.error(error);
process.exit(1);
}
})();
Here, we use an immediately invoked async function to wrap our code and take advantage of the await keyword to handle promises in a more readable way.
Instead of using a callback function with the connect and createChannel methods, we await their resolved promises and assign the resulting objects to connection and channel respectively.
The assertQueue and sendToQueue methods also return promises, so we can use the await keyword to ensure that each operation is completed before moving on to the next. We also handle any errors that may occur by catching them with a try-catch block.
An example of receiving messages from a RabbitMQ queue in NodeJS:
const amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', (error, connection) => {
connection.createChannel((error, channel) => {
const queue = 'hello';
channel.assertQueue(queue, {
durable: false
});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, (msg) => {
console.log(" [x] Received %s", msg.content.toString());
}, {
noAck: true
});
});
});
In the code provided, the durable option is passed as false when creating the queue. This means that the queue will not survive a broker restart, and any messages in the queue will be lost.
If we set durable to true, the queue will survive a broker restart, and any messages in the queue will be retained. This can be useful if we want to ensure that messages are not lost even in the case of a broker failure.
The noAck option is also passed as true when consuming the queue. This means that the message will be automatically acknowledged once it has been received.
If we set noAck to false, the message will remain unacknowledged until the consumer explicitly acknowledges it using the channel.ack() method. This can be useful if we want to ensure that messages are not lost even in the case of a consumer failure.
The concept of acknowledgement (ack) is used to ensure that a message has been successfully delivered and processed by the consumer. If a message is not acknowledged, it will be requeued and delivered to another consumer.
In contrast, the concept of rejection (reject) is used to indicate that a message has been rejected by the consumer, typically because it is malformed or contains invalid data. In this case, the message will be discarded and cannot be requeued.
If a message is not acknowledged or rejected, it will remain unacknowledged and will eventually be sent to a dead-letter queue (DLQ) if one has been configured. A DLQ is a special queue that holds messages that could not be delivered to their intended destination for some reason, and can be used for debugging and error handling purposes.
Comparison to SQS and Kafka
• Amazon Simple Queue Service (SQS) is a managed message queue service provided by Amazon Web Services (AWS). SQS offers a reliable, scalable, and distributed message queue, with features like automatic retries, dead-letter queues, and time-to-live (TTL) for messages.
• Amazon Simple Queue Service (SQS) and RabbitMQ (RMQ) are two different message brokers and cannot be directly integrated with each other. However, both services provide APIs that allow you to interact with them programmatically.
• To use SQS in your application, you need to create an SQS queue and configure it according to your needs. You can then use the AWS SDK for your preferred programming language to send and receive messages from the queue.
• SQS provides a number of features that can help you build reliable and scalable applications. One of these features is the dead-letter queue (DLQ). A DLQ is a special queue that holds messages that could not be delivered to their intended destination for some reason, such as a message with an invalid format or a message that has exceeded its time-to-live (TTL).
• In SQS, you can configure a DLQ to receive messages from a primary queue that has failed to deliver messages. This allows you to isolate and troubleshoot failed messages without affecting the main application flow.
• Another useful feature of SQS is the time-to-live (TTL) for messages. With TTL, you can set a time limit on how long a message can stay in a queue before it is automatically deleted. This can help prevent message backlogs and ensure that your application is processing messages in a timely manner.
• In RabbitMQ, similar concepts exist but with different terminology. RabbitMQ provides the concept of exchanges and bindings, which allow messages to be routed to the appropriate queues. RabbitMQ also provides the ability to set time-to-live (TTL) for messages, and to define a dead-letter exchange for messages that cannot be delivered to their intended queue.
• Apache Kafka is a distributed streaming platform for building real-time data pipelines and streaming applications. Kafka can handle high-throughput and low-latency data streams and provides durable storage, replication, and fault tolerance.
Comparing RabbitMQ, SQS, and Kafka, we can see that:
• RabbitMQ is a traditional message broker and focuses on reliable and scalable communication between microservices. It provides advanced features like exchanges, bindings, and routing keys for message routing.
• Reliability and scalability are important aspects of a message broker, as they help ensure that messages are delivered in a timely and accurate manner, even as the system grows and changes over time. A reliable message broker should be able to handle large volumes of messages without losing or dropping them, and should be able to recover quickly in the event of a failure or outage.
• RabbitMQ is a traditional message broker that is designed to be reliable and scalable. It provides advanced features like exchanges, bindings, and routing keys to enable flexible and efficient message routing within a system.
• Exchanges in RabbitMQ are responsible for routing messages to the appropriate queues. There are several types of exchanges available, including direct, topic, headers, and fanout exchanges, each with its own rules for message routing. Exchanges receive messages from producers and use routing keys to determine which queues the messages should be sent to.
• Bindings in RabbitMQ are used to bind a queue to an exchange, specifying the routing key or pattern that should be used for message routing. When a message is sent to an exchange with a matching routing key, it is delivered to all queues that are bound to that exchange with a matching routing key.
• Routing keys in RabbitMQ are used to route messages from exchanges to queues. They are a way of specifying the intended destination of a message, based on a set of rules defined by the exchange and its bindings.
• Overall, RabbitMQ's advanced features for message routing make it a powerful and flexible message broker that can be customized to fit a wide variety of use cases. Its focus on reliability and scalability makes it a popular choice for building distributed and microservices architectures.
• SQS is a fully managed message queue service with high scalability and reliability, but limited customization options.
• SQS is a managed service provided by AWS, which means that it abstracts away much of the underlying infrastructure and configuration details from the user. This level of abstraction makes SQS easier to use and manage, but also limits the amount of customization that can be done.
• For example, in SQS, you don't have direct control over the underlying infrastructure or message broker software, like you would with a self-hosted solution like RabbitMQ. Additionally, SQS has strict limits on message size, retention periods, and throughput that cannot be customized beyond the allowed limits.
• While these limitations may be a downside for some use cases, the tradeoff is that SQS provides a high level of scalability, reliability, and fault-tolerance without requiring extensive management and maintenance on the part of the user.
• Kafka is a distributed streaming platform designed for high-throughput and low-latency data streams. It provides a publish-subscribe model, a high-level API, and built-in support for stream processing.
• Kafka is a system that can handle large amounts of data in real-time. It lets you organize data into categories called "topics" and allows you to publish data to these topics or subscribe to them to receive the data. This makes it easy to process data in real-time without waiting for large batches to accumulate. Additionally, Kafka includes tools for analyzing and transforming data as it's being streamed.
Each of these services has its own strengths and weaknesses, and the choice depends on the specific requirements of the use case.
Let's say you're building a real-time system for processing financial transactions. In this system, you have a producer service that receives transactions from users and publishes them to a message queue, and multiple consumer services that process these transactions in different ways (e.g. fraud detection, analytics, etc.). The system needs to be highly reliable and scalable, and must handle large volumes of transactions in real-time.
For this use case, Kafka would be a good choice because it's designed for high-throughput and low-latency data streams, and can handle large volumes of data in real-time. Kafka's publish-subscribe model and built-in support for stream processing would allow you to easily publish transactions to multiple topics for different consumers to process in parallel. Kafka also includes tools for analyzing and transforming data as it's being streamed, which could be useful for real-time fraud detection.
On the other hand, if the system didn't require real-time processing and had lower throughput requirements, a message queue service like RabbitMQ or SQS might be a better choice. These services are easier to set up and use, and provide more customization options for handling messages. Additionally, RabbitMQ's advanced features like exchanges and bindings could be useful for routing messages to specific consumers based on their content.
Here is a flowchart that summarizes the comparison between RabbitMQ, SQS, and Kafka:
Posted on February 25, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 5, 2024