Inferring Architecture and Asynchronous Workflows using EventBridge Events

martzcodes

Matt Martz

Posted on June 30, 2022

Inferring Architecture and Asynchronous Workflows using EventBridge Events

In part 2 of my series on Event Driven Step Functions we'll create a completely separate CDK Application that will observe the event traffic generated from the project in part 1. We'll use event traffic to create a timeline of operations (even operations outside of the step function) and we'll also infer the architecture from events. We'll store the events in DynamoDB and also set up a live WebSocket api for a real-time view.

For a personal challenge I wanted to see how far I could get by ONLY using observed event data with no additional IAM access. You could extend the below with access to things like X-Ray / CloudWatch / etc.

We'll be making something that can do this:

demo gif

What this post will cover:

  • High level overview of the architecture
  • Lambda functions related to observability
  • Creating a useful event interface

What this post will NOT cover:

  • SST (in detail)*
  • React
  • D3.js
  • Event Schema Registry (I already wrote about that here and here)

*I was experimenting with SST for this post but don't want it to be a review / the focus of the post. SST is CDK-based

The code for this post is located here: https://github.com/martzcodes/blog-sst-eventdriven-observability

Architecture for Observing Orchestrated Choreography

Architecture Diagram

In part 1 all of the events being emitted were sent to the default bus of the AWS account. In a multi-account scenario we would just need rules in place to forward from the source bus to some observability bus in another account.

In order for us to process all of the events, we need a broad rule to process all events to our targets. In this case, we emitted all of the events with the source project, so we can specify that.

With SST rules and lambdas can be created as part of their EventBus construct:

const bus = new EventBus(stack, "Ordered", {
  cdk: {
    eventBus: CdkEventBus.fromEventBusName(stack, `DefaultBus`, "default"),
  },
  defaults: {
    function: {
      environment: {
        tableName: table.tableName,
        websocketUrl: socketApi.url
      },
    },
  },
  rules: {
    observer: {
      pattern: {
        source: ['project']
      },
      targets: {
        sendMessage: "functions/sendMessage.handler",
        eventTracker: "functions/eventTracker.handler",
      },
    },
  },
});
bus.attachPermissions([table, socketApi]);

Enter fullscreen mode Exit fullscreen mode

In this architecture we have two targets. A target to store the events into DynamoDB for tracking, and a target to emit the event to the Websocket API so that clients can consume them in real-time.

RestAPI

For our DynamoDB Table we'll be using the SST Table Construct:

const table = new Table(stack, "EventDrivenObservability", {
  fields: {
    pk: "string",
    sk: "string",
    ttl: "number",
  },
  timeToLiveAttribute: "ttl",
  primaryIndex: { partitionKey: "pk", sortKey: "sk", },
  cdk: {
    table: {
      removalPolicy: RemovalPolicy.DESTROY,
    }
  }
});

Enter fullscreen mode Exit fullscreen mode

This is a similar interface to the CDK Table construct but allows us to specify known fields. We have to pass in an override for the removal policy since SST doesn't have an equivalent.

šŸ’” CDK Overrides are typically passed in to SST constructs via a cdk property.

From there we will create the last two lambdas for the RestAPI:

const api = new Api(stack, "Api", {
  defaults: {
    function: {
      // Allow the API to access the table
      permissions: [table],
      // Pass in the table name to our API
      environment: {
        tableName: table.tableName,
      },
    },
  },
  routes: {
    "GET /latest": "functions/getLatestEvents.handler",
    "GET /job/{id}": "functions/getJobEvents.handler",
  },
});

Enter fullscreen mode Exit fullscreen mode

The eventTracker target in the EventBus construct and the getLatestEvents.handler and getJobEvents.handler routes SST will create lambdas at those handlers. They get DynamoDB permissions via api.attachPermissions([table]);.

Lambda: EventTracker

The eventTracker lambda will end up writing two DynamoDB records per event. This could probably be optimized to one recored and a GSI but this ends up being pretty simple.

The first event will be the "LATEST" record. the getLatestEvents endpoint will end up getting a list of these so you can pick from a list of jobs. We use the primary key (PK) LATEST along with an event identifier (which ends up being the same as the PK for the other item). We also add a time to live (TTL) of a day so the database ends up cleaning itself up. You could potentially store less data in the LATEST lambda but I kept it consistent with the regular event one.

const time = `${new Date().getTime()}`;
const detail = { ... event.detail };
const latestParams = {
  TableName: process.env.tableName,
  Item: {
    pk: `LATEST`,
    sk:`EVENT#${parseExecution(event.detail.execution)}`,
    latest: time,
    account: event.account,
    source: event.source,
    detailType: event['detail-type'],
    ttl: Math.ceil((new Date().getTime() + 24 * 60 * 60 * 1000) / 1000),
    ...detail,
  },
};
await dynamoDb.put(latestParams as PutItemInput).promise();

Enter fullscreen mode Exit fullscreen mode

For the actual storage event we store it on a PK that is tied to the job that executed. Here event.detail.execution is the state machine's execution. We included this execution id in every event that was emitted in the previous project. These records will be retrieved via this execution id for the getJobEvents endpoint.

const params = {
  TableName: process.env.tableName,
  Item: {
    pk: `EVENT#${parseExecution(event.detail.execution)}`,
    sk: time,
    account: event.account,
    source: event.source,
    detailType: event['detail-type'],
    ttl: Math.ceil((new Date().getTime() + 24 * 60 * 60 * 1000) / 1000),
    ...detail
  },
};
await dynamoDb.put(params as PutItemInput).promise();

Enter fullscreen mode Exit fullscreen mode

Lambda: GetLatestEvents

getLatestEvents simply returns a list of the LATEST records:

const latestJobs = await dynamoDb
    .query({
      TableName,
      KeyConditionExpression: "#pk = :pk",
      ExpressionAttributeNames: {
        "#pk": "pk",
      },
      ExpressionAttributeValues: {
        ":pk": "LATEST",
      },
    })
    .promise();

  return { statusCode: 200, body: JSON.stringify({ latest: latestJobs.Items }) };

Enter fullscreen mode Exit fullscreen mode

Lambda: GetJobEvents

While getJobEvents returns the job-specific records:

const jobEvents = await dynamoDb
    .query({
      TableName,
      KeyConditionExpression: "#pk = :pk",
      ExpressionAttributeNames: {
        "#pk": "pk",
      },
      ExpressionAttributeValues: {
        ":pk": `EVENT#${event.pathParameters!.id}`,
      },
    })
    .promise();

    console.log(JSON.stringify(jobEvents));
  return { statusCode: 200, body: JSON.stringify({ events: jobEvents.Items }) };

Enter fullscreen mode Exit fullscreen mode

Finishing up the RestAPI

If we run npm start SST will deploy a debug stack along with the actual code. After it's deployed if we run the CLI command from part 1 to start an event driven step function... we'll start seeing the items populate in the DynamoDB table that was created.

DynamoDB Items

Websocket API

The Websocket API has a similar construct and will create the remaining two lambda functions via routes:

  const socketApi = new WebSocketApi(stack, "SocketApi", {
    defaults: {
      function: {
        environment: {
          tableName: table.tableName,
        },
      },
    },
    routes: {
      $connect: "functions/connect.handler",
      $disconnect: "functions/disconnect.handler",
    },
  });
  socketApi.attachPermissions([table]);

Enter fullscreen mode Exit fullscreen mode

We'll re-use the same DynamoDB table for storing connected clients.

Lambda: Connect

The connect lambda will put a client connection item in the database:

const params = {
  TableName: process.env.tableName,
  Item: {
    pk: `CONNECTION`,
    sk: event.requestContext.connectionId,
    ttl: Math.ceil((new Date().getTime() + 24 * 60 * 60 * 1000) / 1000),
  },
};
await dynamoDb.put(params as PutItemInput).promise();

Enter fullscreen mode Exit fullscreen mode

Lambda: Disconnect

The disconnect lambda will delete connections:

const params = {
  TableName: process.env.tableName,
  Key: {
    pk: `CONNECTION`,
    sk: event.requestContext.connectionId,
  },
};
await dynamoDb.delete(params as DeleteItemInput).promise();

Enter fullscreen mode Exit fullscreen mode

Lambda: SendMessage

And the sendMessage lambda will query all of the connections from DynamoDB

const messageData = JSON.stringify({
  pk: `EVENT#${parseExecution(event.detail.execution)}`,
  sk: `${new Date().getTime()}`,
  account: event.account,
  source: event.source,
  detailType: event['detail-type'],
  ...event.detail,
});

// Get all the connections
const connections = await dynamoDb
  .query({
    TableName,
    ProjectionExpression: "sk",
    KeyConditionExpression: "#pk = :pk",
    ExpressionAttributeNames: {
      "#pk": "pk",
    },
    ExpressionAttributeValues: {
      ":pk": "CONNECTION",
    },
  })
  .promise();

Enter fullscreen mode Exit fullscreen mode

and then use those connections along with AWS SDK's ApiGatewayManagementApi to send the event to the connected clients. If it gets an error in response it will remove the client from DynamoDB.

const apiG = new ApiGatewayManagementApi({
  endpoint: `${websocketUrl.replace("wss://", "")}`,
});

const postToConnection = async function ({ sk }: { sk: string }) {
  console.log(`connection id: ${sk}`);
  try {
    // Send the message to the given client
    await apiG
      .postToConnection({ ConnectionId: sk, Data: messageData })
      .promise();
  } catch (e: any) {
    console.log(`caught: ${e}`);
    if (e.statusCode === 410) {
      // Remove stale connections
      await dynamoDb
        .delete({ TableName, Key: { pk: "CONNECTION", sk } })
        .promise();
    }
  }
};

// Iterate through all the connections
await Promise.all(
  (connections?.Items! as { sk: string }[]).map(postToConnection)
);

Enter fullscreen mode Exit fullscreen mode

Wrapping up the Websocket API

SST watches the directory structure and will continue to deploy things as changes are made. Depending on the changes it may ask you for confirmation. If you didn't have npm start running, run it now or confirm the changes.

With the changes made, you could use a websocket tester to connect to your Websocket API and if you kick off another step function in the part 1 project, you'd see the events in the logs.

Creating a useful event interface

All of this is great and all but raw events don't tell us much. Or do they? šŸ¤”

An event by itself won't tell us what emitted it or what caused the thing to emit it... to it emit it.

BUT we have access to that information. Each lambda in our event driven step function is essentially wrapped in an incoming and outgoing event. And the lambdas that are doing the work have access to these events... so we can use them.

By creating a common event metadata payload we can get enough information to paint a picture of the architecture and timelines. I extended part 1's putEvent utility with something to add in that incoming event metadata. So now each event will include what started it along with what it's trying to accomplish. Lambda functions also have an environment variable called AWS_LAMBDA_FUNCTION_NAME which is the lambda functions name. By updating this meta each time we emit the event from a lambda, we can gather all the information we need. We can even fill in the gaps left over from step function events that don't have that fidelity.

const execution = event.detail.execution.split(":");
return {
  ...event.detail,
  execution: event.detail.execution, // arn of the state machine job
  meta: {
    incoming: {
      account: event.account,
      source: event.source,
      detailType: event["detail-type"],
    },
    outgoing: {
      source: process.env.EVENT_SOURCE,
      detailType,
    },
    fn: process.env.AWS_LAMBDA_FUNCTION_NAME,
    stateMachine: execution[execution.length - 2], // 2nd to last is the state machine itself
    job: execution[execution.length - 1], // last is the job id
  },
};

Enter fullscreen mode Exit fullscreen mode

I also created a SST ViteStaticSite for the react app to act as a client for the Rest and Websocket APIs:

const site = new ViteStaticSite(stack, "SvelteJSSite", {
  path: "frontend",
  environment: {
    // Pass in the API endpoint to our app
    VITE_APP_API_URL: api.url,
    VITE_SOCKET_API_URL: socketApi.url,
  },
});

Enter fullscreen mode Exit fullscreen mode

This react app uses d3 to create the charts and does some post-processing in order to massage the event data into the demonstration below. I am by no means a d3 expert and the code is lengthy, so I'm going to hand wave it. The important part is to encode the data you need into the event payloads so that you can stitch together a picture without additional IAM access.

Demonstration

With the information we encoded in the events we can figure out how long the lambdas executed, which lambdas were invoked by what events and how everything was chained together. We can process this as the events flow in.

demo gif

Note: I added a random delay to the part1 putEvent to make these graphs more interesting. They're very fast without the delay šŸ˜…

A Timeline of Events

Since we know the start and stop time of the lambdas we can build a timeline of events. Here we can see the interactions between things like the getUserCookie events and the subsequent steps in the state machine.

Timeline

In this image the pink bars are the async side-effect actions. You can see that the generate report step started after the last purple (?) bar for the intermediate step... while the async step continued.

These charts can be customized to change the color coding or the lambdas. In the timeline I chose to focus on the event that started the action but these could be named with the ending event, function name, or anything we have the data for. In the tooltip I created I included more metadata.

Inferring Architecture

We also have enough data that we can infer the architecture of everything interacting with the default event bus (provided they have the metadata we want).

Inferring Architecture

Here we combine the incoming and outgoing events along with the lambda function names in the metadata to figure out what things are connected to each other.

What-if

These diagrams were created in a day with limited d3 experience and ONLY from event data. What if we extended this with X-Ray, CloudWatch, or endpoints with access to get information about the actual state machines.

What if we tied in something like OpenTelemetry... there's a lot of potential here for a decoupled Event Driven Architecture observer.

Hit me up on Twitter @martzcodes if you've got ideas on how YOU would extend this or what other information you could figure out only from using events. šŸ»

šŸ’– šŸ’Ŗ šŸ™… šŸš©
martzcodes
Matt Martz

Posted on June 30, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related