AWS SQS Events On AWS Lambda
Ivonne Roberts
Posted on August 26, 2020
In 2018, AWS has announced support for SQS triggered events on AWS Lambda. For companies embracing a serverless architecture this opened up new possibilities for event-driven architecture, streamlining batch infrastructure and much more.
Before the feature launched, if you were a serverless shop that needed to process SQS messages, the only option was to use CloudWatch to trigger a Lambda function, that polled for messages and then either fanned out workers or chewed threw batches of SQS messages. While this worked, it was prone to error. A Lambda function can only live for 5 minutes, at which point it could either spin up another Lambda function and pass the torch or simply wait for CloudWatch to trigger another Lambda function. Depending on how long the Lambda function would run, you either, lost time waiting for the next CloudWatch trigger or had a high number of messages with multiple receives as the Lambda function timed out.
At Edelman Financial Engines, we put a stake in the ground to embrace two things: event-driven architecture and serverless architecture. These two designs have allowed us to create more scalable applications as well as focus on the things that are important to us and have a direct impact on our clients. With the recent Lambda and SQS announcement, we now have a new option to process our queues in a serverless fashion: simply configure an event source mapping on a Lambda function for a queue.
Event-Driven Architecture
When a client or a planner does something on our site, a cascade of decisions and processes are run. An event-driven architecture enables us to do that work in a reasonable amount of time. Take, for example, a client logging in. When a client presses that login button, we have to do a series of things; get the client’s latest holdings from the provider, validate the client’s information, generate a snapshot of the client’s progress, run Monte Carlo simulations and the list goes on.
With event-driven architecture, as soon as the login happens, an event is sent to an event-bus where consumers are standing by waiting to process certain events. In our use case, an analytics consumer could process the login event and trigger a client engagement metric for reporting. Soon after, another event is published saying the client’s latest holdings are updated. Multiple consumers of that event can then pre-calculate the clients current progress and run Monte Carlo simulations simultaneously and cache the results. Now, as the client navigates to our site landing page, their results show up almost instantaneously instead of the 10+ seconds it would normally take if we did this all sequentially.
Having services like SNS and SQS greatly facilitates this architecture model. We can have consumers (AWS Lambda, SQS queue, HTTP endpoint, etc) set as endpoints to SNS topics that do asynchronous heavy lifting on the inputs. For SQS, queues can be processed by a consumer (formerly only EC2 instances, but now, Lambda functions as well) chewing through the messages in a batch fashion.
Serverless Architecture
At Financial Engines, we have embraced a DevOps culture where decisions are shifted to the functional development team level providing greater choice of tools and technology to match various problems at hand. However, what we don’t want is development teams to be bogged down with tasks like OS patching, auto-scaling servers, and so on. To that end, we have set a standard for teams to use managed services when possible.
Teams can spin up complete microservices using AWS Lambda, DynamoDB, S3, etc., without provisioning or managing any server and OS patching, and still get the power they need. In addition to ease of use, we have had significant cost savings from switching to serverless computing. (Checkout this Financial Engines AWS Case Study for more details on that.)
Using SQS & AWS Lambda to Pre-Calculate Progress
For an example of this new feature/architecture, I am going to dive deep into one of the scenarios mentioned above. When a client’s holdings have been updated, whether the client triggered the update or a job ran to update all client holdings, an event is published with a few details on that client and their holdings. We then have consumers of that event pre-calculate the client’s current progress and cache the results.
There are a couple considerations we needed to make.
- If the client triggered the update (e.g. the client logged in), then it is likely the progress score will be consumed soon and the pre-calculation needs to happen as soon as possible.
- If a job ran to update all client holdings, then the possibility of that progress score being consumed soon is fairly low and therefore we can take our time doing the pre-calculation.
To accomplish the requirements we created the following architecture.
Our event bus publishes events to SNS topics. From there, consumers decide how to process the messages. For our client-triggered updates, we have an AWS Lambda function set as an endpoint to the holdings updated topic. We also configured message attribute filtering so that this Lambda function only receives client-triggered events. For our job-triggered events, we used an SQS queue to consume filtered events. We then used the newly-created SQS Event on AWS lambda to process that queue.
To follow, I’ll walk you through how that SQS-to-AWS Lambda connection was configured.
Step 1:
The first thing you need is a lambda that accepts SQS Events
For our Lambda function we need the following dependencies via gradle:
compile 'com.amazonaws:aws-java-sdk-lambda:1.11.321'
compile 'com.amazonaws:aws-java-sdk-sqs:1.11.321'
At the very basic level, the handler code in Java 8 is the configured something like this:
package com.fngn.samples;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fngn.samples.api.dto.EventDto;
import com.fngn.samples.api.dto.ResponseDto;
import com.fngn.samples.application.service.EventConsumer;
public class QueueConsumerLambda implements RequestHandler<SQSEvent, Void> {
@Override
public Void handleRequest(SQSEvent event, Context context) {
try {
for (SQSEvent.SQSMessage message : event.getRecords()) {
EventDto eventDto = null;
ResponseDto responseDto = null;
String input = message.getBody();
eventDto = unmarshalEventBody(input);
responseDto = eventConsumer.processEvent(eventDto);
handleFailures(responseDto, message);
}
} catch (Exception ex) {
logger.error("Exception handling batch seed request.", ex);
throw ex;
}
return null;
}
...
}
Lastly, we bring this together with the following CloudFormation definition for our Lambda function and our execution role (for bonus points, we have added dead letter queue configuration):
queueConsumerLambda:
Type: 'AWS::Lambda::Function'
Properties:
Handler: 'com.fngn.samples.QueueConsumerLambda::handleRequest'
Role: !Sub "arn:aws:iam::*:role/${queueConsumerExecutionRole}"
Description: !Sub "Cloud formation lambda for ${projectName}"
FunctionName: !Sub "${projectName}-queue-consumer"
MemorySize: 512
Timeout: 30
Code:
S3Bucket: "com.fngn.samples"
S3Key: !Sub "${projectName}/${lambdaCode}"
Runtime: java8
TracingConfig:
Mode: Active
Environment:
Variables:
REGION: !Ref "AWS::Region"
APP_ID: !Sub ${projectName}-event-consumer
ENV_REALM: !Sub ${accountType}
DependsOn:
- queueConsumerExecutionRole
queueConsumerExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Action: 'sts:AssumeRole'
Principal:
Service: lambda.amazonaws.com
Effect: Allow
Sid: ''
Policies:
- PolicyName: !Sub "${projectName}-queue-consumer-policy"
PolicyDocument:
Version: '2008-10-17'
Statement:
...
- Action:
- sqs:ChangeMessageVisibility
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ReceiveMessage
- sqs:SendMessage
Effect: Allow
Resource:
- !GetAtt sqsQueue.Arn
- !GetAtt sqsQueueDLQ.Arn
Sid: 'SQSPermissions'
Step 2:
We can now create an SQS queue via CloudFormation that subscribes to our SNS topic.
sqsQueue:
Type: 'AWS::SQS::Queue'
Properties:
QueueName: !Sub '${projectName}-ConsumerQueue'
RedrivePolicy:
deadLetterTargetArn: !GetAtt sqsQueueConsumerDLQ.Arn
maxReceiveCount: 5
VisibilityTimeout: 600
policyQueueConsumer:
Type: 'AWS::SQS::QueuePolicy'
Properties:
PolicyDocument:
Id: !Sub '${projectName}-ConsumerQueuePolicy'
Version: '2012-10-17'
Statement:
- Sid: 'AllowConsumerSnsToSqsPolicy'
Effect: 'Allow'
Principal:
AWS: '*'
Action:
- 'sqs:SendMessage'
Resource: !GetAtt sqsQueue.Arn
Condition:
ArnEquals:
aws:SourceArn: !Sub 'arn:aws:sns:*:*:holding-events']]
Queues:
- !Ref sqsQueue
Step 3:
Now that those two are in place we can configure the trigger:
sqsEventTrigger:
Type: "AWS::Lambda::EventSourceMapping"
Properties:
BatchSize: 5
Enabled: true
EventSourceArn: !GetAtt sqsQueue.Arn
FunctionName: !Sub "${projectName}-queue-consumer"
DependsOn:
- queueConsumerLambda
- queueConsumerExecutionRole
Step 4:
Send messages to your SQS queue or, in our case, SNS topic and sit back and watch the Lambda function chew through messages as they come in. Honestly, it is as simple as that.
With this configuration, here’s what’s happening. AWS Lambda takes over polling and invoking concurrent Lambda functions to chew through the queue with chunks of messages up to the configured batch size (In our case we configured the batch size to be 10 messages). The Lambda function processes each message in the SQSEvent. If any of those messages fail, the Lambda function fails and the messages stay on the queue. If all the messages are successful then AWS Lambda deletes the message from the queue. If the queue has a dead letter queue (redrive policy above) configured, then AWS Lambda will try to process those messages until it reaches the max receive count of 5. At which point, if the visibility timeout expires and the message is still on the queue, SQS will delete that message from the queue and move it to the dead letter queue.
Monitoring
You can use the dashboards in the AWS Lambda and SQS service consoles as well as configure your own CloudWatch dashboards to monitor the progress. We also employed the use of AWS X-Ray which really helped us early on to identify issues with downstream resources in our microservice.
In the AWS Lambda dashboard, you can see quickly see how the service is invoking your Lambda function pretty consistently until the queue is empty. In the below case, we limited our execution to 200 “workers.” So, you can also see throttles as AWS Lambda enforced the limit on concurrent executors.
In the SQS dashboard, you can see the rate of messages coming into the queue, rate of receives, and other interesting tidbits. As you can see in the below chart, our job was generating events faster that our 200 concurrent executors were processing them, by noticing the age of messages go up and then drop precipitously as the Lambda functions finish processing all the messages.
Here in X-Ray, you can see all the components that make up this microservice. The Lambda function that processes the SNS messages, the Lambda function that process the SQS queue, and the Lambda function that serves up the cached response.
Advance Configuration
Concurrency and EventSourceMapping Config
With our use case, we took configuration a bit further. For example, if a job ran to update all client holdings, then the rate of events per minute spikes fairly high in a short period of time. Downstream resources (e.g. database) used to calculate the progress score might not be able to support that load.
In this case, what we did was create a schedule with subscription details given the day of the week or time of day. We then configured a CloudWatch event that triggers a Lambda function hourly. Based on the schedule’s subscription details, that Lambda function updates either the SQS consumer Lambda function config or its SQS event mapping.
{
"MON-FRI_09:00-04:00_11:00-04:00": {
"queueEnabled":false
},
"MON-FRI_11:00-04:00_22:00-04:00": {
"queueEnabled":true,
"workers":15,
"batchSize":10
},
"MON-FRI_22:00-04:00_09:00-04:00": {
"queueEnabled":true,
"workers":100,
"batchSize":10
}
}
This level of control proved to be quite powerful. During the day, at peak times of our load, it was prudent to not risk stressing downstream resources while they were simultaneously serving client traffic. We could either completely disable processing (i.e. disable the EventSourceMapping) or configure the Lambda function execution concurrency to a low number (ex: 15). However, in the evenings, we can ramp up the number of concurrent executions to 100. On the weekends, we could pump that up even higher to, say, 200 concurrent executions.
It took a cross-functional team effort and several runs to determine our concurrency limit. With resources both in the cloud and on-premises, we needed to closely monitor them as we kept ramping up the numbers. At the end of the day, our on-premises relational database ended up limiting the microservice to not exceed 200 concurrent executions, roughly equaling 5000 messages per minute and 1.4m database calls.
Filtering SNS messages
As mentioned previously, our use case had 2 flavors of the holdings updated event. There were events triggered by users and events triggered by jobs. Those SNS jobs have their event metadata also configured in the SNS message attributes field.
In this case, you can use the SDK to configure filtering the messages for a given endpoint. For our SNS-to-AWS Lambda, flow we could have it only receive events that had appType=WEB. For our SNS-to-SQS flow we configured it to only receive events that had appType=JOB.
Subscription subscription = getSubscriptionForArn(environment.getenv("BATCH_SNS_ENDPOINT"));
if (batchSubscription != null) {
String jsonPolicy = "{\"applicationType\": [\"JOB\"]};
setFilterPolicy(subscription.getSubscriptionArn(), jsonPolicy);
}
We took it a step further and also filtered the messages on the financial institutions or employers that were enabled. This helped us significantly reduce cost on message processing, as well as simplified our code, as the Lambda function no longer had to do its own filtering.
String getBatchSubscriptionFilterPolicy() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonGenerator generator = (new JsonFactory()).createGenerator(baos, JsonEncoding.UTF8);
generator.writeStartObject();
generator.writeFieldName("applicationType");
generator.writeStartArray();
generator.writeString("JOB");
generator.writeEndArray();
Set<String> financialInstitutions = getFinancialInstitutions();
if (financialInstitutions.isEmpty() == false) {
generator.writeFieldName("financialInstitutionId");
generator.writeStartArray();
for (String financialInstitution : financialInstitutions) {
generator.writeString(financialInstitution);
}
generator.writeEndArray();
}
Set<String> employers = getEmployers();
if (employers.isEmpty() == false) {
generator.writeFieldName("employerId");
generator.writeStartArray();
for (String employer : employers) {
generator.writeString(employer);
}
generator.writeEndArray();
}
generator.writeEndObject();
generator.close();
return baos.toString();
}
Notes/Tips/Features Requests
While this is a fairly strong feature, there are a few things we would like to see in the future.
For example, when processing the dead letter queue with this feature, you actually have to manually delete failed messages, or they very quickly become “poison pills.” If you don’t manually delete the messages, then you need to create a dead letter queue for your dead letter queue so that you can set the max receive count to 2 for example, and just ignore what gets into that final queue. It’s a little more heavy lifting than I had expected, when I thought I could just re-use the same Lambda function processing the main queue. It would be nice if AWS would allow you to set a redrive policy without specifying another dead letter queue.
The other issue was the rate of throttling in the Lambda function. At times, we saw several hundred invocations a minute getting throttled. While the documentation says that concurrent execution is honored, we assumed that the rate of Lambda functions the AWS Lambda would invoke was also going to be limited to the number of concurrent executors. I imagine that, long-term, this can get pricey and somewhat wasteful if you limit concurrent executions. I’d like to see AWS address this or try to minimize the number of throttled lambdas.
Lastly, while not really an issue that AWS could address, we struggled with configuring downstream resources’ auto scaling. Since, as mentioned before, jobs can start at any time of day and cause a sudden significant spike of publish events in a short period of time. Downstream services like DynamoDB’s read/write auto scaling always lagged to respond for the first few minutes. This would create a spike in throttles and therefore failed messages, another reason why configuring a dead letter queue is important. (Note that the dashboard shows a per minute average so that doesn’t always equate to actual throttles.)
Conclusion
I hope that through this article you can see how powerful this new feature is and how it can greatly reduce code/management of infrastructure to support queue processing.
Feel free to ask any questions and reach out with any comments you may have. Also, come back and check out any new content we publish. We are always playing around with some of the latest and greatest features with the goal of figuring out all the nuances before teams start using the features.
Happy Coding!
Posted on August 26, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.