AWS Step Functions workflow for an ETL Job on COVID-19 and deploying it with Terraform (#CloudGuruChallenge Series) (Part 2/3)

thakurrishabh

Thakur Rishabh Singh

Posted on April 26, 2021

AWS Step Functions workflow for an ETL Job on COVID-19 and deploying it with Terraform (#CloudGuruChallenge Series) (Part 2/3)

This Post is about creating an AWS Step Functions Workflow on AWS to process an ETL Job on daily COVID-19 count and deploying the infrastructure with Terraform

Contents

  1. Project Overview
  2. Architecture
  3. Step Functions Workflow
    • What is AWS Step Functions?
    • Creating Step Functions Workflow
  4. Deploying Infrastructure with Terraform
  5. Conclusion

1. Project Overview

This project is Second part in the series #CloudGuruChallenge – Event-Driven Python on AWS. Here we deploy an AWS Step Functions Workflow along with various other components required for error handling. The AWS Step Functions Workflow will process an ETL job on daily COVID-19 count which will be demonstrated in the next/final part of this series. Note:- Appropriate permissions must be configured in IAM for the below code to work.
To automate the process Terraform is used for IaC (Infrastructure as Code) and AWS CodePipeline is used for CI/CD. The details about setting up Terraform and CodePipeline has been discussed in detail in Part-1 of the series.

2. Architecture

Screenshot from 2021-04-25 17-15-28

The above architecture works as follows:

  • An event bridge rule triggers the AWS Step Functions Workflow once daily at 1pm.
  • The Step Functions Workflow upon success sends an Email to the owner through Amazon SNS.
  • Upon Failure the another event is triggered by the event bridge which takes the failed event and sends it to SNS, SQS and cloudwatch logs. A notification of this is sent to the owner by Email through SNS.
  • An event bridge rule triggers a lambda function once daily at 7am. This function retrieves the failed events from SQS. It then triggers the step functions workflow with an input containing all the failed dates.

3. Step Functions Workflow

What is AWS Step Functions?

It is a serverless orchestration service where different serverless services can be made to interact based on events while exchanging data in JSON format.

Creating Step Functions Workflow

stepfunctions_graph (1)

The above workflow works as follows:

  • Each rectangular block represents a lambda function task except for CheckETLStatus and Notify which are Choice state and SNS task respectively.
  • The workflow starts with retrieving the ETL status which can be either initial load or update.
  • If it is initial load, all the data is retrieved as a batch from the source.
  • If it is update, only a row of data is retrieved from the source (which is the case count for the current day).
  • The retrieved data is transformed and loaded through the transform and load tasks respectively.
  • Finally an Email is sent to the owner notifying him about the success of the ETL job.

To Create the above workflow the code is given below.

{
  "StartAt": "GetETLStatus",
  "States": {
    "GetETLStatus": {
        "Type": "Task",
        "Resource": "YOUR-LAMBDA-ARN",
        "Next": "CheckETLStatus",
        "TimeoutSeconds": 3,
        "ResultPath": "$.result"
    },
    "CheckETLStatus": { 
        "Type": "Choice",
        "Choices": [
          {
            "Variable": "$.result.status",
            "StringEquals": "InitialLoad",
            "Next": "InitialLoad"
          },
          {
            "Variable": "$.result.status",
            "StringEquals": "Update",
            "Next": "Update"
          }
        ],
        "Default": "InitialLoad"
     },
    "InitialLoad": {
        "Type": "Task",
        "Resource": "YOUR-LAMBDA-ARN",
        "Next": "Transform",
        "TimeoutSeconds": 3,
        "ResultPath": "$.result"
    },
    "Update": {
        "Type": "Task",
        "Resource": "YOUR-LAMBDA-ARN",
        "Next": "Transform",
        "TimeoutSeconds": 3,
        "ResultPath": "$.result"
    },
    "Transform": {
        "Type": "Task",
        "Resource": "YOUR-LAMBDA-ARN",
        "Next": "Load",
        "TimeoutSeconds": 3,
        "ResultPath": "$.result"
    },
    "Load": {
        "Type": "Task",
        "Resource": "YOUR-LAMBDA-ARN",
        "TimeoutSeconds": 3,
        "ResultPath": "$.result",
        "Next": "Notify"
    },
    "Notify": {
        "Type": "Task",
        "Resource": "arn:aws:states:::sns:publish",
        "Parameters": {
          "TopicArn": "YOUR-TOPIC-ARN",
          "Message.$": "$"
        },
        "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

4. Deploying Infrastructure with Terraform

The code to deploy the above infrastructure with terraform is shown below. For more details about how to set up terraform and code pipeline visit Part-1 of the series. I have omitted the code for the other two scheduled events as I would be showing them in next part of this series.

state_machine.tf

resource "aws_sfn_state_machine" "sfn_state_machine" {
  name     = "YOUR-STATE-MACHINE-NAME"
  role_arn = "YOUR-ROLE-ARN"

  definition = <<EOF
{
  "StartAt": "GetETLStatus",
  "States": {
    "GetETLStatus": {
      "Type": "Task",
      "Resource": "YOUR-LAMBDA-ARN",
      "Next": "CheckETLStatus",
      "TimeoutSeconds": 3,
      "ResultPath": "$.result"
    },
    "CheckETLStatus": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.result.status",
          "StringEquals": "InitialLoad",
          "Next": "InitialLoad"
        },
        {
          "Variable": "$.result.status",
          "StringEquals": "Update",
          "Next": "Update"
        }
      ],
      "Default": "InitialLoad"
    },
    "InitialLoad": {
      "Type": "Task",
      "Resource": "YOUR-LAMBDA-ARN",
      "Next": "Transform",
      "TimeoutSeconds": 3,
      "ResultPath": "$.result"
    },
    "Update": {
      "Type": "Task",
      "Resource": "YOUR-LAMBDA-ARN",
      "Next": "Transform",
      "TimeoutSeconds": 3,
      "ResultPath": "$.result"
    },
    "Transform": {
      "Type": "Task",
      "Resource": "YOUR-LAMBDA-ARN",
      "Next": "Load",
      "TimeoutSeconds": 3,
      "ResultPath": "$.result"
    },
    "Load": {
      "Type": "Task",
      "Resource": "YOUR-LAMBDA-ARN",
      "TimeoutSeconds": 3,
      "ResultPath": "$.result",
      "Next": "Notify"
    },
    "Notify": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "YOUR-SNS-TOPIC-ARN",
        "Message.$": "$"
      },
      "End": true
    }
  }
}
EOF

depends_on = [
  aws_sns_topic.ETLJobStatus,
  aws_lambda_function.state_machine_lambdas
]
}
Enter fullscreen mode Exit fullscreen mode

sqs.tf

resource "aws_sqs_queue" "Events_DLQ" {
  name = "Events_DLQ_T"
}
Enter fullscreen mode Exit fullscreen mode

sns.tf

resource "aws_sns_topic" "ETLJobStatus" {
  name = "ETLJobStatus_T"
}

resource "aws_sns_topic" "ETLErrorMessages" {
  name = "ETLErrorMessages_T"
}

resource "aws_sns_topic_subscription" "ETLJobStatus_target" {
  topic_arn = aws_sns_topic.ETLJobStatus.arn
  protocol  = "email"
  endpoint  = "YOUR-EMAIL-ID"

  depends_on = [
    aws_sns_topic.ETLJobStatus
  ]
}

resource "aws_sns_topic_subscription" "ETLErrorMessages_target" {
  topic_arn = aws_sns_topic.ETLErrorMessages.arn
  protocol  = "email"
  endpoint  = "YOUR-EMAIL-ID"

  depends_on = [
    aws_sns_topic.ETLErrorMessages
  ]
}
Enter fullscreen mode Exit fullscreen mode

cloudwatch.tf

#EventBridge Events

resource "aws_cloudwatch_event_rule" "state_machine_events_failed" {
  name        = "state_machine_events_failed_t"
  description = "This event is triggered when the state machine fails."

  event_pattern = <<EOF
{
  "source": ["aws.states"],
  "detail-type": ["Step Functions Execution Status Change"],
  "detail": {
    "status": ["FAILED"],
    "stateMachineArn": ["${aws_sfn_state_machine.sfn_state_machine.arn}"]
  }
}
EOF

depends_on = [
  aws_sfn_state_machine.sfn_state_machine
]
}

#EventBridge Event Targets

resource "aws_cloudwatch_event_target" "sns" {
  rule      = aws_cloudwatch_event_rule.state_machine_events_failed.name
  target_id = "SendToSNS"
  arn       = aws_sns_topic.ETLErrorMessages.arn

  depends_on = [
    aws_cloudwatch_event_rule.state_machine_events_failed,
    aws_sns_topic.ETLErrorMessages
  ]
}

resource "aws_cloudwatch_event_target" "sqs" {
  rule      = aws_cloudwatch_event_rule.state_machine_events_failed.name
  target_id = "SendToSQS"
  arn       = aws_sqs_queue.Events_DLQ.arn

  depends_on = [
    aws_cloudwatch_event_rule.state_machine_events_failed,
    aws_sqs_queue.Events_DLQ
  ]
}

resource "aws_cloudwatch_event_target" "cloudwatch_logs" {
  rule      = aws_cloudwatch_event_rule.state_machine_events_failed.name
  target_id = "SendToCloudwatchLogs"
  arn       = aws_cloudwatch_log_group.log_group.arn

  depends_on = [
    aws_cloudwatch_event_rule.state_machine_events_failed,
    aws_cloudwatch_log_group.log_group
  ]
}

#Cloudwatch Log Group

resource "aws_cloudwatch_log_group" "log_group" {
  name = "state_machine_events_failed_t"
}
Enter fullscreen mode Exit fullscreen mode

5. Conclusion

In this post we have seen how to build step functions workflow for an ETL job and deploy it with terraform. I know that AWS Glue could be a better approach but still I wanted to explore step functions and this challenge was modified to accommodate it. In the next/final part of this series I'll be combining everything to complete the #CloudGuruChallenge – Event-Driven Python on AWS. I'll be performing ETL job on daily COVID-19 cases and display it in AWS Quicksight.

💖 💪 🙅 🚩
thakurrishabh
Thakur Rishabh Singh

Posted on April 26, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related