jbebar
Posted on March 18, 2021
What is Rabbit MQ ๐?
Rabbit MQ is a broker of messages implementing the AMQP protocal. It makes in possible for application to communicate in synchronous as well as in aynchronous way. Each application can subscribe and wait for new messages to arrive.
AMQP involves three main actors: publishers, consumers and the broker.
A publisher is an application that pushes messages to a Rabbit MQ broker.
In the Rabbit MQ broker has two important elements: queues and exchanges binded by routing rules.
When Rabbit MQ receives a message, it will check the routing information of the message and drop it the right queue according to the routing rules. The messages will stay in the queue until an application subscribes to the queue and reads the messages.
In this article I will just talk about the consuming part based on a queue with existing messages and a consumer reading from it, so no need to understand the routing part :).
There are two ways a consumer can read a message from a queue:
- auto acknowledgement mode (autoAck): the messages are automatically removed from the queue as soon as they are sent to the consumer, this is
- manual Acknowledgement mode (manualAck): the consumer subscribes to a queue, and the messages are removed only when the consumer has sent back an acknowledgement (Ack) back to the broker.
We will experiment these two ways of consuming messages.
A simple consumer example ๐งช
Let's see the code of a simple consumer made in Kotlin:
package org.jbebar
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Delivery
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.exitProcess
fun main(args: Array<String>) {
val arguments = args.map { it.split("=") }.map { Pair(it[0], it[1]) }.toMap()
val autoAck = arguments["autoAck"].toBoolean()
val qOS = arguments["qos"]?.toInt()
val processDurationSeconds = arguments["processDurationSeconds"]!!.toLong()
val countDownLatch = CountDownLatch(10)
var processedMessageCount = AtomicInteger(0)
val factory = ConnectionFactory()
factory.host = "localhost"
val connection = factory.newConnection()
val channel = connection.createChannel().apply { qOS?.let { basicQos(it) } }
val deliveryCallback = { _: String, message: Delivery ->
val receivedMessage = String(message.body)
println("Processing : $receivedMessage")
Thread.sleep(processDurationSeconds * 1000)
if (!autoAck) {
println("Acknowledging message $receivedMessage")
channel.basicAck(message.envelope.deliveryTag, false)
}
processedMessageCount.incrementAndGet()
countDownLatch.countDown()
println("Processed : $receivedMessage")
}
channel.basicConsume("demo-queue", autoAck, deliveryCallback, { _ -> })
countDownLatch.await(10, TimeUnit.SECONDS)
println("Processed ${processedMessageCount.get()} messages.")
exitProcess(0)
}
The first part of this code retrieves the command line arguments that will parameter our consumer:
"autoAck": parameters the way our consumers reads messages, if true it is in auto ack mode, if false we are in manualAck.
"qOS": quantity of outstanding messages, quantity of messages that the broker allows to be unacknowledged for one consumer. Also called the quantity of inflight messages.
This parameter is meaningful only if we are in manualAck mode"processDuration": This parameter sets the time or application takes to process each message. It is not related to rabbit mq configuration, but it is a way to simulate a lagging consumer or a fast one.
val autoAck = arguments["autoAck"].toBoolean()
val numberOfMessagesToProcess = arguments["numberOfMessagesToProcess"]?.toInt() ?: 1
val qOS = arguments["qos"]?.toInt()
val processDuration = Duration.ofSeconds(arguments["processDurationSeconds"]?.toLong() ?: 0L)
We create connection based on the Qos passed earlier:
val factory = ConnectionFactory()
factory.host = "localhost"
val connection = factory.newConnection()
val channel = connection.createChannel().apply {
qOS?.let { basicQos(it) }
}
In this deliveryCallback, we simulate a business process and pass it to the channel which will consume from a queue called "demo-queue".
val deliveryCallback = { _: String, message: Delivery ->
val receivedMessage = String(message.body)
println("Processing : $receivedMessage")
Thread.sleep(processDurationSeconds * 1000)
if (!autoAck) {
println("Acknowledging message $receivedMessage")
channel.basicAck(message.envelope.deliveryTag, false)
}
processedMessageCount.incrementAndGet()
countDownLatch.countDown()
println("Processed : $receivedMessage")
}
One important part of the code is the CountDownLatch
.
It prevents our application from exiting before receiving straight after it launched as it blocks and exits only on two conditions:
- countDownLatch.countDown() has been called 10 times, that means 10 messages have been processed.
- it passes a duration of 10 seconds
You can see the full code here :).
Let's try our consumer ๐งช !
3 steps:
- 1 Launch the rabbit Mq broker (check the admin interface on http://localhost:15672/#/ )
docker run -d --rm --hostname my-rabbit --name rabbit-broker -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- 2 Build and launch the publisher to publish 10 messages:
git clone https://github.com/jbebar/rabbit-mq-sample-sender.git
mvn clean package
java -jar target/amqp-sender-jar-with-dependencies.jar 10
You should see the queue in the admin interface of rabbit mq, if you go in the detail of the queue you will see this:
- 3 Build and launch the app:
git clone https://github.com/jbebar/rabbit-mq-sample-receiver.git
mvn clean package
java -jar target/amqp-consumer-jar-with-dependencies.jar autoAck=true processDurationSeconds=0
Out put:
Processing : Message 1
Processed : Message 1
Processing : Message 2
Processed : Message 2
Processing : Message 3
Processed : Message 3
Processing : Message 4
Processed : Message 4
Processing : Message 5
Processed : Message 5
Processing : Message 6
Processed : Message 6
Processing : Message 7
Processed : Message 7
Processing : Message 8
Processed : Message 8
Processing : Message 9
Processed : Message 9
Processing : Message 10
Processed : Message 10
Processed 10 messages.
The process is fast enough to consume all messages before the 10 seconds time out.
But if we try with a business process duration of 5 seconds:
java -jar target/amqp-consumer-jar-with-dependencies.jar autoAck=true processDurationSeconds=5
Processing : Message_1
Processing : Message_2
Processing : Message_3
Processed 2 messages.
Our consumer has time to process only two messages before application exits and the queue is now empty meaning we lost 8 messages before they could be processed by our service.
Our consumer had all its messages in memory and they vanished when it shutdown.
In real life scenario this is dangerous: what if a payment service missed a payment because the service restarted or because our application simply crashes?
Using manual ack and Qos to the rescue
Manual ack
java -jar target/amqp-consumer-jar-with-dependencies.jar autoAck=false processDurationSeconds=5
Processing : Message_1
Acknowledging message Message_1
Processed : Message_1
Processing : Message_2
Acknowledging message Message_2
Processing : Message_3
Processed 1 messages.
If we look at the history of our queue we have 9 messages remaining, no more losses.
The broker behaviour with manualAck is to requeue the messages that have not been acknowledged after a certain period of time or because of a consumer disconnection.
In our case, our consumer was delivered all the messages, that's why the number of messages ready drops suddenly, then we had time to process just one of them and send the ack.
The message was then removed from the queue by the broker.
Finally, the number of ready messages goes back up to 9 because rabbit mq sees the connexion is lost with the consumer as the consumer shutdown.
You can see in the following graph, first the ready messages are dropping in favor of the unacknwoledged messages going up. Then when the consumer disconnects, the number of ready messages for other consumers goes back up.
Setting the Qos
The Qos limits the number of messages unacknowledged for a given consumer. This means it limits the number of messages buffered in the memory of the consumer and not yet processed.
This a way to protect our consumer from crashing permanently when reading from a queue with many messages. Also, if the consumer is too long to ack the messages, they are not available for other consumers.
The optimal value depends on the capacity of the consumer, the documentations advices a Qos from 100 to 300 :
To continue this article, an interesting experiment would be to upload a large amount of messages to the queue, reduce the memory available for a our consumer and see at which Qos it will explode ๐ฅ ๐ค.
Thanks for reading and feel free to comment or ask any question.
Posted on March 18, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.