@4c/graphql-subscription-server
Advanced tools
Comparing version 3.0.4 to 4.0.0
import { Logger } from './Logger'; | ||
export declare function createAsyncIterable<T>(values: T[]): AsyncGenerator<T, void, unknown>; | ||
export declare function createAsyncIterable<T>(values: T[]): AsyncGenerator<Awaited<T>, void, unknown>; | ||
export declare function map<T, U>(iterable: AsyncIterable<T>, mapper: (value: T) => U): AsyncGenerator<U, void, void>; | ||
@@ -4,0 +4,0 @@ export declare function filter<T>(iterable: AsyncIterable<T>, predicate: (value: T) => Promise<boolean> | boolean): AsyncGenerator<T, void, void>; |
"use strict"; | ||
exports.__esModule = true; | ||
exports.AsyncQueue = void 0; | ||
exports.createAsyncIterable = createAsyncIterable; | ||
exports.filter = filter; | ||
exports.map = map; | ||
exports.filter = filter; | ||
exports.AsyncQueue = void 0; | ||
@@ -28,15 +28,2 @@ /* eslint-disable no-await-in-loop */ | ||
constructor(options = {}) { | ||
this.close = async () => { | ||
if (this.setupPromise) { | ||
await this.setupPromise; | ||
} | ||
if (this.options.teardown) { | ||
await this.options.teardown(); | ||
} | ||
this.closed = true; | ||
this.push(null); | ||
}; | ||
this.options = options; | ||
@@ -96,4 +83,16 @@ this.values = []; | ||
close = async () => { | ||
if (this.setupPromise) { | ||
await this.setupPromise; | ||
} | ||
if (this.options.teardown) { | ||
await this.options.teardown(); | ||
} | ||
this.closed = true; | ||
this.push(null); | ||
}; | ||
} | ||
exports.AsyncQueue = AsyncQueue; |
@@ -33,216 +33,222 @@ "use strict"; | ||
constructor(socket, config) { | ||
this.hasPermission = (data, credentials) => { | ||
return this.config.hasPermission(data, credentials); | ||
}; | ||
this.socket = socket; | ||
this.config = config; | ||
this.log = config.createLogger('AuthorizedSocket'); | ||
this.subscriptionContexts = new Map(); | ||
this.clientId = this.socket.id; | ||
this.socket.on('authenticate', this.handleAuthenticate).on('subscribe', this.handleSubscribe).on('unsubscribe', this.handleUnsubscribe).on('connect', this.handleConnect).on('disconnect', this.handleDisconnect); | ||
} | ||
this.handleAuthenticate = async (authorization, cb) => { | ||
try { | ||
this.log('debug', 'authenticating connection', { | ||
clientId: this.clientId | ||
}); | ||
await this.config.credentialsManager.authenticate(authorization); | ||
} catch (error) { | ||
this.log('error', error.message, { | ||
error, | ||
clientId: this.clientId | ||
}); | ||
this.emitError({ | ||
code: 'invalid_authorization' | ||
}); | ||
} | ||
emitError(error) { | ||
this.socket.emit('app_error', error); | ||
} | ||
acknowledge(cb); | ||
}; | ||
async isAuthorized(data, hasPermission) { | ||
const credentials = await this.config.credentialsManager.getCredentials(); | ||
const isAuthorized = !!credentials && hasPermission(data, credentials); | ||
this.handleSubscribe = async ({ | ||
id, | ||
query, | ||
variables | ||
}, cb) => { | ||
let document; | ||
let resultOrStream; | ||
if (!isAuthorized) { | ||
this.log('info', 'unauthorized', { | ||
payload: data, | ||
credentials | ||
}); | ||
} | ||
try { | ||
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptionContexts.size >= this.config.maxSubscriptionsPerConnection) { | ||
this.log('debug', 'subscription limit reached', { | ||
clientId: this.clientId, | ||
maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection | ||
}); | ||
this.emitError({ | ||
code: 'subscribe_failed.subscription_limit' | ||
}); | ||
return; | ||
} | ||
return isAuthorized; | ||
} | ||
if (this.subscriptionContexts.has(id)) { | ||
this.log('debug', 'duplicate subscription attempted', { | ||
id, | ||
clientId: this.clientId | ||
}); | ||
this.emitError({ | ||
code: 'invalid_id.duplicate', | ||
data: id | ||
}); | ||
return; | ||
} | ||
hasPermission = (data, credentials) => { | ||
return this.config.hasPermission(data, credentials); | ||
}; | ||
handleAuthenticate = async (authorization, cb) => { | ||
try { | ||
this.log('debug', 'authenticating connection', { | ||
clientId: this.clientId | ||
}); | ||
await this.config.credentialsManager.authenticate(authorization); | ||
} catch (error) { | ||
this.log('error', error.message, { | ||
error, | ||
clientId: this.clientId | ||
}); | ||
this.emitError({ | ||
code: 'invalid_authorization' | ||
}); | ||
} | ||
document = (0, _graphql.parse)(query); | ||
const validationRules = [..._graphql.specifiedRules, ...(this.config.createValidationRules ? this.config.createValidationRules({ | ||
query, | ||
variables | ||
}) : [])]; | ||
const validationErrors = (0, _graphql.validate)(this.config.schema, document, validationRules); | ||
acknowledge(cb); | ||
}; | ||
/** | ||
* Handle user requesting a subscription. | ||
*/ | ||
if (validationErrors.length) { | ||
this.emitError({ | ||
code: 'subscribe_failed.document_error', | ||
data: validationErrors | ||
}); | ||
return; | ||
} | ||
handleSubscribe = async ({ | ||
id, | ||
query, | ||
variables | ||
}, cb) => { | ||
let document; | ||
let resultOrStream; | ||
const subscriptionContext = new _SubscriptionContext.default(this.config.subscriber); | ||
const sourcePromise = (0, _graphql.createSourceEventStream)(this.config.schema, document, null, { | ||
subscribe: async (topic, { | ||
hasPermission = this.config.hasPermission, | ||
...options | ||
} = {}) => { | ||
return AsyncUtils.filter(await subscriptionContext.subscribe(topic, options), data => this.isAuthorized(data, hasPermission)); | ||
}, | ||
subscriber: this.config.subscriber | ||
}, variables); | ||
this.subscriptionContexts.set(id, subscriptionContext); | ||
try { | ||
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptionContexts.size >= this.config.maxSubscriptionsPerConnection) { | ||
this.log('debug', 'subscription limit reached', { | ||
clientId: this.clientId, | ||
maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection | ||
}); | ||
this.emitError({ | ||
code: 'subscribe_failed.subscription_limit' | ||
}); | ||
return; | ||
} | ||
try { | ||
resultOrStream = await sourcePromise; | ||
} catch (err) { | ||
if (err instanceof _graphql.GraphQLError) { | ||
resultOrStream = { | ||
errors: [err] | ||
}; | ||
} else { | ||
this.subscriptionContexts.delete(id); | ||
throw err; | ||
} | ||
} | ||
if (resultOrStream.errors != null) { | ||
// Do a full unsubscribe because the error may have occured | ||
// after the subscription was set up | ||
this.handleUnsubscribe(id); | ||
const { | ||
errors | ||
} = resultOrStream; | ||
this.log('error', 'client subscribe failed', { | ||
errors, | ||
clientId: this.clientId, | ||
query, | ||
variables, | ||
id | ||
}); | ||
this.emitError({ | ||
code: 'subscribe_failed.gql_error', | ||
data: resultOrStream.errors | ||
}); | ||
return; | ||
} | ||
this.log('debug', 'client subscribed', { | ||
if (this.subscriptionContexts.has(id)) { | ||
this.log('debug', 'duplicate subscription attempted', { | ||
id, | ||
query, | ||
variables, | ||
clientId: this.clientId | ||
}); | ||
} finally { | ||
acknowledge(cb); | ||
this.emitError({ | ||
code: 'invalid_id.duplicate', | ||
data: id | ||
}); | ||
return; | ||
} | ||
const stream = resultOrStream; | ||
document = (0, _graphql.parse)(query); | ||
const validationRules = [..._graphql.specifiedRules, ...(this.config.createValidationRules ? this.config.createValidationRules({ | ||
query, | ||
variables | ||
}) : [])]; | ||
const validationErrors = (0, _graphql.validate)(this.config.schema, document, validationRules); | ||
for await (const payload of stream) { | ||
const credentials = await this.config.credentialsManager.getCredentials(); | ||
let response; | ||
if (validationErrors.length) { | ||
this.emitError({ | ||
code: 'subscribe_failed.document_error', | ||
data: validationErrors | ||
}); | ||
return; | ||
} | ||
try { | ||
response = await (0, _graphql.execute)(this.config.schema, document, payload, this.config.createContext && this.config.createContext(credentials), variables); | ||
} catch (e) { | ||
if (e instanceof _graphql.GraphQLError) { | ||
response = { | ||
errors: [e] | ||
}; | ||
} else { | ||
throw e; | ||
} | ||
const subscriptionContext = new _SubscriptionContext.default(this.config.subscriber); | ||
const sourcePromise = (0, _graphql.createSourceEventStream)(this.config.schema, document, null, { | ||
subscribe: async (topic, { | ||
hasPermission = this.config.hasPermission, | ||
...options | ||
} = {}) => { | ||
return AsyncUtils.filter(await subscriptionContext.subscribe(topic, options), data => this.isAuthorized(data, hasPermission)); | ||
}, | ||
subscriber: this.config.subscriber | ||
}, variables); | ||
this.subscriptionContexts.set(id, subscriptionContext); | ||
try { | ||
resultOrStream = await sourcePromise; | ||
} catch (err) { | ||
if (err instanceof _graphql.GraphQLError) { | ||
resultOrStream = { | ||
errors: [err] | ||
}; | ||
} else { | ||
this.subscriptionContexts.delete(id); | ||
throw err; | ||
} | ||
} | ||
this.log('info', 'emit', { | ||
response, | ||
credentials, | ||
clientId: this.clientId | ||
if (resultOrStream.errors != null) { | ||
// Do a full unsubscribe because the error may have occured | ||
// after the subscription was set up | ||
this.handleUnsubscribe(id); | ||
const { | ||
errors | ||
} = resultOrStream; | ||
this.log('error', 'client subscribe failed', { | ||
errors, | ||
clientId: this.clientId, | ||
query, | ||
variables, | ||
id | ||
}); | ||
this.socket.emit('subscription update', { | ||
id, | ||
...response | ||
this.emitError({ | ||
code: 'subscribe_failed.gql_error', | ||
data: resultOrStream.errors | ||
}); | ||
} | ||
}; | ||
this.handleConnect = () => { | ||
this.log('debug', 'client connected', { | ||
clientId: this.clientId | ||
}); | ||
}; | ||
this.handleUnsubscribe = async (id, cb) => { | ||
const subscriptionContext = this.subscriptionContexts.get(id); | ||
if (!subscriptionContext) { | ||
return; | ||
} | ||
this.log('debug', 'client unsubscribed', { | ||
this.log('debug', 'client subscribed', { | ||
id, | ||
query, | ||
variables, | ||
clientId: this.clientId | ||
}); | ||
await subscriptionContext.close(); | ||
this.subscriptionContexts.delete(id); | ||
} finally { | ||
acknowledge(cb); | ||
}; | ||
} | ||
this.handleDisconnect = async reason => { | ||
this.log('debug', 'client disconnected', { | ||
reason, | ||
clientId: this.clientId | ||
}); | ||
await Promise.all([this.config.credentialsManager.unauthenticate(), ...Array.from(this.subscriptionContexts.values(), subscriptionContext => subscriptionContext.close())]); | ||
}; | ||
const stream = resultOrStream; | ||
this.socket = socket; | ||
this.config = config; | ||
this.log = config.createLogger('AuthorizedSocket'); | ||
this.subscriptionContexts = new Map(); | ||
this.clientId = this.socket.id; | ||
this.socket.on('authenticate', this.handleAuthenticate).on('subscribe', this.handleSubscribe).on('unsubscribe', this.handleUnsubscribe).on('connect', this.handleConnect).on('disconnect', this.handleDisconnect); | ||
} | ||
for await (const payload of stream) { | ||
const credentials = await this.config.credentialsManager.getCredentials(); | ||
let response; | ||
emitError(error) { | ||
this.socket.emit('app_error', error); | ||
} | ||
try { | ||
var _this$config$createCo, _this$config; | ||
async isAuthorized(data, hasPermission) { | ||
const credentials = await this.config.credentialsManager.getCredentials(); | ||
const isAuthorized = !!credentials && hasPermission(data, credentials); | ||
response = await (0, _graphql.execute)({ | ||
schema: this.config.schema, | ||
document, | ||
rootValue: payload, | ||
contextValue: (_this$config$createCo = (_this$config = this.config).createContext) == null ? void 0 : _this$config$createCo.call(_this$config, credentials), | ||
variableValues: variables | ||
}); | ||
} catch (e) { | ||
if (e instanceof _graphql.GraphQLError) { | ||
response = { | ||
errors: [e] | ||
}; | ||
} else { | ||
throw e; | ||
} | ||
} | ||
if (!isAuthorized) { | ||
this.log('info', 'unauthorized', { | ||
payload: data, | ||
credentials | ||
this.log('info', 'emit', { | ||
response, | ||
credentials, | ||
clientId: this.clientId | ||
}); | ||
this.socket.emit('subscription update', { | ||
id, | ||
...response | ||
}); | ||
} | ||
}; | ||
handleConnect = () => { | ||
this.log('debug', 'client connected', { | ||
clientId: this.clientId | ||
}); | ||
}; | ||
handleUnsubscribe = async (id, cb) => { | ||
const subscriptionContext = this.subscriptionContexts.get(id); | ||
return isAuthorized; | ||
} | ||
if (!subscriptionContext) { | ||
return; | ||
} | ||
this.log('debug', 'client unsubscribed', { | ||
id, | ||
clientId: this.clientId | ||
}); | ||
await subscriptionContext.close(); | ||
this.subscriptionContexts.delete(id); | ||
acknowledge(cb); | ||
}; | ||
handleDisconnect = async reason => { | ||
this.log('debug', 'client disconnected', { | ||
reason, | ||
clientId: this.clientId | ||
}); | ||
await Promise.all([this.config.credentialsManager.unauthenticate(), ...Array.from(this.subscriptionContexts.values(), subscriptionContext => subscriptionContext.close())]); | ||
}; | ||
} | ||
exports.default = AuthorizedSocketConnection; |
"use strict"; | ||
exports.__esModule = true; | ||
exports.AsyncQueue = exports.SubscriptionServer = exports.RedisSubscriber = exports.JwtCredentialsManager = exports.EventSubscriber = void 0; | ||
exports.SubscriptionServer = exports.RedisSubscriber = exports.JwtCredentialsManager = exports.EventSubscriber = exports.AsyncQueue = void 0; | ||
@@ -6,0 +6,0 @@ var _EventSubscriber = _interopRequireDefault(require("./EventSubscriber")); |
{ | ||
"name": "@4c/graphql-subscription-server", | ||
"version": "3.0.4", | ||
"version": "4.0.0", | ||
"author": { | ||
@@ -13,6 +13,4 @@ "name": "4Catalyzer" | ||
}, | ||
"husky": { | ||
"hooks": { | ||
"pre-commit": "lint-staged" | ||
} | ||
"gitHooks": { | ||
"pre-commit": "lint-staged" | ||
}, | ||
@@ -38,7 +36,7 @@ "lint-staged": { | ||
"dependencies": { | ||
"express": "^4.17.1", | ||
"redis": "^3.0.0" | ||
"express": "^4.17.2", | ||
"redis": "^3.1.2" | ||
}, | ||
"peerDependencies": { | ||
"graphql": ">=0.12.3", | ||
"graphql": ">=16.0.0", | ||
"socket.io": ">2.0.0" | ||
@@ -45,0 +43,0 @@ }, |
@@ -13,6 +13,7 @@ "use strict"; | ||
class SubscriptionContext { | ||
closed = false; | ||
constructor(subscriber, { | ||
log | ||
} = {}) { | ||
this.closed = false; | ||
this.subscriber = subscriber; | ||
@@ -19,0 +20,0 @@ this.closes = []; |
import type { GraphQLSchema } from 'graphql'; | ||
import type io from 'socket.io'; | ||
import type { Server, Socket } from 'socket.io'; | ||
import type { CreateValidationRules } from './AuthorizedSocketConnection'; | ||
@@ -17,3 +17,3 @@ import type { CredentialsManager } from './CredentialsManager'; | ||
createLogger?: CreateLogger; | ||
socketIoServer?: io.Server; | ||
socketIoServer?: Server; | ||
}; | ||
@@ -23,7 +23,7 @@ export default class SubscriptionServer<TContext, TCredentials> { | ||
log: Logger; | ||
io: io.Server; | ||
io: Server; | ||
constructor(config: SubscriptionServerConfig<TContext, TCredentials>); | ||
attach(httpServer: any): void; | ||
handleConnection: (socket: io.Socket) => void; | ||
handleConnection: (socket: Socket) => void; | ||
close(): Promise<void>; | ||
} |
@@ -18,41 +18,2 @@ "use strict"; | ||
constructor(config) { | ||
this.handleConnection = socket => { | ||
var _this$io$engine$clien, _this$io$engine; | ||
const clientId = socket.id; | ||
this.log('debug', 'new socket connection', { | ||
clientId, | ||
// @ts-expect-error private field | ||
numClients: (_this$io$engine$clien = (_this$io$engine = this.io.engine) == null ? void 0 : _this$io$engine.clientsCount) != null ? _this$io$engine$clien : 0 | ||
}); | ||
const request = Object.create(_express.default.request); | ||
Object.assign(request, socket.request); | ||
const { | ||
createContext | ||
} = this.config; // eslint-disable-next-line no-new | ||
new _AuthorizedSocketConnection.default(socket, { | ||
schema: this.config.schema, | ||
subscriber: this.config.subscriber, | ||
credentialsManager: this.config.createCredentialsManager(request), | ||
hasPermission: this.config.hasPermission, | ||
createContext: createContext && (credentials => createContext(request, credentials)), | ||
maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection, | ||
createValidationRules: this.config.createValidationRules, | ||
createLogger: this.config.createLogger || _Logger.noopCreateLogger | ||
}); // add after so the logs happen in order | ||
socket.once('disconnect', reason => { | ||
var _this$io$engine$clien2; | ||
this.log('debug', 'socket disconnected', { | ||
reason, | ||
clientId, | ||
// @ts-expect-error private field | ||
numClients: ((_this$io$engine$clien2 = this.io.engine.clientsCount) != null ? _this$io$engine$clien2 : 0) - 1 // number hasn't decremented at this point for this client | ||
}); | ||
}); | ||
}; | ||
this.config = config; | ||
@@ -71,3 +32,3 @@ const createLogger = config.createLogger || _Logger.noopCreateLogger; | ||
transports: ['websocket'], | ||
wsEngine: 'ws' | ||
allowEIO3: true | ||
}); | ||
@@ -83,2 +44,39 @@ } | ||
handleConnection = socket => { | ||
var _this$io$engine$clien, _this$io$engine; | ||
const clientId = socket.id; | ||
this.log('debug', 'new socket connection', { | ||
clientId, | ||
numClients: (_this$io$engine$clien = (_this$io$engine = this.io.engine) == null ? void 0 : _this$io$engine.clientsCount) != null ? _this$io$engine$clien : 0 | ||
}); | ||
const request = Object.create(_express.default.request); | ||
Object.assign(request, socket.request); | ||
const { | ||
createContext | ||
} = this.config; // eslint-disable-next-line no-new | ||
new _AuthorizedSocketConnection.default(socket, { | ||
schema: this.config.schema, | ||
subscriber: this.config.subscriber, | ||
credentialsManager: this.config.createCredentialsManager(request), | ||
hasPermission: this.config.hasPermission, | ||
createContext: createContext && (credentials => createContext(request, credentials)), | ||
maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection, | ||
createValidationRules: this.config.createValidationRules, | ||
createLogger: this.config.createLogger || _Logger.noopCreateLogger | ||
}); // add after so the logs happen in order | ||
socket.once('disconnect', reason => { | ||
var _this$io$engine$clien2; | ||
this.log('debug', 'socket disconnected', { | ||
reason, | ||
clientId, | ||
numClients: ((_this$io$engine$clien2 = this.io.engine.clientsCount) != null ? _this$io$engine$clien2 : 0) - 1 // number hasn't decremented at this point for this client | ||
}); | ||
}); | ||
}; | ||
async close() { | ||
@@ -85,0 +83,0 @@ // @ts-ignore |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
878
34591
Updatedexpress@^4.17.2
Updatedredis@^3.1.2