Building a Serverless Program with AWS Lambda, DynamoDB, and SQS

vedvaghela

VedVaghela11

Posted on June 29, 2023

Building a Serverless Program with AWS Lambda, DynamoDB, and SQS

Introduction:

Serverless computing has revolutionized the way we develop and deploy applications, offering scalability, cost-efficiency, and simplified management. In this blog, we'll explore how to build a serverless program using AWS Lambda, DynamoDB, and SQS (Simple Queue Service). By leveraging these powerful services, we'll create three Lambda functions to automate the processing of managing active and inactive accounts.

Why Serverless ?

By adopting a serverless architecture, we can enjoy several advantages. Firstly, we don't need to manage any server infrastructure as AWS Lambda automatically handles the scaling, capacity provisioning, and fault tolerance. Secondly, we only pay for the compute resources used during the execution of the Lambda functions, allowing for cost optimization. Lastly, the decoupled nature of serverless components provides flexibility and enables independent development and scaling.

The Problem :

For example, we have data of 10 accounts in DynamoDB => 4 are active and 6 are inactive. We need the Ids of inactive account to make them active and allocate them to their further use cases. When multiple users trigger lambdas to fetch data directly from DynamoDB to get their temporary account Ids, two lambdas may receive the same ID from the DynamoDB, when these two item fall under the same category of being Inactive.

Problem diagram

Solution:

This is solved by a simple method, i.e. by using Amazon SQS service.
Amazon SQS is a reliable, highly-scalable hosted queue for storing messages as they travel between applications or microservices. Amazon SQS moves data between distributed application components and helps you decouple these components.

Solution Diagram

Setup :

Dynamo DB :
For simplicity in this example, we are designing a simple DynamoDB Structure with only 2 Attributes:

  1. AccountID (String) => Partition Key
  2. Status (String)

Example Table:

Dynamo DB Structure

SQS :

Go to the SQS service and click on the create queue option.
Select Standard Queue as the type of queue, keep everything else as default and proceed to creating the queue.

Queue Creation

After creation of the queue, go into "myTestQueue".

Queue Details

In the details block, take note of the sqs url which will used later in our serverless functions.
We can later check the contents of our queue by clicking on the "send and receive messages" button.

We will be creating 3 lambda functions:

1) dumpIntoSQS
2) receiveID
3) pushIntoSQS_uponChange

We will be using the AWS SDK for Python : Boto3

1) dumpIntoSQS

This function will dump the ids of all the inactive accounts present in our dynamodb table. This sets up the automation in motion.

import json
import boto3
import os

db_client = boto3.client('dynamodb')
sqs = boto3.client('sqs')
queue_url  = os.environ['SQS_URL']
dynamodb_name = os.environ['DYNAMO_DB_NAME']

def lambda_handler(event, context):
    #getting the Inactive account Ids from dynamodb
    response = db_client.scan(
        TableName=dynamodb_name,
        FilterExpression='#st = :s_val',
        ExpressionAttributeNames={
            "#st" : "Status"
        },
        ExpressionAttributeValues={
            ':s_val': {'S':'Inactive'}
        }
    )

    items = response['Items']
    for acc in items:
        #sending individual accounts to the queue        
        response_q = sqs.send_message(
        QueueUrl=queue_url,
        DelaySeconds=10,
        MessageAttributes={
            'AccountID': {
                'DataType': 'String',
                'StringValue': acc['AccountID']['S']
            }
        },
        MessageBody=(
            'Inactive account id info '
        )
        )

        print(response_q['MessageId'])

    return {
        'statusCode': 200,
        'body': json.dumps('All Inactive Account Ids Added into SQS queue')
    }

Enter fullscreen mode Exit fullscreen mode
  1. use environment variables for "SQS URL" and "DYNAMO DB NAME". This can be set up in lambda funciton -> configuration -> environment variables (use environment variables in all the lambda functions for sensitive information like url, arn values etc)
  2. "Status" is a reserved key word for dynamodb, thus we are using FilterExpression and ExpressionAttributeNames to make :s_val as a placeholder for the status attribute
  3. For the Lambda to execute we need to give appropriate permissions to our lambda role. Go to Functions > dumpIntoSQS > Configuration > Permissions > Execution Role which will redirect you to IAM policies of our lambda role.
  • Click on Add Permissions > Attach Policy. Attach the following policies to our role.
    • AmazonDynamoDBFullAccess
    • AmazonSQSFullAccess

2) receiveID

This function performs three tasks:

  • Receive an account id of an inactive account from the queue.

  • to change the status of accounts from inactive to active in the table once the accounts are assigned from the queue.

  • Delete the assigned id from queue

import json
import boto3
import os

sqs = boto3.client('sqs')
dynamodb = boto3.resource('dynamodb')
queue_url = os.environ['SQS_URL']
dynamoDB_name = os.environ['DYNAMO_DB_NAME']

def lambda_handler(event, context):

    # Receive message from SQS queue
    response_sqs = sqs.receive_message(
        QueueUrl=queue_url,
        AttributeNames=[
            'SentTimestamp'
        ],
        MaxNumberOfMessages=1,
        MessageAttributeNames=[
            'All'
        ],
        VisibilityTimeout=0,
        WaitTimeSeconds=0
    )

    message = response_sqs['Messages'][0]
    receipt_handle = message['ReceiptHandle']
    accountIdForUpdate = message['MessageAttributes']['AccountID']['StringValue']

    #update dynamoDB before deletion from queue
    table = dynamodb.Table(dynamoDB_name)
    response_db = table.update_item(
        Key={
            'AccountID': accountIdForUpdate,
        },
        ExpressionAttributeNames={
            "#st" : "Status"
        },
        UpdateExpression='SET #st = :s_val',
        ExpressionAttributeValues={
            ':s_val': 'Active'
        }
    )
    # Delete element from queue
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=receipt_handle
    )

    print('Received message & deleted: %s \n' % message )
    print('Item updated: ',response_db)

    return {
        'statusCode': 200,
        'body': accountIdForUpdate
    }


Enter fullscreen mode Exit fullscreen mode

For the Lambda to execute we need to give appropriate permissions to our lambda role. Go to Functions > dumpIntoSQS > Configuration > Permissions > Execution Role which will redirect you to IAM policies of our lambda role.

  • Click on Add Permissions > Attach Policy. Attach the following policies to our role.
    • AmazonDynamoDBFullAccess
    • AmazonSQSFullAccess

3) pushIntoSQS_uponChange

This lambda will send a message to SQS, i.e. pushes into the queue, containing the ID of the account that has been changed from Active to Inactive after completion of its usage.

import json
import boto3
import os

sqs = boto3.client('sqs')
queue_url  = os.environ['SQS_URL']

def lambda_handler(event, context):

    accountID = event['Records'][0]['dynamodb']['NewImage']['AccountID']['S']

    if event['Records'][0]['dynamodb']['NewImage']['Status']['S'] == 'Inactive':

        #send the changed id to queue
        response = sqs.send_message(
            QueueUrl=queue_url,
            DelaySeconds=10,
            MessageAttributes={
                'AccountID': {
                    'DataType': 'String',
                    'StringValue': accountID
                }
            },
            MessageBody=(
                'Inactive account id info '
            )
        )


    print('Status changed to ', event['Records'][0]['dynamodb']['NewImage']['Status']['S'])
    return {
        'statusCode': 200,
        'body': json.dumps('Status is changed in DynamoDB')
    }

Enter fullscreen mode Exit fullscreen mode

For the Lambda to execute we need to give appropriate permissions to our lambda role. Go to Functions > dumpIntoSQS > Configuration > Permissions > Execution Role which will redirect you to IAM policies of our lambda role.

  • Click on Add Permissions > Attach Policy. Attach the following policies to our role.
    • AWSLambdaSQSQueueExecutionRole
    • AmazonSQSFullAccess

To trigger the lambda we need to setup a DynamoDB Stream.

To Set up the DynamoDB Stream go to the Table -> Exports and Streams

DB Stream

Stream details

Once the DynamoDB Stream is turned on. We need to add trigger, so that this stream can trigger the execution of a lambda function

create trigger

select function

Now we have created the DB Stream and have set it as the trigger for our lambda.

After setting up the lambda, test it by changing one of the values of active attribute to inactive or by creating a new account ID with inactive status. Then check your SQS Queue for messages and you can see a message received which looks this the following:

SQS message example

Conclusion:

In this blog, we explored how to build a serverless program using AWS Lambda, DynamoDB, and SQS. By implementing three Lambda functions, we automated the processing of managing active and inactive accounts. We utilized DynamoDB Streams to trigger a Lambda function whenever an account's status changed and added the account ID to an SQS queue. Finally, we created a Lambda function to consume the account IDs from the queue and perform necessary actions.

*Documentation and references: *
Dynamo DB
SQS
Code(github)

Note:
The policies attached to the lambda roles are of full access for the purpose of smooth operations in the example. However, One should follow the principle of least privilege and only give necessary policies to the role.

This blog is for knowledge purposes only. Use the resources wisely and responsibly.

💖 💪 🙅 🚩
vedvaghela
VedVaghela11

Posted on June 29, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related