Graphql Lambda Subscriptions
Amazon Lambda Powered GraphQL Subscriptions. This is an Amazon Lambda Serverless equivalent to graphql-ws
. It follows the graphql-ws prototcol
. It is tested with the Architect Sandbox against graphql-ws
directly and run in production today. For many applications graphql-lambda-subscriptions
should do what graphql-ws
does for you today without having to run a server. This started as fork of subscriptionless
another library with similar goals.
As subscriptionless
's tagline goes;
Have all the functionality of GraphQL subscriptions on a stateful server without the cost.
Why a fork?
I had different requirements and needed more features. This project wouldn't exist without subscriptionless
and you should totally check it out.
Features
- Only needs DynamoDB, API Gateway and Lambda (no app sync or other managed graphql platform required, can use step functions for ping/pong support)
- Provides a Pub/Sub system to broadcast events to subscriptions
- Provides hooks for the full lifecycle of a subscription
- Type compatible with GraphQL and
nexus.js
- Optional Logging
Quick Start
Since there are many ways to deploy to amazon lambda I'm going to have to get opinionated in the quick start and pick Architect. graphql-lambda-subscriptions
should work on Lambda regardless of your deployment and packaging framework. Take a look at the arc-basic-events mock used for integration testing for an example of using it with Architect.
API Docs
Can be found in our docs folder. You'll want to start with makeServer()
and subscribe()
.
Setup
Create a graphql-lambda-subscriptions server
import { makeServer } from 'graphql-lambda-subscriptions'
const subscriptionServer = makeServer({
dynamodb,
schema,
})
Export the handler
export const handler = subscriptionServer.webSocketHandler
Configure API Gateway
Set up API Gateway to route WebSocket events to the exported handler.
📖 Architect Example
@app
basic-events
@ws
📖 Serverless Framework Example
functions:
websocket:
name: my-subscription-lambda
handler: ./handler.handler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default
Create DynamoDB tables for state
In-flight connections and subscriptions need to be persisted.
Changing DynamoDB table names
Use the tableNames
argument to override the default table names.
const instance = makeServer({
tableNames: {
connections: 'my_connections',
subscriptions: 'my_subscriptions',
},
})
const fetchTableNames = async () => {
return {
connections,
subscriptions,
}
}
const instance = makeServer({
tableNames: fetchTableNames(),
})
💾 Architect Example
@tables
Connection
id *String
ttl TTL
Subscription
id *String
ttl TTL
@indexes
Subscription
connectionId *String
name ConnectionIndex
Subscription
topic *String
name TopicIndex
import { tables } from '@architect/functions'
const fetchTableNames = async () => {
const tables = await tables()
const ensureName = (table) => {
const actualTableName = tables.name(table)
if (!actualTableName) {
throw new Error(`No table found for ${table}`)
}
return actualTableName
}
return {
connections: ensureName('Connection'),
subscriptions: ensureName('Subscription'),
}
}
const subscriptionServer = makeServer({
dynamodb: tables.db,
schema,
tableNames: fetchTableNames(),
})
💾 Serverless Framework Example
resources:
Resources:
connectionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.CONNECTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
subscriptionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.SUBSCRIPTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
- AttributeName: topic
AttributeType: S
- AttributeName: connectionId
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: ConnectionIndex
KeySchema:
- AttributeName: connectionId
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
- IndexName: TopicIndex
KeySchema:
- AttributeName: topic
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
💾 terraform example
resource "aws_dynamodb_table" "connections-table" {
name = "graphql_connections"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
attribute {
name = "id"
type = "S"
}
}
resource "aws_dynamodb_table" "subscriptions-table" {
name = "graphql_subscriptions"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
attribute {
name = "id"
type = "S"
}
attribute {
name = "topic"
type = "S"
}
attribute {
name = "connectionId"
type = "S"
}
global_secondary_index {
name = "ConnectionIndex"
hash_key = "connectionId"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
global_secondary_index {
name = "TopicIndex"
hash_key = "topic"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
}
PubSub
graphql-lambda-subscriptions
uses it's own PubSub implementation.
Subscribing to Topics
Use the subscribe
function to associate incoming subscriptions with a topic.
import { subscribe } from 'graphql-lambda-subscriptions'
export const resolver = {
Subscribe: {
mySubscription: {
subscribe: subscribe('MY_TOPIC'),
resolve: (event, args, context) => {}
}
}
}
📖 Filtering events
Use the subscribe
with SubscribeOptions
to allow for filtering.
Note: If a function is provided, it will be called on subscription start and must return a serializable object.
import { subscribe } from 'graphql-lambda-subscriptions'
subscribe('MY_TOPIC', {
filter: {
attr1: '`attr1` must have this value',
attr2: {
attr3: 'Nested attributes work fine',
},
}
})
subscribe('MY_TOPIC',{
filter: (root, args, context, info) => ({
userId: args.userId,
}),
})
Publishing events
Use the publish()
function on your graphql-lambda-subscriptions server to publish events to active subscriptions. Payloads must be of type Record<string, any>
so they can be filtered and stored.
subscriptionServer.publish({
topic: 'MY_TOPIC',
payload: {
message: 'Hey!',
},
})
Events can come from many sources
export const snsHandler = (event) =>
Promise.all(
event.Records.map((r) =>
subscriptionServer.publish({
topic: r.Sns.TopicArn.substring(r.Sns.TopicArn.lastIndexOf(':') + 1),
payload: JSON.parse(r.Sns.Message),
})
)
)
export const invocationHandler = (payload) => subscriptionServer.publish({ topic: 'MY_TOPIC', payload })
Completing Subscriptions
Use the complete
on your graphql-lambda-subscriptions server to complete active subscriptions. Payloads are optional and match against filters like events do.
subscriptionServer.complete({
topic: 'MY_TOPIC',
payload: {
message: 'Hey!',
},
})
Context
Context is provided on the ServerArgs
object when creating a server. The values are accessible in all callback and resolver functions (eg. resolve
, filter
, onAfterSubscribe
, onSubscribe
and onComplete
).
Assuming no context
argument is provided when creating the server, the default value is an object with connectionInitPayload
, connectionId
properties and the publish()
and complete()
functions. These properties are merged into a provided object or passed into a provided function.
Setting static context value
An object can be provided via the context
attribute when calling makeServer
.
const instance = makeServer({
context: {
myAttr: 'hello',
},
})
The default values (above) will be appended to this object prior to execution.
Setting dynamic context value
A function (optionally async) can be provided via the context
attribute when calling makeServer
.
The default context value is passed as an argument.
const instance = makeServer({
context: ({ connectionInitPayload }) => ({
myAttr: 'hello',
user: connectionInitPayload.user,
}),
})
Using the context
export const resolver = {
Subscribe: {
mySubscription: {
subscribe: subscribe('GREETINGS', {
filter(_, _, context) {
console.log(context.connectionId)
},
async onAfterSubscribe(_, _, { connectionId, publish }) {
await publish('GREETINGS', { message: `HI from ${connectionId}!` })
}
})
resolve: (event, args, context) => {
console.log(context.connectionInitPayload)
return event.payload.message
},
},
},
}
Side effects
Side effect handlers can be declared on subscription fields to handle onSubscribe
(start) and onComplete
(stop) events.
📖 Adding side-effect handlers
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {
},
subscribe: subscribe('MY_TOPIC', {
}),
},
},
}
Events
Global events can be provided when calling makeServer
to track the execution cycle of the lambda.
📖 Connect (onConnect)
Called when a WebSocket connection is first established.
const instance = makeServer({
onConnect: ({ event }) => {
},
})
📖 Disconnect (onDisconnect)
Called when a WebSocket connection is disconnected.
const instance = makeServer({
onDisconnect: ({ event }) => {
},
})
📖 Authorization (connection_init)
onConnectionInit
can be used to verify the connection_init
payload prior to persistence.
Note: Any sensitive data in the incoming message should be removed at this stage.
const instance = makeServer({
onConnectionInit: ({ message }) => {
const token = message.payload.token
if (!myValidation(token)) {
throw Error('Token validation failed')
}
return {
...message.payload,
token: undefined,
}
},
})
By default, the (optionally parsed) payload will be accessible via context.
📖 Subscribe (onSubscribe)
Subscribe (onSubscribe)
Called when any subscription message is received.
const instance = makeServer({
onSubscribe: ({ event, message }) => {
},
})
📖 Complete (onComplete)
Called when any complete message is received.
const instance = makeServer({
onComplete: ({ event, message }) => {
},
})
📖 Error (onError)
Called when any error is encountered
const instance = makeServer({
onError: (error, context) => {
},
})
Caveats
Ping/Pong
For whatever reason, AWS API Gateway does not support WebSocket protocol level ping/pong. So you can use Step Functions to do this. See pingPong
.
Socket idleness
API Gateway considers an idle connection to be one where no messages have been sent on the socket for a fixed duration (currently 10 minutes). The WebSocket spec has support for detecting idle connections (ping/pong) but API Gateway doesn't use it. This means, in the case where both parties are connected, and no message is sent on the socket for the defined duration (direction agnostic), API Gateway will close the socket. A fix for this is to set up immediate reconnection on the client side.
Socket Close Reasons
API Gateway doesn't support custom reasons or codes for WebSockets being closed. So the codes and reason strings wont match graphql-ws
.