Ordered queue processing in Azure Functions with Sessions

jeffhollan

Jeff Hollan

Posted on May 30, 2019

Ordered queue processing in Azure Functions with Sessions

Let's chat about ordering. It's one of my favorite topics, and something I've blogged about extensively before. Previously ordered processing in Azure Functions was only possible with event streams like Azure Event Hubs, but today I want to show how you can preserve order for Service Bus queues and topics as well.

On the surface it seems pretty straight-forward: I want to be able to process messages from a queue in the exact order that I received them. For a simple service running on a machine, it's pretty easy to achieve. However, how do I preserve the ordering of queue messages when I want to process at scale? With something like Azure Functions I may be processing messages across dozens of active instances, how can I preserve ordering?

Let's use a simple example of a messaging system that deals with patients at a hospital. Imagine I have a few events for each patient:

  1. Patient arrives
  2. Patient assigned a room
  3. Patient receives treatment
  4. Patient is discharged

I want to make sure I never process a message out of order and potentially discharge a patient before I've processed their treatment!

Let's run some quick experiments to see what happens. For this I'm going to simulate 1000 patients each sending these 4 messages (in order) and processing them (ideally in order as well).

Service Bus logo

Default and out of order

Let's try this with a simple Azure Function that just triggers on a queue. I'm not going to do anything special, just trigger on the queue and push the operation it's processing to a list on Redis Cache.



public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString")]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    await _client.PushData((string)message.UserProperties["patientId"], Encoding.UTF8.GetString(message.Body));
}


Enter fullscreen mode Exit fullscreen mode

After sending 1000 patients worth of data (4 messages each) to this queue, what does the Redis Cache look like after processing? Well some of the patients look great. When I lookup Patient #4 I see:



>lrange Patient-$4 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"


Enter fullscreen mode Exit fullscreen mode

Great! All 4 events were sent for Patient 4, and got processed in order. But if I look at patient 2:



>lrange Patient-$2 0 -1
1) "Message-1"
2) "Message-2"
3) "Message-0"
4) "Message-3"


Enter fullscreen mode Exit fullscreen mode

In this case it didn't finish processing the "patient arrives" message until after 2 other messages had already been processed. So what happened here? Azure Service Bus does guarantee ordering, so why are my messages out of order?

Well by default, the queue trigger will do a few things. First, for every instance that spins up, it will process a set of messages concurrently. By default an instance concurrently processes 32 messages. That means it may be processing all 4 messages for a patient at the same time and they finish in different order than they were sent. Well that seems easy enough to fix, let's just limit the concurrency to 1.

Anti-pattern: limit scale out and concurrency

Here's maybe the most common solution to the above problem that I see. Let's limit the concurrency to only process 1 message at a time instead of 32. For that I modify my host.json file and set the maxConcurrentCalls to 1. Now each instance will only process 1 message at a time. I run the same test again.

First off, it's super slow. It takes me a long time to chew through the 4000 queue messages because each instance only processes 1 at a time. And worse yet? When I check the results afterwards, some of the patients are still out of order! What's going on here? Even though I limited the instance concurrency to 1, Azure Functions has scaled me out to multiple instances. So if I have 20 function app instances that have scaled, I have 20 messages being processed concurrently (1 per instance). That means I still get into a spot where messages from the same patient could be processed at the same time - just on different instances. I'm still not guaranteed ordered processing.

The fix here? Many people want to limit the scale out of Azure Functions. While it's technically possible, it would hurt my throughput even more. Now only one message globally could be processed at a time, meaning during high traffic I'm going to get a large backlog of patient events that my function may not be able to keep up with.

Sessions to the rescue

Wouldn't this be such a sad blog post if I ended it here? There is a better way! Previously I would have said your best bet here may be to use Event Hubs which, because of partitions and batches, you can guarantee ordering. The challenge here though is that sometimes a queue is the right message broker for the job given its transactional qualities like retries and deadlettering. And now you can use queues and get ordering with Service Bus sessions 🎉.

So what are sessions? Sessions enable you to set an identifier for a group of messages. In order to process messages from a session, you first have to "lock" the session. You can then start to process each message from the session individually (using the same lock / complete semantics of a regular queue). The benefit of sessions is it enables you to preserve order even when processing at high scale across multiple instances. Think of before where we had something like 20 Azure Function app instances all competing for the same queue. Rather than not scaling to 20, now all 20 instances each will "lock" its own available session and only process events from that session. Sessions also ensure that messages from a session are processed in order.

Sessions can be dynamically created at any time. An instance of Azure Functions spins up and first asks "are there any messages that have a session ID that hasn't been locked?" If so, it locks the session and starts processing in order. When a session no longer has any available messages, Azure Functions will release the lock and move on to the next available session. No message will be processed without first having to lock the session the message belongs to.

For our example above, I'm going to send the same 4000 messages (4 patient events for 1000 patients). In this case, I'm going to set the patient ID as the session ID. Each Azure Functions instance will acquire a lock on a session (patient), process any messages that are available, and then move on to another patient that has messages available.

Using sessions in Azure Functions

Sessions are currently available in the Microsoft.Azure.WebJobs.Extensions.ServiceBus extension using version >= 3.1.0, and at the time of writing this is in preview. So first I'll pull in the extension.



Install-Package Microsoft.Azure.WebJobs.Extensions.ServiceBus -Pre


Enter fullscreen mode Exit fullscreen mode

And then make the tiniest code change to my function code to enable sessions (isSessionsEnabled = true):



public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    await _client.PushData(message.SessionId, Encoding.UTF8.GetString(message.Body));
}


Enter fullscreen mode Exit fullscreen mode

I also need to make sure I'm using a session-enabled queue or topic.

session enabled queue

And when I push the messages to the queue, I'll set the right sessionId for each patient message I send.

After publishing the function I push the 4000 messages. The queue gets drained pretty quickly, because I'm able to process multiple sessions concurrently across scaled-out instances. After running the test I check Redis Cache. As expected, I see all messages were processed, and for every single patient I see they were processed in order:



>lrange Patient-$10 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"

>lrange Patient-$872 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"


Enter fullscreen mode Exit fullscreen mode

So with the new Azure Functions support for sessions, I can process messages from a Service Bus queue or topic in order without having to sacrifice on overall throughput. I can dynamically add messages to a new or existing session, and have confidence that messages in a session will be processed in the order they are received by service bus.

You can see the full sample I used for testing and loading messages in my GitHub repo. The master branch will be all in order, and the out-of-order branch is the default and out of order experiment.

💖 💪 🙅 🚩
jeffhollan
Jeff Hollan

Posted on May 30, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related