Creating AWS Timestream Database Using A Lambda Function

randiakm

Randika Madhushan Perera

Posted on January 14, 2024

Creating AWS Timestream Database Using A Lambda Function

Introduction

AWS Timestream is a fast, scalable, and serverless time series database service for IoT and operational applications. Lambda is a computing service that lets you run code without provisioning or managing servers. This guide will walk you through creating a Timestream database and inserting the data from the s3 bucket's Excel file using a Lambda function.

diagram

1. Create an initial lambda function

Follow Developing AWS Lambda Functions In Locally to create the initial lambda function.

2. Create additional scripts

I have created a Python script to create a S3 bucket and upload the Excel file into it.

create_and_upload.py

import boto3
import botocore
import os


def create_s3_bucket(bucket_name, region=None):
    s3_client = boto3.client('s3', region_name=region)
    try:
        if region is None:
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
    except botocore.exceptions.ClientError as e:
        print(f"Error occurred while creating bucket: {e}")
        return False
    return True


def upload_file_to_s3(file_name, bucket_name, object_name=None):
    # Check if the file exists
    if not os.path.exists(file_name):
        print(f"Error: The file {file_name} does not exist.")
        return False

    if object_name is None:
        object_name = os.path.basename(file_name)  # Extracts just the file name from the full file path

    s3_client = boto3.client('s3')
    try:
        s3_client.upload_file(file_name, bucket_name, object_name)
        print(f"File '{file_name}' has been uploaded to bucket '{bucket_name}' as '{object_name}'")
        return True
    except Exception as e:
        print(f"Error occurred while uploading file: {str(e)}")
        return False


def main():
    bucket_name = 's3-bucket-name'  # Replace with your unique bucket name
    region = 'region-name'  # Replace with your desired region
    excel_file_path = r'excel-file-path.xlsx'  # Replace with your local Excel file path

    # Create S3 bucket
    if create_s3_bucket(bucket_name, region):
        print(f"Bucket '{bucket_name}' created successfully.")

    # Upload file to S3
    if upload_file_to_s3(excel_file_path, bucket_name):
        print(f"File '{excel_file_path}' uploaded successfully to '{bucket_name}'.")


if __name__ == '__main__':
    main()

Enter fullscreen mode Exit fullscreen mode

3. Write the lambda function

3.1 File Structure

CSVTimestream
      |
      |--events
      |     |---event.json
      |
      |--timestream
      |     |---app.py
      |
      |---samconfig.toml
      |---template.yaml
Enter fullscreen mode Exit fullscreen mode

My template.yaml file will be the as follows.

3.2 Codes

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  CSVTimestream

  Sample SAM Template for CSVTimestream

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 300
    MemorySize: 128

Resources:
  TimestreamLambdaFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: timestream/
      Handler: app.lambda_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Role: !GetAtt LambdaExecutionRole.Arn

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: [ lambda.amazonaws.com ]
            Action: [ 'sts:AssumeRole' ]
      Policies:
        - PolicyName: TimestreamAccessPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: [ 'timestream:*' ]
                Resource: '*'
        - PolicyName: S3BucketAccessPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: [ 's3:GetObject' ]
                Resource: '*'
Enter fullscreen mode Exit fullscreen mode

My lambda function will be the as follows.

app.py

import boto3
import pandas as pd
from botocore.exceptions import ClientError
from io import BytesIO

# Initialize clients
s3_client = boto3.client('s3')
timestream_write = boto3.client('timestream-write')

# Constants
database_name = 'timestream-db-name'
BUCKET_NAME = 's3-bucket-name'
OBJECT_KEY = 'excel-file-name.xlsx'


def create_database(database_name):
    try:
        timestream_write.create_database(DatabaseName=database_name)
        print(f"Database {database_name} created.")
    except ClientError as e:
        print(f"Database creation failed: {e}")


def create_table(table_name):
    try:
        retention_properties = {
            'MemoryStoreRetentionPeriodInHours': 24,
            'MagneticStoreRetentionPeriodInDays': 7
        }
        timestream_write.create_table(DatabaseName=database_name, TableName=table_name,
                            RetentionProperties=retention_properties)
        print(f"Table {table_name} created.")
    except ClientError as e:
        print(f"Table creation failed: {e}")


def get_excel_file(bucket, key):
    s3_client = boto3.client('s3')
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return BytesIO(response['Body'].read())


def process_excel_file(excel_file):
    # Read the Excel file
    xls = pd.ExcelFile(excel_file)

    # Process each sheet in the Excel file
    for sheet_name in xls.sheet_names:
        df = pd.read_excel(xls, sheet_name=sheet_name)

        # Create a table for each sheet
        create_table(sheet_name)

        # Write records to Timestream
        write_records(df, sheet_name)


def write_records(df, table_name):
    records = []
    version = 1  # Start with a base version number

    for index, row in df.iterrows():
        time_str = row['time'].replace('"', '')
        time_dt = pd.to_datetime(time_str)
        timestamp_ms = int(time_dt.timestamp() * 1000)
        # measure_value = row['measure_value::double']

        # Build the list of dimensions.
        dimensions = [
            {'Name': 'col_1_name', 'Value': str(row['col_1_name'])},
            {'Name': 'col_2_name', 'Value': str(row['col_2_name'])}
.
.
.#continue this based on your Excel file columns
        ]

        # Include additional dimensions based on the sheet structure.
        if 'addi_col' in df.columns:
            dimensions.append({'Name': 'addi_col', 'Value': str(row['addi_col'])})


        record = {
            'Dimensions': dimensions,
            'MeasureName': row['col_name'],
            'MeasureValue': str(row['col_name::double']), # i have added this based on my Excel file
            'MeasureValueType': 'DOUBLE',
            'Time': str(timestamp_ms),
            'Version': version  # Adding a version number
        }
        records.append(record)
        version += 1  # Increment the version for each record

    try:
        result = timestream_write.write_records(DatabaseName=database_name, TableName=table_name,
                                      Records=records, CommonAttributes={})
        print(
            f"Records written to table {table_name} successfully with status: {result['ResponseMetadata']['HTTPStatusCode']}")
    except timestream_write.exceptions.RejectedRecordsException as e:
        print("Error writing records:", e)
        for rejected_record in e.response['RejectedRecords']:
            print("Rejected Record:", rejected_record)
    except ClientError as e:
        print(f"Error writing records: {e}")


def lambda_handler(event, context):
    # Create the Timestream database
    create_database(database_name)

    # Get the Excel file from S3
    excel_file = get_excel_file(BUCKET_NAME, OBJECT_KEY)

    # Process the Excel file
    process_excel_file(excel_file)

    return {
        'statusCode': 200,
        'body': "Data loaded to Timestream successfully."
    }

Enter fullscreen mode Exit fullscreen mode

samconfig.toml will be as follows.

samconfig.toml

# More information about the configuration file can be found here:
# https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html
version = 0.1

[default]
[default.global.parameters]
stack_name = "CSVTimestream"

[default.build.parameters]
cached = true
parallel = true

[default.validate.parameters]
lint = true

[default.deploy.parameters]
capabilities = "CAPABILITY_IAM"
confirm_changeset = true
resolve_s3 = true
s3_prefix = "CSVTimestream"
region = "aws-region"
image_repositories = []

[default.package.parameters]
resolve_s3 = true

[default.sync.parameters]
watch = true

[default.local_start_api.parameters]
warm_containers = "EAGER"

[default.local_start_lambda.parameters]
warm_containers = "EAGER"

Enter fullscreen mode Exit fullscreen mode

4. Finally

Deploy the lambda function and test it using the local invoke command. You'll see the Timestream DB has been created and its tables with data.

💖 💪 🙅 🚩
randiakm
Randika Madhushan Perera

Posted on January 14, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related