s3-serverless-file-processing
Serverless file processing using Lambda and S3 on AWS
Posted on December 15, 2023
Unlock the potential of serverless architecture with AWS Lambda and DynamoDB to revolutionize your sales data processing:
When a large volume of files needs to be processed, the costs can skyrocket as well as the capacity of handling this volume. Many options will not be suitable in this case as a server-centric architecture or manual scalability. Imagine being able to automate this process seamlessly, without the need for complex infrastructure. That’s the power of building a serverless file processing system using AWS Lambda and Amazon S3 Events.
Enter AWS Lambda and Amazon S3 Events — powerful tools that, when combined, offer a serverless solution for file processing. In this article, we’ll explore the step-by-step process of setting up a serverless file processing system using AWS Lambda and S3 Events. I will use Terraform as IaC tool.
You can find all the sources of this project on Github.
Serverless file processing using Lambda and S3 on AWS
In the realm of data-driven enterprises, processing and extracting insights from sales data is a pivotal challenge. Imagine a scenario where CSV files containing sales details—client IDs, products, quantities, and prices—arrive in an Amazon S3 bucket. Traditional methods might involve complex setups, but we’re about to redefine efficiency with the simplicity of serverless architecture.
AWS Lambda and DynamoDB take center stage. Lambda responds to CSV uploads, transforming data seamlessly, while DynamoDB becomes the canvas for storing and enriching sales information.
In the next sections, we delve into the technical details of implementing this serverless file processing system. From configuring Lambda functions to designing DynamoDB tables, let’s unlock the potential of AWS Lambda, DynamoDB and S3 to elevate sales data processing.
To extract and process the CSV file in our Lambda, we are going to use Python runtime with csv and boto3 libraries.
from datetime import datetime
import boto3
import os
import botocore
import json
import csv
import uuid
s3 = boto3.client('s3')
def lambda_handler(event, context):
local_filename = "/tmp/filename.csv"
dynamodb = boto3.resource('dynamodb')
item_table = dynamodb.Table(os.environ['ITEM_TABLE'])
command_table = dynamodb.Table(os.environ['COMMAND_TABLE'])
s3 = boto3.client('s3')
BUCKET_NAME = event["Records"][0]["s3"]["bucket"]["name"]
S3_KEY = event["Records"][0]["s3"]["object"]["key"]
print(BUCKET_NAME)
print(S3_KEY)
try:
s3.download_file(BUCKET_NAME, S3_KEY, local_filename)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist: s3://" + BUCKET_NAME + S3_KEY)
with open(local_filename, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
total_price = 0
command_id = str(uuid.uuid1())
for row in csv_reader:
total_price += float(row["unit_price"]) * float(row["quantity"])
item_table.put_item(
Item={
'id': str(uuid.uuid1()),
'command_id': command_id,
'item_name': row["item_name"],
'unit_price': row["unit_price"],
'quantity': row["quantity"],
'customer': row["customer"],
'salesman': row["salesman"]
}
)
command_table.put_item(
Item={
'id': command_id,
'date': str(datetime.now()),
'status': "Pending"
}
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "data processed with command_id " + command_id,
}),
}
Next, build the zip package that will be uploaded for Lambda with this script:
# lambda_build_script.sh
python3 -m pip install --platform manylinux2014_x86_64 --implementation cp --only-binary=:all: --upgrade --target venv/lib/python3.11/site-packages/ -r requirements.txt
mkdir lambda_package
cp -r venv/lib/python3.11/site-packages/* lambda_package/
cp -r src lambda_package/
cd lambda_package && zip -r ../lambda.zip * && cd ../
We need two buckets here, one to collect the sales CSV files and one to store the Lambda code. Also, we define a bucket notification to trigger the Lambda function during certain event, in our case when a new file is uploaded in our S3.
# /tf/s3.tf
resource "aws_s3_bucket" "builds" {
bucket = "${var.project_name}-lambda-builds"
}
resource "aws_s3_bucket" "bucket" {
bucket = local.bucket_name
}
resource "aws_s3_bucket_notification" "s3-lambda-trigger" {
bucket = aws_s3_bucket.bucket.id
lambda_function {
lambda_function_arn = module.lambda.lambda_function_arn
events = ["s3:ObjectCreated:*"]
}
}
In this part, you will need to create the appropriate role to allow your Lambda function to use the S3 bucket.
# /tf/iam.tf
data "aws_iam_policy_document" "assume_role_policy_lambda" {
statement {
sid = ""
effect = "Allow"
principals {
identifiers = ["lambda.amazonaws.com"]
type = "Service"
}
actions = ["sts:AssumeRole"]
}
}
data "aws_iam_policy_document" "policy_s3" {
statement {
sid = "S3AccessPolicy"
effect = "Allow"
actions = [
"s3:*"
]
resources = [
"arn:aws:s3:::${local.bucket_name}",
"arn:aws:s3:::${local.bucket_name}/*"
]
}
statement {
sid = "DynamoDbAccessPolicy"
effect = "Allow"
actions = [
"dynamodb:PutItem",
"dynamodb:GetItem"
]
resources = [
aws_dynamodb_table.command_table.arn,
aws_dynamodb_table.item_table.arn
]
}
statement {
sid = "LambdaLoggingPolicy"
effect = "Allow"
actions = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
resources = [
"arn:aws:logs:*:*:*"
]
}
}
resource "aws_iam_role" "iam_for_lambda" {
name = "${var.project_name}-lambda-role"
assume_role_policy = data.aws_iam_policy_document.assume_role_policy_lambda.json
}
resource "aws_iam_role_policy" "lambda_policy" {
policy = data.aws_iam_policy_document.policy_s3.json
role = aws_iam_role.iam_for_lambda.id
}
Next, let’s define the Lambda function itself. Sending the lambda.zip as an S3 object and referencing it in the Lambda definition.
# /tf/lambda.tf
module "lambda" {
source = "terraform-aws-modules/lambda/aws"
version = "5.2.0"
function_name = "${var.project_name}-process-sales-data"
runtime = "python3.11"
handler = "src.file_processing_handler.lambda_handler"
lambda_role = aws_iam_role.iam_for_lambda.arn
create_role = false
environment_variables = {
"REGION" = var.region,
"ITEM_TABLE" = aws_dynamodb_table.item_table.name,
"COMMAND_TABLE" = aws_dynamodb_table.command_table.name
}
memory_size = 1028
timeout = 30
create_package = false
s3_existing_package = {
bucket = aws_s3_bucket.builds.id
key = aws_s3_object.lambda_package.id
}
}
resource "aws_s3_object" "lambda_package" {
bucket = aws_s3_bucket.builds.id
key = "${filemd5(local.lambda_package)}.zip"
source = local.lambda_package
}
resource "aws_lambda_permission" "s3_lambda_permission" {
statement_id = "AllowS3Invoke"
action = "lambda:InvokeFunction"
function_name = module.lambda.lambda_function_name
principal = "s3.amazonaws.com"
source_arn = "arn:aws:s3:::${aws_s3_bucket.bucket.id}"
}
In our scenario, we will use two DynamoDB table. One command table and one item table.
# /tf/dynamodb.tf
resource "aws_dynamodb_table" "command_table" {
name = "${var.project_name}-command-table"
hash_key = "id"
billing_mode = "PAY_PER_REQUEST"
attribute {
name = "id"
type = "S"
}
deletion_protection_enabled = true
}
resource "aws_dynamodb_table" "item_table" {
name = "${var.project_name}-item-table"
hash_key = "id"
billing_mode = "PAY_PER_REQUEST"
attribute {
name = "id"
type = "S"
}
deletion_protection_enabled = true
}
First you will need to build our lambda sources.
$ python3 -m venv venv
$ source ./venv/bin/activate
$ sh lambda_build_script.sh
Now you should have a lambda.zip folder.
Make sure to apply the terraform configuration with terraform apply.
Now, we will take this CSV example referencing all the product that a client bought (example/sales_sample.csv
).
And now let the magic happen ! Just drop the CSV file into the S3 bucket and take a look into your DynamoDb tables to see your processed data.
Thanks for reading ! Hope this helped you to use or understand the power of AWS services. Don’t hesitate to give me your feedback or suggestions.
Posted on December 15, 2023
Sign up to receive the latest update from our blog.