AWS AppSync Subscriptions with DynamoDB Streams, Lambda and Serverless

marcostreng

Marco Streng

Posted on June 11, 2021

AWS AppSync Subscriptions with DynamoDB Streams, Lambda and Serverless

With AWS AppSync a GraphQL Subscription always has to be triggered by a GraphQL Mutation. But there are use cases in which you would like to trigger Subscriptions from another kind of event, for example an update in your DynamoDB which was done by an external service. In this article I'd like to show you how you can do this with DynamoDB Streams, Lambda and the Serverless framework.

Example

To keep this example simple let's say we develop a game with a list of user. Each user has got a name and a current score value. If the score of a user changes in our DynamoDB, we want to publish this new score to all subscribers.

Architecture

When a user item in our DynamoDB gets updated we execute a Lambda function via DynamoDB Streams. This Lambda function will execute the GraphQL Mutation on which our clients can subscribe to. As the updated data is already stored in our DynamoDB, the Mutation does not need a data source. It should only pass the incoming data to the subscribers.

Alt Text

Schema

First off all we create our Schema. The Mutation which will be executed by the Lambda is called updateUserScore and takes the id and the new score of the user as arguments. It returns both fields within the UpdatedUserScore type.

The Subscription is called onUpdateUserScore. With the AppSync annotation @aws_subscribe() we can easily subscribe to multiple Mutations. The optional id parameter allows to subscribe to all user or to a specific one.

type User {
  id: ID!
  name: String!
  score: Int
}

type Query {
  listUser: [User]
}

type UpdatedUserScore {
  id: ID!
  score: Int!
}

type Mutation {
  createUser(name: String!): User!
  updateUserScore(id: ID!, score: Int!): UpdatedUserScore!
}

type Subscription {
  onUpdateUserScore(id: ID): UpdatedUserScore
    @aws_subscribe(mutations: ["updateUserScore"])
}

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}
Enter fullscreen mode Exit fullscreen mode

Infrastrucutre

With the serverless-appsync-plugin it's easy to set up our AppSync instance. To keep the example simple we use AWS_IAM for authentication. In real world you probably would use AMAZON_COGNITO_USER_POOLS in addition so that your client app can also authenticate against AppSync.

As mentioned before, the Mutation does not need a data source. Therefore we can create a data source with type NONE.

Don't forget the role statement so that our Lambda function is allowed to execute the AppSync Mutation.

service: subscription-demo

plugins:
  - serverless-appsync-plugin

provider:
  name: aws
  runtime: nodejs12.x
  region: eu-central-1
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Scan
        - dynamodb:Query
        - dynamodb:PutItem
      Resource:
        - !GetAtt DynamoDbTableUser.Arn
        - !Join [ '', [ !GetAtt DynamoDbTableUser.Arn, '/*' ] ]
    - Effect: Allow
      Action:
        - appsync:GraphQL
      Resource:
        - !GetAtt GraphQlApi.Arn
        - !Join [ '/', [ !GetAtt GraphQlApi.Arn, 'types', 'Mutation', 'fields', 'updateUserScore' ] ]

custom:
  appSync:
    name: ${self:service}
    authenticationType: AWS_IAM
    mappingTemplates:
      - dataSource: TableUserDS
        type: Query
        field: listUser
      - dataSource: None
        type: Mutation
        field: updateUserScore
      - dataSource: TableUserDS
        type: Mutation
        field: createUser
    dataSources:
      - type: NONE
        name: None
      - type: AMAZON_DYNAMODB
        name: TableUserDS
        description: 'DynamoDB DynamoDbTableUser table'
        config:
          tableName: !Ref DynamoDbTableUser

functions:
  handleDynamoDbStream:
    handler: backend/handleDynamoDbStream.handler
    environment:
      APP_SYNC_API_URL: !GetAtt GraphQlApi.GraphQLUrl
    events:
      - stream:
          type: dynamodb
          arn: !GetAtt DynamoDbTableUser.StreamArn

resources:
  Resources:
    DynamoDbTableUser:
      Type: AWS::DynamoDB::Table
      Properties:
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        StreamSpecification:
          StreamViewType: NEW_IMAGE
Enter fullscreen mode Exit fullscreen mode

Mapping-Template

The request and response template for our Mutation are quite simple:

## Mutation.updateUserScore.request.vtl

{
    "version": "2017-02-28",
    "payload": $util.toJson($context.arguments)
}
Enter fullscreen mode Exit fullscreen mode
## Mutation.updateUserScore.response.vtl

$util.toJson($ctx.result)
Enter fullscreen mode Exit fullscreen mode

Lambda function

Inside our Lambda function we create an AppSyncClient and the Mutation. DynamoDB Streams are batched. Therefore wie have to iterate over the incoming Records and execute the Mutation.

const AWS = require('aws-sdk')
const appsync = require('aws-appsync')
const gql = require('graphql-tag')
const fetch = require('node-fetch')

if (!globalThis.fetch) globalThis.fetch = fetch

const graphqlClient = new appsync.AWSAppSyncClient({
  url: process.env.APP_SYNC_API_URL,
  region: process.env.AWS_REGION,
  auth: {
    type: 'AWS_IAM',
    credentials: {
      accessKeyId: process.env.AWS_ACCESS_KEY_ID,
      secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
      sessionToken: process.env.AWS_SESSION_TOKEN
    }
  },
  disableOffline: true
})

const mutation = gql`mutation UpdateUserScore($id: ID!, $score: Int!) {
  updateUserScore(id: $id, score: $score) {
    id
    score
  }
}`

exports.handler = (event) => {
  event.Records.forEach((record) => {
    if (record.eventName !== 'MODIFY') return

    const item = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage)

    graphqlClient.mutate({
      mutation,
      variables: {
        id: item.id,
        score: item.score
      }
    })
  })
}
Enter fullscreen mode Exit fullscreen mode

Result

Alt Text

Suggestions or feedback

If you got any kind of feedback, suggestions or ideas - feel free and write a comment below this article. There is always space for improvement!

💖 💪 🙅 🚩
marcostreng
Marco Streng

Posted on June 11, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related