Comparing version 0.7.1 to 0.8.0
/* eslint no-console: [0]*/ | ||
// var Client = require('sonic-js').Client; | ||
var Client = require('../src/lib.js').Client; | ||
var assert = require('assert'); | ||
var host = process.env.SONIC_HOST || 'wss://0.0.0.0:443'; | ||
var API_KEY = '1234'; | ||
var USER = 'serrallonga'; | ||
const Client = require('../src/lib.js').Client; | ||
const assert = require('assert'); | ||
var client = new Client(host + '/v1/query'); | ||
const host = process.env.SONIC_HOST || 'wss://0.0.0.0:443'; | ||
const API_KEY = '1234'; | ||
const USER = 'serrallonga'; | ||
var query = { | ||
const opt = { | ||
maxPoolSize: 5, /* max connection pool size */ | ||
minPoolSize: 1, /* min connection pool size */ | ||
maxTries: 1, /* max create connection attempts before bubbling up error */ | ||
autostart: true, /* should the Client start creating connections once the constructor is called */ | ||
validate: true, /* should the Client validate that connections are responsive before using them */ | ||
validateTimeout: 2000, /* new connection validation timeout */ | ||
acquireTimeout: 3000, /* new connection total acquisition timeout */ | ||
}; | ||
const client = new Client(`${host}/v1/query`, opt); | ||
const query = { | ||
query: '10', | ||
@@ -21,3 +32,3 @@ config: { | ||
var query2 = { | ||
const query2 = { | ||
query: '5', | ||
@@ -27,3 +38,3 @@ config: 'secured_test' | ||
var done = 0; | ||
let done = 0; | ||
@@ -33,28 +44,29 @@ | ||
var stream = client.stream(query); | ||
const stream = client.stream(query); | ||
stream.on('data', function(data) { | ||
stream.on('data', (data) => { | ||
console.log(data); | ||
}); | ||
stream.on('progress', function(p) { | ||
stream.on('progress', (p) => { | ||
done += p.progress; | ||
console.log('running.. ' + done + '/' + p.total + ' ' + p.units); | ||
console.log(`running.. ${done}/${p.total} ${p.units}`); | ||
}); | ||
stream.on('metadata', function(meta) { | ||
console.log('metadata: ' + JSON.stringify(meta)); | ||
stream.on('metadata', (meta) => { | ||
console.log(`metadata: ${JSON.stringify(meta)}`); | ||
}); | ||
stream.on('done', function() { | ||
stream.on('done', () => { | ||
console.log('stream is done!'); | ||
}); | ||
stream.on('error', function(err) { | ||
console.log('stream error: ' + err); | ||
stream.on('error', (err) => { | ||
console.log(`stream error: ${err}`); | ||
}); | ||
/* UNAUTHENTICATED Client.prototype.run */ | ||
client.run(query, function(err, res) { | ||
client.run(query, (err, res) => { | ||
if (err) { | ||
@@ -65,3 +77,3 @@ console.log(err); | ||
res.forEach(function(e) { | ||
res.forEach((e) => { | ||
console.log(e); | ||
@@ -71,12 +83,9 @@ }); | ||
console.log('exec is done!'); | ||
}); | ||
/* AUTHENTICATED Client.prototype.run */ | ||
// `secured_test` source can be accessed without | ||
// an auth token that grants | ||
// authorization equal or higher than 3. | ||
client.run(query2, function(err) { | ||
assert.throws(function () { | ||
client.run(query2, (err) => { | ||
assert.throws(() => { | ||
if (err) { | ||
@@ -88,4 +97,7 @@ throw err; | ||
/* AUTHENTICATED Client.prototype.run */ | ||
// first we need to authenticate | ||
client.authenticate(USER, API_KEY, function(err, token) { | ||
client.authenticate(USER, API_KEY, (err, token) => { | ||
if (err) { | ||
@@ -97,8 +109,8 @@ throw err; | ||
client.run(query2, function(err, res) { | ||
if (err) { | ||
throw err; | ||
client.run(query2, (qerr, res) => { | ||
if (qerr) { | ||
throw qerr; | ||
} | ||
res.forEach(function(e) { | ||
res.forEach((e) => { | ||
console.log(e); | ||
@@ -110,5 +122,6 @@ }); | ||
// close ws | ||
client.close(); | ||
client.close() | ||
.then(() => console.log('released all resources')) | ||
.catch(error => console.error(error)); | ||
}); | ||
}); |
{ | ||
"name": "sonic-js", | ||
"version": "0.7.1", | ||
"version": "0.8.0", | ||
"description": "ws client library for the Sonic protocol", | ||
@@ -10,7 +10,6 @@ "main": "src/lib.js", | ||
"scripts": { | ||
"test": "node_modules/mocha/bin/mocha -r chai spec/close.js spec/ws.js" | ||
"test": "node_modules/mocha/bin/mocha -r chai --timeout 10000 spec/close.js spec/ws.js" | ||
}, | ||
"dependencies": { | ||
"bufferutil": "^1.2.1", | ||
"utf-8-validate": "^1.2.1", | ||
"generic-pool": "git://github.com/Aleksandras-Novikovas/node-pool.git#59889c4b0adb5dea839a951fb351ff5ee6045768", | ||
"ws": "^1.1.0" | ||
@@ -20,4 +19,13 @@ }, | ||
"chai": "^3.5.0", | ||
"eslint": "^3.19.0", | ||
"eslint-config-airbnb": "^15.0.1", | ||
"eslint-plugin-import": "^2.3.0", | ||
"eslint-plugin-jsx-a11y": "^5.0.3", | ||
"eslint-plugin-react": "^7.0.1", | ||
"mocha": "^2.5.3" | ||
}, | ||
"optionalDependencies": { | ||
"bufferutil": "^3.0.0", | ||
"utf-8-validate": "^3.0.1" | ||
} | ||
} |
/* eslint-env node, mocha */ | ||
const Client = require('../src/lib.js').Client; | ||
const assert = require('chai').assert; | ||
var Client = require('../src/lib.js').Client; | ||
var assert = require('chai').assert; | ||
var sonicEndpoint = (process.env.SONIC_HOST || 'wss://0.0.0.0:443') + '/v1/query'; | ||
const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`; | ||
describe('Client#close', function() { | ||
it('should cancel all ongoing queries', function(done) { | ||
var client = new Client(sonicEndpoint); | ||
var q = { | ||
describe('Client#close', () => { | ||
it('should cancel all ongoing queries', () => { | ||
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, }); | ||
const q = { | ||
query: '100', | ||
'progress-delay': 1000, | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
var queries = [q, q, q, q, q, q]; | ||
const queries = [q, q, q]; | ||
queries.forEach(function(query) { | ||
client.run(query, function(err, d) { | ||
if (err || d.length !== 0) { | ||
done(err || new Error('data not empty!')); | ||
return; | ||
} | ||
if (queries.length === 1) { | ||
done(); | ||
assert.equal(client.ws.length, 0); | ||
return; | ||
} | ||
const progress = Promise.all(queries.map(client.run2.bind(client))); | ||
let closeProm; | ||
queries.splice(queries.indexOf(query), 1); | ||
const timer = setInterval(() => { | ||
// wait until all queries are running | ||
if (Object.keys(client.running).length === queries.length) { | ||
closeProm = client.close(); | ||
clearInterval(timer); | ||
} | ||
}, 100); | ||
}); | ||
}); | ||
assert.equal(client.ws.length, 6); | ||
client.close(); | ||
return progress.then((data) => { | ||
assert.equal(data.reduce((a, d) => a + d.length, 0), 0); | ||
}).then(() => closeProm); | ||
}); | ||
}); | ||
describe('stream#cancel', function() { | ||
it('should cancel query', function(done) { | ||
var client = new Client(sonicEndpoint); | ||
var query = { | ||
it('should not allow more queries to be submitted', () => { | ||
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, }); | ||
const q = { | ||
query: '100', | ||
'progress-delay': 1000, | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
var cancelCb = false; | ||
var emitter = client.stream(query); | ||
assert.equal(client.ws.length, 1); | ||
const queries = [q, q, q]; | ||
emitter.on('done', function() { | ||
assert.equal(client.ws.length, 0); | ||
assert(cancelCb, 'cancelCb was not called!'); | ||
done(); | ||
}); | ||
emitter.on('error', function(err) { | ||
done(err); | ||
}); | ||
emitter.cancel(function() { | ||
cancelCb = true; | ||
}); | ||
const closeProm = client.close(); | ||
return Promise.all(queries.map(client.run2.bind(client))).then((data) => { | ||
throw new Error(`not bubble up error: ${JSON.stringify(data)}`); | ||
}).catch((err) => { | ||
assert(err.toString().indexOf('closing') >= 0, err.toString()); | ||
assert.equal(Object.keys(client.running).length, 0); | ||
}).then(() => closeProm); | ||
}); | ||
}); | ||
describe('run#cancel', function() { | ||
it('should cancel query', function(done) { | ||
var client = new Client(sonicEndpoint); | ||
var query = { | ||
describe('stream#cancel', () => { | ||
it('should cancel query', (done) => { | ||
const client = new Client(sonicEndpoint); | ||
const query = { | ||
query: '100', | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
var cancelCb = false; | ||
var closeable = client.run(query, function(err, d) { | ||
if (err || d.length !== 0) { | ||
done(err || new Error('data not empty!')); | ||
} else { | ||
assert.equal(client.ws.length, 0); | ||
assert(cancelCb, 'cancelCb was not called!'); | ||
done(); | ||
} | ||
let cancelCb = false; | ||
const emitter = client.stream(query); | ||
emitter.on('done', () => { | ||
assert.equal(Object.keys(client.running).length, 0); | ||
assert(cancelCb, 'cancelCb was not called!'); | ||
done(); | ||
}); | ||
assert.equal(client.ws.length, 1); | ||
closeable.cancel(function() { | ||
emitter.on('error', (err) => { | ||
done(err); | ||
}); | ||
emitter.cancel(() => { | ||
cancelCb = true; | ||
@@ -85,0 +75,0 @@ }); |
113
spec/util.js
@@ -1,5 +0,5 @@ | ||
var assert = require('chai').assert; | ||
const assert = require('chai').assert; | ||
module.exports.testHappyPathSingle = function(client, query, n, done) { | ||
client.run(query, function(err, data) { | ||
module.exports.testHappyPathSingle = (client, query, n, done) => { | ||
client.run(query, (err, data) => { | ||
if (err) { | ||
@@ -10,3 +10,3 @@ done(err); | ||
if (data.length !== n) { | ||
done(new Error('data was not ' + n)); | ||
done(new Error(`data was not ${n}`)); | ||
return; | ||
@@ -18,7 +18,25 @@ } | ||
module.exports.testHappyPath = function(client, query, n, done) { | ||
var _done = 0; | ||
var stream, traceId; | ||
module.exports.testHappyPath = (client, query, n, _done) => { | ||
let count = 0; | ||
let ran, streamed, doned = false; | ||
let traceId; | ||
if (!_done) { | ||
throw new Error('needs _done to hook with test driver'); | ||
} | ||
client.run(query, function(err, data) { | ||
const debugTimer = setTimeout(() => { | ||
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`); | ||
}, 3000); | ||
const done = (e) => { | ||
// guard against multiple calls to done() with err | ||
if (!doned) { | ||
doned = true; | ||
clearTimeout(debugTimer); | ||
_done(e); | ||
} | ||
}; | ||
client.run(query, (err, data) => { | ||
ran = true; | ||
if (err) { | ||
@@ -28,17 +46,17 @@ done(err); | ||
} | ||
assert(data.length === n, 'data was not' + n); | ||
if (_done === 1) { | ||
assert(data.length === n, `data was not${n}`); | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
_done += 1; | ||
count += 1; | ||
} | ||
}); | ||
stream = client.stream(query); | ||
const stream = client.stream(query); | ||
stream.on('started', function(id) { | ||
stream.on('started', (id) => { | ||
traceId = id; | ||
}); | ||
stream.on('error', function(err) { | ||
stream.on('error', (err) => { | ||
if (err) { | ||
@@ -51,10 +69,11 @@ done(err); | ||
stream.on('done', function(err) { | ||
stream.on('done', (err) => { | ||
streamed = true; | ||
assert(typeof traceId !== 'undefined', 'traceId is undefined in stream done callback on `testHappyPath` test'); | ||
if (err) { | ||
done(err); | ||
} else if (_done === 1) { | ||
} else if (count === 1) { | ||
done(); | ||
} else { | ||
_done += 1; | ||
count += 1; | ||
} | ||
@@ -64,14 +83,30 @@ }); | ||
module.exports.expectError = function(client, query, done) { | ||
var _done = 0; | ||
var stream; | ||
module.exports.expectError = (client, query, _done) => { | ||
let count = 0; | ||
let ran, streamed, doned = false; | ||
client.run(query, function(err) { | ||
if (!_done) { | ||
throw new Error('needs _done to hook with test driver'); | ||
} | ||
const debugTimer = setTimeout(() => { | ||
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`); | ||
}, 2000); | ||
const done = (e) => { | ||
// guard against multiple calls to done() with err | ||
if (!doned) { | ||
doned = true; | ||
clearTimeout(debugTimer); | ||
_done(e); | ||
} | ||
}; | ||
client.run(query, (err) => { | ||
ran = true; | ||
if (err) { | ||
if (done) { | ||
if (_done === 1) { | ||
done(); | ||
} else { | ||
_done += 1; | ||
} | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
@@ -83,16 +118,15 @@ } else { | ||
stream = client.stream(query); | ||
const stream = client.stream(query); | ||
stream.on('done', function() { | ||
stream.on('done', () => { | ||
done(new Error('stream emitted `done` but `error` expected')); | ||
}); | ||
stream.on('error', function(err) { | ||
stream.on('error', (err) => { | ||
if (err) { | ||
if (done) { | ||
if (_done === 1) { | ||
done(); | ||
} else { | ||
_done += 1; | ||
} | ||
streamed = true; | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
@@ -105,5 +139,5 @@ } else { | ||
module.exports.doAuthenticate = function(client, done, apiKeyMaybe) { | ||
var apiKey = apiKeyMaybe || '1234'; | ||
client.authenticate('spec_tests', apiKey, function(err, token) { | ||
module.exports.doAuthenticate = (client, done, apiKeyMaybe) => { | ||
const apiKey = apiKeyMaybe || '1234'; | ||
client.authenticate('spec_tests', apiKey, (err, token) => { | ||
if (err) { | ||
@@ -121,2 +155,1 @@ done(new Error('failed to authenticate')); | ||
}; | ||
/* eslint-env node, mocha */ | ||
const Client = require('../src/lib.js').Client; | ||
const process = require('process'); | ||
const util = require('./util'); | ||
var Client = require('../src/lib.js').Client; | ||
var assert = require('chai').assert; | ||
var process = require('process'); | ||
var sonicEndpoint = (process.env.SONIC_HOST || 'wss://0.0.0.0:443') + '/v1/query'; | ||
var util = require('./util'); | ||
var token; | ||
const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`; | ||
let token; | ||
function runSpecTests(client, id) { | ||
it(id + ' - should be able to run a simple query and stream the data back from the server', function(done) { | ||
var query = { | ||
it(`${id} - should be able to run a simple query and stream the data back from the server`, (done) => { | ||
const query = { | ||
query: '5', | ||
@@ -25,6 +25,7 @@ auth: token, | ||
it(id + ' - should return an error if source class is unknown', function(done) { | ||
var query = { | ||
it(`${id} - should return an error if source class is unknown`, (done) => { | ||
const query = { | ||
query: '1', | ||
auth: token, | ||
trace_id: 'ballz0', | ||
config: { | ||
@@ -38,6 +39,7 @@ class: 'UnknownClass' | ||
it(id + ' - should return an error if query or config is null', function(done) { | ||
var query = { | ||
it(`${id} - should return an error if query is null`, (done) => { | ||
const query = { | ||
query: null, | ||
auth: token, | ||
trace_id: 'ballz1', | ||
config: { | ||
@@ -51,5 +53,6 @@ class: 'SyntheticSource' | ||
it(id + ' - should return an error if config is null', function(done) { | ||
var query = { | ||
it(`${id} - should return an error if config is null`, (done) => { | ||
const query = { | ||
query: '1', | ||
trace_id: 'ballz2', | ||
auth: token, | ||
@@ -62,4 +65,4 @@ config: null | ||
it(id + ' - should return an error if source publisher completes stream with exception', function(done) { | ||
var query = { | ||
it(`${id} - should return an error if source publisher completes stream with exception`, (done) => { | ||
const query = { | ||
// signals source to throw expected exception | ||
@@ -76,4 +79,4 @@ query: '28', | ||
it(id + ' - should return an error if source throws an exception and terminates', function(done) { | ||
var query = { | ||
it(`${id} - should return an error if source throws an exception and terminates`, (done) => { | ||
const query = { | ||
// signals source to throw unexpected exception | ||
@@ -90,6 +93,6 @@ query: '-1', | ||
it(id + ' - should stream a big payload correctly', function(done) { | ||
this.timeout(4000); | ||
var q = ""; | ||
var i = 0; | ||
it(`${id} - should stream a big payload correctly`, function (done) { | ||
this.timeout(6000); | ||
let q = ''; | ||
let i = 0; | ||
while (i < 10000) { | ||
@@ -100,3 +103,3 @@ q += 'aweqefekwljflwekfjkelwfjlwekjfeklwjflwekjfeklwjfeklfejklfjewlkfejwklw'; | ||
var query = { | ||
const query = { | ||
query: q, | ||
@@ -115,10 +118,9 @@ auth: token, | ||
describe('Sonic ws', function() { | ||
describe('Sonic ws', () => { | ||
const client = new Client(sonicEndpoint); | ||
var client = new Client(sonicEndpoint); | ||
runSpecTests(client, 'unauthenticated'); | ||
it('should return an error if source requires authentication and user is unauthenticated', function(done) { | ||
var query = { | ||
it('should return an error if source requires authentication and user is unauthenticated', (done) => { | ||
const query = { | ||
query: '1', | ||
@@ -137,7 +139,6 @@ // tipically set server side, but also | ||
describe('Sonic ws auth', function() { | ||
it('should throw an error if api key is invalid', function(done) { | ||
client.authenticate('spec_tests', 'mariano', function(err) { | ||
if(err) { | ||
describe('Sonic ws auth', () => { | ||
it('should throw an error if api key is invalid', (done) => { | ||
client.authenticate('spec_tests', 'mariano', (err) => { | ||
if (err) { | ||
done(); | ||
@@ -150,4 +151,4 @@ } else { | ||
it('should authenticate user', function(done) { | ||
util.doAuthenticate(client, function(err, token) { | ||
it('should authenticate user', (done) => { | ||
util.doAuthenticate(client, (err, token) => { | ||
if (err) { | ||
@@ -162,9 +163,9 @@ done(err); | ||
runSpecTests(new Client(sonicEndpoint, { maxPoolSize: 1, minPoolSize: 1, maxTries: 1 }), 'single connection'); | ||
describe('Sonic ws with authentication', function() { | ||
describe('Sonic ws with authentication', () => { | ||
const authenticated = new Client(sonicEndpoint); | ||
var authenticated = new Client(sonicEndpoint); | ||
before(function(done) { | ||
util.doAuthenticate(authenticated, function(err, t) { | ||
before((done) => { | ||
util.doAuthenticate(authenticated, (err, t) => { | ||
if (err) { | ||
@@ -179,3 +180,3 @@ done(err); | ||
after(function(done) { | ||
after((done) => { | ||
authenticated.close(); | ||
@@ -188,4 +189,4 @@ done(); | ||
it('should allow an authenticated and authorized user to run a query on a secure source', function(done) { | ||
var query = { | ||
it('should allow an authenticated and authorized user to run a query on a secure source', (done) => { | ||
const query = { | ||
query: '5', | ||
@@ -202,4 +203,4 @@ auth: token, | ||
it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', function(done) { | ||
var query = { | ||
it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', (done) => { | ||
const query = { | ||
query: '5', | ||
@@ -216,5 +217,4 @@ auth: token, | ||
it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', function(done) { | ||
util.doAuthenticate(authenticated, function(err, token) { | ||
it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', (done) => { | ||
util.doAuthenticate(authenticated, (err, token) => { | ||
if (err) { | ||
@@ -225,3 +225,3 @@ done(err); | ||
var query = { | ||
const query = { | ||
query: '5', | ||
@@ -228,0 +228,0 @@ auth: token, |
517
src/lib.js
@@ -1,10 +0,9 @@ | ||
'use strict'; | ||
const BrowserWebSocket = global.MozWebSocket || global.WebSocket; | ||
const WebSocket = BrowserWebSocket || require('ws'); // eslint-disable-line global-require | ||
const EventEmitter = require('events'); | ||
const utils = require('./util'); | ||
const pool = require('generic-pool'); | ||
var BrowserWebSocket = global.MozWebSocket || global.WebSocket; | ||
var WebSocket = BrowserWebSocket || require('ws'); | ||
var EventEmitter = require('events'); | ||
var util = require('util'); | ||
var utils = require('./util'); | ||
var SonicMessage = utils.SonicMessage; | ||
var noop = function() {}; | ||
const SonicMessage = utils.SonicMessage; | ||
const noop = () => {}; | ||
@@ -14,104 +13,199 @@ // this is an ugly hack to prevent browseryfied `ws` module to throw errors at runtime | ||
if (BrowserWebSocket) { | ||
WebSocket.prototype.on = function(event, callback) { | ||
this['on' + event] = callback; | ||
WebSocket.prototype.on = function on(event, callback) { | ||
this[`on${event}`] = callback; | ||
}; | ||
} | ||
function cancel(ws, _cb) { | ||
var cb = typeof _cb === 'function' ? _cb : noop; | ||
function doSend() { | ||
if (BrowserWebSocket) { | ||
try { | ||
ws.send(SonicMessage.CANCEL); | ||
cb(); | ||
} catch (e) { | ||
cb(e); | ||
} | ||
} else { | ||
ws.send(SonicMessage.CANCEL, cb); | ||
} | ||
} | ||
if (ws.readyState === WebSocket.OPEN) { | ||
doSend(); | ||
} else { | ||
ws.on('open', doSend); | ||
} | ||
} | ||
class SonicEmitter extends EventEmitter {} | ||
function Client(sonicAddress) { | ||
this.url = sonicAddress; | ||
this.ws = []; | ||
function getCloseError(ev) { | ||
return new Error(`WebSocket close: code=${ev.code}; reason=${ev.reason}`); | ||
} | ||
function SonicEmitter() { | ||
EventEmitter.call(this); | ||
} | ||
const states = { | ||
INITIALIZED: 1, | ||
CLOSING: 2, | ||
CLOSED: 3, | ||
}; | ||
util.inherits(SonicEmitter, EventEmitter); | ||
class Client { | ||
constructor(sonicAddress, | ||
{ maxPoolSize, minPoolSize, debug, autostart, validate, | ||
validateTimeout, maxTries, acquireTimeout } = {}) { | ||
this.url = sonicAddress; | ||
this.running = {}; | ||
this.nextId = 1; | ||
this.state = states.INITIALIZED; | ||
this.debug = debug; | ||
this.validateTimeout = validateTimeout || 2000; | ||
this.maxPoolSize = maxPoolSize || 5; | ||
this.minPoolSize = minPoolSize || 1; | ||
this.maxTries = maxTries || 1; | ||
this.validate = typeof validate === 'undefined' ? true : validate; | ||
this.autostart = typeof autostart === 'undefined' ? true : autostart; | ||
this.acquireTimeout = acquireTimeout || 3000; | ||
this._initializePool(); | ||
} | ||
Client.prototype.send = function(doneCb, outputCb, progressCb, metadataCb, startedCb) { | ||
var output = outputCb || (function() {}); | ||
var progress = progressCb || (function() {}); | ||
var metadata = metadataCb || (function() {}); | ||
var isDone = false; | ||
var isError = false; | ||
var self = this; | ||
_initializePool() { | ||
const client = this; | ||
const poolOpts = { | ||
max: this.maxPoolSize, // maximum size of the pool | ||
min: this.minPoolSize, // minimum size of the pool | ||
maxTries: this.maxTries, | ||
autostart: this.autostart, | ||
testOnBorrow: this.validate, | ||
}; | ||
return function(message, ws) { | ||
const WsFactory = { | ||
create() { | ||
return new Promise((resolve, reject) => { | ||
const ws = new WebSocket(client.url); | ||
let onOpen, onClose; | ||
ws.send(message); | ||
const onError = (err) => { | ||
ws.removeListener('open', onOpen); | ||
ws.removeListener('close', onClose); | ||
reject(err); | ||
}; | ||
function done(err, id) { | ||
var idx; | ||
onOpen = () => { | ||
ws.removeListener('error', onError); | ||
ws.removeListener('close', onClose); | ||
resolve(ws); | ||
}; | ||
ws.close(1000, 'completed'); | ||
onClose = (ev) => { | ||
ws.removeListener('error', onError); | ||
ws.removeListener('open', onOpen); | ||
reject(getCloseError(ev)); | ||
}; | ||
if ((idx = self.ws.indexOf(ws)) < 0) { | ||
throw new Error('ws not found'); | ||
} | ||
ws.on('open', onOpen); | ||
ws.on('close', onClose); | ||
ws.on('error', onError); | ||
}); | ||
}, | ||
self.ws.splice(idx, 1); | ||
validate(ws) { | ||
return new Promise((resolve) => { | ||
let onPong; | ||
doneCb(err, id); | ||
} | ||
const timer = setTimeout(() => { | ||
resolve(false); | ||
ws.removeListener('pong', onPong); | ||
}, this.validateTimeout); | ||
function closedUnexp() { | ||
done(new Error('connection closed unexpectedly')); | ||
onPong = () => { | ||
clearTimeout(timer); | ||
resolve(true); | ||
}; | ||
ws.on('pong', onPong); | ||
try { | ||
ws.ping(); | ||
} catch (e) { | ||
resolve(false); | ||
} | ||
}); | ||
}, | ||
destroy(ws) { | ||
return new Promise((resolve) => { | ||
ws.on('close', () => { | ||
resolve(); | ||
}); | ||
ws.close(1000, 'pool#destroy'); | ||
}); | ||
}, | ||
}; | ||
this.pool = pool.createPool(WsFactory, poolOpts); | ||
} | ||
_cancel(id, _cb) { | ||
const ws = this.running[id]; | ||
const cb = typeof _cb === 'function' ? _cb : noop; | ||
if (this.debug) console.log(`cancelling: WebSocket(${id})`); | ||
/* a 'D' message is expected when a cancel is send, | ||
* therefore resource cleanup should be handled by done handler | ||
*/ | ||
if (!ws) { | ||
if (this.debug) console.error(new Error(`cancel: WebSocket(${id}) is not running any queries`)); | ||
cb(); | ||
return; | ||
} | ||
ws.on('close', function(ev) { | ||
// browser | ||
if (BrowserWebSocket) { | ||
if (isError) { | ||
done(new Error('WebSocket close code: ' + ev.code + '; reason: ' + ev.reason)); | ||
} else if (ev.code !== 1000 && !isDone) { | ||
closedUnexp(); | ||
} | ||
const doSend = () => { | ||
if (!BrowserWebSocket) { | ||
ws.send(SonicMessage.CANCEL, () => { | ||
if (this.debug) console.log(`cancelled: WebSocket(${id})`); | ||
cb(); | ||
}); | ||
return; | ||
} | ||
// ws | ||
} else if (!isDone && ev !== 1000) { | ||
closedUnexp(); | ||
try { | ||
ws.send(SonicMessage.CANCEL); | ||
if (this.debug) console.log(`cancelled: WebSocket(${id})`); | ||
cb(); | ||
} catch (e) { | ||
if (this.debug) console.error(e); | ||
cb(e); | ||
} | ||
}); | ||
}; | ||
ws.on('error', function(ev) { | ||
// ev is defined with `ws`, but not with the | ||
// browser's WebSocket API | ||
if (BrowserWebSocket) { | ||
isError = true; | ||
if (ws.readyState === WebSocket.CONNECTING) { | ||
ws.on('open', doSend); | ||
} if (ws.readyState === WebSocket.OPEN) { | ||
doSend(); | ||
} else { | ||
// connection is CLOSING/CLOSED | ||
cb(); | ||
} | ||
} | ||
/* ws client agnostic send method */ | ||
_wsSend(message, doneCb, outputCb, progressCb, metadataCb, startedCb, ws) { | ||
const output = outputCb || noop; | ||
const progress = progressCb || noop; | ||
const metadata = metadataCb || noop; | ||
let onError, onClose, onMessage; | ||
const done = (err) => { | ||
ws.removeListener('close', onClose); | ||
ws.removeListener('error', onError); | ||
ws.removeListener('message', onMessage); | ||
doneCb(err); | ||
}; | ||
onClose = (ev) => { | ||
if (this.debug) console.log(`WebSocket closed: code=${ev.code}; reason=${ev.reason};`); | ||
if (!ws.sonicError) { | ||
done(getCloseError(ev)); | ||
} else { | ||
isDone = true; | ||
done(ev); | ||
done(ws.sonicError); | ||
} | ||
}); | ||
}; | ||
ws.on('message', function(message) { | ||
var msg = BrowserWebSocket ? JSON.parse(message.data) : JSON.parse(message.toString('utf-8')); | ||
function checkMsg() { | ||
onError = (err) => { | ||
if (this.debug) console.error(err); | ||
// err is defined with `ws`, but not with the | ||
// browser's WebSocket API so we need to get the errors from the close event | ||
ws.sonicError = err; // eslint-disable-line no-param-reassign | ||
}; | ||
onMessage = (_message) => { | ||
const msg = BrowserWebSocket ? JSON.parse(_message.data) : JSON.parse(_message.toString('utf-8')); | ||
const completeStream = () => { | ||
if (this.debug) console.log(`completed complete/ack sequence: trace_id=${msg.p.trace_id}; error=${msg.v};`); | ||
if (msg.v) { | ||
done(new Error('Query with trace_id `' + msg.p.trace_id + '` failed: ' + msg.v)); | ||
done(new Error(`query with trace_id \`${msg.p.trace_id}\` failed: ${msg.v}`)); | ||
} else { | ||
done(null); | ||
} | ||
} | ||
}; | ||
@@ -124,8 +218,7 @@ switch (msg.e) { | ||
case 'D': | ||
isDone = true; | ||
if (BrowserWebSocket) { | ||
ws.send(SonicMessage.ACK); | ||
checkMsg(); | ||
completeStream(); | ||
} else { | ||
ws.send(SonicMessage.ACK, checkMsg); | ||
ws.send(SonicMessage.ACK, completeStream); | ||
} | ||
@@ -135,5 +228,3 @@ break; | ||
case 'T': | ||
metadata(msg.p.map(function(elem) { | ||
return [elem[0], typeof elem[1]]; | ||
})); | ||
metadata(msg.p.map(elem => [elem[0], typeof elem[1]])); | ||
break; | ||
@@ -152,119 +243,207 @@ | ||
default: | ||
// ignore to improve forwards compatibility | ||
if (this.debug) console.log(`unsupported message received: ${JSON.stringify(msg)}`); | ||
break; | ||
} | ||
}); | ||
}; | ||
}; | ||
}; | ||
Client.prototype.exec = function(message, doneCb, outputCb, progressCb, metadataCb, startedCb) { | ||
ws.on('close', onClose); | ||
ws.on('error', onError); | ||
ws.on('message', onMessage); | ||
ws.send(message); | ||
if (this.debug) console.log(`sending message: ${JSON.stringify(message)}`); | ||
} | ||
var ws = new WebSocket(this.url); | ||
var doExec = this.send(doneCb, outputCb, progressCb, metadataCb, startedCb); | ||
/* pool-aware send method */ | ||
_send(message, doneCb, outputCb, progressCb, metadataCb, startedCb) { | ||
switch (this.state) { | ||
case states.CLOSED: | ||
doneCb(new Error('client is closed and cannot accept more work')); | ||
return; | ||
case states.CLOSING: | ||
doneCb(new Error('client is closing and cannot accept more work')); | ||
return; | ||
default: | ||
} | ||
ws.on('open', function() { | ||
doExec(JSON.stringify(message), ws); | ||
}); | ||
// identify send for cancel hooks | ||
const id = this.nextId++; | ||
let doned = false; | ||
this.ws.push(ws); | ||
let timer; | ||
if (this.debug) { | ||
timer = setTimeout(() => | ||
console.log(`its taking more than (${this.acquireTimeout}) to acquire resource for ticket=${id}`), | ||
this.acquireTimeout); | ||
} | ||
return ws; | ||
}; | ||
// acquire connection | ||
this.pool.acquire().then((ws) => { | ||
this.running[id] = ws; | ||
Client.prototype.stream = function(query) { | ||
var emitter = new SonicEmitter(); | ||
var queryMsg = utils.toMsg(query); | ||
var ws; | ||
if (this.debug) { | ||
if (!ws.sonicId) { | ||
/* first ticket for this resource is the resource ID */ | ||
ws.sonicId = id; // eslint-disable-line no-param-reassign | ||
} | ||
console.log(`acquired resource=${ws.sonicId} for ticket=${id}`); | ||
clearTimeout(timer); | ||
} | ||
function done(err) { | ||
if (err) { | ||
emitter.emit('error', err); | ||
return; | ||
} | ||
// doneCb override to release connection back to pool | ||
const doDoneCb = (err) => { | ||
if (this.debug) console.log(`done with ticket=${id}; resource=${ws.sonicId}`); | ||
if (!doned) { | ||
doned = true; | ||
delete this.running[id]; | ||
emitter.emit('done'); | ||
} | ||
if (ws.sonicDestroy) { | ||
if (this.debug) console.log(`destroying resource=${ws.sonicId}; ticket=${id}`); | ||
this.pool.destroy(ws); | ||
} else { | ||
if (this.debug) console.log(`releasing resource=${ws.sonicId}; ticket=${id}`); | ||
this.pool.release(ws); | ||
} | ||
doneCb(err); | ||
} | ||
}; | ||
function output(elems) { | ||
emitter.emit('data', elems); | ||
} | ||
try { | ||
this._wsSend(JSON.stringify(message), doDoneCb, outputCb, progressCb, metadataCb, startedCb, ws); | ||
} catch (e) { | ||
if (this.debug) console.error(e); | ||
this.pool.release(ws); | ||
doDoneCb(e); | ||
} | ||
}).catch(doneCb); | ||
function metadata(meta) { | ||
emitter.emit('metadata', meta); | ||
return id; // eslint-disable-line consistent-return | ||
} | ||
function progress(prog) { | ||
emitter.emit('progress', prog); | ||
} | ||
stream(query) { | ||
const emitter = new SonicEmitter(); | ||
const queryMsg = utils.toMsg(query); | ||
function started(traceId) { | ||
emitter.emit('started', traceId); | ||
} | ||
function done(err) { | ||
if (err) { | ||
emitter.emit('error', err); | ||
return; | ||
} | ||
ws = this.exec(queryMsg, done, output, progress, metadata, started); | ||
emitter.emit('done'); | ||
} | ||
emitter.cancel = function(cb) { | ||
cancel(ws, cb); | ||
}; | ||
function output(elems) { | ||
emitter.emit('data', elems); | ||
} | ||
return emitter; | ||
}; | ||
function metadata(meta) { | ||
emitter.emit('metadata', meta); | ||
} | ||
Client.prototype.run = function(query, doneCb) { | ||
function progress(prog) { | ||
emitter.emit('progress', prog); | ||
} | ||
var data = []; | ||
var queryMsg = utils.toMsg(query); | ||
var ws; | ||
function started(traceId) { | ||
emitter.emit('started', traceId); | ||
} | ||
function done(err) { | ||
if (err) { | ||
doneCb(err, null); | ||
} else { | ||
doneCb(null, data); | ||
const id = this._send(queryMsg, done, output, progress, metadata, started); | ||
emitter.cancel = (cb) => { | ||
this._cancel(id, cb); | ||
}; | ||
return emitter; | ||
} | ||
/* TODO: deprecate in favor of run2 */ | ||
run(query, doneCb) { | ||
const data = []; | ||
const queryMsg = utils.toMsg(query); | ||
function done(err) { | ||
if (err) { | ||
doneCb(err, null); | ||
} else { | ||
doneCb(null, data); | ||
} | ||
} | ||
function output(elems) { | ||
data.push(elems); | ||
} | ||
this._send(queryMsg, done, output); | ||
} | ||
function output(elems) { | ||
data.push(elems); | ||
run2(query) { | ||
return new Promise((resolve, reject) => { | ||
this.run(query, (err, data) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(data); | ||
}); | ||
}); | ||
} | ||
ws = this.exec(queryMsg, done, output); | ||
authenticate(user, apiKey, doneCb, traceId) { | ||
let token; | ||
const authMsg = { | ||
e: 'H', | ||
p: { | ||
user, | ||
trace_id: traceId, | ||
}, | ||
v: apiKey, | ||
}; | ||
return { | ||
cancel: function(cb) { | ||
cancel(ws, cb); | ||
function done(err) { | ||
if (err) { | ||
doneCb(err); | ||
} else { | ||
doneCb(null, token); | ||
} | ||
} | ||
}; | ||
}; | ||
Client.prototype.authenticate = function(user, apiKey, doneCb, traceId) { | ||
var token; | ||
var authMsg = { | ||
e: 'H', | ||
p: { | ||
user: user, | ||
trace_id: traceId | ||
}, | ||
v: apiKey | ||
}; | ||
function output(elems) { | ||
token = elems[0]; | ||
} | ||
function done(err) { | ||
if (err) { | ||
doneCb(err, null); | ||
} else { | ||
doneCb(null, token); | ||
this._send(authMsg, done, output); | ||
} | ||
_close() { | ||
if (this.state !== states.CLOSING) { | ||
return Promise.resolve(); | ||
} | ||
return this.pool.drain().then(() => this.pool.clear()); | ||
} | ||
function output(elems) { | ||
token = elems[0]; | ||
cancel() { | ||
const ids = Object.keys(this.running); | ||
return Promise.all(ids.map(id => new Promise((resolve, reject) => { | ||
this._cancel(id, (err) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(); | ||
}); | ||
}))); | ||
} | ||
this.exec(authMsg, done, output); | ||
}; | ||
close() { | ||
this.state = states.CLOSING; | ||
return this.cancel() | ||
.then(() => this._close()) | ||
.then(() => { | ||
this.state = states.CLOSED; | ||
}); | ||
} | ||
} | ||
Client.prototype.close = function() { | ||
this.ws.forEach(cancel); | ||
}; | ||
module.exports.Client = Client; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Git dependency
Supply chain riskContains a dependency which resolves to a remote git URL. Dependencies fetched from git URLs are not immutable and can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
913
33702
4
7
1
+ Addedansi-regex@2.1.1(transitive)
+ Addedaproba@1.2.0(transitive)
+ Addedare-we-there-yet@1.1.7(transitive)
+ Addedbindings@1.3.1(transitive)
+ Addedbl@1.2.3(transitive)
+ Addedbuffer-alloc@1.2.0(transitive)
+ Addedbuffer-alloc-unsafe@1.1.0(transitive)
+ Addedbuffer-fill@1.0.0(transitive)
+ Addedbufferutil@3.0.5(transitive)
+ Addedchownr@1.1.4(transitive)
+ Addedcode-point-at@1.1.0(transitive)
+ Addedconsole-control-strings@1.1.0(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addeddecompress-response@3.3.0(transitive)
+ Addeddeep-extend@0.6.0(transitive)
+ Addeddelegates@1.0.0(transitive)
+ Addeddetect-libc@1.0.3(transitive)
+ Addedend-of-stream@1.4.4(transitive)
+ Addedexpand-template@1.1.1(transitive)
+ Addedfs-constants@1.0.0(transitive)
+ Addedgauge@2.7.4(transitive)
+ Addedgithub-from-package@0.0.0(transitive)
+ Addedhas-unicode@2.0.1(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedini@1.3.8(transitive)
+ Addedis-fullwidth-code-point@1.0.0(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedmimic-response@1.0.1(transitive)
+ Addedminimist@1.2.8(transitive)
+ Addedmkdirp@0.5.6(transitive)
+ Addednan@2.10.02.7.0(transitive)
+ Addednode-abi@2.30.1(transitive)
+ Addednoop-logger@0.1.1(transitive)
+ Addednpmlog@4.1.2(transitive)
+ Addednumber-is-nan@1.0.1(transitive)
+ Addedobject-assign@4.1.1(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedos-homedir@1.0.2(transitive)
+ Addedprebuild-install@2.3.04.0.0(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedpump@1.0.32.0.1(transitive)
+ Addedrc@1.2.8(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedsemver@5.7.2(transitive)
+ Addedset-blocking@2.0.0(transitive)
+ Addedsignal-exit@3.0.7(transitive)
+ Addedsimple-concat@1.0.1(transitive)
+ Addedsimple-get@1.4.32.8.2(transitive)
+ Addedstring-width@1.0.2(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedstrip-ansi@3.0.1(transitive)
+ Addedstrip-json-comments@2.0.1(transitive)
+ Addedtar-fs@1.16.4(transitive)
+ Addedtar-stream@1.6.2(transitive)
+ Addedto-buffer@1.1.1(transitive)
+ Addedtunnel-agent@0.6.0(transitive)
+ Addedunzip-response@1.0.2(transitive)
+ Addedutf-8-validate@3.0.4(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addedwhich-pm-runs@1.1.0(transitive)
+ Addedwide-align@1.1.5(transitive)
+ Addedwrappy@1.0.2(transitive)
+ Addedxtend@4.0.14.0.2(transitive)
- Removedbufferutil@^1.2.1
- Removedutf-8-validate@^1.2.1
- Removedbindings@1.2.1(transitive)
- Removedbufferutil@1.3.0(transitive)
- Removednan@2.4.0(transitive)
- Removedutf-8-validate@1.2.2(transitive)