How To Convert a Lambda to a Step Function

martzcodes

Matt Martz

Posted on January 14, 2022

How To Convert a Lambda to a Step Function

I attended AWS re:Invent 2021 in-person and one of my big takeaways was the focus on AWS Step Functions. With the late September announcement that Step Functions now support 200+ AWS Services I could see why.

So, what does that look like? Check Out the Code

In this post I'll convert a Lambda to a Step Function without Lambdas (sometimes called lambda-less or function-less 😈). The original lambda will call an "External" API Gateway, do a GET from DynamoDB, make a decision and potentially WRITE to DynamoDB. The resulting step function will look like this:

#Step Function Workflow

Again... NO lambdas 🀯

This project uses CDK v2.3 and was initialized using cdk init --language typescript. It's not intended to be a step-by-step tutorial. If you need one of those, follow me on Twitter and keep an eye out for my upcoming CDK Crash Course video.

Creating the Mock External Service

To get started we're going to create two API Gateways: Our "actual" one and one to mock an External API Gateway, and avoid circular dependencies. If your external service wasn't hosted by AWS / API Gateway you'd probably need to use a small lambda to do the external API call.

The mock external service lives in lib/mock-external.ts. It's fairly unremarkable in that it's a Lambda with an API Gateway πŸ€·β€β™‚οΈ. The Lambda expects an APIGateway input with a userId pathParameter... so <mockApi>/asdf where asdf would be the "userId". From there it responds with:

  const mockUser = {
    userId: `EXTERNAL#${event.pathParameters.userId}`,
    name: (event.queryStringParameters || {}).name || name.findName(),
  };
Enter fullscreen mode Exit fullscreen mode

This simulates a "lookup" in an external service... here we just consistently change the userId and we generate a random name for the user. Every time the endpoint is called the same user will get a new name (this isn't intended to be a real service, just representative of something that could have changing data).

Building the Lambda-lith

The lambdalith lives in lambda/bigLambda.ts.

It calls the external URL...

  // make axios call to "external" api to get user name
  const { data: externalUser } = await axios.get(
    `${API_URL}/${event.pathParameters.userId}`
  );
Enter fullscreen mode Exit fullscreen mode

Gets the "internal" user matching the external userId:

  const userGetItemInput: DocumentClient.GetItemInput = {
    TableName: TABLE_NAME,
    Key: {
      PK: externalUser.userId,
    },
  };

  const { Item: internalUser } = await db.get(userGetItemInput).promise();
Enter fullscreen mode Exit fullscreen mode

It returns early if the internal user has a "LOCKED" status (set directly via DynamoDB)... otherwise it updates the DynamoDB entry (which would insert if needed).

All "names" that the user has had will also get stored in DynamoDB with the time they were inserted.

  // store user in dynamodb
  const updateItemInput: DocumentClient.UpdateItemInput = {
    TableName: TABLE_NAME,
    Key: {
      PK: externalUser.userId,
    },
    UpdateExpression: `SET #name = :name, #status = :status, ${
      !internalUser
        ? "#history = :history"
        : "#history.#historical = if_not_exists(#history.#historical, :historical)"
    }`,
    ExpressionAttributeNames: { "#name": "name", "#history": "history", "#status": "status" },
    ExpressionAttributeValues: {
      ":name": externalUser.name,
      ":status": "ACTIVE"
    },
    ReturnValues: "UPDATED_NEW",
  };
  if (!internalUser) {
    updateItemInput!.ExpressionAttributeValues![":history"] = {
      [externalUser.name]: new Date().toISOString(),
    };
  } else {
    updateItemInput!.ExpressionAttributeNames!["#historical"] =
      externalUser.name;
    updateItemInput!.ExpressionAttributeValues![":historical"] =
      new Date().getTime();
  }
  const updatedItem = await db.update(updateItemInput).promise();
Enter fullscreen mode Exit fullscreen mode

All in all this is a fairly simple example that represents a few different challenges.

Converting the Lambda-lith to a Step Function

I began the conversion by opening the AWS Step Function console and playing with the shiny "new" Workflow Studio. I used this to get a sense of the different kinds of steps I'd need.

In this case, since we want an immediate (synchronous) response from the Step Function we use an Express Step Function

Screen Shot 2022-01-04 at 11.22.08 AM.png

My first pass (pun intended) was simply this:

stepfunctions_graph.png

But ultimately I found out that there weren't enough Pass steps in there.

Pass Steps are handy... they act as easy placeholders while designing the step function and also do data transformations.

To get started with the implementation I began with the Step Function itself and tied in the API Integration later.

πŸ™ŒThe Data Flow Simulator was incredibly useful in piecing together the JSONPath Expressions and how they interact with the State Machine.

First Step (and Issue)

The first step (πŸ˜‰) of the State Machine is to make that external API Gateway call. I wasted a lot of time on this because it's not super well documented in the case of dynamic / proxied URLs. It seems like in most cases you'd use a POST with a body to hit an API Gateway to a pre-defined / static URL that points to a Step Function... and if you do that you can skip this first PASS step. There's also an open GitHub Issue related to this which is why I ultimately created the lib/ModifiedCallApiGatewayRestApiEndpoint.ts. The regular CallApiGatewayRestApiEndpoint method can't make use of JsonPath expressions to extend endpoints. What that means is in order to make use of my userId path parameter I need to first do a data transformation and then hack it in via the modified class.

    const pass = new Pass(scope, "RoutePass", {
      parameters: {
        "apiPath.$": "States.Format('/{}', $.userId)",
        "userId.$": "$.userId",
        "name.$": "$.name",
      },
    });
Enter fullscreen mode Exit fullscreen mode

The first Pass takes the userId via the input parameters from the API Gateway Integration, builds the API path that will be called (including the rendered path parameter) and then the ModifiedCallApiGatewayRestApiEndpoint puts it in the step's parameters via Parameters: { "Path.$": "$.apiPath", ...orig.Parameters },. Normally you'd be able to directly assign that, but it's not exposed as part of the L2 Construct for the CallApiGatewayRestApiEndpoint.

const callExternal = new ModifiedCallApiGatewayRestApiEndpoint(
    scope,
    "Call External APIGW",
    {
    api: props.mockExternalApi,
    stageName: "prod",
    method: HttpMethod.GET,
    resultPath: "$.external",
    resultSelector: {
        user: JsonPath.stringAt("$.ResponseBody"),
    },
    }
);
Enter fullscreen mode Exit fullscreen mode

I could not get queryParameters to work correctly... it didn't like the resulting object... wasn't worth the trouble for this demo but if you have any tips let me know.

From there the API step takes the passed in RestAPI and that's it. resultPath extends the previous input and places the external API's ResponseBody in the StateMachine's context ($).

Interacting with DynamoDB

Next we make the DynamoDB Get call that the lambdalith does. We can directly interact with DynamoDB without a lambda by using the DynamoGetItem construct.

const dynamoGet = new DynamoGetItem(scope, "Get Internal User", {
    key: {
    PK: DynamoAttributeValue.fromString(
        JsonPath.stringAt("$.external.user.userId")
    ),
    },
    table: props.table,
    resultPath: "$.internal",
});
Enter fullscreen mode Exit fullscreen mode

It uses the context provided by the external call $.external.user.userId to do the lookup and puts the result in the context's internal key.

If you were to sniff out the context at this point it'd look something like this (if DynamoDB already had the item*):

{
    "external": {
        "user": {
            "userId": "EXTERNAL#asdf",
            "name": "Matt Martz"
        }
    },
    "internal": {
        "Item": {
            "PK": {
                "S": "EXTERNAL#asdf"
            },
            "name": {
                "S": "Matt Martz"
            },
            ...
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Choices

Before we can implement the Choice (if/switch statement) we need to define the destinations for those situations. We're going to use 3 types of results... Pass, DynamoDB PutItem and DynamoDB UpdateItem.

Pass

There are 4 end cases for the API... No Changes, Locked, Inserted, and Updated. All of the Pass steps look very similar and they all respond in the same general format:

const userWasLocked = new Pass(scope, "User Was Locked", {
    parameters: {
    id: JsonPath.stringAt("$.external.user.userId"),
    name: JsonPath.stringAt("$.internal.Item.name.S"),
    status: JsonPath.stringAt("$.internal.Item.status.S"),
    userLocked: true,
    nameChanged: false,
    inserted: false,
    },
});
Enter fullscreen mode Exit fullscreen mode

DynamoDB Writes

The DynamoPutItem and DynamoUpdateItem Constructs follow very closely to what you'd do in the lambda code using the DocumentClient. The aws-stepfunctions-tasks module also exposes a nice helper function to convert into the DynamoDB JSON format. Below is the PutItem while the UpdateItem follows a similar pattern.

const dynamoInsert = new DynamoPutItem(scope, "Add Internal User", {
    item: {
    PK: DynamoAttributeValue.fromString(
        JsonPath.stringAt("$.external.user.userId")
    ),
    name: DynamoAttributeValue.fromString(
        JsonPath.stringAt("$.external.user.name")
    ),
    status: DynamoAttributeValue.fromString("ACTIVE"),
    history: DynamoAttributeValue.fromMap({
        [JsonPath.stringAt("$.external.user.name")]:
        DynamoAttributeValue.fromString(
            JsonPath.stringAt("$$.State.EnteredTime")
        ),
    }),
    },
    table: props.table,
    resultPath: "$.inserted",
});
Enter fullscreen mode Exit fullscreen mode

The Choice

The Choice Construct decides which steps are executed based on Conditions. There are a lot of helper methods in the Condition class. In this case we use isNotPresent, not, and stringEquals. This routes the State Machine to the right step based on the condition logic provided.

const isUserLocked = new Choice(scope, "User Locked?", {})
    .when(Condition.isNotPresent("$.internal.Item"), dynamoInsert)
    .when(
    Condition.stringEquals(
        "$.internal.Item.name.S",
        "$.external.user.name"
    ),
    userExists
    )
    .when(
    Condition.not(
        Condition.stringEquals(
        "$.internal.Item.name.S",
        "$.external.user.name"
        )
    ),
    dynamoUpdate
    )
    .when(
    Condition.stringEquals("$.internal.Item.status.S", "LOCKED"),
    userWasLocked
    )
    .otherwise(userExists);
Enter fullscreen mode Exit fullscreen mode

The StateMachine

To define the state machine itself, you first connect the steps together by chaining them with their .next methods and then passing them into the StateMachine construct.

const definition = pass
    .next(callExternal)
    .next(dynamoGet)
    .next(isUserLocked);
    dynamoInsert.next(userInserted);
    dynamoUpdate.next(userWasUpdated);

const logGroup = new LogGroup(scope, "BlogLambdaStepLogs", {
    removalPolicy: RemovalPolicy.DESTROY,
    retention: RetentionDays.ONE_DAY,
});

const stateMachine = new StateMachine(scope, `BlogLambdaStep`, {
    definition,
    stateMachineType: StateMachineType.EXPRESS,
    logs: {
    destination: logGroup,
    level: LogLevel.ALL,
    },
});

props.table.grantReadWriteData(stateMachine);
Enter fullscreen mode Exit fullscreen mode

Here it's important to also make sure the state machine has Read/Write access to DynamoDB using the table's grantReadWriteData method.

🚨For Debugging it's useful to explicitly set the LogGroup for the StateMachine and set the LogLevel to ALL. 🚨

Getting API Gateway to Invoke the StateMachine

Finally the last step is to actually invoke the StateMachine using an API Gateway Integration. To do this we first need to provide a role for the API to be able to StartSyncExecution for the stateMachine:

const credentialsRole = new Role(scope, "getRole", {
    assumedBy: new ServicePrincipal("apigateway.amazonaws.com"),
});

credentialsRole.attachInlinePolicy(
    new Policy(scope, "getPolicy", {
    statements: [
        new PolicyStatement({
        actions: ["states:StartSyncExecution"],
        effect: Effect.ALLOW,
        resources: [stateMachine.stateMachineArn],
        }),
    ],
    })
);
Enter fullscreen mode Exit fullscreen mode

And then we use that credentialsRole in the AwsIntegration. We also have to do some Request and Response Mapping to make sure the API path and query parameters make it into the input, and the response only includes the payload we want to get to the user.

const stepApiResource = props.api.root.addResource("step");
stepApiResource
    .addResource("basic")
    .addResource("{userId}")
    .addMethod(
    "GET",
    new AwsIntegration({
        service: "states",
        action: "StartSyncExecution",
        integrationHttpMethod: "POST",
        options: {
        credentialsRole,
        integrationResponses: [
            {
            statusCode: "200",
            responseTemplates: {
                "application/json": `#set ($parsedPayload = $util.parseJson($input.json('$.output')))
$parsedPayload`,
            },
            },
        ],
        requestTemplates: {
            "application/json": `{
            "input": "{\\"userId\\": \\"$util.escapeJavaScript($input.params('userId'))\\", \\"name\\": \\"$util.escapeJavaScript($input.params('querystring').params('name'))\\"}",
            "stateMachineArn": "${stateMachine.stateMachineArn}"
            }`,
        },
        },
    }),
    {
        methodResponses: [{ statusCode: "200" }],
    }
    );
Enter fullscreen mode Exit fullscreen mode

Step Function-backed APIs?

But what about API Gateway's AWS Step Functions-backed APIs?. That's definitely useful but is similar to the LambdaRestApi construct in that it proxies ALL requests to the StepFunction. You can (theoretically) still add endpoints to it but if you want to add a different Step Function then you still have to go through an AwsIntegration anyways OR integrate the two StepFunctions together, but then you're tightly coupled 😒. In this case we were adding a Step Function to an existing API so that wasn't an option.

Comparing the API calls

When the Stack is deployed we get back the two RestAPIs back:

Outputs:
BlogLambdaToStepfunctionsStack.BlogLambdaSFAPIEndpointC31D91D6 = https://axg9uz4vxb.execute-api.us-east-1.amazonaws.com/prod/
BlogLambdaToStepfunctionsStack.MockExternalAPIEndpointCFF6F3A1 = https://ab1bk91hb3.execute-api.us-east-1.amazonaws.com/prod/
Enter fullscreen mode Exit fullscreen mode

For example, I can call the external API that both the Lambda-lith and Step Functions use:

$ curl https://ab1bk91hb3.execute-api.us-east-1.amazonaws.com/prod/asdf -w '\nTotal: %{time_total}s\n'
{"userId":"EXTERNAL#asdf","name":"Victor Roob"}
Total: 0.081028s
Enter fullscreen mode Exit fullscreen mode

If I call the Lambda-lith with /asdf you get

$ curl https://axg9uz4vxb.execute-api.us-east-1.amazonaws.com/prod/big/asdf -w '\nTotal: %{time_total}s\n'
{"id":"EXTERNAL#asdf","name":"Troy Hammes","status":"ACTIVE","userLocked":false,"nameChanged":true,"inserted":false}
Total: 1.473287s
Enter fullscreen mode Exit fullscreen mode

Which takes 1.5s because of the cold start time of the lambda! Running it again you get:

$ curl https://axg9uz4vxb.execute-api.us-east-1.amazonaws.com/prod/big/asdf -w '\nTotal: %{time_total}s\n'
{"id":"EXTERNAL#asdf","name":"Grady McGlynn","status":"ACTIVE","userLocked":false,"nameChanged":true,"inserted":false}
Total: 0.223742s
Enter fullscreen mode Exit fullscreen mode

Which very quickly drops down to 0.2 seconds!

The step-function, on the other hand, doesn't have cold start times!

$ curl https://axg9uz4vxb.execute-api.us-east-1.amazonaws.com/prod/step/basic/qwerty -w '\nTotal: %{time_total}s\n'
{"status":"ACTIVE","userLocked":false,"nameChanged":false,"inserted":true,"name":"Sheldon Pouros DDS","id":"EXTERNAL#qwerty"}
Total: 0.254265s
$ curl https://axg9uz4vxb.execute-api.us-east-1.amazonaws.com/prod/step/basic/qwerty -w '\nTotal: %{time_total}s\n'
{"userLocked":false,"nameChanged":true,"inserted":false,"name":"Randolph Padberg","id":"EXTERNAL#qwerty","status":"ACTIVE"}
Total: 0.256351s
Enter fullscreen mode Exit fullscreen mode

It's FAST! The API Responses are also equivalent and if we check DynamoDB we get the same behavior.

Summary

There's definitely a learning curve associated with configuring step functions because of the JSONPath Expressions / VTL required to get the steps working together correctly. Steps themselves aren't reusable but if you pair those with smart use of existing functionality (existing or smaller/reusable Lambdas) you can get a lot of benefit.

The tooling has gotten a lot better, and the Workflow Studio and Data Flow Simulator both made it much easier to troubleshoot.

It took slightly more configuration (largely Infrastructure as Code) to setup the Step Function vs just having a large lambda, but by shifting the logic to the State Machine (and therefore managed AWS Services) you're reducing your code ownership.

Steps can't be shared, but you can easily create your own constructs for frequently used AWS Service interactions that include customized JSONPath expressions for your/your organizations needs.

I definitely see a lot of promise here. What observations have you had related to step functions? Comment below or hit me up on Twitter

πŸ’– πŸ’ͺ πŸ™… 🚩
martzcodes
Matt Martz

Posted on January 14, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related