Getting started
Install
yarn add graphql-sse
Create a GraphQL schema
import { GraphQLSchema, GraphQLObjectType, GraphQLString } from 'graphql';
const schema = new GraphQLSchema({
query: new GraphQLObjectType({
name: 'Query',
fields: {
hello: {
type: GraphQLString,
resolve: () => 'world',
},
},
}),
subscription: new GraphQLObjectType({
name: 'Subscription',
fields: {
greetings: {
type: GraphQLString,
subscribe: async function* () {
for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) {
yield { greetings: hi };
}
},
},
},
}),
});
Start the server
import http from 'http';
import { createHandler } from 'graphql-sse';
const handler = createHandler({
schema,
});
const server = http.createServer((req, res) => {
if (req.url.startsWith('/graphql/stream')) return handler(req, res);
return res.writeHead(404).end();
});
server.listen(4000);
console.log('Listening to port 4000');
Browsers might complain about self-signed SSL/TLS certificates. Help can be found on StackOverflow.
$ openssl req -x509 -newkey rsa:2048 -nodes -sha256 -subj '/CN=localhost' \
-keyout localhost-privkey.pem -out localhost-cert.pem
import fs from 'fs';
import http2 from 'http2';
import { createHandler } from 'graphql-sse';
const handler = createHandler({
schema,
});
const server = http2.createSecureServer(
{
key: fs.readFileSync('localhost-privkey.pem'),
cert: fs.readFileSync('localhost-cert.pem'),
},
(req, res) => {
if (req.url.startsWith('/graphql/stream')) return handler(req, res);
return res.writeHead(404).end();
},
);
server.listen(4000);
console.log('Listening to port 4000');
import express from 'express';
import { createHandler } from 'graphql-sse';
const handler = createHandler({ schema });
const app = express();
app.use('/graphql/stream', handler);
app.listen(4000);
console.log('Listening to port 4000');
import Fastify from 'fastify';
import { createHandler } from 'graphql-sse';
const handler = createHandler({ schema });
const fastify = Fastify();
fastify.all('/graphql/stream', (req, res) =>
handler(
req.raw,
res.raw,
req.body,
),
);
fastify.listen(4000);
console.log('Listening to port 4000');
Use the client
import { createClient } from 'graphql-sse';
const client = createClient({
url: 'http://localhost:4000/graphql/stream',
});
(async () => {
const result = await new Promise((resolve, reject) => {
let result;
client.subscribe(
{
query: '{ hello }',
},
{
next: (data) => (result = data),
error: reject,
complete: () => resolve(result),
},
);
});
expect(result).toEqual({ hello: 'world' });
})();
(async () => {
const onNext = () => {
};
let unsubscribe = () => {
};
await new Promise((resolve, reject) => {
unsubscribe = client.subscribe(
{
query: 'subscription { greetings }',
},
{
next: onNext,
error: reject,
complete: resolve,
},
);
});
expect(onNext).toBeCalledTimes(5);
})();
Recipes
🔗 Client usage with Promise
import { createClient, SubscribePayload } from 'graphql-sse';
const client = createClient({
url: 'http://hey.there:4000/graphql/stream',
});
async function execute<T>(payload: SubscribePayload) {
return new Promise<T>((resolve, reject) => {
let result: T;
client.subscribe<T>(payload, {
next: (data) => (result = data),
error: reject,
complete: () => resolve(result),
});
});
}
(async () => {
try {
const result = await execute({
query: '{ hello }',
});
} catch (err) {
}
})();
🔗 Client usage with AsyncIterator
import { createClient, SubscribePayload } from 'graphql-sse';
const client = createClient({
url: 'http://iterators.ftw:4000/graphql/stream',
});
function subscribe<T>(payload: SubscribePayload): AsyncGenerator<T> {
let deferred: {
resolve: (done: boolean) => void;
reject: (err: unknown) => void;
} | null = null;
const pending: T[] = [];
let throwMe: unknown = null,
done = false;
const dispose = client.subscribe<T>(payload, {
next: (data) => {
pending.push(data);
deferred?.resolve(false);
},
error: (err) => {
throwMe = err;
deferred?.reject(throwMe);
},
complete: () => {
done = true;
deferred?.resolve(true);
},
});
return {
[Symbol.asyncIterator]() {
return this;
},
async next() {
if (done) return { done: true, value: undefined };
if (throwMe) throw throwMe;
if (pending.length) return { value: pending.shift()! };
return (await new Promise<boolean>(
(resolve, reject) => (deferred = { resolve, reject }),
))
? { done: true, value: undefined }
: { value: pending.shift()! };
},
async throw(err) {
throw err;
},
async return() {
dispose();
return { done: true, value: undefined };
},
};
}
(async () => {
const subscription = subscribe({
query: 'subscription { greetings }',
});
for await (const result of subscription) {
}
})();
🔗 Client usage with Observable
import { Observable } from 'relay-runtime';
import { Observable } from '@apollo/client/core';
import { Observable } from 'rxjs';
import Observable from 'zen-observable';
const client = createClient({
url: 'http://graphql.loves:4000/observables',
});
function toObservable(operation) {
return new Observable((observer) =>
client.subscribe(operation, {
next: (data) => observer.next(data),
error: (err) => observer.error(err),
complete: () => observer.complete(),
}),
);
}
const observable = toObservable({ query: `subscription { ping }` });
const subscription = observable.subscribe({
next: (data) => {
expect(data).toBe({ data: { ping: 'pong' } });
},
});
subscription.unsubscribe();
🔗 Client usage with Relay
import { GraphQLError } from 'graphql';
import {
Network,
Observable,
RequestParameters,
Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';
const subscriptionsClient = createClient({
url: 'http://i.love:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
return Observable.create((sink) => {
if (!operation.text) {
return sink.error(new Error('Operation text cannot be empty'));
}
return subscriptionsClient.subscribe(
{
operationName: operation.name,
query: operation.text,
variables,
},
sink,
);
});
}
export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
🔗 Client usage with urql
import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createWSClient } from 'graphql-sse';
const sseClient = createWSClient({
url: 'http://its.urql:4000/graphql/stream',
});
const client = createClient({
url: '/graphql/stream',
exchanges: [
...defaultExchanges,
subscriptionExchange({
forwardSubscription(operation) {
return {
subscribe: (sink) => {
const dispose = sseClient.subscribe(operation, sink);
return {
unsubscribe: dispose,
};
},
};
},
}),
],
});
🔗 Client usage with Apollo
import {
ApolloLink,
Operation,
FetchResult,
Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';
class SSELink extends ApolloLink {
private client: Client;
constructor(options: ClientOptions) {
super();
this.client = createClient(options);
}
public request(operation: Operation): Observable<FetchResult> {
return new Observable((sink) => {
return this.client.subscribe<FetchResult>(
{ ...operation, query: print(operation.query) },
{
next: sink.next.bind(sink),
complete: sink.complete.bind(sink),
error: sink.error.bind(sink),
},
);
});
}
}
const link = new SSELink({
url: 'http://where.is:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
🔗 Client usage for HTTP/1 (aka. single connection mode)
import { createClient } from 'graphql-sse';
const client = createClient({
singleConnection: true,
url: 'http://use.single:4000/connection/graphql/stream',
});
🔗 Client usage with custom retry timeout strategy
import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';
const url = 'http://i.want.retry:4000/control/graphql/stream';
const client = createClient({
url,
retryWait: async function waitForServerHealthyBeforeRetry() {
await waitForHealthy(url);
await new Promise((resolve) =>
setTimeout(resolve, 1000 + Math.random() * 3000),
);
},
});
🔗 Client usage in browser
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>GraphQL over Server-Sent Events</title>
<script
type="text/javascript"
src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
></script>
</head>
<body>
<script type="text/javascript">
const client = graphqlSse.createClient({
url: 'http://umdfor.the:4000/win/graphql/stream',
});
</script>
</body>
</html>
🔗 Client usage in Node
const ws = require('ws');
const fetch = require('node-fetch');
const { AbortController } = require('node-abort-controller');
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');
const client = createClient({
url: 'http://no.browser:4000/graphql/stream',
fetchFn: fetch,
abortControllerImpl: AbortController,
generateID: () =>
([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
(c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
),
});
🔗 Server handler usage with custom authentication
import { createHandler } from 'graphql-sse';
import {
schema,
getOrCreateTokenFromCookies,
customAuthenticationTokenDiscovery,
processAuthorizationHeader,
} from './my-graphql';
const handler = createHandler({
schema,
authenticate: async (req, res) => {
let token = req.headers['x-graphql-event-stream-token'];
if (token) {
return Array.isArray(token) ? token.join('') : token;
}
token = getOrCreateTokenFromCookies(req);
token = processAuthorizationHeader(req.headers['authorization']);
token = await customAuthenticationTokenDiscovery(req);
if (!token) return res.writeHead(401, 'Unauthorized').end();
if (req.method === 'POST' && req.headers.accept === 'text/event-stream') {
return '';
}
return token;
},
});
🔗 Server handler usage with dynamic schema
import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';
const handler = createHandler({
schema: async (req, executionArgsWithoutSchema) => {
const isAdmin = await checkIsAdmin(req);
if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
return schema;
},
});
🔗 Server handler usage with custom context value
import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';
const handler = createHandler({
schema,
context: async (req, args) => {
return getDynamicContext(req, args);
},
});
🔗 Server handler usage with custom execution arguments
import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';
const handler = createHandler({
onSubscribe: async (req, _res, params) => {
const schema = await getSchema(req);
const args = {
schema,
operationName: params.operationName,
document: parse(params.query),
variableValues: params.variables,
};
return args;
},
});
🔗 Server handler and client usage with persisted queries
import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql-schema';
type QueryID = string;
const queriesStore: Record<QueryID, ExecutionArgs> = {
iWantTheGreetings: {
schema,
document: parse('subscription Greetings { greetings }'),
},
};
const handler = createHandler(
{
onSubscribe: (req, res, params) => {
const persistedQuery = queriesStore[params.extensions?.persistedQuery];
if (persistedQuery) {
return {
...persistedQuery,
variableValues: params.variables,
};
}
return res.writeHead(404, 'Query Not Found').end();
},
},
wsServer,
);
import { createClient } from 'graphql-sse';
const client = createClient({
url: 'http://persisted.graphql:4000/queries',
});
(async () => {
const onNext = () => {
};
await new Promise((resolve, reject) => {
client.subscribe(
{
query: '',
extensions: {
persistedQuery: 'iWantTheGreetings',
},
},
{
next: onNext,
error: reject,
complete: resolve,
},
);
});
expect(onNext).toBeCalledTimes(5);
})();
Check the docs folder out for TypeDoc generated documentation.
Read about the exact transport intricacies used by the library in the GraphQL over Server-Sent Events Protocol document.
File a bug, contribute with code, or improve documentation? Read up on our guidelines for contributing and drive development with yarn test --watch
away!