Lambda Router for Data Pipeline
Mohamed Radwan
Posted on July 1, 2022
In this article, I am going to show you how to use Lambda as a router in your data analytics projects.
Which service you will use depends on the project, for example, some projects have small files size and the processing job takes around 10 minutes maybe lambda is a good use, other projects have big files and the processing times takes more than two hours maybe Fargate is a good choice.
The data flow in the architecture diagram shows that users upload files to the s3 bucket then the Lambda router will be invoked, Lambda will check the event key, and then query the path from the DynamoDB table.
DynamoDB responses with trigger,arn, and other attributes, Lambda router checks the trigger if true and then checks which arn needs to pass the event to which service.
Lambda router can pass the event to the following services:
- Fargate
- SQS
- SNS
- Lambda
The benefits of using this architecture:
- Based on Serverless means never pay for idler resources, only pay per event and storge.
- Scale with resilience and flexibility, for example: the processing of the files can be in parallel, if you used the fargate service for each file that has been uploaded then if users upload 10 files at the same time you will start 10 tasks in parallel and get the results once each job is done and pay the same amount if you doing it in series.
Lambda router looks like this:
import boto3
import json
from boto3.dynamodb.conditions import Key
region_name='eu-west-1'
table_name='lambda-router'
def get_path(path):
query_table = boto3.resource("dynamodb", region_name= region_name, verify=True).Table(table_name)
return query_table.query(
KeyConditionExpression=Key("path").eq(path)
)
def lambda_handler(event, context):
key = event['Records'][0]['s3']['object']['key']
directory=key.split('/')[:2]
path ='/'.join(directory)+'/'
get_data = get_path(path)
data = get_data['Items'][0]
data['object'] = event['Records'][0]['s3']['object']
arn = data['arn']
# check the trigger
if data['trigger'] != 'true':
return {
'statusCode': 200,
'body': json.dumps('Trigger is False')
}
# trigger lambda
if arn.split(':')[2] == 'lambda':
return(trigger_lambda(arn,data))
# publish to sns topic
if arn.split(':')[2] == 'sns':
return(publish_sns(arn,data))
# run ecs fargate task
if arn.split(':')[2] == 'ecs':
task=arn.split(':')[-2]
task_revision=arn.split(':')[-1]
task_definition= task.split('/')[-1] + ':' + task_revision
return(run_ecs(data, data['cluster_name'], task_definition, int(data['task_count']), data['subnets'], data['securitygroups']))
# send to sqs
if arn.split(':')[2] == 'sqs':
return(message_sqs(arn.split(':')[-1],data))
def trigger_lambda(arn,data):
lambda_client = boto3.client('lambda')
response=lambda_client.invoke(
FunctionName=arn,
InvocationType='Event',
Payload=json.dumps(data),
)
return json.loads(json.dumps(response, default=str))
def publish_sns(arn,data):
sns_client = boto3.client('sns')
response = sns_client.publish(
TargetArn=arn,
Message=json.dumps({'default': json.dumps(data)}),
MessageStructure='json'
)
return response
def run_ecs(event, cluster_name, task, count, subnets, securitygroups):
ecs = boto3.client('ecs')
response = ecs.run_task(
cluster=cluster_name,
launchType = 'FARGATE',
taskDefinition=task,
overrides={
'containerOverrides': [
{
'name': task.split(':')[0],
'environment': [
{
'name': 'path',
'value': event['path']
},
{
'name': 'trigger',
'value': str(event['trigger'])
},
{
'name': 'project-name',
'value': event['project-name']
},
{
'name': 'arn',
'value': event['arn']
},
{
'name': 'key',
'value': event['object']['key']
},
{
'name': 'etag',
'value': event['object']['eTag']
}
]
},
],
},
count = count,
platformVersion='LATEST',
networkConfiguration={
'awsvpcConfiguration': {
'subnets': subnets
,
'securityGroups':
securitygroups
,
'assignPublicIp': 'DISABLED'
}
})
return response
def message_sqs(queue, event):
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=queue)
response = queue.send_message(MessageBody=str(event), MessageAttributes={
'path': {
'DataType': 'String',
'StringValue': event['path']
},
'key': {
'DataType': 'String',
'StringValue': event['object']['key']
},
'size': {
'DataType': 'String',
'StringValue': str(event['object']['size'])
},
'eTag': {
'DataType': 'String',
'StringValue': event['object']['eTag']
}
})
return response
Data in DynamoDB
Run ECS task:
{
"path": "data/project1/",
"arn": "arn:aws:ecs:eu-west-1:123456789:task-definition/gozeit:2",
"cluster_name": "ecs-cluster",
"project-name": "project1",
"securitygroups": [
"sg-04d32af1102317415"
],
"subnets": [
"subnet-68ac7a21",
"subnet-d73ef6b0",
"subnet-a74a5dff"
],
"task_count": "1",
"trigger": "true"
}
Publish to SNS:
{
"path": "data/project2/",
"arn": "arn:aws:sns:eu-west-1:123456789:secprod",
"project-name": "project2",
"trigger": "true"
}
Trigger Lambda:
{
"path": "data/project3/",
"arn": "arn:aws:lambda:eu-west-1:123456789:function:lambda-dummy",
"project-name": "project3",
"trigger": "false"
}
Trigger SQS:
{
"path": "data/project4/",
"arn": "arn:aws:sqs:eu-west-1:123456789:workers",
"project-name": "project4",
"trigger": "true"
}
Posted on July 1, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.