Socket
Socket
Sign inDemoInstall

graphql-ws

Package Overview
Dependencies
Maintainers
1
Versions
103
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

graphql-ws - npm Package Compare versions

Comparing version 0.1.4 to 0.1.5

dist/Binary.d.ts

14

dist/client.d.ts
import { ListenerFn } from 'eventemitter3';
import { ExecutionResult } from 'graphql/execution/execute';
import { DocumentNode } from 'graphql/language/ast';
import { Binary } from './types/Binary';
import { IncomingFile } from './common';
import { Binary } from './Binary';
export { Readable, Writable, Buffer } from 'readable-stream';
export { Binary };

@@ -33,5 +33,2 @@ export interface Observer<T> {

}
export interface IncomingFiles {
[id: number]: IncomingFile[];
}
export interface OperationFiles {

@@ -100,7 +97,2 @@ [id: number]: Binary[];

private checkOperationOptions(options, handler);
private sendSingleFile(opId, file, offset?);
private extractFiles(variables?);
private binaryReader(resource, offset, onData, onComplete);
private processFiles(opId, response?);
private buildMessage(id, type, payload);
private formatErrors(errors);

@@ -114,6 +106,6 @@ private sendMessage(id, type, payload);

private checkMaxConnectTimeout();
private patchSocket();
private connect();
private parseMessage(buffer);
private processReceivedData(receivedData);
private unsubscribe(opId);
}

@@ -11,3 +11,2 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const Subject_1 = require("rxjs/Subject");
const _global = typeof global !== 'undefined' ? global : (typeof window !== 'undefined' ? window : {});

@@ -19,3 +18,2 @@ const NativeWebSocket = _global.WebSocket || _global.MozWebSocket;

const isObject = require("lodash.isobject");
const printer_1 = require("graphql/language/printer");
const getOperationAST_1 = require("graphql/utilities/getOperationAST");

@@ -26,5 +24,10 @@ const symbol_observable_1 = require("symbol-observable");

const message_type_1 = require("./message-type");
const Binary_1 = require("./types/Binary");
const Binary_1 = require("./Binary");
exports.Binary = Binary_1.Binary;
const BinarySender_1 = require("./BinarySender");
const common_1 = require("./common");
var readable_stream_1 = require("readable-stream");
exports.Readable = readable_stream_1.Readable;
exports.Writable = readable_stream_1.Writable;
exports.Buffer = readable_stream_1.Buffer;
class SubscriptionClient {

@@ -195,3 +198,3 @@ constructor(url, options, webSocketImpl) {

this.checkOperationOptions(processedOptions, handler);
this.filesOut[opId] = yield this.extractFiles(processedOptions.variables);
this.filesOut[opId] = common_1.extractOutgoingFiles(processedOptions.variables);
if (this.operations[opId]) {

@@ -260,93 +263,2 @@ this.operations[opId] = { options: processedOptions, handler };

}
sendSingleFile(opId, file, offset = 0) {
return __awaiter(this, void 0, void 0, function* () {
const headerSize = 4 * 3;
const chunkSize = common_1.BINARY_CHUNK_SIZE;
const buffer = new DataView(new ArrayBuffer(headerSize + chunkSize));
buffer.setUint32(0, opId, true);
buffer.setUint32(4, message_type_1.MessageType.GQL_BINARY, true);
buffer.setUint32(8, file.getId(), true);
file.initRead(offset);
let read;
do {
read = yield file.readInto(buffer.buffer, chunkSize, headerSize);
let buf = buffer.buffer;
if (read < chunkSize) {
buf = buffer.buffer.slice(0, headerSize + read);
}
this.sendMessageRaw(buf);
} while (read === chunkSize);
this.sendMessageRaw(buffer.buffer.slice(0, headerSize));
});
}
extractFiles(variables) {
return __awaiter(this, void 0, void 0, function* () {
const files = [];
for (let file of common_1.findBinaries(variables || {})) {
yield file.initMetadata();
const found = files.find(f => f.equal(file));
if (found) {
file.clone(found);
}
else {
files.push(file);
}
}
return files;
});
}
binaryReader(resource, offset, onData, onComplete) {
const { opId, fileId } = resource;
const file = this.filesIn[opId].find(f => f.binary.getId() === fileId);
const reader = file.reader;
const request = {
id: fileId,
offset,
};
this.sendMessage(opId, message_type_1.MessageType.GQL_BINARY_REQUEST, request);
reader.subscribe((data) => __awaiter(this, void 0, void 0, function* () { return onData(data); }), (error) => __awaiter(this, void 0, void 0, function* () { return onComplete(error); }), () => __awaiter(this, void 0, void 0, function* () { return onComplete(); }));
}
processFiles(opId, response) {
common_1.deserializeBinaries(response || {}, (file) => {
const obj = { opId, fileId: file.id };
const binary = new Binary_1.Binary(obj, { onRead: this.binaryReader.bind(this) }, file);
const found = this.filesIn[opId].find(f => f.binary.equal(binary));
if (found) {
binary.clone(found.binary);
}
else {
const reader = new Subject_1.Subject();
this.filesIn[opId].push({
binary,
reader,
});
}
return binary;
});
}
buildMessage(id, type, payload) {
let serializedMessage;
if (payload) {
const payloadToReturn = payload && payload.query ? Object.assign({}, payload, { query: typeof payload.query === 'string' ? payload.query : printer_1.print(payload.query) }) :
payload;
serializedMessage = JSON.stringify(payloadToReturn);
let parsedMessage;
try {
parsedMessage = JSON.parse(serializedMessage);
}
catch (e) {
throw new Error(`Message must be JSON-serializable. Got: ${payloadToReturn}`);
}
}
else {
serializedMessage = '';
}
const enc = new window.TextEncoder('utf-8');
const headerSize = 4 * 2;
const message = new DataView(new ArrayBuffer(headerSize + serializedMessage.length));
message.setUint32(0, id, true);
message.setUint32(4, type, true);
new Uint8Array(message.buffer).set(enc.encode(serializedMessage), 8);
return message.buffer;
}
formatErrors(errors) {

@@ -368,3 +280,3 @@ if (Array.isArray(errors)) {

sendMessage(id, type, payload) {
this.sendMessageRaw(this.buildMessage(id, type, payload));
this.sendMessageRaw(common_1.buildMessage(id, type, payload));
}

@@ -396,3 +308,3 @@ sendMessageRaw(message) {

const opId = parseInt(key, 10);
this.unsentMessagesQueue.push(this.buildMessage(opId, message_type_1.MessageType.GQL_START, this.operations[opId].options));
this.unsentMessagesQueue.push(common_1.buildMessage(opId, message_type_1.MessageType.GQL_START, this.operations[opId].options));
});

@@ -430,5 +342,27 @@ this.reconnecting = true;

}
patchSocket() {
this.client.removeListener = this.client.removeEventListener;
let sockSend = this.client.send;
sockSend = sockSend.bind(this.client);
this.client.send = (data, callback) => {
try {
sockSend(data);
if (callback) {
callback();
}
}
catch (err) {
if (callback) {
callback(err);
}
else {
throw err;
}
}
};
}
connect() {
this.client = new this.wsImpl(this.url, protocol_1.GRAPHQL_WS);
this.client.binaryType = 'arraybuffer';
this.patchSocket();
this.checkMaxConnectTimeout();

@@ -454,36 +388,9 @@ this.client.onopen = () => {

}
parseMessage(buffer) {
const message = new DataView(buffer);
let result;
let base = {
id: message.getUint32(0, true),
type: message.getUint32(4, true),
};
if (base.type === message_type_1.MessageType.GQL_BINARY) {
const payload = {
fileId: message.getUint32(8, true),
buffer: buffer.slice(12),
};
result = Object.assign({}, base, { payload });
}
else {
const dec = new window.TextDecoder('utf-8');
let payload = dec.decode(buffer.slice(8));
result = Object.assign({}, base, { payload: payload.length ? JSON.parse(payload) : null });
}
return result;
}
processReceivedData(receivedData) {
return __awaiter(this, void 0, void 0, function* () {
const parsedMessage = this.parseMessage(receivedData);
const opId = parsedMessage.id;
if ([message_type_1.MessageType.GQL_DATA,
message_type_1.MessageType.GQL_COMPLETE,
message_type_1.MessageType.GQL_ERROR,
].indexOf(parsedMessage.type) !== -1 && !this.operations[opId]) {
console.log('DEBUGGER (yki)');
debugger;
this.unsubscribe(opId);
const parsedMessage = common_1.parseMessage(receivedData);
if (!parsedMessage) {
return;
}
const opId = parsedMessage.id;
switch (parsedMessage.type) {

@@ -516,14 +423,27 @@ case message_type_1.MessageType.GQL_CONNECTION_ERROR:

parsedMessage.payload : Object.assign({}, parsedMessage.payload, { errors: this.formatErrors(parsedMessage.payload.errors) });
this.processFiles(opId, parsedPayload);
this.filesIn[opId] = common_1.extractIncomingFiles(opId, this.client, parsedPayload);
this.operations[opId].handler(null, parsedPayload);
break;
}
case message_type_1.MessageType.GQL_BINARY: {
case message_type_1.MessageType.GQL_BINARY_REQUEST: {
const payload = parsedMessage.payload;
if (this.filesIn[opId]) {
const file = this.filesIn[opId].find(f => f.binary.getId() === payload.fileId);
if (this.filesOut[opId]) {
const { id, offset } = payload;
const file = this.filesOut[opId].find(f => f.getId() === id);
if (file) {
file.reader.next(payload.buffer);
if (payload.buffer.byteLength === 0) {
file.reader.complete();
const reader = file.createReadStream(offset);
const writer = new BinarySender_1.BinarySender({
opId,
fileId: file.getId(),
socket: this.client,
});
const finish = new Promise((resolve, reject) => {
reader.pipe(writer)
.on('finish', resolve)
.on('error', reject);
});
yield finish;
if (this.filesOut[opId]) {
const index = this.filesOut[opId].findIndex(f => f === file);
delete this.filesOut[opId][index];
}

@@ -534,13 +454,2 @@ }

}
case message_type_1.MessageType.GQL_BINARY_REQUEST: {
const payload = parsedMessage.payload;
const { id, offset } = payload;
const file = this.filesOut[opId].find(f => f.getId() === id);
if (file) {
yield this.sendSingleFile(opId, file, offset);
const index = this.filesOut[opId].findIndex(f => f === file);
delete this.filesOut[opId][index];
}
break;
}
case message_type_1.MessageType.GQL_CONNECTION_KEEP_ALIVE:

@@ -547,0 +456,0 @@ const firstKA = typeof this.wasKeepAliveReceived === 'undefined';

@@ -1,15 +0,2 @@

import { Binary, SerializedBinary } from './types/Binary';
import { Subject } from 'rxjs';
export declare const BINARY_CHUNK_SIZE: number;
export declare function repeatPromise(promise: () => Promise<boolean>): any;
export declare function findBinaries(object: any): Binary[];
export declare function deserializeBinaries(object: any, callback: (value: SerializedBinary) => Binary): void;
export interface IncomingFile {
binary: Binary;
reader: Subject<any>;
}
export interface FilePayload {
fileId: number;
buffer: ArrayBuffer;
}
import { Binary } from './Binary';
export interface FileRequestPayload {

@@ -19,1 +6,14 @@ id: number;

}
export declare function repeatPromise(promise: () => Promise<boolean>): any;
export declare function extractIncomingFiles(opId: number, socket: any, obj?: {
[key: string]: any;
}): Binary[];
export declare function extractOutgoingFiles(obj?: {
[key: string]: any;
}): Binary[];
export declare function parseMessage(buffer: ArrayBuffer): {
payload: any;
id: number;
type: number;
};
export declare function buildMessage(id: number, type: number, payload: any): ArrayBuffer;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const Binary_1 = require("./types/Binary");
exports.BINARY_CHUNK_SIZE = 100 * 1024;
const Binary_1 = require("./Binary");
const BinaryReceiver_1 = require("./BinaryReceiver");
const message_type_1 = require("./message-type");
function repeatPromise(promise) {

@@ -16,3 +17,3 @@ return promise().then((repeat) => repeat && repeatPromise(promise));

else if (object[k] && typeof object[k] === 'object') {
value.push(...this.findBinaries(object[k]));
value.push(...findBinaries(object[k]));
}

@@ -22,3 +23,2 @@ });

}
exports.findBinaries = findBinaries;
function deserializeBinaries(object, callback) {

@@ -30,7 +30,88 @@ Object.keys(object).forEach((k) => {

else if (object[k] && typeof object[k] === 'object') {
this.deserializeBinaries(object[k], callback);
deserializeBinaries(object[k], callback);
}
});
}
exports.deserializeBinaries = deserializeBinaries;
function extractIncomingFiles(opId, socket, obj) {
const files = [];
deserializeBinaries(obj || {}, (file) => {
const onStream = (offset) => new BinaryReceiver_1.BinaryReceiver({
opId,
fileId: file.id,
offset,
socket,
});
const binary = new Binary_1.Binary(onStream, file);
const found = files.find(f => f.equal(binary));
if (found) {
binary.clone(found);
}
else {
files.push(binary);
}
return binary;
});
return files;
}
exports.extractIncomingFiles = extractIncomingFiles;
function extractOutgoingFiles(obj) {
const files = [];
for (let file of findBinaries(obj || {})) {
const found = files.find(f => f.equal(file));
if (found) {
file.clone(found);
}
else {
files.push(file);
}
}
return files;
}
exports.extractOutgoingFiles = extractOutgoingFiles;
function parseMessage(buffer) {
const message = Buffer.from(buffer);
let result;
let payloadBase = {
id: message.readUInt32LE(0),
type: message.readUInt32LE(4),
};
if (payloadBase.type === message_type_1.MessageType.GQL_BINARY_REQUEST) {
const payload = {
id: message.readUInt32LE(8),
offset: message.readUInt32LE(12),
};
result = Object.assign({}, payloadBase, { payload });
}
else {
const availableTypes = [
message_type_1.MessageType.GQL_CONNECTION_INIT,
message_type_1.MessageType.GQL_START,
message_type_1.MessageType.GQL_STOP,
message_type_1.MessageType.GQL_CONNECTION_TERMINATE,
message_type_1.MessageType.GQL_CONNECTION_ERROR,
message_type_1.MessageType.GQL_CONNECTION_ACK,
message_type_1.MessageType.GQL_COMPLETE,
message_type_1.MessageType.GQL_ERROR,
message_type_1.MessageType.GQL_DATA,
message_type_1.MessageType.GQL_CONNECTION_KEEP_ALIVE,
];
if (availableTypes.includes(payloadBase.type)) {
const payloadStr = Buffer.from(buffer, 8).toString();
const payload = payloadStr.length > 0 ? JSON.parse(payloadStr) : null;
result = Object.assign({}, payloadBase, { payload });
}
}
return result;
}
exports.parseMessage = parseMessage;
function buildMessage(id, type, payload) {
let serializedMessage = JSON.stringify(payload) || '';
const headerSize = 8;
const message = new Buffer(headerSize + serializedMessage.length);
message.writeUInt32LE(id, 0);
message.writeUInt32LE(type, 4);
Buffer.from(serializedMessage).copy(message, headerSize);
return message.buffer;
}
exports.buildMessage = buildMessage;
//# sourceMappingURL=common.js.map

@@ -6,2 +6,2 @@ export * from './client';

export * from './types/BinaryType';
export * from './types/Binary';
export * from './Binary';

@@ -11,3 +11,3 @@ "use strict";

__export(require("./types/BinaryType"));
__export(require("./types/Binary"));
__export(require("./Binary"));
//# sourceMappingURL=index.js.map

@@ -10,6 +10,7 @@ export declare enum MessageType {

GQL_BINARY = 8,
GQL_BINARY_REQUEST = 9,
GQL_ERROR = 10,
GQL_COMPLETE = 11,
GQL_STOP = 12,
GQL_BINARY_ACK = 9,
GQL_BINARY_REQUEST = 10,
GQL_ERROR = 11,
GQL_COMPLETE = 12,
GQL_STOP = 13,
}

@@ -13,7 +13,8 @@ "use strict";

MessageType[MessageType["GQL_BINARY"] = 8] = "GQL_BINARY";
MessageType[MessageType["GQL_BINARY_REQUEST"] = 9] = "GQL_BINARY_REQUEST";
MessageType[MessageType["GQL_ERROR"] = 10] = "GQL_ERROR";
MessageType[MessageType["GQL_COMPLETE"] = 11] = "GQL_COMPLETE";
MessageType[MessageType["GQL_STOP"] = 12] = "GQL_STOP";
MessageType[MessageType["GQL_BINARY_ACK"] = 9] = "GQL_BINARY_ACK";
MessageType[MessageType["GQL_BINARY_REQUEST"] = 10] = "GQL_BINARY_REQUEST";
MessageType[MessageType["GQL_ERROR"] = 11] = "GQL_ERROR";
MessageType[MessageType["GQL_COMPLETE"] = 12] = "GQL_COMPLETE";
MessageType[MessageType["GQL_STOP"] = 13] = "GQL_STOP";
})(MessageType = exports.MessageType || (exports.MessageType = {}));
//# sourceMappingURL=message-type.js.map

@@ -5,4 +5,4 @@ /// <reference types="node" />

import { ExecutionResult, GraphQLSchema, DocumentNode, ValidationContext, GraphQLFieldResolver } from 'graphql';
import { Binary } from './types/Binary';
import { FilePayload, FileRequestPayload, IncomingFile } from './common';
import { Binary } from './Binary';
import { FileRequestPayload } from './common';
export declare type ExecutionIterator = AsyncIterator<ExecutionResult>;

@@ -24,3 +24,3 @@ export interface ExecutionParams<TContext = any> {

filesIn: {
[id: number]: IncomingFile[];
[id: number]: Binary[];
};

@@ -44,3 +44,3 @@ filesOut: {

export interface OperationMessage {
payload?: QueryPayload | FilePayload | FileRequestPayload;
payload?: QueryPayload | FileRequestPayload;
id?: string;

@@ -60,2 +60,3 @@ type: number;

subscribe?: SubscribeFunction;
formatError?: Function;
validationRules?: Array<(context: ValidationContext) => any>;

@@ -77,2 +78,3 @@ onOperation?: Function;

private schema;
private formatError;
private rootValue;

@@ -83,2 +85,4 @@ private keepAlive;

static create(options: ServerOptions, socketOptions: WebSocket.IServerOptions): SubscriptionServer;
private static sendMessage(connectionContext, opId, type, payload);
private static sendError(connectionContext, opId, errorPayload, overrideDefaultErrorType?);
constructor(options: ServerOptions, socketOptions: WebSocket.IServerOptions);

@@ -90,11 +94,3 @@ readonly server: WebSocket.Server;

private onClose(connectionContext);
private parseMessage(buffer);
private binaryReader(resource, offset, onData, onComplete);
private processFiles(ctx, opId, variables?);
private extractFiles(response?);
private sendSingleFile(connectionContext, opId, file, offset?);
private onMessage(connectionContext);
private buildMessage(id, type, payload);
private sendMessage(connectionContext, opId, type, payload);
private sendError(connectionContext, opId, errorPayload, overrideDefaultErrorType?);
}

@@ -21,5 +21,4 @@ "use strict";

const is_subscriptions_1 = require("./utils/is-subscriptions");
const rxjs_1 = require("rxjs");
const Binary_1 = require("./types/Binary");
const common_1 = require("./common");
const BinarySender_1 = require("./BinarySender");
class SubscriptionServer {

@@ -29,2 +28,19 @@ static create(options, socketOptions) {

}
static sendMessage(connectionContext, opId, type, payload) {
const message = common_1.buildMessage(opId, type, payload);
if (connectionContext.socket.readyState === WebSocket.OPEN) {
connectionContext.socket.send(message);
}
}
static sendError(connectionContext, opId, errorPayload, overrideDefaultErrorType) {
const sanitizedOverrideDefaultErrorType = overrideDefaultErrorType || message_type_1.MessageType.GQL_ERROR;
if ([
message_type_1.MessageType.GQL_CONNECTION_ERROR,
message_type_1.MessageType.GQL_ERROR,
].indexOf(sanitizedOverrideDefaultErrorType) === -1) {
throw new Error('overrideDefaultErrorType should be one of the allowed error messages' +
' GQL_CONNECTION_ERROR or GQL_ERROR');
}
SubscriptionServer.sendMessage(connectionContext, opId, sanitizedOverrideDefaultErrorType, errorPayload);
}
constructor(options, socketOptions) {

@@ -57,3 +73,3 @@ const { onOperation, onOperationComplete, onConnect, onDisconnect, keepAlive, } = options;

if (socket.readyState === WebSocket.OPEN) {
this.sendMessage(connectionContext, undefined, message_type_1.MessageType.GQL_CONNECTION_KEEP_ALIVE, undefined);
SubscriptionServer.sendMessage(connectionContext, undefined, message_type_1.MessageType.GQL_CONNECTION_KEEP_ALIVE, undefined);
}

@@ -67,3 +83,3 @@ else {

if (error) {
this.sendError(connectionContext, 0, { message: error.message ? error.message : error }, message_type_1.MessageType.GQL_CONNECTION_ERROR);
SubscriptionServer.sendError(connectionContext, 0, { message: error.message ? error.message : error }, message_type_1.MessageType.GQL_CONNECTION_ERROR);
setTimeout(() => {

@@ -95,3 +111,3 @@ connectionContext.socket.close(1011);

loadExecutor(options) {
const { execute, subscribe, schema, rootValue } = options;
const { execute, subscribe, schema, rootValue, formatError } = options;
if (!execute) {

@@ -105,2 +121,3 @@ throw new Error('Must provide `execute` for websocket server constructor.');

this.rootValue = rootValue;
this.formatError = formatError;
this.execute = execute;

@@ -127,92 +144,2 @@ this.subscribe = subscribe;

}
parseMessage(buffer) {
const message = new DataView(buffer);
let result;
let payloadBase = {
id: message.getUint32(0, true),
type: message.getUint32(4, true),
};
if (payloadBase.type === message_type_1.MessageType.GQL_BINARY) {
const payload = {
fileId: message.getUint32(8, true),
buffer: buffer.slice(12),
};
result = Object.assign({}, payloadBase, { payload });
}
else {
const payloadStr = Buffer.from(buffer, 8).toString();
const payload = payloadStr.length > 0 ? JSON.parse(payloadStr) : null;
result = Object.assign({}, payloadBase, { payload });
}
return result;
}
binaryReader(resource, offset, onData, onComplete) {
const { connectionContext, opId, fileId } = resource;
const file = connectionContext.filesIn[opId].find((f) => f.binary.getId() === fileId);
const reader = file.reader;
const request = {
id: fileId,
offset,
};
this.sendMessage(connectionContext, opId, message_type_1.MessageType.GQL_BINARY_REQUEST, request);
reader.subscribe((data) => __awaiter(this, void 0, void 0, function* () { return onData(data); }), (error) => __awaiter(this, void 0, void 0, function* () { return onComplete(error); }), () => __awaiter(this, void 0, void 0, function* () { return onComplete(); }));
}
processFiles(ctx, opId, variables) {
common_1.deserializeBinaries(variables || {}, (file) => {
const obj = { connectionContext: ctx, opId, fileId: file.id };
const binary = new Binary_1.Binary(obj, { onRead: this.binaryReader.bind(this) }, file);
const found = ctx.filesIn[opId].find(f => f.binary.equal(binary));
if (found) {
binary.clone(found.binary);
}
else {
const reader = new rxjs_1.Subject();
ctx.filesIn[opId].push({
binary,
reader,
});
}
return binary;
});
}
extractFiles(response) {
return __awaiter(this, void 0, void 0, function* () {
const files = [];
for (let file of common_1.findBinaries(response || {})) {
yield file.initMetadata();
const found = files.find((f) => f.equal(file));
if (!found) {
files.push(file);
}
else {
file.clone(found);
}
}
return files;
});
}
sendSingleFile(connectionContext, opId, file, offset = 0) {
return __awaiter(this, void 0, void 0, function* () {
const headerSize = 4 * 3;
const chunkSize = common_1.BINARY_CHUNK_SIZE;
const buffer = new DataView(new ArrayBuffer(headerSize + chunkSize));
buffer.setUint32(0, opId, true);
buffer.setUint32(4, message_type_1.MessageType.GQL_BINARY, true);
buffer.setUint32(8, file.getId(), true);
file.initRead(offset);
let read;
do {
read = yield file.readInto(buffer.buffer, chunkSize, headerSize);
if (connectionContext.socket.readyState !== WebSocket.OPEN) {
break;
}
let buf = buffer.buffer;
if (read < chunkSize) {
buf = buffer.buffer.slice(0, headerSize + read);
}
connectionContext.socket.send(buf);
} while (read === chunkSize);
connectionContext.socket.send(buffer.buffer.slice(0, headerSize));
});
}
onMessage(connectionContext) {

@@ -225,3 +152,6 @@ let onInitResolve = null, onInitReject = null;

return (message) => {
const parsedMessage = this.parseMessage(message);
const parsedMessage = common_1.parseMessage(message);
if (!parsedMessage) {
return;
}
const opId = parsedMessage.id;

@@ -246,8 +176,8 @@ switch (parsedMessage.type) {

}
this.sendMessage(connectionContext, undefined, message_type_1.MessageType.GQL_CONNECTION_ACK, undefined);
SubscriptionServer.sendMessage(connectionContext, undefined, message_type_1.MessageType.GQL_CONNECTION_ACK, undefined);
if (this.keepAlive) {
this.sendMessage(connectionContext, undefined, message_type_1.MessageType.GQL_CONNECTION_KEEP_ALIVE, undefined);
SubscriptionServer.sendMessage(connectionContext, undefined, message_type_1.MessageType.GQL_CONNECTION_KEEP_ALIVE, undefined);
}
}).catch((error) => {
this.sendError(connectionContext, opId, { message: error.message }, message_type_1.MessageType.GQL_CONNECTION_ERROR);
SubscriptionServer.sendError(connectionContext, opId, { message: error.message }, message_type_1.MessageType.GQL_CONNECTION_ERROR);
setTimeout(() => {

@@ -287,3 +217,3 @@ connectionContext.socket.close(1011);

const error = `Invalid params returned from onOperation! return values must be an object!`;
this.sendError(connectionContext, opId, { message: error });
SubscriptionServer.sendError(connectionContext, opId, { message: error });
throw new Error(error);

@@ -302,3 +232,3 @@ }

}
this.processFiles(connectionContext, opId, params.variables);
connectionContext.filesIn[opId] = common_1.extractIncomingFiles(opId, connectionContext.socket, params.variables);
const promiseOrIterable = executor(this.schema, document, this.rootValue, params.context, params.variables, params.operationName);

@@ -313,3 +243,3 @@ if (!iterall_1.isAsyncIterable(promiseOrIterable) && promiseOrIterable instanceof Promise) {

console.error('Invalid `execute` return type! Only Promise or AsyncIterable are valid values!');
this.sendError(connectionContext, opId, {
SubscriptionServer.sendError(connectionContext, opId, {
message: 'GraphQL execute engine is not available',

@@ -327,3 +257,3 @@ });

let result = value;
connectionContext.filesOut[opId] = yield this.extractFiles(result.data);
connectionContext.filesOut[opId] = common_1.extractOutgoingFiles(result.data);
if (params.formatResponse) {

@@ -337,5 +267,5 @@ try {

}
this.sendMessage(connectionContext, opId, message_type_1.MessageType.GQL_DATA, result);
SubscriptionServer.sendMessage(connectionContext, opId, message_type_1.MessageType.GQL_DATA, result);
})).then(() => __awaiter(this, void 0, void 0, function* () {
while (connectionContext.filesOut[opId].length > 0) {
while (connectionContext.filesOut[opId] && connectionContext.filesOut[opId].length > 0) {
yield new Promise(resolve => connectionContext.filesOutEvent.once(opId.toString(), resolve));

@@ -345,3 +275,3 @@ }

.then(() => {
this.sendMessage(connectionContext, opId, message_type_1.MessageType.GQL_COMPLETE, null);
SubscriptionServer.sendMessage(connectionContext, opId, message_type_1.MessageType.GQL_COMPLETE, null);
this.unsubscribe(connectionContext, opId);

@@ -362,3 +292,3 @@ })

}
this.sendError(connectionContext, opId, error);
SubscriptionServer.sendError(connectionContext, opId, error);
});

@@ -371,6 +301,6 @@ return executionIterable;

if (e.errors) {
this.sendMessage(connectionContext, opId, message_type_1.MessageType.GQL_DATA, { errors: e.errors });
SubscriptionServer.sendMessage(connectionContext, opId, message_type_1.MessageType.GQL_DATA, { errors: e.errors });
}
else {
this.sendError(connectionContext, opId, { message: e.message });
SubscriptionServer.sendError(connectionContext, opId, { message: e.message });
}

@@ -382,16 +312,2 @@ this.unsubscribe(connectionContext, opId);

break;
case message_type_1.MessageType.GQL_BINARY:
connectionContext.initPromise.then(() => {
const payload = parsedMessage.payload;
if (connectionContext.filesIn[opId]) {
const file = connectionContext.filesIn[opId].find(f => f.binary.getId() === payload.fileId);
if (file) {
file.reader.next(payload.buffer);
if (payload.buffer.byteLength === 0) {
file.reader.complete();
}
}
}
});
break;
case message_type_1.MessageType.GQL_BINARY_REQUEST:

@@ -404,6 +320,19 @@ connectionContext.initPromise.then(() => __awaiter(this, void 0, void 0, function* () {

if (file) {
yield this.sendSingleFile(connectionContext, opId, file, offset);
const index = connectionContext.filesOut[opId].findIndex(f => f === file);
delete connectionContext.filesOut[opId][index];
connectionContext.filesOutEvent.emit(opId.toString());
const reader = file.createReadStream(offset);
const writer = new BinarySender_1.BinarySender({
opId,
fileId: file.getId(),
socket: connectionContext.socket,
});
const finish = new Promise((resolve, reject) => {
reader.pipe(writer)
.on('finish', resolve)
.on('error', reject);
});
yield finish;
if (connectionContext.filesOut[opId]) {
const index = connectionContext.filesOut[opId].findIndex(f => f === file);
delete connectionContext.filesOut[opId][index];
connectionContext.filesOutEvent.emit(opId.toString());
}
}

@@ -419,34 +348,8 @@ }

default:
this.sendError(connectionContext, opId, { message: 'Invalid message type!' });
break;
}
};
}
buildMessage(id, type, payload) {
let serializedMessage = JSON.stringify(payload) || '';
const headerSize = 4 * 2;
const message = new DataView(new ArrayBuffer(headerSize + serializedMessage.length));
message.setUint32(0, id, true);
message.setUint32(4, type, true);
new Uint8Array(message.buffer).set(Buffer.from(serializedMessage), 8);
return message.buffer;
}
sendMessage(connectionContext, opId, type, payload) {
const message = this.buildMessage(opId, type, payload);
if (connectionContext.socket.readyState === WebSocket.OPEN) {
connectionContext.socket.send(message);
}
}
sendError(connectionContext, opId, errorPayload, overrideDefaultErrorType) {
const sanitizedOverrideDefaultErrorType = overrideDefaultErrorType || message_type_1.MessageType.GQL_ERROR;
if ([
message_type_1.MessageType.GQL_CONNECTION_ERROR,
message_type_1.MessageType.GQL_ERROR,
].indexOf(sanitizedOverrideDefaultErrorType) === -1) {
throw new Error('overrideDefaultErrorType should be one of the allowed error messages' +
' GQL_CONNECTION_ERROR or GQL_ERROR');
}
this.sendMessage(connectionContext, opId, sanitizedOverrideDefaultErrorType, errorPayload);
}
}
exports.SubscriptionServer = SubscriptionServer;
//# sourceMappingURL=server.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const graphql_1 = require("graphql");
const Binary_1 = require("./Binary");
const Binary_1 = require("../Binary");
const schema = `

@@ -6,0 +6,0 @@ scalar Binary

{
"name": "graphql-ws",
"version": "0.1.4",
"version": "0.1.5",
"description": "WebSocket transport for GraphQL",

@@ -13,4 +13,4 @@ "main": "dist/index.js",

"backo2": "^1.0.2",
"class-autobind": "^0.1.4",
"eventemitter3": "^2.0.3",
"fast-sha256": "^1.0.0",
"iterall": "^1.1.1",

@@ -20,3 +20,3 @@ "lodash.assign": "^4.2.0",

"lodash.isstring": "^4.0.1",
"rxjs": "^5.4.3",
"readable-stream": "^2.3.3",
"symbol-observable": "^1.0.4",

@@ -23,0 +23,0 @@ "ws": "^3.0.0"

@@ -21,1 +21,5 @@ interface Array<T> {

declare module 'backo2';
declare module 'readable-stream';
declare module 'class-autobind';

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc