Creating AWS Timestream Database Using A Lambda Function
Randika Madhushan Perera
Posted on January 14, 2024
Introduction
AWS Timestream is a fast, scalable, and serverless time series database service for IoT and operational applications. Lambda is a computing service that lets you run code without provisioning or managing servers. This guide will walk you through creating a Timestream database and inserting the data from the s3 bucket's Excel file using a Lambda function.
1. Create an initial lambda function
Follow Developing AWS Lambda Functions In Locally to create the initial lambda function.
2. Create additional scripts
I have created a Python script to create a S3 bucket and upload the Excel file into it.
create_and_upload.py
import boto3
import botocore
import os
def create_s3_bucket(bucket_name, region=None):
s3_client = boto3.client('s3', region_name=region)
try:
if region is None:
s3_client.create_bucket(Bucket=bucket_name)
else:
location = {'LocationConstraint': region}
s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
except botocore.exceptions.ClientError as e:
print(f"Error occurred while creating bucket: {e}")
return False
return True
def upload_file_to_s3(file_name, bucket_name, object_name=None):
# Check if the file exists
if not os.path.exists(file_name):
print(f"Error: The file {file_name} does not exist.")
return False
if object_name is None:
object_name = os.path.basename(file_name) # Extracts just the file name from the full file path
s3_client = boto3.client('s3')
try:
s3_client.upload_file(file_name, bucket_name, object_name)
print(f"File '{file_name}' has been uploaded to bucket '{bucket_name}' as '{object_name}'")
return True
except Exception as e:
print(f"Error occurred while uploading file: {str(e)}")
return False
def main():
bucket_name = 's3-bucket-name' # Replace with your unique bucket name
region = 'region-name' # Replace with your desired region
excel_file_path = r'excel-file-path.xlsx' # Replace with your local Excel file path
# Create S3 bucket
if create_s3_bucket(bucket_name, region):
print(f"Bucket '{bucket_name}' created successfully.")
# Upload file to S3
if upload_file_to_s3(excel_file_path, bucket_name):
print(f"File '{excel_file_path}' uploaded successfully to '{bucket_name}'.")
if __name__ == '__main__':
main()
3. Write the lambda function
3.1 File Structure
CSVTimestream
|
|--events
| |---event.json
|
|--timestream
| |---app.py
|
|---samconfig.toml
|---template.yaml
My template.yaml file will be the as follows.
3.2 Codes
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
CSVTimestream
Sample SAM Template for CSVTimestream
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 300
MemorySize: 128
Resources:
TimestreamLambdaFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: timestream/
Handler: app.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Role: !GetAtt LambdaExecutionRole.Arn
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: [ lambda.amazonaws.com ]
Action: [ 'sts:AssumeRole' ]
Policies:
- PolicyName: TimestreamAccessPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [ 'timestream:*' ]
Resource: '*'
- PolicyName: S3BucketAccessPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [ 's3:GetObject' ]
Resource: '*'
My lambda function will be the as follows.
app.py
import boto3
import pandas as pd
from botocore.exceptions import ClientError
from io import BytesIO
# Initialize clients
s3_client = boto3.client('s3')
timestream_write = boto3.client('timestream-write')
# Constants
database_name = 'timestream-db-name'
BUCKET_NAME = 's3-bucket-name'
OBJECT_KEY = 'excel-file-name.xlsx'
def create_database(database_name):
try:
timestream_write.create_database(DatabaseName=database_name)
print(f"Database {database_name} created.")
except ClientError as e:
print(f"Database creation failed: {e}")
def create_table(table_name):
try:
retention_properties = {
'MemoryStoreRetentionPeriodInHours': 24,
'MagneticStoreRetentionPeriodInDays': 7
}
timestream_write.create_table(DatabaseName=database_name, TableName=table_name,
RetentionProperties=retention_properties)
print(f"Table {table_name} created.")
except ClientError as e:
print(f"Table creation failed: {e}")
def get_excel_file(bucket, key):
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=bucket, Key=key)
return BytesIO(response['Body'].read())
def process_excel_file(excel_file):
# Read the Excel file
xls = pd.ExcelFile(excel_file)
# Process each sheet in the Excel file
for sheet_name in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet_name)
# Create a table for each sheet
create_table(sheet_name)
# Write records to Timestream
write_records(df, sheet_name)
def write_records(df, table_name):
records = []
version = 1 # Start with a base version number
for index, row in df.iterrows():
time_str = row['time'].replace('"', '')
time_dt = pd.to_datetime(time_str)
timestamp_ms = int(time_dt.timestamp() * 1000)
# measure_value = row['measure_value::double']
# Build the list of dimensions.
dimensions = [
{'Name': 'col_1_name', 'Value': str(row['col_1_name'])},
{'Name': 'col_2_name', 'Value': str(row['col_2_name'])}
.
.
.#continue this based on your Excel file columns
]
# Include additional dimensions based on the sheet structure.
if 'addi_col' in df.columns:
dimensions.append({'Name': 'addi_col', 'Value': str(row['addi_col'])})
record = {
'Dimensions': dimensions,
'MeasureName': row['col_name'],
'MeasureValue': str(row['col_name::double']), # i have added this based on my Excel file
'MeasureValueType': 'DOUBLE',
'Time': str(timestamp_ms),
'Version': version # Adding a version number
}
records.append(record)
version += 1 # Increment the version for each record
try:
result = timestream_write.write_records(DatabaseName=database_name, TableName=table_name,
Records=records, CommonAttributes={})
print(
f"Records written to table {table_name} successfully with status: {result['ResponseMetadata']['HTTPStatusCode']}")
except timestream_write.exceptions.RejectedRecordsException as e:
print("Error writing records:", e)
for rejected_record in e.response['RejectedRecords']:
print("Rejected Record:", rejected_record)
except ClientError as e:
print(f"Error writing records: {e}")
def lambda_handler(event, context):
# Create the Timestream database
create_database(database_name)
# Get the Excel file from S3
excel_file = get_excel_file(BUCKET_NAME, OBJECT_KEY)
# Process the Excel file
process_excel_file(excel_file)
return {
'statusCode': 200,
'body': "Data loaded to Timestream successfully."
}
samconfig.toml will be as follows.
samconfig.toml
# More information about the configuration file can be found here:
# https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html
version = 0.1
[default]
[default.global.parameters]
stack_name = "CSVTimestream"
[default.build.parameters]
cached = true
parallel = true
[default.validate.parameters]
lint = true
[default.deploy.parameters]
capabilities = "CAPABILITY_IAM"
confirm_changeset = true
resolve_s3 = true
s3_prefix = "CSVTimestream"
region = "aws-region"
image_repositories = []
[default.package.parameters]
resolve_s3 = true
[default.sync.parameters]
watch = true
[default.local_start_api.parameters]
warm_containers = "EAGER"
[default.local_start_lambda.parameters]
warm_containers = "EAGER"
4. Finally
Deploy the lambda function and test it using the local invoke
command. You'll see the Timestream DB has been created and its tables with data.
Posted on January 14, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.