Supercharge File Processing with AWS Lambda, S3 and DynamoDb

dhoang1905

Donovan HOANG

Posted on December 15, 2023

Supercharge File Processing with AWS Lambda, S3 and DynamoDb

TL; DR.

Unlock the potential of serverless architecture with AWS Lambda and DynamoDB to revolutionize your sales data processing:

  • Streamline data transformations: Say goodbye to complex ETL pipelines.
  • Enrich data dynamically: Calculate prices with taxes, totals, and unique IDs automatically.
  • Scalability and cost-efficiency: AWS Lambda adapts seamlessly to your workload.
  • Embrace serverless efficiency for transformative insights into your sales data.
  • Ready to transform your data? Dive into the article for a step-by-step guide!

S3, Lambda, and DynamoDb logos


Introduction

When a large volume of files needs to be processed, the costs can skyrocket as well as the capacity of handling this volume. Many options will not be suitable in this case as a server-centric architecture or manual scalability. Imagine being able to automate this process seamlessly, without the need for complex infrastructure. That’s the power of building a serverless file processing system using AWS Lambda and Amazon S3 Events.

Enter AWS Lambda and Amazon S3 Events — powerful tools that, when combined, offer a serverless solution for file processing. In this article, we’ll explore the step-by-step process of setting up a serverless file processing system using AWS Lambda and S3 Events. I will use Terraform as IaC tool.

You can find all the sources of this project on Github.

GitHub logo Donovan1905 / s3-serverless-file-processing

Serverless file processing using Lambda and S3 on AWS

s3-serverless-file-processing

Serverless file processing using Lambda and S3 on AWS




Use case

In the realm of data-driven enterprises, processing and extracting insights from sales data is a pivotal challenge. Imagine a scenario where CSV files containing sales details—client IDs, products, quantities, and prices—arrive in an Amazon S3 bucket. Traditional methods might involve complex setups, but we’re about to redefine efficiency with the simplicity of serverless architecture.

The Challenge: Data Transformation Complexity:

  • Simplify the transformation of CSV data into a structured format.
  • Dynamic Data Enrichment: Automate tasks like calculating prices with taxes, determining total transaction values, and assigning unique command IDs.
  • Scalability and Cost Efficiency: Ensure scalability without the overhead of managing idle infrastructure.

The Solution:

AWS Lambda and DynamoDB take center stage. Lambda responds to CSV uploads, transforming data seamlessly, while DynamoDB becomes the canvas for storing and enriching sales information.

The Use Case:

  • CSV Upload to S3: Sales data lands in an S3 bucket, triggering serverless processing.
  • Lambda Takes the Stage: Responding to S3 events, Lambda transforms raw CSV data.
  • DynamoDB, the Canvas of Transformation: Enriched data is stored with additional computed information.
  • Scalability in Action: Lambda scales dynamically, ensuring efficiency and cost-effectiveness.

In the next sections, we delve into the technical details of implementing this serverless file processing system. From configuring Lambda functions to designing DynamoDB tables, let’s unlock the potential of AWS Lambda, DynamoDB and S3 to elevate sales data processing.

CSV processing with Python

To extract and process the CSV file in our Lambda, we are going to use Python runtime with csv and boto3 libraries.

from datetime import datetime

import boto3
import os
import botocore
import json
import csv
import uuid

s3 = boto3.client('s3')


def lambda_handler(event, context):
    local_filename = "/tmp/filename.csv"

    dynamodb = boto3.resource('dynamodb')
    item_table = dynamodb.Table(os.environ['ITEM_TABLE'])
    command_table = dynamodb.Table(os.environ['COMMAND_TABLE'])

    s3 = boto3.client('s3')
    BUCKET_NAME = event["Records"][0]["s3"]["bucket"]["name"]
    S3_KEY = event["Records"][0]["s3"]["object"]["key"]

    print(BUCKET_NAME)
    print(S3_KEY)

    try:
        s3.download_file(BUCKET_NAME, S3_KEY, local_filename)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist: s3://" + BUCKET_NAME + S3_KEY)

    with open(local_filename, mode='r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        total_price = 0
        command_id = str(uuid.uuid1())
        for row in csv_reader:
            total_price += float(row["unit_price"]) * float(row["quantity"])
            item_table.put_item(
                Item={
                    'id': str(uuid.uuid1()),
                    'command_id': command_id,
                    'item_name': row["item_name"],
                    'unit_price': row["unit_price"],
                    'quantity': row["quantity"],
                    'customer': row["customer"],
                    'salesman': row["salesman"]
                }
            )
        command_table.put_item(
            Item={
                'id': command_id,
                'date': str(datetime.now()),
                'status': "Pending"
            }
        )

    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "data processed with command_id " + command_id,
        }),
    }
Enter fullscreen mode Exit fullscreen mode

Next, build the zip package that will be uploaded for Lambda with this script:

# lambda_build_script.sh 

python3 -m pip install --platform manylinux2014_x86_64 --implementation cp --only-binary=:all: --upgrade --target venv/lib/python3.11/site-packages/ -r requirements.txt
mkdir lambda_package
cp -r venv/lib/python3.11/site-packages/* lambda_package/
cp -r src lambda_package/
cd lambda_package && zip -r ../lambda.zip * && cd ../
Enter fullscreen mode Exit fullscreen mode

Deploy with Terraform

S3 Buckets

We need two buckets here, one to collect the sales CSV files and one to store the Lambda code. Also, we define a bucket notification to trigger the Lambda function during certain event, in our case when a new file is uploaded in our S3.

# /tf/s3.tf

resource "aws_s3_bucket" "builds" {
  bucket = "${var.project_name}-lambda-builds"
}

resource "aws_s3_bucket" "bucket" {
  bucket = local.bucket_name
}

resource "aws_s3_bucket_notification" "s3-lambda-trigger" {
  bucket = aws_s3_bucket.bucket.id
  lambda_function {
    lambda_function_arn = module.lambda.lambda_function_arn
    events              = ["s3:ObjectCreated:*"]
  }
}
Enter fullscreen mode Exit fullscreen mode

IAM roles and policies

In this part, you will need to create the appropriate role to allow your Lambda function to use the S3 bucket.

# /tf/iam.tf

data "aws_iam_policy_document" "assume_role_policy_lambda" {
  statement {
    sid    = ""
    effect = "Allow"
    principals {
      identifiers = ["lambda.amazonaws.com"]
      type        = "Service"
    }
    actions = ["sts:AssumeRole"]
  }
}

data "aws_iam_policy_document" "policy_s3" {
  statement {
    sid    = "S3AccessPolicy"
    effect = "Allow"
    actions = [
      "s3:*"
    ]
    resources = [
      "arn:aws:s3:::${local.bucket_name}",
      "arn:aws:s3:::${local.bucket_name}/*"
    ]
  }

  statement {
    sid    = "DynamoDbAccessPolicy"
    effect = "Allow"
    actions = [
      "dynamodb:PutItem",
      "dynamodb:GetItem"
    ]
    resources = [
      aws_dynamodb_table.command_table.arn,
      aws_dynamodb_table.item_table.arn
    ]
  }

  statement {
    sid    = "LambdaLoggingPolicy"
    effect = "Allow"
    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents"
    ]
    resources = [
      "arn:aws:logs:*:*:*"
    ]
  }
}

resource "aws_iam_role" "iam_for_lambda" {
  name               = "${var.project_name}-lambda-role"
  assume_role_policy = data.aws_iam_policy_document.assume_role_policy_lambda.json
}

resource "aws_iam_role_policy" "lambda_policy" {
  policy = data.aws_iam_policy_document.policy_s3.json
  role   = aws_iam_role.iam_for_lambda.id
}
Enter fullscreen mode Exit fullscreen mode

Lambda function

Next, let’s define the Lambda function itself. Sending the lambda.zip as an S3 object and referencing it in the Lambda definition.

# /tf/lambda.tf

module "lambda" {
  source  = "terraform-aws-modules/lambda/aws"
  version = "5.2.0"

  function_name = "${var.project_name}-process-sales-data"
  runtime       = "python3.11"
  handler       = "src.file_processing_handler.lambda_handler"
  lambda_role   = aws_iam_role.iam_for_lambda.arn
  create_role   = false
  environment_variables = {
    "REGION"        = var.region,
    "ITEM_TABLE"    = aws_dynamodb_table.item_table.name,
    "COMMAND_TABLE" = aws_dynamodb_table.command_table.name
  }

  memory_size = 1028
  timeout     = 30

  create_package = false
  s3_existing_package = {
    bucket = aws_s3_bucket.builds.id
    key    = aws_s3_object.lambda_package.id
  }
}

resource "aws_s3_object" "lambda_package" {
  bucket = aws_s3_bucket.builds.id
  key    = "${filemd5(local.lambda_package)}.zip"
  source = local.lambda_package
}

resource "aws_lambda_permission" "s3_lambda_permission" {
  statement_id  = "AllowS3Invoke"
  action        = "lambda:InvokeFunction"
  function_name = module.lambda.lambda_function_name
  principal     = "s3.amazonaws.com"
  source_arn    = "arn:aws:s3:::${aws_s3_bucket.bucket.id}"
}
Enter fullscreen mode Exit fullscreen mode

DynamoDB tables

In our scenario, we will use two DynamoDB table. One command table and one item table.

# /tf/dynamodb.tf

resource "aws_dynamodb_table" "command_table" {
  name         = "${var.project_name}-command-table"
  hash_key     = "id"
  billing_mode = "PAY_PER_REQUEST"

  attribute {
    name = "id"
    type = "S"
  }

  deletion_protection_enabled = true
}

resource "aws_dynamodb_table" "item_table" {
  name         = "${var.project_name}-item-table"
  hash_key     = "id"
  billing_mode = "PAY_PER_REQUEST"

  attribute {
    name = "id"
    type = "S"
  }

  deletion_protection_enabled = true
}
Enter fullscreen mode Exit fullscreen mode

Let’s test it !

First you will need to build our lambda sources.

$ python3 -m venv venv
$ source ./venv/bin/activate
$ sh lambda_build_script.sh
Enter fullscreen mode Exit fullscreen mode

Now you should have a lambda.zip folder.

Make sure to apply the terraform configuration with terraform apply.

Now, we will take this CSV example referencing all the product that a client bought (example/sales_sample.csv).

And now let the magic happen ! Just drop the CSV file into the S3 bucket and take a look into your DynamoDb tables to see your processed data.

Content of the command table after lambda invocation

Content of the item table after lambda invocation


Thanks for reading ! Hope this helped you to use or understand the power of AWS services. Don’t hesitate to give me your feedback or suggestions.

💖 💪 🙅 🚩
dhoang1905
Donovan HOANG

Posted on December 15, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related