Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
graphql-sse
Advanced tools
Zero-dependency, HTTP/1 safe, simple, GraphQL over Server-Sent Events Protocol server and client
The graphql-sse package provides a way to implement GraphQL over Server-Sent Events (SSE). This allows for real-time updates and subscriptions in a GraphQL API using the SSE protocol, which is a simpler alternative to WebSockets.
Setting up a GraphQL server with SSE
This code sets up a basic GraphQL server with SSE support. It defines a schema with a query and a subscription, and starts an HTTP server that listens for GraphQL SSE requests.
const { createServer } = require('http');
const { execute, subscribe } = require('graphql');
const { makeExecutableSchema } = require('@graphql-tools/schema');
const { GraphQLSse } = require('graphql-sse');
const typeDefs = `
type Query {
hello: String
}
type Subscription {
time: String
}
`;
const resolvers = {
Query: {
hello: () => 'Hello world!',
},
Subscription: {
time: {
subscribe: async function* () {
while (true) {
yield { time: new Date().toISOString() };
await new Promise(resolve => setTimeout(resolve, 1000));
}
},
},
},
};
const schema = makeExecutableSchema({ typeDefs, resolvers });
const server = createServer(GraphQLSse({ schema, execute, subscribe }));
server.listen(4000, () => {
console.log('Server is running on http://localhost:4000/graphql');
});
Client-side subscription using EventSource
This code demonstrates how to subscribe to a GraphQL subscription using the EventSource API on the client side. It listens for messages from the server and logs the data to the console.
const eventSource = new EventSource('http://localhost:4000/graphql?query=subscription{time}');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log(data);
};
The subscriptions-transport-ws package provides WebSocket support for GraphQL subscriptions. It is more complex than graphql-sse but offers more features and flexibility for real-time communication.
The graphql-ws package is a lightweight and modern alternative to subscriptions-transport-ws for implementing GraphQL subscriptions over WebSocket. It is designed to be simple and efficient, similar to graphql-sse but using WebSocket instead of SSE.
Apollo Server is a popular GraphQL server implementation that supports subscriptions using WebSocket. It is a comprehensive solution for building GraphQL APIs and includes many features beyond just subscriptions.
$ yarn add graphql-sse
import { GraphQLSchema, GraphQLObjectType, GraphQLString } from 'graphql';
/**
* Construct a GraphQL schema and define the necessary resolvers.
*
* type Query {
* hello: String
* }
* type Subscription {
* greetings: String
* }
*/
const schema = new GraphQLSchema({
query: new GraphQLObjectType({
name: 'Query',
fields: {
hello: {
type: GraphQLString,
resolve: () => 'world',
},
},
}),
subscription: new GraphQLObjectType({
name: 'Subscription',
fields: {
greetings: {
type: GraphQLString,
subscribe: async function* () {
for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) {
yield { greetings: hi };
}
},
},
},
}),
});
http
import http from 'http';
import { createHandler } from 'graphql-sse';
// Create the GraphQL over SSE handler
const handler = createHandler({
schema, // from the previous step
});
// Create a HTTP server using the handler on `/graphql/stream`
const server = http.createServer((req, res) => {
if (req.url.startsWith('/graphql/stream')) return handler(req, res);
return res.writeHead(404).end();
});
server.listen(4000);
console.log('Listening to port 4000');
http2
Browsers might complain about self-signed SSL/TLS certificates. Help can be found on StackOverflow.
$ openssl req -x509 -newkey rsa:2048 -nodes -sha256 -subj '/CN=localhost' \
-keyout localhost-privkey.pem -out localhost-cert.pem
import fs from 'fs';
import http2 from 'http2';
import { createHandler } from 'graphql-sse';
// Create the GraphQL over SSE handler
const handler = createHandler({
schema, // from the previous step
});
// Create a HTTP/2 server using the handler on `/graphql/stream`
const server = http2.createSecureServer(
{
key: fs.readFileSync('localhost-privkey.pem'),
cert: fs.readFileSync('localhost-cert.pem'),
},
(req, res) => {
if (req.url.startsWith('/graphql/stream')) return handler(req, res);
return res.writeHead(404).end();
},
);
server.listen(4000);
console.log('Listening to port 4000');
express
import express from 'express'; // yarn add express
import { createHandler } from 'graphql-sse';
// Create the GraphQL over SSE handler
const handler = createHandler({ schema });
// Create an express app serving all methods on `/graphql/stream`
const app = express();
app.use('/graphql/stream', handler);
app.listen(4000);
console.log('Listening to port 4000');
fastify
import Fastify from 'fastify'; // yarn add fastify
import { createHandler } from 'graphql-sse';
// Create the GraphQL over SSE handler
const handler = createHandler({ schema });
// Create a fastify instance serving all methods on `/graphql/stream`
const fastify = Fastify();
fastify.all('/graphql/stream', (req, res) =>
handler(
req.raw,
res.raw,
req.body, // fastify reads the body for you
),
);
fastify.listen(4000);
console.log('Listening to port 4000');
import { createClient } from 'graphql-sse';
const client = createClient({
// singleConnection: true, use "single connection mode" instead of the default "distinct connection mode"
url: 'http://localhost:4000/graphql/stream',
});
// query
(async () => {
const result = await new Promise((resolve, reject) => {
let result;
client.subscribe(
{
query: '{ hello }',
},
{
next: (data) => (result = data),
error: reject,
complete: () => resolve(result),
},
);
});
expect(result).toEqual({ hello: 'world' });
})();
// subscription
(async () => {
const onNext = () => {
/* handle incoming values */
};
let unsubscribe = () => {
/* complete the subscription */
};
await new Promise((resolve, reject) => {
unsubscribe = client.subscribe(
{
query: 'subscription { greetings }',
},
{
next: onNext,
error: reject,
complete: resolve,
},
);
});
expect(onNext).toBeCalledTimes(5); // we say "Hi" in 5 languages
})();
import { createClient, SubscribePayload } from 'graphql-sse';
const client = createClient({
url: 'http://hey.there:4000/graphql/stream',
});
async function execute<T>(payload: SubscribePayload) {
return new Promise<T>((resolve, reject) => {
let result: T;
client.subscribe<T>(payload, {
next: (data) => (result = data),
error: reject,
complete: () => resolve(result),
});
});
}
// use
(async () => {
try {
const result = await execute({
query: '{ hello }',
});
// complete
// next = result = { data: { hello: 'Hello World!' } }
} catch (err) {
// error
}
})();
import { createClient, SubscribePayload } from 'graphql-sse';
const client = createClient({
url: 'http://iterators.ftw:4000/graphql/stream',
});
function subscribe<T>(payload: SubscribePayload): AsyncIterableIterator<T> {
let deferred: {
resolve: (done: boolean) => void;
reject: (err: unknown) => void;
} | null = null;
const pending: T[] = [];
let throwMe: unknown = null,
done = false;
const dispose = client.subscribe<T>(payload, {
next: (data) => {
pending.push(data);
deferred?.resolve(false);
},
error: (err) => {
throwMe = err;
deferred?.reject(throwMe);
},
complete: () => {
done = true;
deferred?.resolve(true);
},
});
return {
[Symbol.asyncIterator]() {
return this;
},
async next() {
if (done) return { done: true, value: undefined };
if (throwMe) throw throwMe;
if (pending.length) return { value: pending.shift()! };
return (await new Promise<boolean>(
(resolve, reject) => (deferred = { resolve, reject }),
))
? { done: true, value: undefined }
: { value: pending.shift()! };
},
async return() {
dispose();
return { done: true, value: undefined };
},
};
}
(async () => {
const subscription = subscribe({
query: 'subscription { greetings }',
});
// subscription.return() to dispose
for await (const result of subscription) {
// next = result = { data: { greetings: 5x } }
}
// complete
})();
import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable
const client = createClient({
url: 'http://graphql.loves:4000/observables',
});
function toObservable(operation) {
return new Observable((observer) =>
client.subscribe(operation, {
next: (data) => observer.next(data),
error: (err) => observer.error(err),
complete: () => observer.complete(),
}),
);
}
const observable = toObservable({ query: `subscription { ping }` });
const subscription = observable.subscribe({
next: (data) => {
expect(data).toBe({ data: { ping: 'pong' } });
},
});
// ⏱
subscription.unsubscribe();
import { GraphQLError } from 'graphql';
import {
Network,
Observable,
RequestParameters,
Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';
const subscriptionsClient = createClient({
url: 'http://i.love:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
// yes, both fetch AND subscribe can be handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
return Observable.create((sink) => {
if (!operation.text) {
return sink.error(new Error('Operation text cannot be empty'));
}
return subscriptionsClient.subscribe(
{
operationName: operation.name,
query: operation.text,
variables,
},
sink,
);
});
}
export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createWSClient } from 'graphql-sse';
const sseClient = createWSClient({
url: 'http://its.urql:4000/graphql/stream',
});
const client = createClient({
url: '/graphql/stream',
exchanges: [
...defaultExchanges,
subscriptionExchange({
forwardSubscription(operation) {
return {
subscribe: (sink) => {
const dispose = sseClient.subscribe(operation, sink);
return {
unsubscribe: dispose,
};
},
};
},
}),
],
});
import {
ApolloLink,
Operation,
FetchResult,
Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';
class SSELink extends ApolloLink {
private client: Client;
constructor(options: ClientOptions) {
super();
this.client = createClient(options);
}
public request(operation: Operation): Observable<FetchResult> {
return new Observable((sink) => {
return this.client.subscribe<FetchResult>(
{ ...operation, query: print(operation.query) },
{
next: sink.next.bind(sink),
complete: sink.complete.bind(sink),
error: sink.error.bind(sink),
},
);
});
}
}
const link = new SSELink({
url: 'http://where.is:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
import { createClient } from 'graphql-sse';
const client = createClient({
singleConnection: true, // this is literally it 😄
url: 'http://use.single:4000/connection/graphql/stream',
// lazy: true (default) -> connect on first subscribe and disconnect on last unsubscribe
// lazy: false -> connect as soon as the client is created
});
// The client will now run in a "single connection mode" mode. Meaning,
// a single SSE connection will be used to transmit all operation results
// while separate HTTP requests will be issued to dictate the behaviour.
import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';
const url = 'http://i.want.retry:4000/control/graphql/stream';
const client = createClient({
url,
retryWait: async function waitForServerHealthyBeforeRetry() {
// if you have a server healthcheck, you can wait for it to become
// healthy before retrying after an abrupt disconnect (most commonly a restart)
await waitForHealthy(url);
// after the server becomes ready, wait for a second + random 1-4s timeout
// (avoid DDoSing yourself) and try connecting again
await new Promise((resolve) =>
setTimeout(resolve, 1000 + Math.random() * 3000),
);
},
});
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>GraphQL over Server-Sent Events</title>
<script
type="text/javascript"
src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
></script>
</head>
<body>
<script type="text/javascript">
const client = graphqlSse.createClient({
url: 'http://umdfor.the:4000/win/graphql/stream',
});
// consider other recipes for usage inspiration
</script>
</body>
</html>
const ws = require('ws'); // yarn add ws
const fetch = require('node-fetch'); // yarn add node-fetch
const { AbortController } = require('node-abort-controller'); // (node < v15) yarn add node-abort-controller
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');
const client = createClient({
url: 'http://no.browser:4000/graphql/stream',
fetchFn: fetch,
abortControllerImpl: AbortController, // node < v15
/**
* Generates a v4 UUID to be used as the ID.
* Reference: https://gist.github.com/jed/982883
*/
generateID: () =>
([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
(c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
),
});
// consider other recipes for usage inspiration
import { createHandler } from 'graphql-sse';
import {
schema,
getOrCreateTokenFromCookies,
customAuthenticationTokenDiscovery,
processAuthorizationHeader,
} from './my-graphql';
const handler = createHandler({
schema,
authenticate: async (req, res) => {
let token = req.headers['x-graphql-event-stream-token'];
if (token) {
// When the client is working in a "single connection mode"
// all subsequent requests for operations will have the
// stream token set in the `X-GraphQL-Event-Stream-Token` header.
//
// It is considered safe to accept the header token always
// because if a stream reservation does not exist, or is already
// fulfilled, the handler itself will reject the request.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
return Array.isArray(token) ? token.join('') : token;
}
// It is necessary to generate a unique token when dealing with
// clients that operate in the "single connection mode". The process
// of generating the token is completely up to the implementor.
token = getOrCreateTokenFromCookies(req);
// or
token = processAuthorizationHeader(req.headers['authorization']);
// or
token = await customAuthenticationTokenDiscovery(req);
// Using the response argument the implementor may respond to
// authentication issues however he sees fit.
if (!token) return res.writeHead(401, 'Unauthorized').end();
// Clients that operate in "distinct connections mode" dont
// need a unique stream token. It is completely ok to simply
// return an empty string for authenticated clients.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode
if (req.method === 'POST' && req.headers.accept === 'text/event-stream') {
// "distinct connections mode" requests an event-stream with a POST
// method. These two checks, together with the lack of `X-GraphQL-Event-Stream-Token`
// header, are sufficient for accurate detection.
return ''; // return token; is OK too
}
// On the other hand, clients operating in "single connection mode"
// need a unique stream token which will be provided alongside the
// incoming event stream request inside the `X-GraphQL-Event-Stream-Token` header.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
return token;
},
});
// use `handler` with your favourite http library
import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';
const handler = createHandler({
schema: async (req, executionArgsWithoutSchema) => {
// will be called on every subscribe request
// allowing you to dynamically supply the schema
// using the depending on the provided arguments
const isAdmin = await checkIsAdmin(req);
if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
return schema;
},
});
// use `handler` with your favourite http library
import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';
const handler = createHandler({
schema,
// or static context by supplying the value direcly
context: async (req, args) => {
return getDynamicContext(req, args);
},
});
// use `handler` with your favourite http library
import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';
const handler = createHandler({
onSubscribe: async (req, _res, params) => {
const schema = await getSchema(req);
const args = {
schema,
operationName: params.operationName,
document: parse(params.query),
variableValues: params.variables,
};
return args;
},
});
// use `handler` with your favourite http library
// 🛸 server
import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql-schema';
// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;
const queriesStore: Record<QueryID, ExecutionArgs> = {
iWantTheGreetings: {
schema, // you may even provide different schemas in the queries store
document: parse('subscription Greetings { greetings }'),
},
};
const handler = createHandler(
{
onSubscribe: (req, res, params) => {
const persistedQuery = queriesStore[params.extensions?.persistedQuery];
if (persistedQuery) {
return {
...persistedQuery,
variableValues: params.variables, // use the variables from the client
};
}
// for extra security only allow the queries from the store
return res.writeHead(404, 'Query Not Found').end();
},
},
wsServer,
);
// use `handler` with your favourite http library
// 📺 client
import { createClient } from 'graphql-sse';
const client = createClient({
url: 'http://persisted.graphql:4000/queries',
});
(async () => {
const onNext = () => {
/**/
};
await new Promise((resolve, reject) => {
client.subscribe(
{
query: '', // query field is required, but you can leave it empty for persisted queries
extensions: {
persistedQuery: 'iWantTheGreetings',
},
},
{
next: onNext,
error: reject,
complete: resolve,
},
);
});
expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();
Check the docs folder out for TypeDoc generated documentation.
Read about the exact transport intricacies used by the library in the GraphQL over Server-Sent Events Protocol document.
File a bug, contribute with code, or improve documentation? Read up on our guidelines for contributing and drive development with yarn test --watch
away!
FAQs
Zero-dependency, HTTP/1 safe, simple, GraphQL over Server-Sent Events Protocol server and client
We found that graphql-sse demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.