Ersin Buckley
Posted on March 22, 2022
RSMQ is the most simple Queue Implementation in the known universe. My own implementation here comes in at under 500 lines of code. Join me for a short tour of the code, and how it works.
People might be asking why I went to this effort when there are already so many implementations? Educational reasons. Implementing RSMQ is a fine and tightly scoped project that can be completed in a couple days of after work effort. In the day job we have been using a PHP implementation of the library but I wanted to deepen my knowledge of this simple tool by creating an implementation in my favorite language!
If you have Redis already, RSMQ is probably the easiest way to add messaging between different languages, processes and machines!
There are PHP/Python/Javascript/Java/C#/Rust/Go implementations now available for your usage. Want to write your own? No problem! Read this and become convinced that this is a simple and basic implementation task.
How does RSMQ work?
Create Queues, send Messages, receive them on the diaspora of devices/processes and languages. Queues contain messages. Messages are delivered in a first in-first out basis (think about the queue in line at the McDonloads).
A Message is only delivered to one consumer. A consumer, being the thing that takes the message and finishes the work, deleting the message at the end. There is a time limit for how long a message can be processed for. When the time runs out, the message re-enters the queue and can be picked up by another consumer.
A typical worker implementation will receive a message, do the work, and then delete the message once it has processed successfully. If the worker crashes during the work, the message will be re-entered on the queue automatically, and another consumer will be able to pick it up.
Implementing our own.
Some Redis knowledge assumed. You should become familiar with the basics. Today we will be using the following data structures.
- Sets (contains the list of queues)
- Hashes (contains the stats for a specific queue, and messages)
- Sorted Sets (Zset)
Creating a new message Queue
A new queues is simply a unique Hash and a member added to a set.
key := rsmq.ns + ":" + opts.QName + ":Q"
A queue is uniquely identified by the key. This is how we reference the queue, send messages to it, pop from it, update attributes about the queue. You will be using similar identifiers to this throughout your code in a lot of places.
_, err = rsmq.cl.HMSet(ctx, key, map[string]interface{}{
"createdby": "ersin",
"vt": 30, // TODO allow this to be set with CreateQueueRequestOptions
"delay": 0,
"maxsize": 65536,
"created": unixMilliseonds,
"modified": unixMilliseconds,
}).Result()
if err != nil {
return fmt.Errorf("CreateQueue: set queue params: %w", err)
}
We define a hash for the attributes of the queue, including informative things like the delay in seconds before a message becomes receivable and the maximum size of the message in the queue.
_, err = rsmq.cl.SAdd(ctx, rsmq.ns+":QUEUES", opts.QName).Result()
if err != nil {
return fmt.Errorf("CreateQueue: add queue to QUEUES set: %w", err)
}
Finally, we add the name of the queue to a set key. This allows our library to list all queues that are created under a given namespace.
Sending a message to the queue
Code first, description second. It looks like this!
pipe := rsmq.cl.Pipeline()
pipe.ZAdd(ctx, key, &Redis.Z{
Score: float64(q.TimeSent.Add(sendTime).UnixMilli()),
Member: q.UID,
})
pipe.HSet(ctx, key+":Q", q.UID, opts.Message)
pipe.HIncrBy(ctx, key+":Q", "totalsent", 1)
_, err = pipe.Exec(ctx)
Sending a message is a three step process that must happen as Redis 'pipeline'. A pipeline ensures that the actions do not require a round trip between client and server.
- Add the message ID to the Sorted Set
rsmq.ns + ":" + opts.QName
with a score being the unix milliseconds of when it should send. Later on, we will re-score this message which has the effect of hiding it from other Q consumers. - Set the message content as a value on the
rsmq.ns + ":" + opts.QName + ":Q"
hash. The key of the content is the message ID - Increment the
totalSent
field of the Q hash used in step (2).
Note that we execute these in a 'pipe', which ensures that these steps happen atomically on the Redis data store, so we can be confident that the 3 commands always happen in sequence.
ReceiveMessage
Our final operation for this code review is to receive a message from the queue. This step would be called by the workers that do the actual processing.
When a message is received it comes with an explicit deadline set for processing the message. This is known as the 'Visibility Timeout' and it is the number of second in which this message will be exclusively available to the one worker. If the visibility timeout expires, then the message will end up being sent to another worker.
results, err := rsmq.cl.EvalSha(
ctx,
// this references the embedded LUA procedure
*rsmq.receiveMessageSha1,
// these parameters are presented as KEY to the LUA script
[]string{key, timeSentUnix, timeVisibilityExpiresUnix}).Slice()
Only a single line on the Go side of things. We call a Lua script which runs on Redis to do the heavy lifting. An embedded Lua script runs in an atomic an thread-safe way on the datastore. This is how we get the guarantees of only one worker receiving a message at a time.
The following parameters to the Lua:
- KEYS[1]. Is the name of the sorted set containing the Q
rsmq.ns + ":" + opts.QName
- KEYS[2]. Is the time when a message has been requested from the Q.
- KEYS[3]. Is the time when a message should become visible to other consumers again.
Now let's step through the Lua procedure.
local msg = Redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1")
if #msg == 0 then
return {}
end
Remember that there is a sorted set scored by timestamp sent for each message in the queue. The above code returns exactly one or zero message that is due for return between the -inf
score and the current timestamp provided KEYS[2]
.
Redis.call("ZADD", KEYS[1], KEYS[3], msg[1])
Next we re-score the current message found with the new visibility timeout defined in the KEYS[3]
parameter. This step is important, because it keeps our queue in the data store. Other workers will be able to pick up this message again if it is not deleted before the visibility time has expired.
Redis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1)
Now we increment the totalrecv
counter on the Hash which contains the Q.
The final block of code concerns returning the message.
local mbody = Redis.call("HGET", KEYS[1] .. ":Q", msg[1])
local rc = Redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1)
local o = {msg[1], mbody, rc}
if rc==1 then
Redis.call("HSET", KEYS[1] .. ":Q", msg[1] .. ":fr", KEYS[2])
table.insert(o, KEYS[2])
else
local fr = Redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr")
table.insert(o, fr)
end
return o
An HINCRBY is called to increment our count of how many times this message has been received.
To wrap it all up, the message content is returned along with a few additional infomrative attributes.
Want to implement your own?
My dreams would come true if I inspired you to write something like this for your own learning purposes. It would make my week complete if you left a comment.
All Credit for the original implementation of RSMQ goes to https://github.com/smrchy/rsmq :)
Posted on March 22, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.