How to Ingest both real-time and batch data into AWS TimeStream using Lambda and execute query from Pycharm
Naveen B
Posted on February 15, 2024
I had a chance on working with this NoSQL timeseries database. So, thought of sharing a simple framework Ingesting data from S3 to AWS TimeStream using Lambda and execute queries remotely from your local Pycharm.
Architecture Diagram:
What’s happening on the Architecture Diagram:
This is the more elaborated version of how to ingest both real-time and batch data concurrently using a single Lambda.
At first let’s see the tech stack, it consist of:
- AWS S3
- AWS Lambda
- AWS CloudWatch
- AWS TimeStream
- Pycharm
Now, the source csv data will be unloaded into S3 Raw Source bucket. That will trigger the Splitting Lambda. We can set the trigger to a specific S3 bucket using EventBridge in the Lambda Console.
Purpose of Splitter Lambda:
Assume, when it comes to batch data the file size can vary from MB to GB. The Lambda’s total memory of 10240 MB and during the transformation and ingestion it can only safely process a file sized upto 200MB. So, this lambda will break large sized file into chunks of processable file based on user provided size.
For Example: The provided criteria size in the lambda is 200MB, if the file size exceeds the criteria then the file gets chunked and loaded into S3 Structured Source else the original file will be loaded it into S3 Structured Source Bucket with date partitioning as folders.
Below is the Python Script that splits large sized files into chunks of files based on the provided size.
import boto3
import json
import os
import csv
import io
import datetime
import re
import awswrangler as wr
import numpy as np
s3 = boto3.client('s3')
def lambda_handler(event, context):
## Attributes extraction from parameters
# Destination Bucket name
dest_bucket = os.environ['bucket_name']
## Extraction of Source Path
event_result = event['Records']
bucket_name = ''.join([sub['s3']['bucket']['name'] for sub in event_result])
object_name = ''.join([sub['s3']['object']['key'] for sub in event_result])
path = "s3://" + bucket_name + "/" + object_name
## Gets the required file
size_reader = s3.get_object(Bucket=bucket_name, Key=object_name)
## Providing the required size to chunk
chunk_size = 200 * 1024 * 1024
## Reading data from Source
df = wr.s3.read_csv(path)
## partition for upload split DataFrame to S3
today = datetime.datetime.now()
datim = today.strftime("%Y%m%dT%H%M%S")
year = today.strftime("%Y")
month = today.strftime("%m")
day = today.strftime("%d")
## Getting the size value of the file
file_size = size_reader['ContentLength']
if file_size > chunk_size:
## Logic for splitting the actual size value into chunks based on chunk_size variable
num_chunks = file_size // chunk_size + 1
## Using numpy's array_split function the dataframe is splitted into chunks of df's
chunks = np.array_split(df, num_chunks)
## The chunked datframes is uploaded to S3 under the loop
for i, chunk in enumerate(chunks):
chunk_data = chunk.to_csv(index=False)
output = "raw_source/" + year + "/" + month + "/" + day + "/" + desired_name + "/" + "part" + str(i + 1) + "-" + desired_name + ".csv"
s3.put_object(Body=chunk_data, Bucket=dest_bucket, Key=output)
else:
## When the file size is smaller than the desired value whole file is uploaded to S3
s3_c = boto3.resource('s3')
copy_source = {
'Bucket': bucket_name,
'Key': object_name
}
output = "raw_source/" + year + "/" + month + "/" + day + "/" + desired_name + "/" + desired_name + ".csv"
# print(output)
s3_c.meta.client.copy(copy_source, dest_bucket, output)
# TODO implement
return {
'statusCode': 200
}
After the chunked files (or) the small sized original file is loaded into S3 Structured Source, the Transformation Lambda will be triggered.
Purpose of Transformation Lambda:
The core purpose of this lambda is to run both real-time data and batch data consequently based on types of triggers that’s triggering the lambda.
Here, this lambda gets triggered everytime when a file gets loaded into S3 Structured Source, then it operates the developed real-time and batch logics based on the file size.
(Note: According to the scenerio I worked the real-time data was less than 1 MB and the batch data size were between 50 to 200 MB)
For Example: If the file size is less than 1 MB the real-time logic will be executed and If the file size is more than that, then the batch logic will be executed.
What Real-Time logic do?
Here, the transformations and mapping process is done with For Loop concept and appending the transformed data into a list. Then that list is ingested into AWS TimeStream using Boto3 write_record function.
response = client.write_records(DatabaseName=DB_NAME, TableName=TBL_NAME, Records=record)
What Batch Logic do?
Here, the transformation and mapping process is done with the same For Loop concept and appended into a list and gets stored into a sub folder under S3 Structured Source bucket.
Now, for ingesting bulk data into AWS TimeStream there’s an in-built function called Batch Load Task. This will assist on inserting GB’s of data into TimeStream from S3 in a single run.
For Example: Once all the batch load is completed for the day with all the transformed data are stored under a sub-folder in S3 Structured Source bucket, then a cron job under AWS CloudWatch Rule is scheduled to trigger the same lambda by the end of the day to run create_batch_load_task function which will create a Batch Load Task in AWS TimeStream that’ll ingest all the data from the pointed Source (AWS S3) to the required Destination TimeStream Table.
Below is the template for creating a Batch Load Task using Python SDK:
response = client.create_batch_load_task(TargetDatabaseName=DB_NAME, TargetTableName=TBL_NAME,
DataModelConfiguration={
'DataModel': {
"TimeColumn": "Timestamp",
"TimeUnit": "MILLISECONDS",
"DimensionMappings": [{
"SourceColumn": "Dimension_Columnname_from_S3",
"DestinationColumn": "Dimension_Columnname_to_TimeStream"
}
],
"MultiMeasureMappings": {
"MultiMeasureAttributeMappings": [
{
"SourceColumn": "MeasureValue_Column_from_S3",
"TargetMultiMeasureAttributeName": "MeasureValue_Column_to_TimeStream",
"MeasureValueType": "DOUBLE"
},
{
"SourceColumn": "source_time_from_S3",
"TargetMultiMeasureAttributeName": "source_time_to_TimeStream",
"MeasureValueType": "TIMESTAMP"
}
]
},
"MeasureNameColumn": "measure_name"
}
},
DataSourceConfiguration={
"DataSourceS3Configuration": {
"BucketName": 'bucket_name',
"ObjectKeyPrefix": output
},
"DataFormat": "CSV"
},
ReportConfiguration={
"ReportS3Configuration": {
"BucketName": 'bucket_name',
"ObjectKeyPrefix": 'sub_folder',
"EncryptionOption": 'SSE_S3'
}
}
)
print(response)
Below is the entire Python Script that can handle both Real-Time and Batch data simultaneously based on S3 and CloudWatch Rule triggers:
import boto3
import json
import awswrangler as wr
import os
import csv
import io
import random
import string
def lambda_handler(event, context):
## Capture Transformation Start time
from datetime import datetime
time_var = datetime.now()
cur = time_var.strftime("%d/%m/%Y %H:%M:%S")
print("Process started at", cur)
## Attributes extraction from parameters
# TimeStream Details
DB_NAME = os.environ['DB_NAME']
TBL_NAME = os.environ['TBL_NAME']
## Reading data from Source
client = boto3.client('timestream-write')
## Identify the lambda purpose
try:
event_trigger = event['Records'][0]['eventSource']
except:
event_trigger = event['source']
if event_trigger == 'aws:s3':
print("Entering record transformation and mapping Process")
## Extraction of Source Path
event_result = event['Records']
print(event_result)
bucket_name = ''.join([sub['s3']['bucket']['name'] for sub in event_result])
object_name = ''.join([sub['s3']['object']['key'] for sub in event_result])
path = "s3://" + bucket_name + "/" + object_name
## Gets the required file
size_reader = s3.get_object(Bucket=bucket_name, Key=object_name)
## Providing the required size to chunk
desired_size = 10 * 1024 * 1024
## Getting the size value of the file
file_size = size_reader['ContentLength']
df = wr.s3.read_csv(path)
## Records conversion from csv to dictionary
json_df = df.to_json(orient="records")
source = json.loads(json_df)
## Extraction, transformation and mapping of data from the source
import datetime
from time import time
record = []
for i in range(0, len(source)):
Product_list = {key.strip(): value for key, value in source[i].items()}
#---Time transformation from String to MilliSeconds for source data---#
time_val = Product_list['Timestamp']
timestamp_epoch = datetime.datetime.strptime(time_val, "%m/%d/%Y %H:%M:%S")
timestamp_millisecond = timestamp_epoch.timestamp() * 1000
millisecond_varchar = str(timestamp_millisecond)
valid_time = millisecond_varchar.replace(".0", "")
#---Current Time transformation from String to MilliSeconds for source data---#
milliseconds = int(time() * 1000)
# print(milliseconds)
if file_size < desired_size:
## Process to read and write the source data for near-real_time use-case.
key_list = list(Product_list.keys())
dims = key_list[1]
block = dims[:2]
measure_name = str(block)
time_column = {
'Name': 'source_time',
'Value': str(valid_time),
'Type': 'TIMESTAMP'
}
temp_val = {
'Name': 'measure_value',
'Value': str(Product_list[sens]),
'Type': 'DOUBLE'
}
dimension = [{'Name': 'DimensionName', 'Value': dims}]
overall_value = {'Time': str(milliseconds), 'Dimensions': dimension, 'MeasureName': measure_name, 'MeasureValues': [time_column, temp_val],
'MeasureValueType': 'MULTI'}
record = [overall_value]
print(record)
try:
response = client.write_records(DatabaseName=DB_NAME, TableName=TBL_NAME, Records=record)
print("WriteRecords Status: [%s]" % response['ResponseMetadata']['HTTPStatusCode'])
except client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
print("Other records were written successfully. ")
except Exception as err:
print("Error:", err)
if file_size > desired_size:
key_list = list(Product_list.keys())
dims = key_list[1]
overall_value = {'DimensionName ': dims, 'Timestamp': str(milliseconds),
'measure_value': str(Product_list[sens]), 'source_time': valid_time, 'measure_name': measure_name}
record.append(overall_value)
if file_size >desired_size:
## Curation Folder Partition Process for transformed values to be inserted, since batch_load_task only accepts files like csv, json etc...
today = datetime.datetime.now()
datim = today.strftime("%Y%m%dT%H%M%S")
year = today.strftime("%Y")
month = today.strftime("%m")
day = today.strftime("%d")
output = "curated_source/" + year + "/" + month + "/" + day + "/" + dims + "/" + dims + "-" + datim + ".csv"
stream = io.StringIO()
headers = list(record[0].keys())
writer = csv.DictWriter(stream, fieldnames=headers)
writer.writeheader()
writer.writerows(record)
csv_string_object = stream.getvalue()
s3 = boto3.resource('s3')
s3.Object('bucket_name', output).put(Body=csv_string_object)
if event_trigger == 'aws.events':
print("Entering record uploading to TimeStream Process")
# Process to read and write the source data using batch_load_task function for batch load use-case.
today = datetime.datetime.now()
datim = today.strftime("%Y%m%dT%H%M%S")
year = today.strftime("%Y")
month = today.strftime("%m")
day = today.strftime("%d")
output = "curated_source/" + year + "/" + month + "/" + day + "/"
response = client.create_batch_load_task(TargetDatabaseName=DB_NAME, TargetTableName=TBL_NAME,
DataModelConfiguration={
'DataModel': {
"TimeColumn": "Timestamp",
"TimeUnit": "MILLISECONDS",
"DimensionMappings": [{
"SourceColumn": "Dimension_Columnname_from_S3",
"DestinationColumn": "Dimension_Columnname_to_TimeStream"
}
],
"MultiMeasureMappings": {
"MultiMeasureAttributeMappings": [
{
"SourceColumn": "MeasureValue_Column_from_S3",
"TargetMultiMeasureAttributeName": "MeasureValue_Column_to_TimeStream",
"MeasureValueType": "DOUBLE"
},
{
"SourceColumn": "source_time_from_S3",
"TargetMultiMeasureAttributeName": "source_time_to_TimeStream",
"MeasureValueType": "TIMESTAMP"
}
]
},
"MeasureNameColumn": "measure_name"
}
},
DataSourceConfiguration={
"DataSourceS3Configuration": {
"BucketName": 'bucket_name',
"ObjectKeyPrefix": output
},
"DataFormat": "CSV"
},
ReportConfiguration={
"ReportS3Configuration": {
"BucketName": 'bucket_name',
"ObjectKeyPrefix": 'sub_folder',
"EncryptionOption": 'SSE_S3'
}
}
)
print(response)
## Capture Transformation End time
from datetime import datetime
var_time = datetime.now()
cur_end = var_time.strftime("%d/%m/%Y %H:%M:%S")
print("Process ended at", cur_end)
# TODO implement
return {
'statusCode': 200,
# 'body': json.dumps('Hello from Lambda!')
}
Interacting with AWS TimeStream from Pycharm:
To run queries on TimeStream table from Pycharm, first the AWS environment has to be integrated with Pycharm. Get the AWS Access_Key_ID, Secret_Access_Key and if it’s an organisation account get the Session_Token as well.
Download AWS Tool Kit in Pycharm and install AWS Configure in the local, then open Terminal in Mac (or) open cmd in Windows machine.
#Use the Following Command to configure the AWS Credentials in it.
#In Mac:
naveenchandarb@cmd ~ % vi ~/.aws/credentials
[default]
Access_Key_ID = ************************
Secret_Access_Key = ************************
Session_Token = ************************
#In Windows:
naveenchandarb@cmd ~ % aws configure
Access_Key_ID = ************************
Secret_Access_Key = ************************
Session_Token = ************************
region = ************************
After configuring AWS Credentials on the local, goto Pycharm create a python file and use the below script to interact with AWS TimeStream.
import boto3
timestream_client = boto3.client('timestream-query')
paginator = self.timestream_client.get_paginator('query')
def execute_query(query):
response = paginator.paginate(QueryString=query)
# print(response)
column_names = [column['Name'] for column in response['ColumnInfo']]
query_id = response['QueryId']
next_token = None
rows = []
while True:
if 'Rows' in response:
rows.extend(response['Rows'])
if 'NextToken' in response:
next_token = response['NextToken']
response = timestream_client.query(QueryString=query_id, NextToken=next_token)
else:
break
return rows, column_names
def parse_rows(rows):
parsed_rows = []
for row in rows:
parsed_row = []
for data in row['Data']:
parsed_row.append(data.get('ScalarValue'))
parsed_rows.append(parsed_row)
return parsed_rows
def run_query(query):
rows, column_names = execute_query(query)
parsed_rows = parse_rows(rows)
return parsed_rows, column_names
query = """
unload(
SELECT Dimension_Name,
AVG(measure_value) AS avg_value,
bin(time, 15m) as avg_time
FROM "timestream_db"."timestream_table"
WHERE
time >= '2018-01-01 00:00:00' AND time <= '2018-01-10 00:00:00'
and
measure_name = 'Medium'
group by
Dimension_Name, bin(time, 15m))
to S3://bucket_name/sub_folders/ with (format = 'CSV', compression = 'NONE')
"""
result, column_names = run_query(query)
table = [column_names] + result
# Print the result
for row in table:
print(row)
I hope this article will help on performing TimeStream operations using Lambda without any complex coding to achieve remote query executions and ingestion of both real-time and batch load data using Python SDK.
Posted on February 15, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.