@4c/graphql-subscription-server
Advanced tools
Comparing version 0.7.1 to 0.8.0
@@ -0,1 +1,9 @@ | ||
<a name="0.8.0"></a> | ||
# [0.8.0](https://github.com/4Catalyzer/graphql-subscription-server/compare/v0.7.1...v0.8.0) (2018-08-13) | ||
### Features | ||
* Use an explicitly closeable async iterator ([#20](https://github.com/4Catalyzer/graphql-subscription-server/issues/20)) ([4408ddf](https://github.com/4Catalyzer/graphql-subscription-server/commit/4408ddf)) | ||
<a name="0.7.1"></a> | ||
@@ -2,0 +10,0 @@ ## [0.7.1](https://github.com/4Catalyzer/graphql-subscription-server/compare/v0.7.0...v0.7.1) (2018-08-13) |
@@ -14,15 +14,22 @@ /* eslint-disable no-await-in-loop */ | ||
constructor(options = {}) { | ||
this.closed = false; | ||
this.close = async () => { | ||
if (this.setupPromise) { | ||
await this.setupPromise; | ||
} | ||
if (this.options.teardown) { | ||
await this.options.teardown(); | ||
} | ||
this.closed = true; | ||
this.push(null); | ||
}; | ||
this.options = options; | ||
this.values = []; | ||
this.options = options; | ||
this.createPromise(); | ||
this.iterable = this.createIterable(); | ||
this.closed = false; | ||
this.iterator = this.createIterator(); | ||
} | ||
close() { | ||
if (this.closed) return; | ||
this.closed = true; | ||
if (this.options.teardown) this.options.teardown(); | ||
} | ||
createPromise() { | ||
@@ -34,29 +41,36 @@ this.promise = new Promise(resolve => { | ||
async *createIterableRaw() { | ||
try { | ||
if (this.options.setup) await this.options.setup(); | ||
yield null; | ||
async createIterator() { | ||
const iterator = this.createIteratorRaw(); // Wait for setup. | ||
while (true) { | ||
await this.promise; | ||
await iterator.next(); | ||
return iterator; | ||
} | ||
for (const value of this.values) { | ||
yield value; | ||
async *createIteratorRaw() { | ||
if (this.options.setup) { | ||
this.setupPromise = this.options.setup(); | ||
} | ||
if (this.setupPromise) { | ||
await this.setupPromise; | ||
} | ||
yield null; | ||
while (true) { | ||
await this.promise; | ||
for (const value of this.values) { | ||
if (this.closed) { | ||
return; | ||
} | ||
this.values.length = 0; | ||
this.createPromise(); | ||
yield value; | ||
} | ||
} finally { | ||
await this.close(); | ||
this.values.length = 0; | ||
this.createPromise(); | ||
} | ||
} | ||
async createIterable() { | ||
const iterableRaw = this.createIterableRaw(); // wait for the first synthetic yield after setup | ||
await iterableRaw.next(); | ||
return iterableRaw; | ||
} | ||
push(value) { | ||
@@ -63,0 +77,0 @@ this.values.push(value); |
import { createSourceEventStream, execute, GraphQLError, parse, specifiedRules, validate } from 'graphql'; | ||
import * as AsyncUtils from './AsyncUtils'; | ||
import SubscriptionContext from './SubscriptionContext'; | ||
@@ -56,3 +57,3 @@ const acknowledge = cb => { | ||
try { | ||
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptions.size >= this.config.maxSubscriptionsPerConnection) { | ||
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptionContexts.size >= this.config.maxSubscriptionsPerConnection) { | ||
this.log('debug', 'subscription limit reached', { | ||
@@ -67,3 +68,3 @@ maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection | ||
if (this.subscriptions.has(id)) { | ||
if (this.subscriptionContexts.has(id)) { | ||
this.log('debug', 'duplicate subscription attempted', { | ||
@@ -94,6 +95,7 @@ id | ||
const subscriptionContext = new SubscriptionContext(this.config.subscriber); | ||
const sourcePromise = createSourceEventStream(this.config.schema, document, null, { | ||
subscribe: async (...args) => AsyncUtils.filter((await this.config.subscriber.subscribe(...args)), this.isAuthorized) | ||
subscribe: async (...args) => AsyncUtils.filter((await subscriptionContext.subscribe(...args)), this.isAuthorized) | ||
}, variables); | ||
this.subscriptions.set(id, sourcePromise); | ||
this.subscriptionContexts.set(id, subscriptionContext); | ||
@@ -108,3 +110,3 @@ try { | ||
} else { | ||
this.subscriptions.delete(id); | ||
this.subscriptionContexts.delete(id); | ||
throw err; | ||
@@ -115,3 +117,3 @@ } | ||
if (resultOrStream.errors != null) { | ||
this.subscriptions.delete(id); | ||
this.subscriptionContexts.delete(id); | ||
this.emitError({ | ||
@@ -158,6 +160,6 @@ code: 'subscribe_failed.gql_error', | ||
this.handleUnsubscribe = async id => { | ||
const subscription = await this.subscriptions.get(id); | ||
const subscriptionContext = this.subscriptionContexts.get(id); | ||
if (subscription && typeof subscription.return === 'function') { | ||
subscription.return(); | ||
if (!subscriptionContext) { | ||
return; | ||
} | ||
@@ -168,3 +170,4 @@ | ||
}); | ||
this.subscriptions.delete(id); | ||
await subscriptionContext.close(); | ||
this.subscriptionContexts.delete(id); | ||
}; | ||
@@ -174,10 +177,3 @@ | ||
this.log('debug', 'client disconnected'); | ||
await this.config.credentialsManager.unauthenticate(); | ||
this.subscriptions.forEach(async subscriptionPromise => { | ||
const subscription = await subscriptionPromise; | ||
if (subscription && typeof subscription.return === 'function') { | ||
subscription.return(); | ||
} | ||
}); | ||
await Promise.all([this.config.credentialsManager.unauthenticate(), ...Array.from(this.subscriptionContexts.values(), subscriptionContext => subscriptionContext.close())]); | ||
}; | ||
@@ -188,3 +184,3 @@ | ||
this.log = config.createLogger('@4c/SubscriptionServer::AuthorizedSocket'); | ||
this.subscriptions = new Map(); | ||
this.subscriptionContexts = new Map(); | ||
this.socket.on('authenticate', this.handleAuthenticate).on('subscribe', this.handleSubscribe).on('unsubscribe', this.handleUnsubscribe).on('disconnect', this.handleDisconnect); | ||
@@ -191,0 +187,0 @@ } |
@@ -55,6 +55,6 @@ import { AsyncQueue } from './AsyncUtils'; | ||
eventQueues.add(queue); | ||
return queue.iterable; | ||
return queue; | ||
} | ||
async close() { | ||
close() { | ||
this._listeners.forEach((fn, event) => { | ||
@@ -61,0 +61,0 @@ this.emitter.removeListener(event, fn); |
@@ -31,3 +31,3 @@ const SECONDS_TO_MS = 1000; | ||
async unauthenticate() { | ||
unauthenticate() { | ||
if (this.renewHandle) { | ||
@@ -34,0 +34,0 @@ clearTimeout(this.renewHandle); |
@@ -16,3 +16,6 @@ import redis from 'redis'; | ||
if (!queues) return; | ||
if (!queues) { | ||
return; | ||
} | ||
queues.forEach(queue => { | ||
@@ -24,15 +27,13 @@ queue.push(message); | ||
_redisSubscribe(channel) { | ||
return promisify(cb => this.redis.subscribe(channel, cb))(); | ||
} | ||
async _subscribeToChannel(channel) { | ||
if (this._channels.has(channel)) return; | ||
if (this._channels.has(channel)) { | ||
return; | ||
} | ||
this._channels.add(channel); | ||
await this._redisSubscribe(channel); | ||
await promisify(cb => this.redis.subscribe(channel, cb))(); | ||
} | ||
async subscribe(channel, parseMessage = this._parseMessage) { | ||
subscribe(channel, parseMessage = this._parseMessage) { | ||
let channelQueues = this._queues.get(channel); | ||
@@ -44,4 +45,2 @@ | ||
this._queues.set(channel, channelQueues); | ||
await this._redisSubscribe(channel); | ||
} | ||
@@ -67,5 +66,14 @@ | ||
channelQueues.add(queue); | ||
const iterable = await queue.iterable; | ||
if (!parseMessage) return iterable; | ||
return map(iterable, parseMessage); | ||
let iteratorPromise = queue.iterator; | ||
if (parseMessage) { | ||
// Workaround for Flow. | ||
const parseMessageFn = parseMessage; | ||
iteratorPromise = iteratorPromise.then(iterator => map(iterator, parseMessageFn)); | ||
} | ||
return { | ||
iterator: iteratorPromise, | ||
close: queue.close | ||
}; | ||
} | ||
@@ -72,0 +80,0 @@ |
@@ -23,15 +23,22 @@ "use strict"; | ||
constructor(options = {}) { | ||
this.closed = false; | ||
this.close = async () => { | ||
if (this.setupPromise) { | ||
await this.setupPromise; | ||
} | ||
if (this.options.teardown) { | ||
await this.options.teardown(); | ||
} | ||
this.closed = true; | ||
this.push(null); | ||
}; | ||
this.options = options; | ||
this.values = []; | ||
this.options = options; | ||
this.createPromise(); | ||
this.iterable = this.createIterable(); | ||
this.closed = false; | ||
this.iterator = this.createIterator(); | ||
} | ||
close() { | ||
if (this.closed) return; | ||
this.closed = true; | ||
if (this.options.teardown) this.options.teardown(); | ||
} | ||
createPromise() { | ||
@@ -43,29 +50,36 @@ this.promise = new Promise(resolve => { | ||
async *createIterableRaw() { | ||
try { | ||
if (this.options.setup) await this.options.setup(); | ||
yield null; | ||
async createIterator() { | ||
const iterator = this.createIteratorRaw(); // Wait for setup. | ||
while (true) { | ||
await this.promise; | ||
await iterator.next(); | ||
return iterator; | ||
} | ||
for (const value of this.values) { | ||
yield value; | ||
async *createIteratorRaw() { | ||
if (this.options.setup) { | ||
this.setupPromise = this.options.setup(); | ||
} | ||
if (this.setupPromise) { | ||
await this.setupPromise; | ||
} | ||
yield null; | ||
while (true) { | ||
await this.promise; | ||
for (const value of this.values) { | ||
if (this.closed) { | ||
return; | ||
} | ||
this.values.length = 0; | ||
this.createPromise(); | ||
yield value; | ||
} | ||
} finally { | ||
await this.close(); | ||
this.values.length = 0; | ||
this.createPromise(); | ||
} | ||
} | ||
async createIterable() { | ||
const iterableRaw = this.createIterableRaw(); // wait for the first synthetic yield after setup | ||
await iterableRaw.next(); | ||
return iterableRaw; | ||
} | ||
push(value) { | ||
@@ -72,0 +86,0 @@ this.values.push(value); |
@@ -10,2 +10,6 @@ "use strict"; | ||
var _SubscriptionContext = _interopRequireDefault(require("./SubscriptionContext")); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { var desc = Object.defineProperty && Object.getOwnPropertyDescriptor ? Object.getOwnPropertyDescriptor(obj, key) : {}; if (desc.get || desc.set) { Object.defineProperty(newObj, key, desc); } else { newObj[key] = obj[key]; } } } } newObj.default = obj; return newObj; } } | ||
@@ -65,3 +69,3 @@ | ||
try { | ||
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptions.size >= this.config.maxSubscriptionsPerConnection) { | ||
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptionContexts.size >= this.config.maxSubscriptionsPerConnection) { | ||
this.log('debug', 'subscription limit reached', { | ||
@@ -76,3 +80,3 @@ maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection | ||
if (this.subscriptions.has(id)) { | ||
if (this.subscriptionContexts.has(id)) { | ||
this.log('debug', 'duplicate subscription attempted', { | ||
@@ -103,6 +107,7 @@ id | ||
const subscriptionContext = new _SubscriptionContext.default(this.config.subscriber); | ||
const sourcePromise = (0, _graphql.createSourceEventStream)(this.config.schema, document, null, { | ||
subscribe: async (...args) => AsyncUtils.filter((await this.config.subscriber.subscribe(...args)), this.isAuthorized) | ||
subscribe: async (...args) => AsyncUtils.filter((await subscriptionContext.subscribe(...args)), this.isAuthorized) | ||
}, variables); | ||
this.subscriptions.set(id, sourcePromise); | ||
this.subscriptionContexts.set(id, subscriptionContext); | ||
@@ -117,3 +122,3 @@ try { | ||
} else { | ||
this.subscriptions.delete(id); | ||
this.subscriptionContexts.delete(id); | ||
throw err; | ||
@@ -124,3 +129,3 @@ } | ||
if (resultOrStream.errors != null) { | ||
this.subscriptions.delete(id); | ||
this.subscriptionContexts.delete(id); | ||
this.emitError({ | ||
@@ -167,6 +172,6 @@ code: 'subscribe_failed.gql_error', | ||
this.handleUnsubscribe = async id => { | ||
const subscription = await this.subscriptions.get(id); | ||
const subscriptionContext = this.subscriptionContexts.get(id); | ||
if (subscription && typeof subscription.return === 'function') { | ||
subscription.return(); | ||
if (!subscriptionContext) { | ||
return; | ||
} | ||
@@ -177,3 +182,4 @@ | ||
}); | ||
this.subscriptions.delete(id); | ||
await subscriptionContext.close(); | ||
this.subscriptionContexts.delete(id); | ||
}; | ||
@@ -183,10 +189,3 @@ | ||
this.log('debug', 'client disconnected'); | ||
await this.config.credentialsManager.unauthenticate(); | ||
this.subscriptions.forEach(async subscriptionPromise => { | ||
const subscription = await subscriptionPromise; | ||
if (subscription && typeof subscription.return === 'function') { | ||
subscription.return(); | ||
} | ||
}); | ||
await Promise.all([this.config.credentialsManager.unauthenticate(), ...Array.from(this.subscriptionContexts.values(), subscriptionContext => subscriptionContext.close())]); | ||
}; | ||
@@ -197,3 +196,3 @@ | ||
this.log = config.createLogger('@4c/SubscriptionServer::AuthorizedSocket'); | ||
this.subscriptions = new Map(); | ||
this.subscriptionContexts = new Map(); | ||
this.socket.on('authenticate', this.handleAuthenticate).on('subscribe', this.handleSubscribe).on('unsubscribe', this.handleUnsubscribe).on('disconnect', this.handleDisconnect); | ||
@@ -200,0 +199,0 @@ } |
@@ -60,6 +60,6 @@ "use strict"; | ||
eventQueues.add(queue); | ||
return queue.iterable; | ||
return queue; | ||
} | ||
async close() { | ||
close() { | ||
this._listeners.forEach((fn, event) => { | ||
@@ -66,0 +66,0 @@ this.emitter.removeListener(event, fn); |
@@ -36,3 +36,3 @@ "use strict"; | ||
async unauthenticate() { | ||
unauthenticate() { | ||
if (this.renewHandle) { | ||
@@ -39,0 +39,0 @@ clearTimeout(this.renewHandle); |
@@ -26,3 +26,6 @@ "use strict"; | ||
if (!queues) return; | ||
if (!queues) { | ||
return; | ||
} | ||
queues.forEach(queue => { | ||
@@ -34,15 +37,13 @@ queue.push(message); | ||
_redisSubscribe(channel) { | ||
return (0, _util.promisify)(cb => this.redis.subscribe(channel, cb))(); | ||
} | ||
async _subscribeToChannel(channel) { | ||
if (this._channels.has(channel)) return; | ||
if (this._channels.has(channel)) { | ||
return; | ||
} | ||
this._channels.add(channel); | ||
await this._redisSubscribe(channel); | ||
await (0, _util.promisify)(cb => this.redis.subscribe(channel, cb))(); | ||
} | ||
async subscribe(channel, parseMessage = this._parseMessage) { | ||
subscribe(channel, parseMessage = this._parseMessage) { | ||
let channelQueues = this._queues.get(channel); | ||
@@ -54,4 +55,2 @@ | ||
this._queues.set(channel, channelQueues); | ||
await this._redisSubscribe(channel); | ||
} | ||
@@ -77,5 +76,14 @@ | ||
channelQueues.add(queue); | ||
const iterable = await queue.iterable; | ||
if (!parseMessage) return iterable; | ||
return (0, _AsyncUtils.map)(iterable, parseMessage); | ||
let iteratorPromise = queue.iterator; | ||
if (parseMessage) { | ||
// Workaround for Flow. | ||
const parseMessageFn = parseMessage; | ||
iteratorPromise = iteratorPromise.then(iterator => (0, _AsyncUtils.map)(iterator, parseMessageFn)); | ||
} | ||
return { | ||
iterator: iteratorPromise, | ||
close: queue.close | ||
}; | ||
} | ||
@@ -82,0 +90,0 @@ |
{ | ||
"name": "@4c/graphql-subscription-server", | ||
"version": "0.7.1", | ||
"version": "0.8.0", | ||
"author": "4Catalyzer", | ||
@@ -5,0 +5,0 @@ "license": "MIT", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
77493
48
964