Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@4c/graphql-subscription-server

Package Overview
Dependencies
Maintainers
5
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@4c/graphql-subscription-server - npm Package Compare versions

Comparing version 0.7.1 to 0.8.0

es/SubscriptionContext.js

8

CHANGELOG.md

@@ -0,1 +1,9 @@

<a name="0.8.0"></a>
# [0.8.0](https://github.com/4Catalyzer/graphql-subscription-server/compare/v0.7.1...v0.8.0) (2018-08-13)
### Features
* Use an explicitly closeable async iterator ([#20](https://github.com/4Catalyzer/graphql-subscription-server/issues/20)) ([4408ddf](https://github.com/4Catalyzer/graphql-subscription-server/commit/4408ddf))
<a name="0.7.1"></a>

@@ -2,0 +10,0 @@ ## [0.7.1](https://github.com/4Catalyzer/graphql-subscription-server/compare/v0.7.0...v0.7.1) (2018-08-13)

70

es/AsyncUtils.js

@@ -14,15 +14,22 @@ /* eslint-disable no-await-in-loop */

constructor(options = {}) {
this.closed = false;
this.close = async () => {
if (this.setupPromise) {
await this.setupPromise;
}
if (this.options.teardown) {
await this.options.teardown();
}
this.closed = true;
this.push(null);
};
this.options = options;
this.values = [];
this.options = options;
this.createPromise();
this.iterable = this.createIterable();
this.closed = false;
this.iterator = this.createIterator();
}
close() {
if (this.closed) return;
this.closed = true;
if (this.options.teardown) this.options.teardown();
}
createPromise() {

@@ -34,29 +41,36 @@ this.promise = new Promise(resolve => {

async *createIterableRaw() {
try {
if (this.options.setup) await this.options.setup();
yield null;
async createIterator() {
const iterator = this.createIteratorRaw(); // Wait for setup.
while (true) {
await this.promise;
await iterator.next();
return iterator;
}
for (const value of this.values) {
yield value;
async *createIteratorRaw() {
if (this.options.setup) {
this.setupPromise = this.options.setup();
}
if (this.setupPromise) {
await this.setupPromise;
}
yield null;
while (true) {
await this.promise;
for (const value of this.values) {
if (this.closed) {
return;
}
this.values.length = 0;
this.createPromise();
yield value;
}
} finally {
await this.close();
this.values.length = 0;
this.createPromise();
}
}
async createIterable() {
const iterableRaw = this.createIterableRaw(); // wait for the first synthetic yield after setup
await iterableRaw.next();
return iterableRaw;
}
push(value) {

@@ -63,0 +77,0 @@ this.values.push(value);

34

es/AuthorizedSocketConnection.js
import { createSourceEventStream, execute, GraphQLError, parse, specifiedRules, validate } from 'graphql';
import * as AsyncUtils from './AsyncUtils';
import SubscriptionContext from './SubscriptionContext';

@@ -56,3 +57,3 @@ const acknowledge = cb => {

try {
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptions.size >= this.config.maxSubscriptionsPerConnection) {
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptionContexts.size >= this.config.maxSubscriptionsPerConnection) {
this.log('debug', 'subscription limit reached', {

@@ -67,3 +68,3 @@ maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection

if (this.subscriptions.has(id)) {
if (this.subscriptionContexts.has(id)) {
this.log('debug', 'duplicate subscription attempted', {

@@ -94,6 +95,7 @@ id

const subscriptionContext = new SubscriptionContext(this.config.subscriber);
const sourcePromise = createSourceEventStream(this.config.schema, document, null, {
subscribe: async (...args) => AsyncUtils.filter((await this.config.subscriber.subscribe(...args)), this.isAuthorized)
subscribe: async (...args) => AsyncUtils.filter((await subscriptionContext.subscribe(...args)), this.isAuthorized)
}, variables);
this.subscriptions.set(id, sourcePromise);
this.subscriptionContexts.set(id, subscriptionContext);

@@ -108,3 +110,3 @@ try {

} else {
this.subscriptions.delete(id);
this.subscriptionContexts.delete(id);
throw err;

@@ -115,3 +117,3 @@ }

if (resultOrStream.errors != null) {
this.subscriptions.delete(id);
this.subscriptionContexts.delete(id);
this.emitError({

@@ -158,6 +160,6 @@ code: 'subscribe_failed.gql_error',

this.handleUnsubscribe = async id => {
const subscription = await this.subscriptions.get(id);
const subscriptionContext = this.subscriptionContexts.get(id);
if (subscription && typeof subscription.return === 'function') {
subscription.return();
if (!subscriptionContext) {
return;
}

@@ -168,3 +170,4 @@

});
this.subscriptions.delete(id);
await subscriptionContext.close();
this.subscriptionContexts.delete(id);
};

@@ -174,10 +177,3 @@

this.log('debug', 'client disconnected');
await this.config.credentialsManager.unauthenticate();
this.subscriptions.forEach(async subscriptionPromise => {
const subscription = await subscriptionPromise;
if (subscription && typeof subscription.return === 'function') {
subscription.return();
}
});
await Promise.all([this.config.credentialsManager.unauthenticate(), ...Array.from(this.subscriptionContexts.values(), subscriptionContext => subscriptionContext.close())]);
};

@@ -188,3 +184,3 @@

this.log = config.createLogger('@4c/SubscriptionServer::AuthorizedSocket');
this.subscriptions = new Map();
this.subscriptionContexts = new Map();
this.socket.on('authenticate', this.handleAuthenticate).on('subscribe', this.handleSubscribe).on('unsubscribe', this.handleUnsubscribe).on('disconnect', this.handleDisconnect);

@@ -191,0 +187,0 @@ }

@@ -55,6 +55,6 @@ import { AsyncQueue } from './AsyncUtils';

eventQueues.add(queue);
return queue.iterable;
return queue;
}
async close() {
close() {
this._listeners.forEach((fn, event) => {

@@ -61,0 +61,0 @@ this.emitter.removeListener(event, fn);

@@ -31,3 +31,3 @@ const SECONDS_TO_MS = 1000;

async unauthenticate() {
unauthenticate() {
if (this.renewHandle) {

@@ -34,0 +34,0 @@ clearTimeout(this.renewHandle);

@@ -16,3 +16,6 @@ import redis from 'redis';

if (!queues) return;
if (!queues) {
return;
}
queues.forEach(queue => {

@@ -24,15 +27,13 @@ queue.push(message);

_redisSubscribe(channel) {
return promisify(cb => this.redis.subscribe(channel, cb))();
}
async _subscribeToChannel(channel) {
if (this._channels.has(channel)) return;
if (this._channels.has(channel)) {
return;
}
this._channels.add(channel);
await this._redisSubscribe(channel);
await promisify(cb => this.redis.subscribe(channel, cb))();
}
async subscribe(channel, parseMessage = this._parseMessage) {
subscribe(channel, parseMessage = this._parseMessage) {
let channelQueues = this._queues.get(channel);

@@ -44,4 +45,2 @@

this._queues.set(channel, channelQueues);
await this._redisSubscribe(channel);
}

@@ -67,5 +66,14 @@

channelQueues.add(queue);
const iterable = await queue.iterable;
if (!parseMessage) return iterable;
return map(iterable, parseMessage);
let iteratorPromise = queue.iterator;
if (parseMessage) {
// Workaround for Flow.
const parseMessageFn = parseMessage;
iteratorPromise = iteratorPromise.then(iterator => map(iterator, parseMessageFn));
}
return {
iterator: iteratorPromise,
close: queue.close
};
}

@@ -72,0 +80,0 @@

@@ -23,15 +23,22 @@ "use strict";

constructor(options = {}) {
this.closed = false;
this.close = async () => {
if (this.setupPromise) {
await this.setupPromise;
}
if (this.options.teardown) {
await this.options.teardown();
}
this.closed = true;
this.push(null);
};
this.options = options;
this.values = [];
this.options = options;
this.createPromise();
this.iterable = this.createIterable();
this.closed = false;
this.iterator = this.createIterator();
}
close() {
if (this.closed) return;
this.closed = true;
if (this.options.teardown) this.options.teardown();
}
createPromise() {

@@ -43,29 +50,36 @@ this.promise = new Promise(resolve => {

async *createIterableRaw() {
try {
if (this.options.setup) await this.options.setup();
yield null;
async createIterator() {
const iterator = this.createIteratorRaw(); // Wait for setup.
while (true) {
await this.promise;
await iterator.next();
return iterator;
}
for (const value of this.values) {
yield value;
async *createIteratorRaw() {
if (this.options.setup) {
this.setupPromise = this.options.setup();
}
if (this.setupPromise) {
await this.setupPromise;
}
yield null;
while (true) {
await this.promise;
for (const value of this.values) {
if (this.closed) {
return;
}
this.values.length = 0;
this.createPromise();
yield value;
}
} finally {
await this.close();
this.values.length = 0;
this.createPromise();
}
}
async createIterable() {
const iterableRaw = this.createIterableRaw(); // wait for the first synthetic yield after setup
await iterableRaw.next();
return iterableRaw;
}
push(value) {

@@ -72,0 +86,0 @@ this.values.push(value);

@@ -10,2 +10,6 @@ "use strict";

var _SubscriptionContext = _interopRequireDefault(require("./SubscriptionContext"));
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { var desc = Object.defineProperty && Object.getOwnPropertyDescriptor ? Object.getOwnPropertyDescriptor(obj, key) : {}; if (desc.get || desc.set) { Object.defineProperty(newObj, key, desc); } else { newObj[key] = obj[key]; } } } } newObj.default = obj; return newObj; } }

@@ -65,3 +69,3 @@

try {
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptions.size >= this.config.maxSubscriptionsPerConnection) {
if (this.config.maxSubscriptionsPerConnection != null && this.subscriptionContexts.size >= this.config.maxSubscriptionsPerConnection) {
this.log('debug', 'subscription limit reached', {

@@ -76,3 +80,3 @@ maxSubscriptionsPerConnection: this.config.maxSubscriptionsPerConnection

if (this.subscriptions.has(id)) {
if (this.subscriptionContexts.has(id)) {
this.log('debug', 'duplicate subscription attempted', {

@@ -103,6 +107,7 @@ id

const subscriptionContext = new _SubscriptionContext.default(this.config.subscriber);
const sourcePromise = (0, _graphql.createSourceEventStream)(this.config.schema, document, null, {
subscribe: async (...args) => AsyncUtils.filter((await this.config.subscriber.subscribe(...args)), this.isAuthorized)
subscribe: async (...args) => AsyncUtils.filter((await subscriptionContext.subscribe(...args)), this.isAuthorized)
}, variables);
this.subscriptions.set(id, sourcePromise);
this.subscriptionContexts.set(id, subscriptionContext);

@@ -117,3 +122,3 @@ try {

} else {
this.subscriptions.delete(id);
this.subscriptionContexts.delete(id);
throw err;

@@ -124,3 +129,3 @@ }

if (resultOrStream.errors != null) {
this.subscriptions.delete(id);
this.subscriptionContexts.delete(id);
this.emitError({

@@ -167,6 +172,6 @@ code: 'subscribe_failed.gql_error',

this.handleUnsubscribe = async id => {
const subscription = await this.subscriptions.get(id);
const subscriptionContext = this.subscriptionContexts.get(id);
if (subscription && typeof subscription.return === 'function') {
subscription.return();
if (!subscriptionContext) {
return;
}

@@ -177,3 +182,4 @@

});
this.subscriptions.delete(id);
await subscriptionContext.close();
this.subscriptionContexts.delete(id);
};

@@ -183,10 +189,3 @@

this.log('debug', 'client disconnected');
await this.config.credentialsManager.unauthenticate();
this.subscriptions.forEach(async subscriptionPromise => {
const subscription = await subscriptionPromise;
if (subscription && typeof subscription.return === 'function') {
subscription.return();
}
});
await Promise.all([this.config.credentialsManager.unauthenticate(), ...Array.from(this.subscriptionContexts.values(), subscriptionContext => subscriptionContext.close())]);
};

@@ -197,3 +196,3 @@

this.log = config.createLogger('@4c/SubscriptionServer::AuthorizedSocket');
this.subscriptions = new Map();
this.subscriptionContexts = new Map();
this.socket.on('authenticate', this.handleAuthenticate).on('subscribe', this.handleSubscribe).on('unsubscribe', this.handleUnsubscribe).on('disconnect', this.handleDisconnect);

@@ -200,0 +199,0 @@ }

@@ -60,6 +60,6 @@ "use strict";

eventQueues.add(queue);
return queue.iterable;
return queue;
}
async close() {
close() {
this._listeners.forEach((fn, event) => {

@@ -66,0 +66,0 @@ this.emitter.removeListener(event, fn);

@@ -36,3 +36,3 @@ "use strict";

async unauthenticate() {
unauthenticate() {
if (this.renewHandle) {

@@ -39,0 +39,0 @@ clearTimeout(this.renewHandle);

@@ -26,3 +26,6 @@ "use strict";

if (!queues) return;
if (!queues) {
return;
}
queues.forEach(queue => {

@@ -34,15 +37,13 @@ queue.push(message);

_redisSubscribe(channel) {
return (0, _util.promisify)(cb => this.redis.subscribe(channel, cb))();
}
async _subscribeToChannel(channel) {
if (this._channels.has(channel)) return;
if (this._channels.has(channel)) {
return;
}
this._channels.add(channel);
await this._redisSubscribe(channel);
await (0, _util.promisify)(cb => this.redis.subscribe(channel, cb))();
}
async subscribe(channel, parseMessage = this._parseMessage) {
subscribe(channel, parseMessage = this._parseMessage) {
let channelQueues = this._queues.get(channel);

@@ -54,4 +55,2 @@

this._queues.set(channel, channelQueues);
await this._redisSubscribe(channel);
}

@@ -77,5 +76,14 @@

channelQueues.add(queue);
const iterable = await queue.iterable;
if (!parseMessage) return iterable;
return (0, _AsyncUtils.map)(iterable, parseMessage);
let iteratorPromise = queue.iterator;
if (parseMessage) {
// Workaround for Flow.
const parseMessageFn = parseMessage;
iteratorPromise = iteratorPromise.then(iterator => (0, _AsyncUtils.map)(iterator, parseMessageFn));
}
return {
iterator: iteratorPromise,
close: queue.close
};
}

@@ -82,0 +90,0 @@

{
"name": "@4c/graphql-subscription-server",
"version": "0.7.1",
"version": "0.8.0",
"author": "4Catalyzer",

@@ -5,0 +5,0 @@ "license": "MIT",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc