New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

sonic-js

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sonic-js - npm Package Compare versions

Comparing version 0.7.1 to 0.8.0

.eslintrc.js

77

examples/example.js
/* eslint no-console: [0]*/
// var Client = require('sonic-js').Client;
var Client = require('../src/lib.js').Client;
var assert = require('assert');
var host = process.env.SONIC_HOST || 'wss://0.0.0.0:443';
var API_KEY = '1234';
var USER = 'serrallonga';
const Client = require('../src/lib.js').Client;
const assert = require('assert');
var client = new Client(host + '/v1/query');
const host = process.env.SONIC_HOST || 'wss://0.0.0.0:443';
const API_KEY = '1234';
const USER = 'serrallonga';
var query = {
const opt = {
maxPoolSize: 5, /* max connection pool size */
minPoolSize: 1, /* min connection pool size */
maxTries: 1, /* max create connection attempts before bubbling up error */
autostart: true, /* should the Client start creating connections once the constructor is called */
validate: true, /* should the Client validate that connections are responsive before using them */
validateTimeout: 2000, /* new connection validation timeout */
acquireTimeout: 3000, /* new connection total acquisition timeout */
};
const client = new Client(`${host}/v1/query`, opt);
const query = {
query: '10',

@@ -21,3 +32,3 @@ config: {

var query2 = {
const query2 = {
query: '5',

@@ -27,3 +38,3 @@ config: 'secured_test'

var done = 0;
let done = 0;

@@ -33,28 +44,29 @@

var stream = client.stream(query);
const stream = client.stream(query);
stream.on('data', function(data) {
stream.on('data', (data) => {
console.log(data);
});
stream.on('progress', function(p) {
stream.on('progress', (p) => {
done += p.progress;
console.log('running.. ' + done + '/' + p.total + ' ' + p.units);
console.log(`running.. ${done}/${p.total} ${p.units}`);
});
stream.on('metadata', function(meta) {
console.log('metadata: ' + JSON.stringify(meta));
stream.on('metadata', (meta) => {
console.log(`metadata: ${JSON.stringify(meta)}`);
});
stream.on('done', function() {
stream.on('done', () => {
console.log('stream is done!');
});
stream.on('error', function(err) {
console.log('stream error: ' + err);
stream.on('error', (err) => {
console.log(`stream error: ${err}`);
});
/* UNAUTHENTICATED Client.prototype.run */
client.run(query, function(err, res) {
client.run(query, (err, res) => {
if (err) {

@@ -65,3 +77,3 @@ console.log(err);

res.forEach(function(e) {
res.forEach((e) => {
console.log(e);

@@ -71,12 +83,9 @@ });

console.log('exec is done!');
});
/* AUTHENTICATED Client.prototype.run */
// `secured_test` source can be accessed without
// an auth token that grants
// authorization equal or higher than 3.
client.run(query2, function(err) {
assert.throws(function () {
client.run(query2, (err) => {
assert.throws(() => {
if (err) {

@@ -88,4 +97,7 @@ throw err;

/* AUTHENTICATED Client.prototype.run */
// first we need to authenticate
client.authenticate(USER, API_KEY, function(err, token) {
client.authenticate(USER, API_KEY, (err, token) => {
if (err) {

@@ -97,8 +109,8 @@ throw err;

client.run(query2, function(err, res) {
if (err) {
throw err;
client.run(query2, (qerr, res) => {
if (qerr) {
throw qerr;
}
res.forEach(function(e) {
res.forEach((e) => {
console.log(e);

@@ -110,5 +122,6 @@ });

// close ws
client.close();
client.close()
.then(() => console.log('released all resources'))
.catch(error => console.error(error));
});
});
{
"name": "sonic-js",
"version": "0.7.1",
"version": "0.8.0",
"description": "ws client library for the Sonic protocol",

@@ -10,7 +10,6 @@ "main": "src/lib.js",

"scripts": {
"test": "node_modules/mocha/bin/mocha -r chai spec/close.js spec/ws.js"
"test": "node_modules/mocha/bin/mocha -r chai --timeout 10000 spec/close.js spec/ws.js"
},
"dependencies": {
"bufferutil": "^1.2.1",
"utf-8-validate": "^1.2.1",
"generic-pool": "git://github.com/Aleksandras-Novikovas/node-pool.git#59889c4b0adb5dea839a951fb351ff5ee6045768",
"ws": "^1.1.0"

@@ -20,4 +19,13 @@ },

"chai": "^3.5.0",
"eslint": "^3.19.0",
"eslint-config-airbnb": "^15.0.1",
"eslint-plugin-import": "^2.3.0",
"eslint-plugin-jsx-a11y": "^5.0.3",
"eslint-plugin-react": "^7.0.1",
"mocha": "^2.5.3"
},
"optionalDependencies": {
"bufferutil": "^3.0.0",
"utf-8-validate": "^3.0.1"
}
}
/* eslint-env node, mocha */
const Client = require('../src/lib.js').Client;
const assert = require('chai').assert;
var Client = require('../src/lib.js').Client;
var assert = require('chai').assert;
var sonicEndpoint = (process.env.SONIC_HOST || 'wss://0.0.0.0:443') + '/v1/query';
const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`;
describe('Client#close', function() {
it('should cancel all ongoing queries', function(done) {
var client = new Client(sonicEndpoint);
var q = {
describe('Client#close', () => {
it('should cancel all ongoing queries', () => {
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, });
const q = {
query: '100',
'progress-delay': 1000,
config: { class: 'SyntheticSource' }
};
var queries = [q, q, q, q, q, q];
const queries = [q, q, q];
queries.forEach(function(query) {
client.run(query, function(err, d) {
if (err || d.length !== 0) {
done(err || new Error('data not empty!'));
return;
}
if (queries.length === 1) {
done();
assert.equal(client.ws.length, 0);
return;
}
const progress = Promise.all(queries.map(client.run2.bind(client)));
let closeProm;
queries.splice(queries.indexOf(query), 1);
const timer = setInterval(() => {
// wait until all queries are running
if (Object.keys(client.running).length === queries.length) {
closeProm = client.close();
clearInterval(timer);
}
}, 100);
});
});
assert.equal(client.ws.length, 6);
client.close();
return progress.then((data) => {
assert.equal(data.reduce((a, d) => a + d.length, 0), 0);
}).then(() => closeProm);
});
});
describe('stream#cancel', function() {
it('should cancel query', function(done) {
var client = new Client(sonicEndpoint);
var query = {
it('should not allow more queries to be submitted', () => {
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, });
const q = {
query: '100',
'progress-delay': 1000,
config: { class: 'SyntheticSource' }
};
var cancelCb = false;
var emitter = client.stream(query);
assert.equal(client.ws.length, 1);
const queries = [q, q, q];
emitter.on('done', function() {
assert.equal(client.ws.length, 0);
assert(cancelCb, 'cancelCb was not called!');
done();
});
emitter.on('error', function(err) {
done(err);
});
emitter.cancel(function() {
cancelCb = true;
});
const closeProm = client.close();
return Promise.all(queries.map(client.run2.bind(client))).then((data) => {
throw new Error(`not bubble up error: ${JSON.stringify(data)}`);
}).catch((err) => {
assert(err.toString().indexOf('closing') >= 0, err.toString());
assert.equal(Object.keys(client.running).length, 0);
}).then(() => closeProm);
});
});
describe('run#cancel', function() {
it('should cancel query', function(done) {
var client = new Client(sonicEndpoint);
var query = {
describe('stream#cancel', () => {
it('should cancel query', (done) => {
const client = new Client(sonicEndpoint);
const query = {
query: '100',
config: { class: 'SyntheticSource' }
};
var cancelCb = false;
var closeable = client.run(query, function(err, d) {
if (err || d.length !== 0) {
done(err || new Error('data not empty!'));
} else {
assert.equal(client.ws.length, 0);
assert(cancelCb, 'cancelCb was not called!');
done();
}
let cancelCb = false;
const emitter = client.stream(query);
emitter.on('done', () => {
assert.equal(Object.keys(client.running).length, 0);
assert(cancelCb, 'cancelCb was not called!');
done();
});
assert.equal(client.ws.length, 1);
closeable.cancel(function() {
emitter.on('error', (err) => {
done(err);
});
emitter.cancel(() => {
cancelCb = true;

@@ -85,0 +75,0 @@ });

@@ -1,5 +0,5 @@

var assert = require('chai').assert;
const assert = require('chai').assert;
module.exports.testHappyPathSingle = function(client, query, n, done) {
client.run(query, function(err, data) {
module.exports.testHappyPathSingle = (client, query, n, done) => {
client.run(query, (err, data) => {
if (err) {

@@ -10,3 +10,3 @@ done(err);

if (data.length !== n) {
done(new Error('data was not ' + n));
done(new Error(`data was not ${n}`));
return;

@@ -18,7 +18,25 @@ }

module.exports.testHappyPath = function(client, query, n, done) {
var _done = 0;
var stream, traceId;
module.exports.testHappyPath = (client, query, n, _done) => {
let count = 0;
let ran, streamed, doned = false;
let traceId;
if (!_done) {
throw new Error('needs _done to hook with test driver');
}
client.run(query, function(err, data) {
const debugTimer = setTimeout(() => {
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`);
}, 3000);
const done = (e) => {
// guard against multiple calls to done() with err
if (!doned) {
doned = true;
clearTimeout(debugTimer);
_done(e);
}
};
client.run(query, (err, data) => {
ran = true;
if (err) {

@@ -28,17 +46,17 @@ done(err);

}
assert(data.length === n, 'data was not' + n);
if (_done === 1) {
assert(data.length === n, `data was not${n}`);
if (count === 1) {
done();
} else {
_done += 1;
count += 1;
}
});
stream = client.stream(query);
const stream = client.stream(query);
stream.on('started', function(id) {
stream.on('started', (id) => {
traceId = id;
});
stream.on('error', function(err) {
stream.on('error', (err) => {
if (err) {

@@ -51,10 +69,11 @@ done(err);

stream.on('done', function(err) {
stream.on('done', (err) => {
streamed = true;
assert(typeof traceId !== 'undefined', 'traceId is undefined in stream done callback on `testHappyPath` test');
if (err) {
done(err);
} else if (_done === 1) {
} else if (count === 1) {
done();
} else {
_done += 1;
count += 1;
}

@@ -64,14 +83,30 @@ });

module.exports.expectError = function(client, query, done) {
var _done = 0;
var stream;
module.exports.expectError = (client, query, _done) => {
let count = 0;
let ran, streamed, doned = false;
client.run(query, function(err) {
if (!_done) {
throw new Error('needs _done to hook with test driver');
}
const debugTimer = setTimeout(() => {
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`);
}, 2000);
const done = (e) => {
// guard against multiple calls to done() with err
if (!doned) {
doned = true;
clearTimeout(debugTimer);
_done(e);
}
};
client.run(query, (err) => {
ran = true;
if (err) {
if (done) {
if (_done === 1) {
done();
} else {
_done += 1;
}
if (count === 1) {
done();
} else {
count += 1;
}

@@ -83,16 +118,15 @@ } else {

stream = client.stream(query);
const stream = client.stream(query);
stream.on('done', function() {
stream.on('done', () => {
done(new Error('stream emitted `done` but `error` expected'));
});
stream.on('error', function(err) {
stream.on('error', (err) => {
if (err) {
if (done) {
if (_done === 1) {
done();
} else {
_done += 1;
}
streamed = true;
if (count === 1) {
done();
} else {
count += 1;
}

@@ -105,5 +139,5 @@ } else {

module.exports.doAuthenticate = function(client, done, apiKeyMaybe) {
var apiKey = apiKeyMaybe || '1234';
client.authenticate('spec_tests', apiKey, function(err, token) {
module.exports.doAuthenticate = (client, done, apiKeyMaybe) => {
const apiKey = apiKeyMaybe || '1234';
client.authenticate('spec_tests', apiKey, (err, token) => {
if (err) {

@@ -121,2 +155,1 @@ done(new Error('failed to authenticate'));

};
/* eslint-env node, mocha */
const Client = require('../src/lib.js').Client;
const process = require('process');
const util = require('./util');
var Client = require('../src/lib.js').Client;
var assert = require('chai').assert;
var process = require('process');
var sonicEndpoint = (process.env.SONIC_HOST || 'wss://0.0.0.0:443') + '/v1/query';
var util = require('./util');
var token;
const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`;
let token;
function runSpecTests(client, id) {
it(id + ' - should be able to run a simple query and stream the data back from the server', function(done) {
var query = {
it(`${id} - should be able to run a simple query and stream the data back from the server`, (done) => {
const query = {
query: '5',

@@ -25,6 +25,7 @@ auth: token,

it(id + ' - should return an error if source class is unknown', function(done) {
var query = {
it(`${id} - should return an error if source class is unknown`, (done) => {
const query = {
query: '1',
auth: token,
trace_id: 'ballz0',
config: {

@@ -38,6 +39,7 @@ class: 'UnknownClass'

it(id + ' - should return an error if query or config is null', function(done) {
var query = {
it(`${id} - should return an error if query is null`, (done) => {
const query = {
query: null,
auth: token,
trace_id: 'ballz1',
config: {

@@ -51,5 +53,6 @@ class: 'SyntheticSource'

it(id + ' - should return an error if config is null', function(done) {
var query = {
it(`${id} - should return an error if config is null`, (done) => {
const query = {
query: '1',
trace_id: 'ballz2',
auth: token,

@@ -62,4 +65,4 @@ config: null

it(id + ' - should return an error if source publisher completes stream with exception', function(done) {
var query = {
it(`${id} - should return an error if source publisher completes stream with exception`, (done) => {
const query = {
// signals source to throw expected exception

@@ -76,4 +79,4 @@ query: '28',

it(id + ' - should return an error if source throws an exception and terminates', function(done) {
var query = {
it(`${id} - should return an error if source throws an exception and terminates`, (done) => {
const query = {
// signals source to throw unexpected exception

@@ -90,6 +93,6 @@ query: '-1',

it(id + ' - should stream a big payload correctly', function(done) {
this.timeout(4000);
var q = "";
var i = 0;
it(`${id} - should stream a big payload correctly`, function (done) {
this.timeout(6000);
let q = '';
let i = 0;
while (i < 10000) {

@@ -100,3 +103,3 @@ q += 'aweqefekwljflwekfjkelwfjlwekjfeklwjflwekjfeklwjfeklfejklfjewlkfejwklw';

var query = {
const query = {
query: q,

@@ -115,10 +118,9 @@ auth: token,

describe('Sonic ws', function() {
describe('Sonic ws', () => {
const client = new Client(sonicEndpoint);
var client = new Client(sonicEndpoint);
runSpecTests(client, 'unauthenticated');
it('should return an error if source requires authentication and user is unauthenticated', function(done) {
var query = {
it('should return an error if source requires authentication and user is unauthenticated', (done) => {
const query = {
query: '1',

@@ -137,7 +139,6 @@ // tipically set server side, but also

describe('Sonic ws auth', function() {
it('should throw an error if api key is invalid', function(done) {
client.authenticate('spec_tests', 'mariano', function(err) {
if(err) {
describe('Sonic ws auth', () => {
it('should throw an error if api key is invalid', (done) => {
client.authenticate('spec_tests', 'mariano', (err) => {
if (err) {
done();

@@ -150,4 +151,4 @@ } else {

it('should authenticate user', function(done) {
util.doAuthenticate(client, function(err, token) {
it('should authenticate user', (done) => {
util.doAuthenticate(client, (err, token) => {
if (err) {

@@ -162,9 +163,9 @@ done(err);

runSpecTests(new Client(sonicEndpoint, { maxPoolSize: 1, minPoolSize: 1, maxTries: 1 }), 'single connection');
describe('Sonic ws with authentication', function() {
describe('Sonic ws with authentication', () => {
const authenticated = new Client(sonicEndpoint);
var authenticated = new Client(sonicEndpoint);
before(function(done) {
util.doAuthenticate(authenticated, function(err, t) {
before((done) => {
util.doAuthenticate(authenticated, (err, t) => {
if (err) {

@@ -179,3 +180,3 @@ done(err);

after(function(done) {
after((done) => {
authenticated.close();

@@ -188,4 +189,4 @@ done();

it('should allow an authenticated and authorized user to run a query on a secure source', function(done) {
var query = {
it('should allow an authenticated and authorized user to run a query on a secure source', (done) => {
const query = {
query: '5',

@@ -202,4 +203,4 @@ auth: token,

it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', function(done) {
var query = {
it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', (done) => {
const query = {
query: '5',

@@ -216,5 +217,4 @@ auth: token,

it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', function(done) {
util.doAuthenticate(authenticated, function(err, token) {
it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', (done) => {
util.doAuthenticate(authenticated, (err, token) => {
if (err) {

@@ -225,3 +225,3 @@ done(err);

var query = {
const query = {
query: '5',

@@ -228,0 +228,0 @@ auth: token,

@@ -1,10 +0,9 @@

'use strict';
const BrowserWebSocket = global.MozWebSocket || global.WebSocket;
const WebSocket = BrowserWebSocket || require('ws'); // eslint-disable-line global-require
const EventEmitter = require('events');
const utils = require('./util');
const pool = require('generic-pool');
var BrowserWebSocket = global.MozWebSocket || global.WebSocket;
var WebSocket = BrowserWebSocket || require('ws');
var EventEmitter = require('events');
var util = require('util');
var utils = require('./util');
var SonicMessage = utils.SonicMessage;
var noop = function() {};
const SonicMessage = utils.SonicMessage;
const noop = () => {};

@@ -14,104 +13,199 @@ // this is an ugly hack to prevent browseryfied `ws` module to throw errors at runtime

if (BrowserWebSocket) {
WebSocket.prototype.on = function(event, callback) {
this['on' + event] = callback;
WebSocket.prototype.on = function on(event, callback) {
this[`on${event}`] = callback;
};
}
function cancel(ws, _cb) {
var cb = typeof _cb === 'function' ? _cb : noop;
function doSend() {
if (BrowserWebSocket) {
try {
ws.send(SonicMessage.CANCEL);
cb();
} catch (e) {
cb(e);
}
} else {
ws.send(SonicMessage.CANCEL, cb);
}
}
if (ws.readyState === WebSocket.OPEN) {
doSend();
} else {
ws.on('open', doSend);
}
}
class SonicEmitter extends EventEmitter {}
function Client(sonicAddress) {
this.url = sonicAddress;
this.ws = [];
function getCloseError(ev) {
return new Error(`WebSocket close: code=${ev.code}; reason=${ev.reason}`);
}
function SonicEmitter() {
EventEmitter.call(this);
}
const states = {
INITIALIZED: 1,
CLOSING: 2,
CLOSED: 3,
};
util.inherits(SonicEmitter, EventEmitter);
class Client {
constructor(sonicAddress,
{ maxPoolSize, minPoolSize, debug, autostart, validate,
validateTimeout, maxTries, acquireTimeout } = {}) {
this.url = sonicAddress;
this.running = {};
this.nextId = 1;
this.state = states.INITIALIZED;
this.debug = debug;
this.validateTimeout = validateTimeout || 2000;
this.maxPoolSize = maxPoolSize || 5;
this.minPoolSize = minPoolSize || 1;
this.maxTries = maxTries || 1;
this.validate = typeof validate === 'undefined' ? true : validate;
this.autostart = typeof autostart === 'undefined' ? true : autostart;
this.acquireTimeout = acquireTimeout || 3000;
this._initializePool();
}
Client.prototype.send = function(doneCb, outputCb, progressCb, metadataCb, startedCb) {
var output = outputCb || (function() {});
var progress = progressCb || (function() {});
var metadata = metadataCb || (function() {});
var isDone = false;
var isError = false;
var self = this;
_initializePool() {
const client = this;
const poolOpts = {
max: this.maxPoolSize, // maximum size of the pool
min: this.minPoolSize, // minimum size of the pool
maxTries: this.maxTries,
autostart: this.autostart,
testOnBorrow: this.validate,
};
return function(message, ws) {
const WsFactory = {
create() {
return new Promise((resolve, reject) => {
const ws = new WebSocket(client.url);
let onOpen, onClose;
ws.send(message);
const onError = (err) => {
ws.removeListener('open', onOpen);
ws.removeListener('close', onClose);
reject(err);
};
function done(err, id) {
var idx;
onOpen = () => {
ws.removeListener('error', onError);
ws.removeListener('close', onClose);
resolve(ws);
};
ws.close(1000, 'completed');
onClose = (ev) => {
ws.removeListener('error', onError);
ws.removeListener('open', onOpen);
reject(getCloseError(ev));
};
if ((idx = self.ws.indexOf(ws)) < 0) {
throw new Error('ws not found');
}
ws.on('open', onOpen);
ws.on('close', onClose);
ws.on('error', onError);
});
},
self.ws.splice(idx, 1);
validate(ws) {
return new Promise((resolve) => {
let onPong;
doneCb(err, id);
}
const timer = setTimeout(() => {
resolve(false);
ws.removeListener('pong', onPong);
}, this.validateTimeout);
function closedUnexp() {
done(new Error('connection closed unexpectedly'));
onPong = () => {
clearTimeout(timer);
resolve(true);
};
ws.on('pong', onPong);
try {
ws.ping();
} catch (e) {
resolve(false);
}
});
},
destroy(ws) {
return new Promise((resolve) => {
ws.on('close', () => {
resolve();
});
ws.close(1000, 'pool#destroy');
});
},
};
this.pool = pool.createPool(WsFactory, poolOpts);
}
_cancel(id, _cb) {
const ws = this.running[id];
const cb = typeof _cb === 'function' ? _cb : noop;
if (this.debug) console.log(`cancelling: WebSocket(${id})`);
/* a 'D' message is expected when a cancel is send,
* therefore resource cleanup should be handled by done handler
*/
if (!ws) {
if (this.debug) console.error(new Error(`cancel: WebSocket(${id}) is not running any queries`));
cb();
return;
}
ws.on('close', function(ev) {
// browser
if (BrowserWebSocket) {
if (isError) {
done(new Error('WebSocket close code: ' + ev.code + '; reason: ' + ev.reason));
} else if (ev.code !== 1000 && !isDone) {
closedUnexp();
}
const doSend = () => {
if (!BrowserWebSocket) {
ws.send(SonicMessage.CANCEL, () => {
if (this.debug) console.log(`cancelled: WebSocket(${id})`);
cb();
});
return;
}
// ws
} else if (!isDone && ev !== 1000) {
closedUnexp();
try {
ws.send(SonicMessage.CANCEL);
if (this.debug) console.log(`cancelled: WebSocket(${id})`);
cb();
} catch (e) {
if (this.debug) console.error(e);
cb(e);
}
});
};
ws.on('error', function(ev) {
// ev is defined with `ws`, but not with the
// browser's WebSocket API
if (BrowserWebSocket) {
isError = true;
if (ws.readyState === WebSocket.CONNECTING) {
ws.on('open', doSend);
} if (ws.readyState === WebSocket.OPEN) {
doSend();
} else {
// connection is CLOSING/CLOSED
cb();
}
}
/* ws client agnostic send method */
_wsSend(message, doneCb, outputCb, progressCb, metadataCb, startedCb, ws) {
const output = outputCb || noop;
const progress = progressCb || noop;
const metadata = metadataCb || noop;
let onError, onClose, onMessage;
const done = (err) => {
ws.removeListener('close', onClose);
ws.removeListener('error', onError);
ws.removeListener('message', onMessage);
doneCb(err);
};
onClose = (ev) => {
if (this.debug) console.log(`WebSocket closed: code=${ev.code}; reason=${ev.reason};`);
if (!ws.sonicError) {
done(getCloseError(ev));
} else {
isDone = true;
done(ev);
done(ws.sonicError);
}
});
};
ws.on('message', function(message) {
var msg = BrowserWebSocket ? JSON.parse(message.data) : JSON.parse(message.toString('utf-8'));
function checkMsg() {
onError = (err) => {
if (this.debug) console.error(err);
// err is defined with `ws`, but not with the
// browser's WebSocket API so we need to get the errors from the close event
ws.sonicError = err; // eslint-disable-line no-param-reassign
};
onMessage = (_message) => {
const msg = BrowserWebSocket ? JSON.parse(_message.data) : JSON.parse(_message.toString('utf-8'));
const completeStream = () => {
if (this.debug) console.log(`completed complete/ack sequence: trace_id=${msg.p.trace_id}; error=${msg.v};`);
if (msg.v) {
done(new Error('Query with trace_id `' + msg.p.trace_id + '` failed: ' + msg.v));
done(new Error(`query with trace_id \`${msg.p.trace_id}\` failed: ${msg.v}`));
} else {
done(null);
}
}
};

@@ -124,8 +218,7 @@ switch (msg.e) {

case 'D':
isDone = true;
if (BrowserWebSocket) {
ws.send(SonicMessage.ACK);
checkMsg();
completeStream();
} else {
ws.send(SonicMessage.ACK, checkMsg);
ws.send(SonicMessage.ACK, completeStream);
}

@@ -135,5 +228,3 @@ break;

case 'T':
metadata(msg.p.map(function(elem) {
return [elem[0], typeof elem[1]];
}));
metadata(msg.p.map(elem => [elem[0], typeof elem[1]]));
break;

@@ -152,119 +243,207 @@

default:
// ignore to improve forwards compatibility
if (this.debug) console.log(`unsupported message received: ${JSON.stringify(msg)}`);
break;
}
});
};
};
};
Client.prototype.exec = function(message, doneCb, outputCb, progressCb, metadataCb, startedCb) {
ws.on('close', onClose);
ws.on('error', onError);
ws.on('message', onMessage);
ws.send(message);
if (this.debug) console.log(`sending message: ${JSON.stringify(message)}`);
}
var ws = new WebSocket(this.url);
var doExec = this.send(doneCb, outputCb, progressCb, metadataCb, startedCb);
/* pool-aware send method */
_send(message, doneCb, outputCb, progressCb, metadataCb, startedCb) {
switch (this.state) {
case states.CLOSED:
doneCb(new Error('client is closed and cannot accept more work'));
return;
case states.CLOSING:
doneCb(new Error('client is closing and cannot accept more work'));
return;
default:
}
ws.on('open', function() {
doExec(JSON.stringify(message), ws);
});
// identify send for cancel hooks
const id = this.nextId++;
let doned = false;
this.ws.push(ws);
let timer;
if (this.debug) {
timer = setTimeout(() =>
console.log(`its taking more than (${this.acquireTimeout}) to acquire resource for ticket=${id}`),
this.acquireTimeout);
}
return ws;
};
// acquire connection
this.pool.acquire().then((ws) => {
this.running[id] = ws;
Client.prototype.stream = function(query) {
var emitter = new SonicEmitter();
var queryMsg = utils.toMsg(query);
var ws;
if (this.debug) {
if (!ws.sonicId) {
/* first ticket for this resource is the resource ID */
ws.sonicId = id; // eslint-disable-line no-param-reassign
}
console.log(`acquired resource=${ws.sonicId} for ticket=${id}`);
clearTimeout(timer);
}
function done(err) {
if (err) {
emitter.emit('error', err);
return;
}
// doneCb override to release connection back to pool
const doDoneCb = (err) => {
if (this.debug) console.log(`done with ticket=${id}; resource=${ws.sonicId}`);
if (!doned) {
doned = true;
delete this.running[id];
emitter.emit('done');
}
if (ws.sonicDestroy) {
if (this.debug) console.log(`destroying resource=${ws.sonicId}; ticket=${id}`);
this.pool.destroy(ws);
} else {
if (this.debug) console.log(`releasing resource=${ws.sonicId}; ticket=${id}`);
this.pool.release(ws);
}
doneCb(err);
}
};
function output(elems) {
emitter.emit('data', elems);
}
try {
this._wsSend(JSON.stringify(message), doDoneCb, outputCb, progressCb, metadataCb, startedCb, ws);
} catch (e) {
if (this.debug) console.error(e);
this.pool.release(ws);
doDoneCb(e);
}
}).catch(doneCb);
function metadata(meta) {
emitter.emit('metadata', meta);
return id; // eslint-disable-line consistent-return
}
function progress(prog) {
emitter.emit('progress', prog);
}
stream(query) {
const emitter = new SonicEmitter();
const queryMsg = utils.toMsg(query);
function started(traceId) {
emitter.emit('started', traceId);
}
function done(err) {
if (err) {
emitter.emit('error', err);
return;
}
ws = this.exec(queryMsg, done, output, progress, metadata, started);
emitter.emit('done');
}
emitter.cancel = function(cb) {
cancel(ws, cb);
};
function output(elems) {
emitter.emit('data', elems);
}
return emitter;
};
function metadata(meta) {
emitter.emit('metadata', meta);
}
Client.prototype.run = function(query, doneCb) {
function progress(prog) {
emitter.emit('progress', prog);
}
var data = [];
var queryMsg = utils.toMsg(query);
var ws;
function started(traceId) {
emitter.emit('started', traceId);
}
function done(err) {
if (err) {
doneCb(err, null);
} else {
doneCb(null, data);
const id = this._send(queryMsg, done, output, progress, metadata, started);
emitter.cancel = (cb) => {
this._cancel(id, cb);
};
return emitter;
}
/* TODO: deprecate in favor of run2 */
run(query, doneCb) {
const data = [];
const queryMsg = utils.toMsg(query);
function done(err) {
if (err) {
doneCb(err, null);
} else {
doneCb(null, data);
}
}
function output(elems) {
data.push(elems);
}
this._send(queryMsg, done, output);
}
function output(elems) {
data.push(elems);
run2(query) {
return new Promise((resolve, reject) => {
this.run(query, (err, data) => {
if (err) {
reject(err);
return;
}
resolve(data);
});
});
}
ws = this.exec(queryMsg, done, output);
authenticate(user, apiKey, doneCb, traceId) {
let token;
const authMsg = {
e: 'H',
p: {
user,
trace_id: traceId,
},
v: apiKey,
};
return {
cancel: function(cb) {
cancel(ws, cb);
function done(err) {
if (err) {
doneCb(err);
} else {
doneCb(null, token);
}
}
};
};
Client.prototype.authenticate = function(user, apiKey, doneCb, traceId) {
var token;
var authMsg = {
e: 'H',
p: {
user: user,
trace_id: traceId
},
v: apiKey
};
function output(elems) {
token = elems[0];
}
function done(err) {
if (err) {
doneCb(err, null);
} else {
doneCb(null, token);
this._send(authMsg, done, output);
}
_close() {
if (this.state !== states.CLOSING) {
return Promise.resolve();
}
return this.pool.drain().then(() => this.pool.clear());
}
function output(elems) {
token = elems[0];
cancel() {
const ids = Object.keys(this.running);
return Promise.all(ids.map(id => new Promise((resolve, reject) => {
this._cancel(id, (err) => {
if (err) {
reject(err);
return;
}
resolve();
});
})));
}
this.exec(authMsg, done, output);
};
close() {
this.state = states.CLOSING;
return this.cancel()
.then(() => this._close())
.then(() => {
this.state = states.CLOSED;
});
}
}
Client.prototype.close = function() {
this.ws.forEach(cancel);
};
module.exports.Client = Client;

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc