Process S3 Objects with Step Functions using CDK + TS

wakeupmh

Marcos Henrique

Posted on May 27, 2024

Process S3 Objects with Step Functions using CDK + TS

Preface

Whenever a user needed to launch automation, our tech lead had to dive into an extensive manual process that took about two days to configure everything.
Picture this: an Excel file with over six tabs and data manipulation using Excel functions (a spreadsheet nightmare 👻).
Seeing this madness, I realised we could automate this process (trust me, it was a heated debate to reach this conclusion). Thus, the architecture below was born.

Remember, we've got some proprietary code, so this is a simplified and slightly fictionalised version to illustrate the solution.

architecture

The Solution

We needed to set up a process to trigger a Step Function every time an object was created in S3. However, direct invocation of a Step Function is not currently supported. Therefore, I created an EventBridge rule to monitor S3 for object creation. Once an object was created, the rule would invoke the Step Function to process the object using one of the Lambda tasks.
I used the Express type since I needed this to be a quick process. With this decision, I had to parallelise some steps (yes, my automation allowed for that), so, I used the Parallel state. Finally, if any errors occurred, I set up a notification using a topic for that.

Express x Standard
Standard: For long-running processes with exactly-once execution guarantees and advanced recovery features.
Express: For fast, high-throughput processes with at least one execution guarantee and billing based on time and memory.

Getting your hands dirty

dough mixing
Let's implement it using CDK with Typescript!

Let's start by creating our bucket and importing a topic to notify us of any errors



const userNotificationTopic = Topic.fromTopicArn(
  this,
  'userNotificationTopic',
  Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)

const bucket = new Bucket(this, 'bucket', {
  bucketName: 'automation-configuration',
  removalPolicy: RemovalPolicy.DESTROY,
})


Enter fullscreen mode Exit fullscreen mode

Now, let's create our Lambdas and grant read permissions to the Lambda that will access the S3 object:



const etlLambda = new NodejsFunction(this, 'etl-lambda', {
  entry: 'src/lambda/etl/etl.handler.ts',
  functionName: 'etl',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

bucket.grantReadWrite(
  new Alias(this, id.concat('alias'), {
    aliasName: 'current',
    version: etlLambda.currentVersion,
  }),
)

const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})


Enter fullscreen mode Exit fullscreen mode

To call a Lambda within our Step Function, we need to create a Lambda Invoke for each of the Lambdas. So, I created a function to avoid repeating this for every Lambda:



private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }


Enter fullscreen mode Exit fullscreen mode

Using mountLambdaInvokes:



const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
  { function: etlLambda, output: '$.Payload' },
  { function: categoriesLambda },
  { function: hospitalsLambda },
])


Enter fullscreen mode Exit fullscreen mode

We need to create our failure step and the SnsPublish to send failed events to the topic we imported earlier:



const errorTopicConfig = {
  topic: userNotificationTopic,
  subject: 'Automation Config Failed 😥',
  message: TaskInput.fromObject({
    message: 'Automation Config Failed due to an unexpected error.',
    cause: 'Unexpected Error',
    channel: 'email',
    destination: { ToAddresses: ['suporte@somemail.com'] },
  }),
}

const publishFailed = (publishFailedId: string) =>
  new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

const jobFailed = new Fail(this, 'automation-config-job-failed', {
  cause: 'Unexpected Error',
})


Enter fullscreen mode Exit fullscreen mode

Having done this, let's set up our Parallel state. Each branch will be a Lambda processed simultaneously. We'll also add a retry to attempt again if any issues arise and a catch to handle any failures in this parallel process:



const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
  .branch(categoriesTask)
  .branch(hospitalsTask)
  .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
  .addCatch(publishFailed('exams').next(jobFailed), {
    errors: ['States.ALL'],
  })


Enter fullscreen mode Exit fullscreen mode

At this point, we'll create the definition of our tasks, a log group, and the State Machine, which is essentially our Step Function:



const definition = etlTask.next(hospitalsCategoriesParallel)

const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
  retention: RetentionDays.ONE_WEEK,
  removalPolicy: RemovalPolicy.DESTROY,
})

const stateMachine = new StateMachine(this, `${id}-state-machine`, {
  definition,
  timeout: Duration.minutes(5),
  stateMachineName: 'automation-configuration',
  stateMachineType: StateMachineType.EXPRESS,
  logs: {
    destination: logGroup,
    includeExecutionData: true,
    level: LogLevel.ALL,
  },
})


Enter fullscreen mode Exit fullscreen mode

After this, we need to create the rules to associate EventBridge with S3 and the execution of our Step Function:



const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
  ruleName: 'automation-config-s3-event-rule',
})

const eventRole = new Role(this, 'eventRole', {
  assumedBy: new ServicePrincipal('events.amazonaws.com'),
})

stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
  new SfnStateMachine(stateMachine, {
    input: RuleTargetInput.fromObject({
      detail: EventField.fromPath('$.detail'),
    }),
    role: eventRole,
  }),
)
s3EventRule.addEventPattern({
  source: ['aws.s3'],
  detailType: ['Object Created'],
  detail: {
    bucket: {
      name: [bucket.bucketName],
    },
    object: {
      key: [
        {
          wildcard: 'csv/automation-configuration/*.csv',
        },
      ],
    },
  },
})


Enter fullscreen mode Exit fullscreen mode

Finally, let's create a rule to listen to any unexpected failures that occur in our Step Function through EventBridge as well, thus maintaining the event-driven nature that we love so much:



 const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })


Enter fullscreen mode Exit fullscreen mode

Putting everything together in a class so you can understand the entire unified flow:



import { Duration, Fn, RemovalPolicy } from 'aws-cdk-lib'
import { Rule, RuleTargetInput, EventField } from 'aws-cdk-lib/aws-events'
import { SfnStateMachine, SnsTopic } from 'aws-cdk-lib/aws-events-targets'
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'
import { Alias, Architecture, IFunction } from 'aws-cdk-lib/aws-lambda'
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'
import { Bucket } from 'aws-cdk-lib/aws-s3'
import { Topic } from 'aws-cdk-lib/aws-sns'
import { Fail, LogLevel, Parallel, StateMachine, StateMachineType, TaskInput } from 'aws-cdk-lib/aws-stepfunctions'
import { LambdaInvoke, SnsPublish } from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'

export default class AutomationConfiguration extends Construct {
  constructor(scope: Construct, id: string) {
    super(scope, id)

    const userNotificationTopic = Topic.fromTopicArn(
      this,
      'userNotificationTopic',
      Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
    )

    const bucket = new Bucket(this, 'bucket', {
      bucketName: 'automation-configuration',
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const etlLambda = new NodejsFunction(this, 'etl-lambda', {
      entry: 'src/lambda/etl/etl.handler.ts',
      functionName: 'etl',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    bucket.grantRead(
      new Alias(this, id.concat('alias'), {
        aliasName: 'current',
        version: etlLambda.currentVersion,
      }),
    )

    const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
      { function: etlLambda, output: '$.Payload' },
      { function: categoriesLambda },
      { function: hospitalsLambda },
    ])

    const errorTopicConfig = {
      topic: userNotificationTopic,
      subject: 'Automation Config Failed 😥',
      message: TaskInput.fromObject({
        message: 'Automation Config Failed due to an unexpected error.',
        cause: 'Unexpected Error',
        channel: 'email',
        destination: { ToAddresses: ['suporte@somemail.com'] },
      }),
    }

    const publishFailed = (publishFailedId: string) =>
      new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

    const jobFailed = new Fail(this, 'automation-config-job-failed', {
      cause: 'Unexpected Error',
    })

    const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
      .branch(categoriesTask)
      .branch(hospitalsTask)
      .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
      .addCatch(publishFailed('exams').next(jobFailed), {
        errors: ['States.ALL'],
      })
    const definition = etlTask.next(hospitalsCategoriesParallel)

    const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
      retention: RetentionDays.ONE_WEEK,
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const stateMachine = new StateMachine(this, `${id}-state-machine`, {
      definition,
      timeout: Duration.minutes(5),
      stateMachineName: 'automation-configuration',
      stateMachineType: StateMachineType.EXPRESS,
      logs: {
        destination: logGroup,
        includeExecutionData: true,
        level: LogLevel.ALL,
      },
    })

    const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
      ruleName: 'automation-config-s3-event-rule',
    })

    const eventRole = new Role(this, 'eventRole', {
      assumedBy: new ServicePrincipal('events.amazonaws.com'),
    })

    stateMachine.grantStartExecution(eventRole)
    s3EventRule.addTarget(
      new SfnStateMachine(stateMachine, {
        input: RuleTargetInput.fromObject({
          detail: EventField.fromPath('$.detail'),
        }),
        role: eventRole,
      }),
    )
    s3EventRule.addEventPattern({
      source: ['aws.s3'],
      detailType: ['Object Created'],
      detail: {
        bucket: {
          name: [bucket.bucketName],
        },
        object: {
          key: [
            {
              wildcard: 'csv/automation-configuration/*.csv',
            },
          ],
        },
      },
    })

    const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })
  }

  private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }
}


Enter fullscreen mode Exit fullscreen mode

[🇧🇷 PT-BR]

Prefácio

Sempre que um usuário precisava subir uma automação, o nosso tech lead tinha que realizar um trabalho manual bem extensivo do qual levava cerca de dois dias para configurar tudo, usando um arquivo xls com mais de 6 abas e manipulando os dados com algumas funções do excel, ao ver esta situação percebi que podíamos sim automatizar este fluxo (inclusive foi uma discussão acalorada para chegarmos nesta conclusão), assim surgiu a arquitetura abaixo (temos algumas questões de código proprietário então esta arquitetura é uma visão simplificada e com items ficticios para demonstrar esta solução)

A Solução

Tinhamos que triggar um Step Function quando um objeto fosse criado no S3, infelizmente hoje não temos como chamar de forma direta um step function, então tive que criar um event bridge rule para ouvir a criação de um objeto no meu S3, e quando isso fosse criado chamar o step function e consumir o objeto em questão por um dos lambdas tasks, como eu precisava que fosse um processo mais rápido utilizei o tipo express, com esta decisão eu precisava paralelizar alguns steps (sim, minha automatização possibilitava isso) assim utilizei o state Parallel, por fim caso eu tivesse algum erro eu notifico utilizando um tópico para isso.

Express x Standard

  • Standard: Para processos longos, com execuções garantidas exatamente uma vez e recursos avançados de recuperação.
  • Express: Para processos rápidos e de alta taxa de transferência, com execuções garantidas pelo menos uma vez e cobrança baseada em tempo e memória.

Colocando a mão na massa

Bora implementar isto com CDK e Typescript!

Vamos começar criando nosso bucket e importanto um tópico para notificar nossos erros



const userNotificationTopic = Topic.fromTopicArn(
  this,
  'userNotificationTopic',
  Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)

const bucket = new Bucket(this, 'bucket', {
  bucketName: 'automation-configuration',
  removalPolicy: RemovalPolicy.DESTROY,
})


Enter fullscreen mode Exit fullscreen mode

Agora vamos criar nossos lambdas e dar a permissão de leitura para o lambda que irá acessar o objeto do S3:



const etlLambda = new NodejsFunction(this, 'etl-lambda', {
  entry: 'src/lambda/etl/etl.handler.ts',
  functionName: 'etl',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

bucket.grantReadWrite(
  new Alias(this, id.concat('alias'), {
    aliasName: 'current',
    version: etlLambda.currentVersion,
  }),
)

const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})


Enter fullscreen mode Exit fullscreen mode

Para que possamos chamar uma lambda dentro do nosso Step Function temos que criar um Lambda Invoke para cada um dos lambdas, assim criei uma função para não precisar ficar repetindo isto para cada lambda:



private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }


Enter fullscreen mode Exit fullscreen mode

Utilizando a função mountLambdaInvokes:



const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
  { function: etlLambda, output: '$.Payload' },
  { function: categoriesLambda },
  { function: hospitalsLambda },
])


Enter fullscreen mode Exit fullscreen mode

Precisamos criar nosso step de falha e o SnsPublish para enviar os eventos que falharam para o tópico que importamos antes:



const errorTopicConfig = {
  topic: userNotificationTopic,
  subject: 'Automation Config Failed 😥',
  message: TaskInput.fromObject({
    message: 'Automation Config Failed due to an unexpected error.',
    cause: 'Unexpected Error',
    channel: 'email',
    destination: { ToAddresses: ['suporte@somemail.com'] },
  }),
}

const publishFailed = (publishFailedId: string) =>
  new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

const jobFailed = new Fail(this, 'automation-config-job-failed', {
  cause: 'Unexpected Error',
})


Enter fullscreen mode Exit fullscreen mode

Feito isto vamos montar nosso Parallel, onde cada branch é um lambda que será processado simultaneamente, adicionaremos também um retry para q seja feita a tentativa novamente caso ocorra algum problema e um catch para capturar as falhas desse processo paralelo:



const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
  .branch(categoriesTask)
  .branch(hospitalsTask)
  .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
  .addCatch(publishFailed('exams').next(jobFailed), {
    errors: ['States.ALL'],
  })


Enter fullscreen mode Exit fullscreen mode

Neste ponto vamos criar a definition das nossas tasks, um log group e a State Machine que nada mais é que nossa Step Function de fato



const definition = etlTask.next(hospitalsCategoriesParallel)

const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
  retention: RetentionDays.ONE_WEEK,
  removalPolicy: RemovalPolicy.DESTROY,
})

const stateMachine = new StateMachine(this, `${id}-state-machine`, {
  definition,
  timeout: Duration.minutes(5),
  stateMachineName: 'automation-configuration',
  stateMachineType: StateMachineType.EXPRESS,
  logs: {
    destination: logGroup,
    includeExecutionData: true,
    level: LogLevel.ALL,
  },
})


Enter fullscreen mode Exit fullscreen mode

Após isso precisamos criar as rules para associar o event bridge com S3 e com a execução do nosso step function:



const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
  ruleName: 'automation-config-s3-event-rule',
})

const eventRole = new Role(this, 'eventRole', {
  assumedBy: new ServicePrincipal('events.amazonaws.com'),
})

stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
  new SfnStateMachine(stateMachine, {
    input: RuleTargetInput.fromObject({
      detail: EventField.fromPath('$.detail'),
    }),
    role: eventRole,
  }),
)
s3EventRule.addEventPattern({
  source: ['aws.s3'],
  detailType: ['Object Created'],
  detail: {
    bucket: {
      name: [bucket.bucketName],
    },
    object: {
      key: [
        {
          wildcard: 'csv/automation-configuration/*.csv',
        },
      ],
    },
  },
})


Enter fullscreen mode Exit fullscreen mode

E por fim vamos criar uma regra para ouvir as falhas inesperadas que ocorrerem em nosso step function através do event bridge também, assim mantendo o caráter de orientação a eventos que tanto adoramos:



 const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })


Enter fullscreen mode Exit fullscreen mode

Colocando tudo junto em uma classe para que você possa entender todo o fluxo unificado:



import { Duration, Fn, RemovalPolicy } from 'aws-cdk-lib'
import { Rule, RuleTargetInput, EventField } from 'aws-cdk-lib/aws-events'
import { SfnStateMachine, SnsTopic } from 'aws-cdk-lib/aws-events-targets'
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'
import { Alias, Architecture, IFunction } from 'aws-cdk-lib/aws-lambda'
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'
import { Bucket } from 'aws-cdk-lib/aws-s3'
import { Topic } from 'aws-cdk-lib/aws-sns'
import { Fail, LogLevel, Parallel, StateMachine, StateMachineType, TaskInput } from 'aws-cdk-lib/aws-stepfunctions'
import { LambdaInvoke, SnsPublish } from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'

export default class AutomationConfiguration extends Construct {
  constructor(scope: Construct, id: string) {
    super(scope, id)

    const userNotificationTopic = Topic.fromTopicArn(
      this,
      'userNotificationTopic',
      Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
    )

    const bucket = new Bucket(this, 'bucket', {
      bucketName: 'automation-configuration',
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const etlLambda = new NodejsFunction(this, 'etl-lambda', {
      entry: 'src/lambda/etl/etl.handler.ts',
      functionName: 'etl',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    bucket.grantRead(
      new Alias(this, id.concat('alias'), {
        aliasName: 'current',
        version: etlLambda.currentVersion,
      }),
    )

    const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
      { function: etlLambda, output: '$.Payload' },
      { function: categoriesLambda },
      { function: hospitalsLambda },
    ])

    const errorTopicConfig = {
      topic: userNotificationTopic,
      subject: 'Automation Config Failed 😥',
      message: TaskInput.fromObject({
        message: 'Automation Config Failed due to an unexpected error.',
        cause: 'Unexpected Error',
        channel: 'email',
        destination: { ToAddresses: ['suporte@somemail.com'] },
      }),
    }

    const publishFailed = (publishFailedId: string) =>
      new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

    const jobFailed = new Fail(this, 'automation-config-job-failed', {
      cause: 'Unexpected Error',
    })

    const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
      .branch(categoriesTask)
      .branch(hospitalsTask)
      .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
      .addCatch(publishFailed('exams').next(jobFailed), {
        errors: ['States.ALL'],
      })
    const definition = etlTask.next(hospitalsCategoriesParallel)

    const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
      retention: RetentionDays.ONE_WEEK,
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const stateMachine = new StateMachine(this, `${id}-state-machine`, {
      definition,
      timeout: Duration.minutes(5),
      stateMachineName: 'automation-configuration',
      stateMachineType: StateMachineType.EXPRESS,
      logs: {
        destination: logGroup,
        includeExecutionData: true,
        level: LogLevel.ALL,
      },
    })

    const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
      ruleName: 'automation-config-s3-event-rule',
    })

    const eventRole = new Role(this, 'eventRole', {
      assumedBy: new ServicePrincipal('events.amazonaws.com'),
    })

    stateMachine.grantStartExecution(eventRole)
    s3EventRule.addTarget(
      new SfnStateMachine(stateMachine, {
        input: RuleTargetInput.fromObject({
          detail: EventField.fromPath('$.detail'),
        }),
        role: eventRole,
      }),
    )
    s3EventRule.addEventPattern({
      source: ['aws.s3'],
      detailType: ['Object Created'],
      detail: {
        bucket: {
          name: [bucket.bucketName],
        },
        object: {
          key: [
            {
              wildcard: 'csv/automation-configuration/*.csv',
            },
          ],
        },
      },
    })

    const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })
  }

  private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }
}


Enter fullscreen mode Exit fullscreen mode
💖 💪 🙅 🚩
wakeupmh
Marcos Henrique

Posted on May 27, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related