@4c/graphql-subscription-server
Advanced tools
Comparing version 3.0.1 to 3.0.2
@@ -0,1 +1,3 @@ | ||
import { Logger } from './Logger'; | ||
export declare function createAsyncIterable<T>(values: T[]): AsyncGenerator<T, void, unknown>; | ||
export declare function map<T, U>(iterable: AsyncIterable<T>, mapper: (value: T) => U): AsyncGenerator<U, void, void>; | ||
@@ -6,2 +8,3 @@ export declare function filter<T>(iterable: AsyncIterable<T>, predicate: (value: T) => Promise<boolean> | boolean): AsyncGenerator<T, void, void>; | ||
teardown?: () => void | Promise<void>; | ||
log?: Logger; | ||
}; | ||
@@ -8,0 +11,0 @@ export declare class AsyncQueue { |
"use strict"; | ||
exports.__esModule = true; | ||
exports.createAsyncIterable = createAsyncIterable; | ||
exports.map = map; | ||
@@ -9,2 +10,6 @@ exports.filter = filter; | ||
/* eslint-disable no-await-in-loop */ | ||
async function* createAsyncIterable(values) { | ||
for (const value of values) yield value; | ||
} | ||
async function* map(iterable, mapper) { | ||
@@ -64,4 +69,6 @@ for await (const value of iterable) { | ||
await this.setupPromise; | ||
} | ||
} // this is dummy value yielded so we can await the first `.next()` | ||
// call of the iterable, ensuring that the setup code has completed | ||
yield null; | ||
@@ -68,0 +75,0 @@ |
@@ -42,2 +42,3 @@ import { GraphQLSchema, ValidationContext } from 'graphql'; | ||
subscriptionContexts: Map<string, SubscriptionContext<SubscribeOptions<TCredentials>>>; | ||
readonly clientId: string; | ||
constructor(socket: io.Socket, config: AuthorizedSocketOptions<TContext, TCredentials>); | ||
@@ -57,4 +58,4 @@ emitError(error: { | ||
handleUnsubscribe: (id: string, cb?: (() => void) | undefined) => Promise<void>; | ||
handleDisconnect: () => Promise<void>; | ||
handleDisconnect: (reason?: string | undefined) => Promise<void>; | ||
} | ||
export {}; |
@@ -40,7 +40,10 @@ "use strict"; | ||
this.log('debug', 'authenticating connection', { | ||
clientId: this.socket.id | ||
clientId: this.clientId | ||
}); | ||
await this.config.credentialsManager.authenticate(authorization); | ||
} catch (err) { | ||
this.log('error', err.message, err); | ||
} catch (error) { | ||
this.log('error', error.message, { | ||
error, | ||
clientId: this.clientId | ||
}); | ||
this.emitError({ | ||
@@ -65,2 +68,3 @@ code: 'invalid_authorization' | ||
this.log('debug', 'subscription limit reached', { | ||
clientId: this.clientId, | ||
maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection | ||
@@ -76,3 +80,4 @@ }); | ||
this.log('debug', 'duplicate subscription attempted', { | ||
id | ||
id, | ||
clientId: this.clientId | ||
}); | ||
@@ -127,3 +132,15 @@ this.emitError({ | ||
if (resultOrStream.errors != null) { | ||
this.subscriptionContexts.delete(id); | ||
// Do a full unsubscribe because the error may have occured | ||
// after the subscription was set up | ||
this.handleUnsubscribe(id); | ||
const { | ||
errors | ||
} = resultOrStream; | ||
this.log('error', 'client subscribe failed', { | ||
errors, | ||
clientId: this.clientId, | ||
query, | ||
variables, | ||
id | ||
}); | ||
this.emitError({ | ||
@@ -135,4 +152,3 @@ code: 'subscribe_failed.gql_error', | ||
} | ||
} finally { | ||
acknowledge(cb); | ||
this.log('debug', 'client subscribed', { | ||
@@ -142,4 +158,6 @@ id, | ||
variables, | ||
clientId: this.socket.id | ||
clientId: this.clientId | ||
}); | ||
} finally { | ||
acknowledge(cb); | ||
} | ||
@@ -167,3 +185,4 @@ | ||
response, | ||
credentials | ||
credentials, | ||
clientId: this.clientId | ||
}); | ||
@@ -179,3 +198,3 @@ this.socket.emit('subscription update', { | ||
this.log('debug', 'client connected', { | ||
clientId: this.socket.id | ||
clientId: this.clientId | ||
}); | ||
@@ -193,3 +212,3 @@ }; | ||
id, | ||
clientId: this.socket.id | ||
clientId: this.clientId | ||
}); | ||
@@ -201,5 +220,6 @@ await subscriptionContext.close(); | ||
this.handleDisconnect = async () => { | ||
this.handleDisconnect = async reason => { | ||
this.log('debug', 'client disconnected', { | ||
clientId: this.socket.id | ||
reason, | ||
clientId: this.clientId | ||
}); | ||
@@ -211,4 +231,5 @@ await Promise.all([this.config.credentialsManager.unauthenticate(), ...Array.from(this.subscriptionContexts.values(), subscriptionContext => subscriptionContext.close())]); | ||
this.config = config; | ||
this.log = config.createLogger('@4c/SubscriptionServer::AuthorizedSocket'); | ||
this.log = config.createLogger('AuthorizedSocket'); | ||
this.subscriptionContexts = new Map(); | ||
this.clientId = this.socket.id; | ||
this.socket.on('authenticate', this.handleAuthenticate).on('subscribe', this.handleSubscribe).on('unsubscribe', this.handleUnsubscribe).on('connect', this.handleConnect).on('disconnect', this.handleDisconnect); | ||
@@ -215,0 +236,0 @@ } |
@@ -1,1 +0,3 @@ | ||
"use strict"; | ||
"use strict"; | ||
exports.__esModule = true; |
import { CredentialsManager } from './CredentialsManager'; | ||
import { CreateLogger, Logger } from './Logger'; | ||
declare type Seconds = number; | ||
@@ -8,2 +9,3 @@ export interface JwtCredentials { | ||
updateOnExpired?: boolean; | ||
createLogger?: CreateLogger; | ||
} | ||
@@ -14,3 +16,4 @@ export default abstract class JwtCredentialsManager<TCredentials extends JwtCredentials> implements CredentialsManager<TCredentials> { | ||
private updateOnExpired; | ||
constructor(config?: JwtCredentialsManagerConfig); | ||
readonly log: Logger; | ||
constructor({ createLogger, updateOnExpired, }?: JwtCredentialsManagerConfig); | ||
private isExpired; | ||
@@ -17,0 +20,0 @@ getCredentials(): Promise<TCredentials | null | undefined>; |
@@ -5,11 +5,16 @@ "use strict"; | ||
exports.default = void 0; | ||
var _Logger = require("./Logger"); | ||
const SECONDS_TO_MS = 1000; | ||
class JwtCredentialsManager { | ||
constructor(config = {}) { | ||
var _config$updateOnExpir; | ||
constructor({ | ||
createLogger = _Logger.noopCreateLogger, | ||
updateOnExpired | ||
} = {}) { | ||
this.token = null; | ||
this.credentialsPromise = null; | ||
this.updateOnExpired = (_config$updateOnExpir = config.updateOnExpired) != null ? _config$updateOnExpir : false; | ||
this.updateOnExpired = updateOnExpired != null ? updateOnExpired : false; | ||
this.log = createLogger('JwtCredentialsManager'); | ||
} | ||
@@ -25,3 +30,15 @@ | ||
if (credentials && this.isExpired(credentials)) { | ||
return this.updateOnExpired ? this.updateCredentials() : null; | ||
if (!this.updateOnExpired) { | ||
this.log('debug', 'request for expired credentials', { | ||
token: this.token, | ||
expiredCredentials: credentials | ||
}); | ||
return null; | ||
} | ||
this.log('silly', 'credentials expired: refreshing from token', { | ||
token: this.token, | ||
expiredCredentials: credentials | ||
}); | ||
return this.updateCredentials(); | ||
} | ||
@@ -53,2 +70,6 @@ | ||
if (!creds || this.isExpired(creds)) { | ||
this.log('silly', 'credentials expired after update', { | ||
token: this.token, | ||
credentials: creds | ||
}); | ||
return null; | ||
@@ -55,0 +76,0 @@ } |
export declare type LogLevels = 'error' | 'warn' | 'info' | 'verbose' | 'debug' | 'silly'; | ||
export declare type Logger = (level: LogLevels, message: string, meta?: Record<string, unknown>) => void; | ||
export declare type CreateLogger = (group?: string) => Logger; | ||
declare const noopCreateLogger: CreateLogger; | ||
export { noopCreateLogger }; |
@@ -1,1 +0,9 @@ | ||
"use strict"; | ||
"use strict"; | ||
exports.__esModule = true; | ||
exports.noopCreateLogger = void 0; | ||
// eslint-disable-next-line @typescript-eslint/no-empty-function | ||
const noopCreateLogger = () => () => {}; | ||
exports.noopCreateLogger = noopCreateLogger; |
{ | ||
"name": "@4c/graphql-subscription-server", | ||
"version": "3.0.1", | ||
"version": "3.0.2", | ||
"author": { | ||
@@ -5,0 +5,0 @@ "name": "4Catalyzer" |
import redis from 'redis'; | ||
import { AsyncQueue } from './AsyncUtils'; | ||
import { CreateLogger } from './Logger'; | ||
import type { Subscriber } from './Subscriber'; | ||
@@ -7,2 +8,3 @@ declare type Channel = string; | ||
parseMessage?: (msg: string) => any; | ||
createLogger?: CreateLogger; | ||
}; | ||
@@ -17,3 +19,4 @@ export declare type RedisSubscriberOptions = { | ||
_channels: Set<string>; | ||
constructor({ parseMessage, ...redisConfig }?: RedisConfigOptions); | ||
private readonly log; | ||
constructor({ parseMessage, createLogger, ...redisConfig }?: RedisConfigOptions); | ||
_subscribeToChannel(channel: string): Promise<void>; | ||
@@ -20,0 +23,0 @@ subscribe(channel: Channel, options?: TOptions): { |
@@ -12,2 +12,4 @@ "use strict"; | ||
var _Logger = require("./Logger"); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
@@ -18,4 +20,6 @@ | ||
parseMessage, | ||
createLogger = _Logger.noopCreateLogger, | ||
...redisConfig | ||
} = {}) { | ||
this.log = createLogger('RedisSubscriber'); | ||
this.redis = _redis.default.createClient(redisConfig); | ||
@@ -26,2 +30,7 @@ this._queues = new Map(); | ||
this.redis.on('message', (channel, message) => { | ||
this.log('silly', 'message received', { | ||
channel, | ||
message | ||
}); | ||
const queues = this._queues.get(channel); | ||
@@ -41,2 +50,5 @@ | ||
if (this._channels.has(channel)) { | ||
this.log('debug', 'Channel already subscribed to', { | ||
channel | ||
}); | ||
return; | ||
@@ -49,2 +61,5 @@ } | ||
await (0, _util.promisify)(this.redis.subscribe).call(this.redis, channel); | ||
this.log('debug', 'Channel subscribed', { | ||
channel | ||
}); | ||
} | ||
@@ -76,2 +91,7 @@ | ||
} | ||
this.log('debug', 'Channel subscriber unsubscribed', { | ||
channel, | ||
numSubscribersForChannelRemaining: innerQueues.size | ||
}); | ||
} | ||
@@ -95,2 +115,6 @@ }); | ||
await (0, _util.promisify)(this.redis.quit).call(this.redis); | ||
this.log('silly', 'closed', { | ||
numQueus: this._queues.size, | ||
numChannels: this._channels.size | ||
}); | ||
} | ||
@@ -97,0 +121,0 @@ |
@@ -1,1 +0,3 @@ | ||
"use strict"; | ||
"use strict"; | ||
exports.__esModule = true; |
@@ -0,8 +1,14 @@ | ||
import { Logger } from './Logger'; | ||
import { Subscriber } from './Subscriber'; | ||
export interface SubscriptionContextOptions { | ||
log?: Logger; | ||
} | ||
export default class SubscriptionContext<TSubscriberOptions> { | ||
subscriber: Subscriber<TSubscriberOptions>; | ||
closes: Array<() => Promise<void>>; | ||
constructor(subscriber: Subscriber<any>); | ||
readonly subscriber: Subscriber<TSubscriberOptions>; | ||
readonly closes: Array<() => Promise<void>>; | ||
private readonly log?; | ||
closed: boolean; | ||
constructor(subscriber: Subscriber<any>, { log }?: SubscriptionContextOptions); | ||
subscribe(topic: string, options?: TSubscriberOptions): Promise<AsyncIterableIterator<any>>; | ||
close(): Promise<void>; | ||
} |
@@ -6,9 +6,28 @@ "use strict"; | ||
var _AsyncUtils = require("./AsyncUtils"); | ||
function emptySubscription() { | ||
return Promise.resolve((0, _AsyncUtils.createAsyncIterable)([])); | ||
} | ||
class SubscriptionContext { | ||
constructor(subscriber) { | ||
constructor(subscriber, { | ||
log | ||
} = {}) { | ||
this.closed = false; | ||
this.subscriber = subscriber; | ||
this.closes = []; | ||
this.log = log; | ||
} | ||
subscribe(topic, options) { | ||
if (this.closed) { | ||
var _this$log; | ||
(_this$log = this.log) == null ? void 0 : _this$log.call(this, 'debug', `Subscribe after closed`, { | ||
topic | ||
}); | ||
return emptySubscription(); | ||
} | ||
const { | ||
@@ -19,2 +38,7 @@ iterator, | ||
this.closes.push(close); | ||
if (this.closed) { | ||
return close().then(() => emptySubscription()); | ||
} | ||
return iterator; | ||
@@ -24,2 +48,3 @@ } | ||
async close() { | ||
this.closed = true; | ||
await Promise.all(this.closes.map(close => close())); | ||
@@ -26,0 +51,0 @@ } |
@@ -5,3 +5,3 @@ import type { GraphQLSchema } from 'graphql'; | ||
import type { CredentialsManager } from './CredentialsManager'; | ||
import type { CreateLogger, Logger } from './Logger'; | ||
import { CreateLogger, Logger } from './Logger'; | ||
import type { Subscriber } from './Subscriber'; | ||
@@ -18,3 +18,3 @@ export declare type SubscriptionServerConfig<TContext, TCredentials> = { | ||
createLogger?: CreateLogger; | ||
socketIoServer: io.Server; | ||
socketIoServer?: io.Server; | ||
}; | ||
@@ -21,0 +21,0 @@ export default class SubscriptionServer<TContext, TCredentials> { |
@@ -12,11 +12,17 @@ "use strict"; | ||
var _Logger = require("./Logger"); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
// eslint-disable-next-line @typescript-eslint/no-empty-function | ||
const defaultCreateLogger = () => () => {}; | ||
class SubscriptionServer { | ||
constructor(config) { | ||
this.handleConnection = socket => { | ||
this.log('debug', 'new socket connection'); | ||
var _this$io$engine$clien, _this$io$engine; | ||
const clientId = socket.id; | ||
this.log('debug', 'SubscriptionServer: new socket connection', { | ||
clientId, | ||
// @ts-expect-error private field | ||
numClients: (_this$io$engine$clien = (_this$io$engine = this.io.engine) == null ? void 0 : _this$io$engine.clientsCount) != null ? _this$io$engine$clien : 0 | ||
}); | ||
const request = Object.create(_express.default.request); | ||
@@ -36,3 +42,15 @@ Object.assign(request, socket.request); | ||
createValidationRules: this.config.createValidationRules, | ||
createLogger: this.config.createLogger || defaultCreateLogger | ||
createLogger: this.config.createLogger || _Logger.noopCreateLogger | ||
}); // add after so the logs happen in order | ||
socket.once('disconnect', reason => { | ||
var _this$io$engine$clien2; | ||
this.log('debug', 'SubscriptionServer: socket disconnected', { | ||
reason, | ||
clientId, | ||
// @ts-expect-error private field | ||
numClients: ((_this$io$engine$clien2 = this.io.engine.clientsCount) != null ? _this$io$engine$clien2 : 0) - 1 // number hasn't decremented at this point for this client | ||
}); | ||
}); | ||
@@ -42,4 +60,4 @@ }; | ||
this.config = config; | ||
const createLogger = config.createLogger || defaultCreateLogger; | ||
this.log = createLogger('@4c/SubscriptionServer::Server'); | ||
const createLogger = config.createLogger || _Logger.noopCreateLogger; | ||
this.log = createLogger('SubscriptionServer'); | ||
this.io = config.socketIoServer; | ||
@@ -46,0 +64,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
34893
27
870
0