@4c/graphql-subscription-server
Advanced tools
Comparing version 0.6.0 to 0.7.0
@@ -18,3 +18,3 @@ /* eslint-disable no-await-in-loop */ | ||
this.createPromise(); | ||
this.iterable = this.createIterable(options.setup); | ||
this.iterable = this.createIterable(); | ||
} | ||
@@ -34,5 +34,6 @@ | ||
async *createIterable(setup) { | ||
async *createIterableRaw() { | ||
try { | ||
if (setup) await setup(); | ||
if (this.options.setup) await this.options.setup(); | ||
yield null; | ||
@@ -54,2 +55,9 @@ while (true) { | ||
async createIterable() { | ||
const iterableRaw = this.createIterableRaw(); // wait for the first synthetic yield after setup | ||
await iterableRaw.next(); | ||
return iterableRaw; | ||
} | ||
push(value) { | ||
@@ -56,0 +64,0 @@ this.values.push(value); |
@@ -93,3 +93,3 @@ import { createSourceEventStream, execute, GraphQLError, parse, specifiedRules, validate } from 'graphql'; | ||
const sourcePromise = createSourceEventStream(this.config.schema, document, null, { | ||
subscribe: (...args) => AsyncUtils.filter(this.config.subscriber.subscribe(...args), this.isAuthorized) | ||
subscribe: async (...args) => AsyncUtils.filter((await this.config.subscriber.subscribe(...args)), this.isAuthorized) | ||
}, variables); | ||
@@ -96,0 +96,0 @@ this.subscriptions.set(id, sourcePromise); |
@@ -23,3 +23,7 @@ import redis from 'redis'; | ||
_subscribeToChannel(channel) { | ||
_redisSubscribe(channel) { | ||
return promisify(cb => this.redis.subscribe(channel, cb))(); | ||
} | ||
async _subscribeToChannel(channel) { | ||
if (this._channels.has(channel)) return; | ||
@@ -29,6 +33,6 @@ | ||
this.redis.subscribe(channel); | ||
await this._redisSubscribe(channel); | ||
} | ||
subscribe(channel, parseMessage = this._parseMessage) { | ||
async subscribe(channel, parseMessage = this._parseMessage) { | ||
let channelQueues = this._queues.get(channel); | ||
@@ -41,3 +45,3 @@ | ||
this.redis.subscribe(channel); | ||
await this._redisSubscribe(channel); | ||
} | ||
@@ -63,4 +67,5 @@ | ||
channelQueues.add(queue); | ||
if (!parseMessage) return queue.iterable; | ||
return map(queue.iterable, parseMessage); | ||
const iterable = await queue.iterable; | ||
if (!parseMessage) return iterable; | ||
return map(iterable, parseMessage); | ||
} | ||
@@ -67,0 +72,0 @@ |
@@ -27,3 +27,3 @@ "use strict"; | ||
this.createPromise(); | ||
this.iterable = this.createIterable(options.setup); | ||
this.iterable = this.createIterable(); | ||
} | ||
@@ -43,5 +43,6 @@ | ||
async *createIterable(setup) { | ||
async *createIterableRaw() { | ||
try { | ||
if (setup) await setup(); | ||
if (this.options.setup) await this.options.setup(); | ||
yield null; | ||
@@ -63,2 +64,9 @@ while (true) { | ||
async createIterable() { | ||
const iterableRaw = this.createIterableRaw(); // wait for the first synthetic yield after setup | ||
await iterableRaw.next(); | ||
return iterableRaw; | ||
} | ||
push(value) { | ||
@@ -65,0 +73,0 @@ this.values.push(value); |
@@ -101,3 +101,3 @@ "use strict"; | ||
const sourcePromise = (0, _graphql.createSourceEventStream)(this.config.schema, document, null, { | ||
subscribe: (...args) => AsyncUtils.filter(this.config.subscriber.subscribe(...args), this.isAuthorized) | ||
subscribe: async (...args) => AsyncUtils.filter((await this.config.subscriber.subscribe(...args)), this.isAuthorized) | ||
}, variables); | ||
@@ -104,0 +104,0 @@ this.subscriptions.set(id, sourcePromise); |
@@ -33,3 +33,7 @@ "use strict"; | ||
_subscribeToChannel(channel) { | ||
_redisSubscribe(channel) { | ||
return (0, _util.promisify)(cb => this.redis.subscribe(channel, cb))(); | ||
} | ||
async _subscribeToChannel(channel) { | ||
if (this._channels.has(channel)) return; | ||
@@ -39,6 +43,6 @@ | ||
this.redis.subscribe(channel); | ||
await this._redisSubscribe(channel); | ||
} | ||
subscribe(channel, parseMessage = this._parseMessage) { | ||
async subscribe(channel, parseMessage = this._parseMessage) { | ||
let channelQueues = this._queues.get(channel); | ||
@@ -51,3 +55,3 @@ | ||
this.redis.subscribe(channel); | ||
await this._redisSubscribe(channel); | ||
} | ||
@@ -73,4 +77,5 @@ | ||
channelQueues.add(queue); | ||
if (!parseMessage) return queue.iterable; | ||
return (0, _AsyncUtils.map)(queue.iterable, parseMessage); | ||
const iterable = await queue.iterable; | ||
if (!parseMessage) return iterable; | ||
return (0, _AsyncUtils.map)(iterable, parseMessage); | ||
} | ||
@@ -77,0 +82,0 @@ |
{ | ||
"name": "@4c/graphql-subscription-server", | ||
"version": "0.6.0", | ||
"version": "0.7.0", | ||
"author": "4Catalyzer", | ||
@@ -5,0 +5,0 @@ "license": "MIT", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
72967
889
1