How to Send a CSV File from S3 into Redshift with an AWS Lambda Function
Alexander Bolaño
Posted on March 26, 2022
Introduction
Nowadays is a must to automate everything and cloud jobs are not the exceptions, as Data Engineer We need to acquire the skill of move data wherever needed, if you want to know how to start facing AWS tools in your daily routine like a data professional, this post is for you.
Step By Step
After collecting data, the next step is to design an ETL in order to extract, transform and load your data before you want to move it into an analytics platform like Amazon Redshift but in this case, only We going to move data from S3 into a Redshift Cluster using for AWS free tier.
To do that, I’ve tried to approach the study case as follows :
- Create an S3 bucket.
- Create a Redshift cluster.
- Connect to Redshift from DBeaver or whatever you want.
- Create a table in your database.
- Create a virtual environment in Python with dependencies needed.
- Create your Lambda Function.
- Someone uploads data to S3.
- Query your data.
¡¡Let’s get started !!
Later you have finished step 1 and 2 let’s to connect to our database with the help of SQL client DBeaver or whatever you want, for this We need to remember the following data from Redshift Cluster configuration:
HOST = "xyz.redshift.amazonaws.com"
PORT = "5439"
DATABASE = "mydatabase"
USERNAME = "myadmin"
PASSWORD = "XYZ"
TABLE = "mytable"
Now when We connect to our database let’s create a new table
CREATE TABLE mytable (
id INT4 distkey sortkey,
col 1 VARCHAR (30) NOT NULL,
col 2 VARCHAR(100) NOT NULL,
col 3 VARCHAR(100) NOT NULL,
col 4 INTEGER NOT NULL,
col 5 INTEGER NOT NULL,
col 6 INTEGER NOT NULL);
For this tutorial, our Lambda function will need some Python libraries like Sqalchemy, Psycopg2, So you need to create a virtual environment in Python with these dependencies as well as Lambda Script before compressing the .zip file that you’ll upload into AWS.
At this point all you need to configure your Lambda Function into AWS is a Python Script and trigger your Lambda each time someone uploads a new object to an S3 bucket, you need to configure the following resources:
- Upload your lambda_function.zip (Python script and dependencies or yo can add aws custom layer) and use the code example from bellow to send data into redshift
lambda_function.py
. - Attach an IAM role to the Lambda function, which grants access to
AWSLambdaVPCAccesExcecutionRole
- For this case, you need to add VPC default in the Lambda function or any other you have.
- Add environment variables “CON” and “Table”
CON = "postgresql://USERNAME:PASSWORD@clustername.xyz.redshift.amazonaws.com:5439/DATABASE"
Table = "mytable"
- Create an S3 Event Notification that invokes the Lambda function each time someone uploads an object to your S3 bucket.
- You can configure a timeout ≥ 3 min.
Let's go to the code Here 👇
import sqlalchemy
import psycopg2
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from datetime import datetime,timedelta
import os
def handler(event, context):
for record in event['Records']:
S3_BUCKET = record['s3']['bucket']['name']
S3_OBJECT = record['s3']['object']['key']
# Arguments
DBC= os.environ["CON"]
RS_TABLE = os.environ["Table"]
RS_PORT = "5439"
DELIMITER = "','"
REGION = "'us-east-1' "
# Connection
engine = create_engine(DBC)
db = scoped_session(sessionmaker(bind=engine))
# Send files from S3 into redshift
copy_query = "COPY "+RS_TABLE+" from 's3://"+ S3_BUCKET+'/'+S3_OBJECT+"' iam_role 'arn:aws:iam::11111111111:role/youroleredshift' delimiter "+DELIMITER+" IGNOREHEADER 1 REGION " + REGION
# Execute querie
db.execute(copy_query)
db.commit()
db.close()
Before you’re ready to upload a CSV file to your S3 bucket, keep in mind you’ve created a table first, so after you’ve implemented your lambda function and configured it correctly, you can upload data to S3 and go to DBeaver to query data in your table.
Summary
AWS Lambda is an easy way to automate your process but We need to understand which moment can’t use it, for example, AWS Lambda has a 6MB payload limit, so it is not practical to migrate very large tables this way.
On the other hand, the main advantage to use this service is that is a whole solution Serverless!! , So No need to manage any EC2 instances.
Thank you for reading this far. If you find this article useful, like and share this article. Someone could find it useful too and why not invite me for a coffee.
Follow me 👉 LinkedIn
Follow me 👉 Twitter
Contact: alexbonella2806@gmail.com
Posted on March 26, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.