New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

sonic-js

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sonic-js - npm Package Compare versions

Comparing version 0.8.1 to 0.8.2

src/WsFactory.js

41

.eslintrc.js
module.exports = {
env: {
node: true,
},
rules: {
'comma-dangle': 0,
'no-plusplus': 0,
'no-cond-assign': 0,
'no-continue': 0,
'no-console': 0,
'one-var': 0,
'one-var-declaration-per-line': 0,
'no-underscore-dangle': 0,
},
extends: 'airbnb'
'env': {
'browser': true,
'node': true,
'es6': true
},
'extends': 'eslint:recommended',
'parserOptions': {
'sourceType': 'module'
},
'rules': {
'indent': [
'error',
'tab'
],
'linebreak-style': [
'error',
'unix'
],
'quotes': [
'error',
'single'
],
'semi': [
'error',
'always'
]
}
};

@@ -7,3 +7,3 @@ /* eslint no-console: [0]*/

const host = process.env.SONIC_HOST || 'wss://0.0.0.0:443';
const host = process.env.SONIC_HOST || 'ws://0.0.0.0:9111';
const API_KEY = '1234';

@@ -13,9 +13,5 @@ const USER = 'serrallonga';

const opt = {
maxPoolSize: 5, /* max connection pool size */
minPoolSize: 1, /* min connection pool size */
maxTries: 1, /* max create connection attempts before bubbling up error */
autostart: true, /* should the Client start creating connections once the constructor is called */
validate: true, /* should the Client validate that connections are responsive before using them */
validateTimeout: 2000, /* new connection validation timeout */
acquireTimeout: 3000, /* new connection total acquisition timeout */
maxPoolSize: 5, /* max connection pool size */
minPoolSize: 1, /* min connection pool size */
acquireTimeout: 3000, /* new connection total acquisition timeout */
};

@@ -26,13 +22,13 @@

const query = {
query: '10',
config: {
class: 'SyntheticSource',
seed: 1000,
'progress-delay': 10
}
query: '10',
config: {
class: 'SyntheticSource',
seed: 1000,
'progress-delay': 10
}
};
const query2 = {
query: '5',
config: 'secured_test'
query: '5',
config: 'secured_test'
};

@@ -48,20 +44,20 @@

stream.on('data', (data) => {
console.log(data);
console.log(data);
});
stream.on('progress', (p) => {
done += p.progress;
console.log(`running.. ${done}/${p.total} ${p.units}`);
done += p.progress;
console.log(`running.. ${done}/${p.total} ${p.units}`);
});
stream.on('metadata', (meta) => {
console.log(`metadata: ${JSON.stringify(meta)}`);
console.log(`metadata: ${JSON.stringify(meta)}`);
});
stream.on('done', () => {
console.log('stream is done!');
console.log('stream is done!');
});
stream.on('error', (err) => {
console.log(`stream error: ${err}`);
console.log(`stream error: ${err}`);
});

@@ -73,12 +69,12 @@

client.run(query, (err, res) => {
if (err) {
console.log(err);
return;
}
if (err) {
console.log(err);
return;
}
res.forEach((e) => {
console.log(e);
});
res.forEach((e) => {
console.log(e);
});
console.log('exec is done!');
console.log('exec is done!');
});

@@ -90,7 +86,7 @@

client.run(query2, (err) => {
assert.throws(() => {
if (err) {
throw err;
}
});
assert.throws(() => {
if (err) {
throw err;
}
});
});

@@ -103,24 +99,24 @@

client.authenticate(USER, API_KEY, (err, token) => {
if (err) {
throw err;
}
if (err) {
throw err;
}
query2.auth = token;
query2.auth = token;
client.run(query2, (qerr, res) => {
if (qerr) {
throw qerr;
}
client.run(query2, (qerr, res) => {
if (qerr) {
throw qerr;
}
res.forEach((e) => {
console.log(e);
});
res.forEach((e) => {
console.log(e);
});
console.log('secured exec is done!');
console.log('secured exec is done!');
// close ws
client.close()
client.close()
.then(() => console.log('released all resources'))
.catch(error => console.error(error));
});
});
});
{
"name": "sonic-js",
"version": "0.8.1",
"version": "0.8.2",
"description": "ws client library for the Sonic protocol",

@@ -13,3 +13,3 @@ "main": "src/lib.js",

"dependencies": {
"generic-pool": "git://github.com/Aleksandras-Novikovas/node-pool.git#59889c4b0adb5dea839a951fb351ff5ee6045768",
"generic-pool": "^2.5.4",
"ws": "^1.1.0"

@@ -16,0 +16,0 @@ },

@@ -18,3 +18,3 @@ # Sonic-js [![Build Status](https://travis-ci.org/xarxa6/sonic-js.svg)](https://travis-ci.org/xarxa6/sonic-js) [![npm version](https://badge.fury.io/js/sonic-js.svg)](https://badge.fury.io/js/sonic-js)

var client = new Client('wss://0.0.0.0:443');
var client = new Client('ws://0.0.0.0:9111');

@@ -21,0 +21,0 @@ var query = {

@@ -5,74 +5,74 @@ /* eslint-env node, mocha */

const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`;
const sonicEndpoint = `${process.env.SONIC_HOST || 'ws://0.0.0.0:9111'}/v1/query`;
describe('Client#close', () => {
it('should cancel all ongoing queries', () => {
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, });
const q = {
query: '100',
'progress-delay': 1000,
config: { class: 'SyntheticSource' }
};
it('should cancel all ongoing queries', () => {
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, });
const q = {
query: '100',
'progress-delay': 1000,
config: { class: 'SyntheticSource' }
};
const queries = [q, q, q];
const queries = [q, q, q];
const progress = Promise.all(queries.map(client.run2.bind(client)));
let closeProm;
const progress = Promise.all(queries.map(client.run2.bind(client)));
let closeProm;
const timer = setInterval(() => {
const timer = setInterval(() => {
// wait until all queries are running
if (Object.keys(client.running).length === queries.length) {
closeProm = client.close();
clearInterval(timer);
}
}, 100);
if (Object.keys(client.running).length === queries.length) {
closeProm = client.close();
clearInterval(timer);
}
}, 100);
return progress.then((data) => {
assert.equal(data.reduce((a, d) => a + d.length, 0), 0);
}).then(() => closeProm);
});
return progress.then((data) => {
assert.equal(data.reduce((a, d) => a + d.length, 0), 0);
}).then(() => closeProm);
});
it('should not allow more queries to be submitted', () => {
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, });
const q = {
query: '100',
'progress-delay': 1000,
config: { class: 'SyntheticSource' }
};
it('should not allow more queries to be submitted', () => {
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, });
const q = {
query: '100',
'progress-delay': 1000,
config: { class: 'SyntheticSource' }
};
const queries = [q, q, q];
const queries = [q, q, q];
const closeProm = client.close();
const closeProm = client.close();
return Promise.all(queries.map(client.run2.bind(client))).then((data) => {
throw new Error(`not bubble up error: ${JSON.stringify(data)}`);
}).catch((err) => {
assert(err.toString().indexOf('closing') >= 0, err.toString());
assert.equal(Object.keys(client.running).length, 0);
}).then(() => closeProm);
});
return Promise.all(queries.map(client.run2.bind(client))).then((data) => {
throw new Error(`not bubble up error: ${JSON.stringify(data)}`);
}).catch((err) => {
assert(err.toString().indexOf('closing') >= 0, err.toString());
assert.equal(Object.keys(client.running).length, 0);
}).then(() => closeProm);
});
});
describe('stream#cancel', () => {
it('should cancel query', (done) => {
const client = new Client(sonicEndpoint);
const query = {
query: '100',
config: { class: 'SyntheticSource' }
};
let cancelCb = false;
const emitter = client.stream(query);
it('should cancel query', (done) => {
const client = new Client(sonicEndpoint);
const query = {
query: '100',
config: { class: 'SyntheticSource' }
};
let cancelCb = false;
const emitter = client.stream(query);
emitter.on('done', () => {
assert.equal(Object.keys(client.running).length, 0);
assert(cancelCb, 'cancelCb was not called!');
done();
});
emitter.on('error', (err) => {
done(err);
});
emitter.cancel(() => {
cancelCb = true;
});
});
emitter.on('done', () => {
assert.equal(Object.keys(client.running).length, 0);
assert(cancelCb, 'cancelCb was not called!');
done();
});
emitter.on('error', (err) => {
done(err);
});
emitter.cancel(() => {
cancelCb = true;
});
});
});
const assert = require('chai').assert;
module.exports.testHappyPathSingle = (client, query, n, done) => {
client.run(query, (err, data) => {
if (err) {
done(err);
return;
}
if (data.length !== n) {
done(new Error(`data was not ${n}`));
return;
}
done();
});
client.run(query, (err, data) => {
if (err) {
done(err);
return;
}
if (data.length !== n) {
done(new Error(`data was not ${n}`));
return;
}
done();
});
};
module.exports.testHappyPath = (client, query, n, _done) => {
let count = 0;
let ran, streamed, doned = false;
let traceId;
if (!_done) {
throw new Error('needs _done to hook with test driver');
}
let count = 0;
let ran, streamed, doned = false;
let traceId;
if (!_done) {
throw new Error('needs _done to hook with test driver');
}
const debugTimer = setTimeout(() => {
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`);
}, 3000);
const debugTimer = setTimeout(() => {
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`);
}, 3000);
const done = (e) => {
const done = (e) => {
// guard against multiple calls to done() with err
if (!doned) {
doned = true;
clearTimeout(debugTimer);
_done(e);
}
};
if (!doned) {
doned = true;
clearTimeout(debugTimer);
_done(e);
}
};
client.run(query, (err, data) => {
ran = true;
if (err) {
done(err);
return;
}
assert(data.length === n, `data was not${n}`);
if (count === 1) {
done();
} else {
count += 1;
}
});
client.run(query, (err, data) => {
ran = true;
if (err) {
done(err);
return;
}
assert(data.length === n, `data was not${n}`);
if (count === 1) {
done();
} else {
count += 1;
}
});
const stream = client.stream(query);
const stream = client.stream(query);
stream.on('started', (id) => {
traceId = id;
});
stream.on('started', (id) => {
traceId = id;
});
stream.on('error', (err) => {
if (err) {
done(err);
} else {
done(new Error('error emitted but no error returned!'));
}
});
stream.on('error', (err) => {
if (err) {
done(err);
} else {
done(new Error('error emitted but no error returned!'));
}
});
stream.on('done', (err) => {
streamed = true;
assert(typeof traceId !== 'undefined', 'traceId is undefined in stream done callback on `testHappyPath` test');
if (err) {
done(err);
} else if (count === 1) {
done();
} else {
count += 1;
}
});
stream.on('done', (err) => {
streamed = true;
assert(typeof traceId !== 'undefined', 'traceId is undefined in stream done callback on `testHappyPath` test');
if (err) {
done(err);
} else if (count === 1) {
done();
} else {
count += 1;
}
});
};
module.exports.expectError = (client, query, _done) => {
let count = 0;
let ran, streamed, doned = false;
let count = 0;
let ran, streamed, doned = false;
if (!_done) {
throw new Error('needs _done to hook with test driver');
}
if (!_done) {
throw new Error('needs _done to hook with test driver');
}
const debugTimer = setTimeout(() => {
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`);
}, 2000);
const debugTimer = setTimeout(() => {
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`);
}, 2000);
const done = (e) => {
const done = (e) => {
// guard against multiple calls to done() with err
if (!doned) {
doned = true;
clearTimeout(debugTimer);
_done(e);
}
};
if (!doned) {
doned = true;
clearTimeout(debugTimer);
_done(e);
}
};
client.run(query, (err) => {
ran = true;
if (err) {
if (count === 1) {
done();
} else {
count += 1;
}
} else {
done(new Error('expected error but no error returned'));
}
});
client.run(query, (err) => {
ran = true;
if (err) {
if (count === 1) {
done();
} else {
count += 1;
}
} else {
done(new Error('expected error but no error returned'));
}
});
const stream = client.stream(query);
const stream = client.stream(query);
stream.on('done', () => {
done(new Error('stream emitted `done` but `error` expected'));
});
stream.on('done', () => {
done(new Error('stream emitted `done` but `error` expected'));
});
stream.on('error', (err) => {
if (err) {
streamed = true;
if (count === 1) {
done();
} else {
count += 1;
}
} else {
done(new Error('expected error but no error returned'));
}
});
stream.on('error', (err) => {
if (err) {
streamed = true;
if (count === 1) {
done();
} else {
count += 1;
}
} else {
done(new Error('expected error but no error returned'));
}
});
};
module.exports.doAuthenticate = (client, done, apiKeyMaybe) => {
const apiKey = apiKeyMaybe || '1234';
client.authenticate('spec_tests', apiKey, (err, token) => {
if (err) {
done(new Error('failed to authenticate'));
return;
}
const apiKey = apiKeyMaybe || '1234';
client.authenticate('spec_tests', apiKey, (err, token) => {
if (err) {
done(new Error('failed to authenticate'));
return;
}
if (token) {
done(null, token);
} else {
done(new Error('protocol error: no token received from server'));
}
});
if (token) {
done(null, token);
} else {
done(new Error('protocol error: no token received from server'));
}
});
};

@@ -6,3 +6,3 @@ /* eslint-env node, mocha */

const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`;
const sonicEndpoint = `${process.env.SONIC_HOST || 'ws://0.0.0.0:9111'}/v1/query`;

@@ -12,213 +12,213 @@ let token;

function runSpecTests(client, id) {
it(`${id} - should be able to run a simple query and stream the data back from the server`, (done) => {
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
seed: 1000,
'progress-delay': 10
}
};
it(`${id} - should be able to run a simple query and stream the data back from the server`, (done) => {
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
seed: 1000,
'progress-delay': 10
}
};
util.testHappyPath(client, query, 5, done);
});
util.testHappyPath(client, query, 5, done);
});
it(`${id} - should return an error if source class is unknown`, (done) => {
const query = {
query: '1',
auth: token,
trace_id: 'ballz0',
config: {
class: 'UnknownClass'
}
};
it(`${id} - should return an error if source class is unknown`, (done) => {
const query = {
query: '1',
auth: token,
trace_id: 'ballz0',
config: {
class: 'UnknownClass'
}
};
util.expectError(client, query, done);
});
util.expectError(client, query, done);
});
it(`${id} - should return an error if query is null`, (done) => {
const query = {
query: null,
auth: token,
trace_id: 'ballz1',
config: {
class: 'SyntheticSource'
}
};
it(`${id} - should return an error if query is null`, (done) => {
const query = {
query: null,
auth: token,
trace_id: 'ballz1',
config: {
class: 'SyntheticSource'
}
};
util.expectError(client, query, done);
});
util.expectError(client, query, done);
});
it(`${id} - should return an error if config is null`, (done) => {
const query = {
query: '1',
trace_id: 'ballz2',
auth: token,
config: null
};
it(`${id} - should return an error if config is null`, (done) => {
const query = {
query: '1',
trace_id: 'ballz2',
auth: token,
config: null
};
util.expectError(client, query, done);
});
util.expectError(client, query, done);
});
it(`${id} - should return an error if source publisher completes stream with exception`, (done) => {
const query = {
it(`${id} - should return an error if source publisher completes stream with exception`, (done) => {
const query = {
// signals source to throw expected exception
query: '28',
auth: token,
config: {
class: 'SyntheticSource'
}
};
query: '28',
auth: token,
config: {
class: 'SyntheticSource'
}
};
util.expectError(client, query, done);
});
util.expectError(client, query, done);
});
it(`${id} - should return an error if source throws an exception and terminates`, (done) => {
const query = {
it(`${id} - should return an error if source throws an exception and terminates`, (done) => {
const query = {
// signals source to throw unexpected exception
query: '-1',
auth: token,
config: {
class: 'SyntheticSource'
}
};
query: '-1',
auth: token,
config: {
class: 'SyntheticSource'
}
};
util.expectError(client, query, done);
});
util.expectError(client, query, done);
});
it(`${id} - should stream a big payload correctly`, function (done) {
this.timeout(6000);
let q = '';
let i = 0;
while (i < 10000) {
q += 'aweqefekwljflwekfjkelwfjlwekjfeklwjflwekjfeklwjfeklfejklfjewlkfejwklw';
i += 1;
}
it(`${id} - should stream a big payload correctly`, function (done) {
this.timeout(6000);
let q = '';
let i = 0;
while (i < 10000) {
q += 'aweqefekwljflwekfjkelwfjlwekjfeklwjflwekjfeklwjfeklfejklfjewlkfejwklw';
i += 1;
}
const query = {
query: q,
auth: token,
config: {
class: 'SyntheticSource',
'progress-delay': 0,
size: 5
}
};
const query = {
query: q,
auth: token,
config: {
class: 'SyntheticSource',
'progress-delay': 0,
size: 5
}
};
util.testHappyPath(client, query, 5, done);
});
util.testHappyPath(client, query, 5, done);
});
}
describe('Sonic ws', () => {
const client = new Client(sonicEndpoint);
const client = new Client(sonicEndpoint);
runSpecTests(client, 'unauthenticated');
runSpecTests(client, 'unauthenticated');
it('should return an error if source requires authentication and user is unauthenticated', (done) => {
const query = {
query: '1',
it('should return an error if source requires authentication and user is unauthenticated', (done) => {
const query = {
query: '1',
// tipically set server side, but also
// valid to be passed client side
config: {
class: 'SyntheticSource',
security: 2
}
};
config: {
class: 'SyntheticSource',
security: 2
}
};
util.expectError(client, query, done);
});
util.expectError(client, query, done);
});
describe('Sonic ws auth', () => {
it('should throw an error if api key is invalid', (done) => {
client.authenticate('spec_tests', 'mariano', (err) => {
if (err) {
done();
} else {
done(new Error('did not return error on invalid api key'));
}
});
});
describe('Sonic ws auth', () => {
it('should throw an error if api key is invalid', (done) => {
client.authenticate('spec_tests', 'mariano', (err) => {
if (err) {
done();
} else {
done(new Error('did not return error on invalid api key'));
}
});
});
it('should authenticate user', (done) => {
util.doAuthenticate(client, (err, token) => {
if (err) {
done(err);
return;
}
done();
});
});
});
it('should authenticate user', (done) => {
util.doAuthenticate(client, (err, token) => {
if (err) {
done(err);
return;
}
done();
});
});
});
runSpecTests(new Client(sonicEndpoint, { maxPoolSize: 1, minPoolSize: 1, maxTries: 1 }), 'single connection');
runSpecTests(new Client(sonicEndpoint, { maxPoolSize: 1, minPoolSize: 1, }), 'single connection');
describe('Sonic ws with authentication', () => {
const authenticated = new Client(sonicEndpoint);
describe('Sonic ws with authentication', () => {
const authenticated = new Client(sonicEndpoint);
before((done) => {
util.doAuthenticate(authenticated, (err, t) => {
if (err) {
done(err);
return;
}
token = t;
done();
});
});
before((done) => {
util.doAuthenticate(authenticated, (err, t) => {
if (err) {
done(err);
return;
}
token = t;
done();
});
});
after((done) => {
authenticated.close();
done();
});
after((done) => {
authenticated.close();
done();
});
// client is authenticated
runSpecTests(authenticated, 'authenticated');
runSpecTests(authenticated, 'authenticated');
it('should allow an authenticated and authorized user to run a query on a secure source', (done) => {
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
security: 1
}
};
it('should allow an authenticated and authorized user to run a query on a secure source', (done) => {
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
security: 1
}
};
util.testHappyPath(authenticated, query, 5, done);
});
util.testHappyPath(authenticated, query, 5, done);
});
it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', (done) => {
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
security: 2000
}
};
it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', (done) => {
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
security: 2000
}
};
util.expectError(authenticated, query, done);
});
util.expectError(authenticated, query, done);
});
it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', (done) => {
util.doAuthenticate(authenticated, (err, token) => {
if (err) {
done(err);
return;
}
it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', (done) => {
util.doAuthenticate(authenticated, (err, token) => {
if (err) {
done(err);
return;
}
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
security: 1
}
};
util.expectError(authenticated, query, done);
}, 'only_from_ip'); // check server's reference.conf
});
});
const query = {
query: '5',
auth: token,
config: {
class: 'SyntheticSource',
security: 1
}
};
util.expectError(authenticated, query, done);
}, 'only_from_ip'); // check server's reference.conf
});
});
});
const BrowserWebSocket = global.MozWebSocket || global.WebSocket;
const WebSocket = BrowserWebSocket || require('ws'); // eslint-disable-line global-require
const EventEmitter = require('events');
const WsFactory = require('./WsFactory');
const utils = require('./util');
const pool = require('generic-pool');
const Pool = require('generic-pool').Pool;

@@ -13,439 +15,375 @@ const SonicMessage = utils.SonicMessage;

if (BrowserWebSocket) {
WebSocket.prototype.on = function on(event, callback) {
this.addEventListener(event, callback);
};
WebSocket.prototype.on = function on(event, callback) {
this.addEventListener(event, callback);
};
WebSocket.prototype.removeListener = function removeListener(event, callback) {
this.removeEventListener(event, callback);
};
WebSocket.prototype.removeListener = function removeListener(event, callback) {
this.removeEventListener(event, callback);
};
}
class SonicEmitter extends EventEmitter {}
function getCloseError(ev) {
return new Error(`WebSocket close: code=${ev.code}; reason=${ev.reason}`);
}
const states = {
INITIALIZED: 1,
CLOSING: 2,
CLOSED: 3,
INITIALIZED: 1,
CLOSING: 2,
CLOSED: 3,
};
class Client {
constructor(sonicAddress,
{ maxPoolSize, minPoolSize, debug, autostart, validate,
validateTimeout, maxTries, acquireTimeout } = {}) {
this.url = sonicAddress;
this.running = {};
this.nextId = 1;
this.state = states.INITIALIZED;
this.debug = debug;
this.validateTimeout = validateTimeout || 2000;
this.maxPoolSize = maxPoolSize || 5;
this.minPoolSize = minPoolSize || 1;
this.maxTries = maxTries || 1;
/* browsers WebSocket API don't support sending ping/pong */
this.validate = (!BrowserWebSocket && typeof validate !== 'undefined') ? validate : false;
this.autostart = typeof autostart === 'undefined' ? true : autostart;
this.acquireTimeout = acquireTimeout || 3000;
this._initializePool();
}
constructor(sonicAddress, { maxPoolSize, minPoolSize, debug, acquireTimeout } = {}) {
this.url = sonicAddress;
this.running = {};
this.nextId = 1;
this.state = states.INITIALIZED;
this.debug = debug;
this.maxPoolSize = maxPoolSize || 5;
this.minPoolSize = minPoolSize || 1;
this.acquireTimeout = acquireTimeout || 3000;
this._initializePool();
}
_initializePool() {
const client = this;
const poolOpts = {
max: this.maxPoolSize, // maximum size of the pool
min: this.minPoolSize, // minimum size of the pool
maxTries: this.maxTries,
autostart: this.autostart,
testOnBorrow: this.validate,
};
_log(log) {
if (typeof this.debug === 'function') {
this.debug(log);
} else if (this.debug) {
console.log(log); // eslint-disable-line no-console
}
}
const WsFactory = {
create() {
return new Promise((resolve, reject) => {
const ws = new WebSocket(client.url);
let onOpen, onClose;
_initializePool() {
const factory = new WsFactory(WebSocket, this.url);
const poolOpts = {
name: 'sonic',
create: factory.create.bind(factory),
validate: factory.validate.bind(factory),
destroy: factory.destroy.bind(factory),
max: this.maxPoolSize, // maximum size of the pool
min: this.minPoolSize, // minimum size of the pool
log: this.debug,
};
const onError = (err) => {
ws.removeListener('open', onOpen);
ws.removeListener('close', onClose);
reject(err);
};
onOpen = () => {
ws.removeListener('error', onError);
ws.removeListener('close', onClose);
resolve(ws);
};
this.pool = new Pool(poolOpts);
}
onClose = (ev) => {
ws.removeListener('error', onError);
ws.removeListener('open', onOpen);
reject(getCloseError(ev));
};
_cancel(id, _cb) {
const ws = this.running[id];
const cb = typeof _cb === 'function' ? _cb : noop;
ws.on('open', onOpen);
ws.on('close', onClose);
ws.on('error', onError);
});
},
this._log(`cancelling: WebSocket(${id})`);
validate(ws) {
return new Promise((resolve) => {
let onPong;
/* a 'D' message is expected when a cancel is send,
* therefore resource cleanup should be handled by done handler */
if (!ws) {
this._log(new Error(`cancel: WebSocket(${id}) is not running any queries`));
cb();
return;
}
const timer = setTimeout(() => {
resolve(false);
ws.removeListener('pong', onPong);
}, this.validateTimeout);
const doSend = () => {
if (!BrowserWebSocket) {
ws.send(SonicMessage.CANCEL, () => {
this._log(`cancelled: WebSocket(${id})`);
cb();
});
return;
}
onPong = () => {
clearTimeout(timer);
resolve(true);
};
try {
ws.send(SonicMessage.CANCEL);
this._log(`cancelled: WebSocket(${id})`);
cb();
} catch (e) {
this._log(e);
cb(e);
}
};
ws.on('pong', onPong);
if (ws.readyState === WebSocket.CONNECTING) {
ws.on('open', doSend);
} if (ws.readyState === WebSocket.OPEN) {
doSend();
} else {
// connection is CLOSING/CLOSED
cb();
}
}
try {
ws.ping();
} catch (e) {
resolve(false);
}
});
},
/* ws client agnostic send method */
_wsSend(message, doneCb, outputCb, progressCb, metadataCb, startedCb, ws) {
const output = outputCb || noop;
const progress = progressCb || noop;
const metadata = metadataCb || noop;
let onError, onClose, onMessage;
destroy(ws) {
return new Promise((resolve) => {
ws.on('close', () => {
resolve();
});
ws.close(1000, 'pool#destroy');
});
},
};
const done = (err) => {
ws.removeListener('close', onClose);
ws.removeListener('error', onError);
ws.removeListener('message', onMessage);
doneCb(err);
};
this.pool = pool.createPool(WsFactory, poolOpts);
}
onClose = (ev) => {
this._log(`WebSocket closed: code=${ev.code}; reason=${ev.reason};`);
if (!ws.sonicError) {
done(utils.getCloseError(ev));
} else {
done(ws.sonicError);
}
};
_cancel(id, _cb) {
const ws = this.running[id];
const cb = typeof _cb === 'function' ? _cb : noop;
onError = (err) => {
this._log(err);
// err is defined with `ws`, but not with the
// browser's WebSocket API so we need to get the errors from the close event
ws.sonicError = err; // eslint-disable-line no-param-reassign
};
if (this.debug) console.log(`cancelling: WebSocket(${id})`);
onMessage = (_message) => {
const msg = BrowserWebSocket ? JSON.parse(_message.data) : JSON.parse(_message.toString('utf-8'));
const completeStream = () => {
this._log(`completed complete/ack sequence: trace_id=${msg.p.trace_id}; error=${msg.v};`);
if (msg.v) {
done(new Error(`query with trace_id \`${msg.p.trace_id}\` failed: ${msg.v}`));
} else {
done(null);
}
};
/* a 'D' message is expected when a cancel is send,
* therefore resource cleanup should be handled by done handler
*/
if (!ws) {
if (this.debug) console.error(new Error(`cancel: WebSocket(${id}) is not running any queries`));
cb();
return;
}
switch (msg.e) {
case 'P':
progress(utils.toProgress(msg.p));
break;
const doSend = () => {
if (!BrowserWebSocket) {
ws.send(SonicMessage.CANCEL, () => {
if (this.debug) console.log(`cancelled: WebSocket(${id})`);
cb();
});
return;
}
case 'D':
if (BrowserWebSocket) {
ws.send(SonicMessage.ACK);
completeStream();
} else {
ws.send(SonicMessage.ACK, completeStream);
}
break;
try {
ws.send(SonicMessage.CANCEL);
if (this.debug) console.log(`cancelled: WebSocket(${id})`);
cb();
} catch (e) {
if (this.debug) console.error(e);
cb(e);
}
};
case 'T':
metadata(msg.p.map(elem => [elem[0], typeof elem[1]]));
break;
if (ws.readyState === WebSocket.CONNECTING) {
ws.on('open', doSend);
} if (ws.readyState === WebSocket.OPEN) {
doSend();
} else {
// connection is CLOSING/CLOSED
cb();
}
}
case 'S':
if (typeof startedCb !== 'undefined') {
startedCb(msg.v);
}
break;
/* ws client agnostic send method */
_wsSend(message, doneCb, outputCb, progressCb, metadataCb, startedCb, ws) {
const output = outputCb || noop;
const progress = progressCb || noop;
const metadata = metadataCb || noop;
let onError, onClose, onMessage;
case 'O':
output(msg.p);
break;
const done = (err) => {
ws.removeListener('close', onClose);
ws.removeListener('error', onError);
ws.removeListener('message', onMessage);
doneCb(err);
};
default:
this._log(`unsupported message received: ${JSON.stringify(msg)}`);
break;
}
};
onClose = (ev) => {
if (this.debug) console.log(`WebSocket closed: code=${ev.code}; reason=${ev.reason};`);
if (!ws.sonicError) {
done(getCloseError(ev));
} else {
done(ws.sonicError);
}
};
ws.on('close', onClose);
ws.on('error', onError);
ws.on('message', onMessage);
ws.send(message);
this._log(`sending message: ${JSON.stringify(message)}`);
}
onError = (err) => {
if (this.debug) console.error(err);
// err is defined with `ws`, but not with the
// browser's WebSocket API so we need to get the errors from the close event
ws.sonicError = err; // eslint-disable-line no-param-reassign
};
/* pool-aware send method */
_send(message, doneCb, outputCb, progressCb, metadataCb, startedCb) {
switch (this.state) {
case states.CLOSED:
doneCb(new Error('client is closed and cannot accept more work'));
return;
case states.CLOSING:
doneCb(new Error('client is closing and cannot accept more work'));
return;
default:
}
onMessage = (_message) => {
const msg = BrowserWebSocket ? JSON.parse(_message.data) : JSON.parse(_message.toString('utf-8'));
const completeStream = () => {
if (this.debug) console.log(`completed complete/ack sequence: trace_id=${msg.p.trace_id}; error=${msg.v};`);
if (msg.v) {
done(new Error(`query with trace_id \`${msg.p.trace_id}\` failed: ${msg.v}`));
} else {
done(null);
}
};
// identify send for cancel hooks
const id = this.nextId++;
let doned = false;
switch (msg.e) {
case 'P':
progress(utils.toProgress(msg.p));
break;
const timer = setTimeout(() => {
const err = new Error(`its taking more than (${this.acquireTimeout}) to acquire resource for ticket=${id}`);
this._log(err);
doneCb(err);
}, this.acquireTimeout);
case 'D':
if (BrowserWebSocket) {
ws.send(SonicMessage.ACK);
completeStream();
} else {
ws.send(SonicMessage.ACK, completeStream);
}
break;
// acquire connection
this.pool.acquire((err, ws) => {
clearTimeout(timer);
case 'T':
metadata(msg.p.map(elem => [elem[0], typeof elem[1]]));
break;
if (err) {
doneCb(err);
return;
}
case 'S':
if (typeof startedCb !== 'undefined') {
startedCb(msg.v);
}
break;
this.running[id] = ws;
case 'O':
output(msg.p);
break;
if (this.debug) {
if (!ws.sonicId) {
/* first ticket for this resource is the resource ID */
ws.sonicId = id; // eslint-disable-line no-param-reassign
}
console.log(`acquired resource=${ws.sonicId} for ticket=${id}`); // eslint-disable-line no-console
}
default:
if (this.debug) console.log(`unsupported message received: ${JSON.stringify(msg)}`);
break;
}
};
// doneCb override to release connection back to pool
const doDoneCb = (err) => {
this._log(`done with ticket=${id}; resource=${ws.sonicId}`);
if (!doned) {
doned = true;
delete this.running[id];
ws.on('close', onClose);
ws.on('error', onError);
ws.on('message', onMessage);
ws.send(message);
if (this.debug) console.log(`sending message: ${JSON.stringify(message)}`);
}
if (err) {
this._log(`destroying resource=${ws.sonicId}; ticket=${id}`);
/* pool.destroy produces unexpected results; this forces resourced to be invalid */
ws.close(1011, 'pool#destroy');
} else {
this._log(`releasing resource=${ws.sonicId}; ticket=${id}`);
}
this.pool.release(ws);
doneCb(err);
}
};
/* pool-aware send method */
_send(message, doneCb, outputCb, progressCb, metadataCb, startedCb) {
switch (this.state) {
case states.CLOSED:
doneCb(new Error('client is closed and cannot accept more work'));
return;
case states.CLOSING:
doneCb(new Error('client is closing and cannot accept more work'));
return;
default:
}
try {
this._wsSend(JSON.stringify(message), doDoneCb, outputCb, progressCb, metadataCb, startedCb, ws);
} catch (e) {
this._log(e);
doDoneCb(e);
}
});
// identify send for cancel hooks
const id = this.nextId++;
let doned = false;
return id; // eslint-disable-line consistent-return
}
let timer;
if (this.debug) {
timer = setTimeout(() =>
console.log(`its taking more than (${this.acquireTimeout}) to acquire resource for ticket=${id}`),
this.acquireTimeout);
}
stream(query) {
const emitter = new EventEmitter();
const queryMsg = utils.toMsg(query);
// acquire connection
this.pool.acquire().then((ws) => {
this.running[id] = ws;
function done(err) {
if (err) {
emitter.emit('error', err);
return;
}
if (this.debug) {
if (!ws.sonicId) {
/* first ticket for this resource is the resource ID */
ws.sonicId = id; // eslint-disable-line no-param-reassign
}
console.log(`acquired resource=${ws.sonicId} for ticket=${id}`);
clearTimeout(timer);
}
emitter.emit('done');
}
// doneCb override to release connection back to pool
const doDoneCb = (err) => {
if (this.debug) console.log(`done with ticket=${id}; resource=${ws.sonicId}`);
if (!doned) {
doned = true;
delete this.running[id];
function output(elems) {
emitter.emit('data', elems);
}
if (ws.sonicDestroy) {
if (this.debug) console.log(`destroying resource=${ws.sonicId}; ticket=${id}`);
this.pool.destroy(ws);
} else {
if (this.debug) console.log(`releasing resource=${ws.sonicId}; ticket=${id}`);
this.pool.release(ws);
}
doneCb(err);
}
};
function metadata(meta) {
emitter.emit('metadata', meta);
}
try {
this._wsSend(JSON.stringify(message), doDoneCb, outputCb, progressCb, metadataCb, startedCb, ws);
} catch (e) {
if (this.debug) console.error(e);
this.pool.release(ws);
doDoneCb(e);
}
}).catch(doneCb);
function progress(prog) {
emitter.emit('progress', prog);
}
return id; // eslint-disable-line consistent-return
}
function started(traceId) {
emitter.emit('started', traceId);
}
stream(query) {
const emitter = new SonicEmitter();
const queryMsg = utils.toMsg(query);
const id = this._send(queryMsg, done, output, progress, metadata, started);
function done(err) {
if (err) {
emitter.emit('error', err);
return;
}
emitter.cancel = (cb) => {
this._cancel(id, cb);
};
emitter.emit('done');
}
return emitter;
}
function output(elems) {
emitter.emit('data', elems);
}
/* TODO: deprecate in favor of run2 */
run(query, doneCb) {
const data = [];
const queryMsg = utils.toMsg(query);
function metadata(meta) {
emitter.emit('metadata', meta);
}
function done(err) {
if (err) {
doneCb(err, null);
} else {
doneCb(null, data);
}
}
function progress(prog) {
emitter.emit('progress', prog);
}
function output(elems) {
data.push(elems);
}
function started(traceId) {
emitter.emit('started', traceId);
}
this._send(queryMsg, done, output);
}
const id = this._send(queryMsg, done, output, progress, metadata, started);
run2(query) {
return new Promise((resolve, reject) => {
this.run(query, (err, data) => {
if (err) {
reject(err);
return;
}
resolve(data);
});
});
}
emitter.cancel = (cb) => {
this._cancel(id, cb);
};
authenticate(user, apiKey, doneCb, traceId) {
let token;
const authMsg = {
e: 'H',
p: {
user,
trace_id: traceId,
},
v: apiKey,
};
return emitter;
}
function done(err) {
if (err) {
doneCb(err);
} else {
doneCb(null, token);
}
}
/* TODO: deprecate in favor of run2 */
run(query, doneCb) {
const data = [];
const queryMsg = utils.toMsg(query);
function output(elems) {
token = elems[0];
}
function done(err) {
if (err) {
doneCb(err, null);
} else {
doneCb(null, data);
}
}
this._send(authMsg, done, output);
}
function output(elems) {
data.push(elems);
}
_close() {
if (this.state !== states.CLOSING) {
return Promise.resolve();
}
return this.pool.drain(() => this.pool.destroyAllNow());
}
this._send(queryMsg, done, output);
}
cancel() {
const ids = Object.keys(this.running);
run2(query) {
return new Promise((resolve, reject) => {
this.run(query, (err, data) => {
if (err) {
reject(err);
return;
}
resolve(data);
});
});
}
return Promise.all(ids.map(id => new Promise((resolve, reject) => {
this._cancel(id, (err) => {
if (err) {
reject(err);
return;
}
resolve();
});
})));
}
authenticate(user, apiKey, doneCb, traceId) {
let token;
const authMsg = {
e: 'H',
p: {
user,
trace_id: traceId,
},
v: apiKey,
};
function done(err) {
if (err) {
doneCb(err);
} else {
doneCb(null, token);
}
}
function output(elems) {
token = elems[0];
}
this._send(authMsg, done, output);
}
_close() {
if (this.state !== states.CLOSING) {
return Promise.resolve();
}
return this.pool.drain().then(() => this.pool.clear());
}
cancel() {
const ids = Object.keys(this.running);
return Promise.all(ids.map(id => new Promise((resolve, reject) => {
this._cancel(id, (err) => {
if (err) {
reject(err);
return;
}
resolve();
});
})));
}
close() {
this.state = states.CLOSING;
return this.cancel()
.then(() => this._close())
.then(() => {
this.state = states.CLOSED;
});
}
close() {
this.state = states.CLOSING;
return this.cancel()
.then(() => this._close())
.then(() => this.state = states.CLOSED);
}
}
module.exports.Client = Client;

@@ -1,62 +0,65 @@

'use strict';
function toMsg(query) {
var traceId = query.trace_id || query.traceId;
var token = query.token || query.auth;
return {
e: 'Q',
v: query.query,
p: {
auth: token,
trace_id: traceId,
config: query.config
}
};
var traceId = query.trace_id || query.traceId;
var token = query.token || query.auth;
return {
e: 'Q',
v: query.query,
p: {
auth: token,
trace_id: traceId,
config: query.config
}
};
}
function toProgress(payload) {
var status;
var status;
switch (payload.s) {
case 0:
status = 'Queued';
break;
switch (payload.s) {
case 0:
status = 'Queued';
break;
case 1:
status = 'Started';
break;
case 1:
status = 'Started';
break;
case 2:
status = 'Running';
break;
case 2:
status = 'Running';
break;
case 3:
status = 'Waiting';
break;
case 3:
status = 'Waiting';
break;
case 4:
status = 'Finished';
break;
case 4:
status = 'Finished';
break;
default:
status = 'Unknown';
break;
}
default:
status = 'Unknown';
break;
}
return {
status: status,
statusCode: payload.s,
progress: payload.p,
total: payload.t,
units: payload.u
};
return {
status: status,
statusCode: payload.s,
progress: payload.p,
total: payload.t,
units: payload.u
};
}
function getCloseError(ev) {
return new Error(`WebSocket close: code=${ev.code}; reason=${ev.reason}`);
}
module.exports = {
toMsg: toMsg,
toProgress: toProgress,
SonicMessage: {
ACK: JSON.stringify({ e: 'A' }),
CANCEL: JSON.stringify({ e: 'C' })
}
getCloseError,
toMsg,
toProgress,
SonicMessage: {
ACK: JSON.stringify({ e: 'A' }),
CANCEL: JSON.stringify({ e: 'C' })
}
};

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc