AWS AppSync Subscriptions with DynamoDB Streams, Lambda and Serverless
Marco Streng
Posted on June 11, 2021
With AWS AppSync a GraphQL Subscription always has to be triggered by a GraphQL Mutation. But there are use cases in which you would like to trigger Subscriptions from another kind of event, for example an update in your DynamoDB which was done by an external service. In this article I'd like to show you how you can do this with DynamoDB Streams, Lambda and the Serverless framework.
Example
To keep this example simple let's say we develop a game with a list of user. Each user has got a name and a current score value. If the score of a user changes in our DynamoDB, we want to publish this new score to all subscribers.
Architecture
When a user item in our DynamoDB gets updated we execute a Lambda function via DynamoDB Streams. This Lambda function will execute the GraphQL Mutation on which our clients can subscribe to. As the updated data is already stored in our DynamoDB, the Mutation does not need a data source. It should only pass the incoming data to the subscribers.
Schema
First off all we create our Schema. The Mutation which will be executed by the Lambda is called updateUserScore
and takes the id and the new score of the user as arguments. It returns both fields within the UpdatedUserScore
type.
The Subscription is called onUpdateUserScore
. With the AppSync annotation @aws_subscribe()
we can easily subscribe to multiple Mutations. The optional id parameter allows to subscribe to all user or to a specific one.
type User {
id: ID!
name: String!
score: Int
}
type Query {
listUser: [User]
}
type UpdatedUserScore {
id: ID!
score: Int!
}
type Mutation {
createUser(name: String!): User!
updateUserScore(id: ID!, score: Int!): UpdatedUserScore!
}
type Subscription {
onUpdateUserScore(id: ID): UpdatedUserScore
@aws_subscribe(mutations: ["updateUserScore"])
}
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
Infrastrucutre
With the serverless-appsync-plugin it's easy to set up our AppSync instance. To keep the example simple we use AWS_IAM
for authentication. In real world you probably would use AMAZON_COGNITO_USER_POOLS
in addition so that your client app can also authenticate against AppSync.
As mentioned before, the Mutation does not need a data source. Therefore we can create a data source with type NONE
.
Don't forget the role statement so that our Lambda function is allowed to execute the AppSync Mutation.
service: subscription-demo
plugins:
- serverless-appsync-plugin
provider:
name: aws
runtime: nodejs12.x
region: eu-central-1
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:Scan
- dynamodb:Query
- dynamodb:PutItem
Resource:
- !GetAtt DynamoDbTableUser.Arn
- !Join [ '', [ !GetAtt DynamoDbTableUser.Arn, '/*' ] ]
- Effect: Allow
Action:
- appsync:GraphQL
Resource:
- !GetAtt GraphQlApi.Arn
- !Join [ '/', [ !GetAtt GraphQlApi.Arn, 'types', 'Mutation', 'fields', 'updateUserScore' ] ]
custom:
appSync:
name: ${self:service}
authenticationType: AWS_IAM
mappingTemplates:
- dataSource: TableUserDS
type: Query
field: listUser
- dataSource: None
type: Mutation
field: updateUserScore
- dataSource: TableUserDS
type: Mutation
field: createUser
dataSources:
- type: NONE
name: None
- type: AMAZON_DYNAMODB
name: TableUserDS
description: 'DynamoDB DynamoDbTableUser table'
config:
tableName: !Ref DynamoDbTableUser
functions:
handleDynamoDbStream:
handler: backend/handleDynamoDbStream.handler
environment:
APP_SYNC_API_URL: !GetAtt GraphQlApi.GraphQLUrl
events:
- stream:
type: dynamodb
arn: !GetAtt DynamoDbTableUser.StreamArn
resources:
Resources:
DynamoDbTableUser:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
StreamSpecification:
StreamViewType: NEW_IMAGE
Mapping-Template
The request and response template for our Mutation are quite simple:
## Mutation.updateUserScore.request.vtl
{
"version": "2017-02-28",
"payload": $util.toJson($context.arguments)
}
## Mutation.updateUserScore.response.vtl
$util.toJson($ctx.result)
Lambda function
Inside our Lambda function we create an AppSyncClient and the Mutation. DynamoDB Streams are batched. Therefore wie have to iterate over the incoming Records and execute the Mutation.
const AWS = require('aws-sdk')
const appsync = require('aws-appsync')
const gql = require('graphql-tag')
const fetch = require('node-fetch')
if (!globalThis.fetch) globalThis.fetch = fetch
const graphqlClient = new appsync.AWSAppSyncClient({
url: process.env.APP_SYNC_API_URL,
region: process.env.AWS_REGION,
auth: {
type: 'AWS_IAM',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
sessionToken: process.env.AWS_SESSION_TOKEN
}
},
disableOffline: true
})
const mutation = gql`mutation UpdateUserScore($id: ID!, $score: Int!) {
updateUserScore(id: $id, score: $score) {
id
score
}
}`
exports.handler = (event) => {
event.Records.forEach((record) => {
if (record.eventName !== 'MODIFY') return
const item = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage)
graphqlClient.mutate({
mutation,
variables: {
id: item.id,
score: item.score
}
})
})
}
Result
Suggestions or feedback
If you got any kind of feedback, suggestions or ideas - feel free and write a comment below this article. There is always space for improvement!
Posted on June 11, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.