Comparing version 0.8.1 to 0.8.2
module.exports = { | ||
env: { | ||
node: true, | ||
}, | ||
rules: { | ||
'comma-dangle': 0, | ||
'no-plusplus': 0, | ||
'no-cond-assign': 0, | ||
'no-continue': 0, | ||
'no-console': 0, | ||
'one-var': 0, | ||
'one-var-declaration-per-line': 0, | ||
'no-underscore-dangle': 0, | ||
}, | ||
extends: 'airbnb' | ||
'env': { | ||
'browser': true, | ||
'node': true, | ||
'es6': true | ||
}, | ||
'extends': 'eslint:recommended', | ||
'parserOptions': { | ||
'sourceType': 'module' | ||
}, | ||
'rules': { | ||
'indent': [ | ||
'error', | ||
'tab' | ||
], | ||
'linebreak-style': [ | ||
'error', | ||
'unix' | ||
], | ||
'quotes': [ | ||
'error', | ||
'single' | ||
], | ||
'semi': [ | ||
'error', | ||
'always' | ||
] | ||
} | ||
}; |
@@ -7,3 +7,3 @@ /* eslint no-console: [0]*/ | ||
const host = process.env.SONIC_HOST || 'wss://0.0.0.0:443'; | ||
const host = process.env.SONIC_HOST || 'ws://0.0.0.0:9111'; | ||
const API_KEY = '1234'; | ||
@@ -13,9 +13,5 @@ const USER = 'serrallonga'; | ||
const opt = { | ||
maxPoolSize: 5, /* max connection pool size */ | ||
minPoolSize: 1, /* min connection pool size */ | ||
maxTries: 1, /* max create connection attempts before bubbling up error */ | ||
autostart: true, /* should the Client start creating connections once the constructor is called */ | ||
validate: true, /* should the Client validate that connections are responsive before using them */ | ||
validateTimeout: 2000, /* new connection validation timeout */ | ||
acquireTimeout: 3000, /* new connection total acquisition timeout */ | ||
maxPoolSize: 5, /* max connection pool size */ | ||
minPoolSize: 1, /* min connection pool size */ | ||
acquireTimeout: 3000, /* new connection total acquisition timeout */ | ||
}; | ||
@@ -26,13 +22,13 @@ | ||
const query = { | ||
query: '10', | ||
config: { | ||
class: 'SyntheticSource', | ||
seed: 1000, | ||
'progress-delay': 10 | ||
} | ||
query: '10', | ||
config: { | ||
class: 'SyntheticSource', | ||
seed: 1000, | ||
'progress-delay': 10 | ||
} | ||
}; | ||
const query2 = { | ||
query: '5', | ||
config: 'secured_test' | ||
query: '5', | ||
config: 'secured_test' | ||
}; | ||
@@ -48,20 +44,20 @@ | ||
stream.on('data', (data) => { | ||
console.log(data); | ||
console.log(data); | ||
}); | ||
stream.on('progress', (p) => { | ||
done += p.progress; | ||
console.log(`running.. ${done}/${p.total} ${p.units}`); | ||
done += p.progress; | ||
console.log(`running.. ${done}/${p.total} ${p.units}`); | ||
}); | ||
stream.on('metadata', (meta) => { | ||
console.log(`metadata: ${JSON.stringify(meta)}`); | ||
console.log(`metadata: ${JSON.stringify(meta)}`); | ||
}); | ||
stream.on('done', () => { | ||
console.log('stream is done!'); | ||
console.log('stream is done!'); | ||
}); | ||
stream.on('error', (err) => { | ||
console.log(`stream error: ${err}`); | ||
console.log(`stream error: ${err}`); | ||
}); | ||
@@ -73,12 +69,12 @@ | ||
client.run(query, (err, res) => { | ||
if (err) { | ||
console.log(err); | ||
return; | ||
} | ||
if (err) { | ||
console.log(err); | ||
return; | ||
} | ||
res.forEach((e) => { | ||
console.log(e); | ||
}); | ||
res.forEach((e) => { | ||
console.log(e); | ||
}); | ||
console.log('exec is done!'); | ||
console.log('exec is done!'); | ||
}); | ||
@@ -90,7 +86,7 @@ | ||
client.run(query2, (err) => { | ||
assert.throws(() => { | ||
if (err) { | ||
throw err; | ||
} | ||
}); | ||
assert.throws(() => { | ||
if (err) { | ||
throw err; | ||
} | ||
}); | ||
}); | ||
@@ -103,24 +99,24 @@ | ||
client.authenticate(USER, API_KEY, (err, token) => { | ||
if (err) { | ||
throw err; | ||
} | ||
if (err) { | ||
throw err; | ||
} | ||
query2.auth = token; | ||
query2.auth = token; | ||
client.run(query2, (qerr, res) => { | ||
if (qerr) { | ||
throw qerr; | ||
} | ||
client.run(query2, (qerr, res) => { | ||
if (qerr) { | ||
throw qerr; | ||
} | ||
res.forEach((e) => { | ||
console.log(e); | ||
}); | ||
res.forEach((e) => { | ||
console.log(e); | ||
}); | ||
console.log('secured exec is done!'); | ||
console.log('secured exec is done!'); | ||
// close ws | ||
client.close() | ||
client.close() | ||
.then(() => console.log('released all resources')) | ||
.catch(error => console.error(error)); | ||
}); | ||
}); | ||
}); |
{ | ||
"name": "sonic-js", | ||
"version": "0.8.1", | ||
"version": "0.8.2", | ||
"description": "ws client library for the Sonic protocol", | ||
@@ -13,3 +13,3 @@ "main": "src/lib.js", | ||
"dependencies": { | ||
"generic-pool": "git://github.com/Aleksandras-Novikovas/node-pool.git#59889c4b0adb5dea839a951fb351ff5ee6045768", | ||
"generic-pool": "^2.5.4", | ||
"ws": "^1.1.0" | ||
@@ -16,0 +16,0 @@ }, |
@@ -18,3 +18,3 @@ # Sonic-js [![Build Status](https://travis-ci.org/xarxa6/sonic-js.svg)](https://travis-ci.org/xarxa6/sonic-js) [![npm version](https://badge.fury.io/js/sonic-js.svg)](https://badge.fury.io/js/sonic-js) | ||
var client = new Client('wss://0.0.0.0:443'); | ||
var client = new Client('ws://0.0.0.0:9111'); | ||
@@ -21,0 +21,0 @@ var query = { |
@@ -5,74 +5,74 @@ /* eslint-env node, mocha */ | ||
const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`; | ||
const sonicEndpoint = `${process.env.SONIC_HOST || 'ws://0.0.0.0:9111'}/v1/query`; | ||
describe('Client#close', () => { | ||
it('should cancel all ongoing queries', () => { | ||
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, }); | ||
const q = { | ||
query: '100', | ||
'progress-delay': 1000, | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
it('should cancel all ongoing queries', () => { | ||
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, }); | ||
const q = { | ||
query: '100', | ||
'progress-delay': 1000, | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
const queries = [q, q, q]; | ||
const queries = [q, q, q]; | ||
const progress = Promise.all(queries.map(client.run2.bind(client))); | ||
let closeProm; | ||
const progress = Promise.all(queries.map(client.run2.bind(client))); | ||
let closeProm; | ||
const timer = setInterval(() => { | ||
const timer = setInterval(() => { | ||
// wait until all queries are running | ||
if (Object.keys(client.running).length === queries.length) { | ||
closeProm = client.close(); | ||
clearInterval(timer); | ||
} | ||
}, 100); | ||
if (Object.keys(client.running).length === queries.length) { | ||
closeProm = client.close(); | ||
clearInterval(timer); | ||
} | ||
}, 100); | ||
return progress.then((data) => { | ||
assert.equal(data.reduce((a, d) => a + d.length, 0), 0); | ||
}).then(() => closeProm); | ||
}); | ||
return progress.then((data) => { | ||
assert.equal(data.reduce((a, d) => a + d.length, 0), 0); | ||
}).then(() => closeProm); | ||
}); | ||
it('should not allow more queries to be submitted', () => { | ||
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, }); | ||
const q = { | ||
query: '100', | ||
'progress-delay': 1000, | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
it('should not allow more queries to be submitted', () => { | ||
const client = new Client(sonicEndpoint, { maxPoolSize: 5, minPoolSize: 5, }); | ||
const q = { | ||
query: '100', | ||
'progress-delay': 1000, | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
const queries = [q, q, q]; | ||
const queries = [q, q, q]; | ||
const closeProm = client.close(); | ||
const closeProm = client.close(); | ||
return Promise.all(queries.map(client.run2.bind(client))).then((data) => { | ||
throw new Error(`not bubble up error: ${JSON.stringify(data)}`); | ||
}).catch((err) => { | ||
assert(err.toString().indexOf('closing') >= 0, err.toString()); | ||
assert.equal(Object.keys(client.running).length, 0); | ||
}).then(() => closeProm); | ||
}); | ||
return Promise.all(queries.map(client.run2.bind(client))).then((data) => { | ||
throw new Error(`not bubble up error: ${JSON.stringify(data)}`); | ||
}).catch((err) => { | ||
assert(err.toString().indexOf('closing') >= 0, err.toString()); | ||
assert.equal(Object.keys(client.running).length, 0); | ||
}).then(() => closeProm); | ||
}); | ||
}); | ||
describe('stream#cancel', () => { | ||
it('should cancel query', (done) => { | ||
const client = new Client(sonicEndpoint); | ||
const query = { | ||
query: '100', | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
let cancelCb = false; | ||
const emitter = client.stream(query); | ||
it('should cancel query', (done) => { | ||
const client = new Client(sonicEndpoint); | ||
const query = { | ||
query: '100', | ||
config: { class: 'SyntheticSource' } | ||
}; | ||
let cancelCb = false; | ||
const emitter = client.stream(query); | ||
emitter.on('done', () => { | ||
assert.equal(Object.keys(client.running).length, 0); | ||
assert(cancelCb, 'cancelCb was not called!'); | ||
done(); | ||
}); | ||
emitter.on('error', (err) => { | ||
done(err); | ||
}); | ||
emitter.cancel(() => { | ||
cancelCb = true; | ||
}); | ||
}); | ||
emitter.on('done', () => { | ||
assert.equal(Object.keys(client.running).length, 0); | ||
assert(cancelCb, 'cancelCb was not called!'); | ||
done(); | ||
}); | ||
emitter.on('error', (err) => { | ||
done(err); | ||
}); | ||
emitter.cancel(() => { | ||
cancelCb = true; | ||
}); | ||
}); | ||
}); |
234
spec/util.js
const assert = require('chai').assert; | ||
module.exports.testHappyPathSingle = (client, query, n, done) => { | ||
client.run(query, (err, data) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
if (data.length !== n) { | ||
done(new Error(`data was not ${n}`)); | ||
return; | ||
} | ||
done(); | ||
}); | ||
client.run(query, (err, data) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
if (data.length !== n) { | ||
done(new Error(`data was not ${n}`)); | ||
return; | ||
} | ||
done(); | ||
}); | ||
}; | ||
module.exports.testHappyPath = (client, query, n, _done) => { | ||
let count = 0; | ||
let ran, streamed, doned = false; | ||
let traceId; | ||
if (!_done) { | ||
throw new Error('needs _done to hook with test driver'); | ||
} | ||
let count = 0; | ||
let ran, streamed, doned = false; | ||
let traceId; | ||
if (!_done) { | ||
throw new Error('needs _done to hook with test driver'); | ||
} | ||
const debugTimer = setTimeout(() => { | ||
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`); | ||
}, 3000); | ||
const debugTimer = setTimeout(() => { | ||
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`); | ||
}, 3000); | ||
const done = (e) => { | ||
const done = (e) => { | ||
// guard against multiple calls to done() with err | ||
if (!doned) { | ||
doned = true; | ||
clearTimeout(debugTimer); | ||
_done(e); | ||
} | ||
}; | ||
if (!doned) { | ||
doned = true; | ||
clearTimeout(debugTimer); | ||
_done(e); | ||
} | ||
}; | ||
client.run(query, (err, data) => { | ||
ran = true; | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert(data.length === n, `data was not${n}`); | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
}); | ||
client.run(query, (err, data) => { | ||
ran = true; | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert(data.length === n, `data was not${n}`); | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
}); | ||
const stream = client.stream(query); | ||
const stream = client.stream(query); | ||
stream.on('started', (id) => { | ||
traceId = id; | ||
}); | ||
stream.on('started', (id) => { | ||
traceId = id; | ||
}); | ||
stream.on('error', (err) => { | ||
if (err) { | ||
done(err); | ||
} else { | ||
done(new Error('error emitted but no error returned!')); | ||
} | ||
}); | ||
stream.on('error', (err) => { | ||
if (err) { | ||
done(err); | ||
} else { | ||
done(new Error('error emitted but no error returned!')); | ||
} | ||
}); | ||
stream.on('done', (err) => { | ||
streamed = true; | ||
assert(typeof traceId !== 'undefined', 'traceId is undefined in stream done callback on `testHappyPath` test'); | ||
if (err) { | ||
done(err); | ||
} else if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
}); | ||
stream.on('done', (err) => { | ||
streamed = true; | ||
assert(typeof traceId !== 'undefined', 'traceId is undefined in stream done callback on `testHappyPath` test'); | ||
if (err) { | ||
done(err); | ||
} else if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
}); | ||
}; | ||
module.exports.expectError = (client, query, _done) => { | ||
let count = 0; | ||
let ran, streamed, doned = false; | ||
let count = 0; | ||
let ran, streamed, doned = false; | ||
if (!_done) { | ||
throw new Error('needs _done to hook with test driver'); | ||
} | ||
if (!_done) { | ||
throw new Error('needs _done to hook with test driver'); | ||
} | ||
const debugTimer = setTimeout(() => { | ||
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`); | ||
}, 2000); | ||
const debugTimer = setTimeout(() => { | ||
console.log(`count=${count}; run=${ran}; stream=${streamed}; doned=${doned}`); | ||
}, 2000); | ||
const done = (e) => { | ||
const done = (e) => { | ||
// guard against multiple calls to done() with err | ||
if (!doned) { | ||
doned = true; | ||
clearTimeout(debugTimer); | ||
_done(e); | ||
} | ||
}; | ||
if (!doned) { | ||
doned = true; | ||
clearTimeout(debugTimer); | ||
_done(e); | ||
} | ||
}; | ||
client.run(query, (err) => { | ||
ran = true; | ||
if (err) { | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
} else { | ||
done(new Error('expected error but no error returned')); | ||
} | ||
}); | ||
client.run(query, (err) => { | ||
ran = true; | ||
if (err) { | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
} else { | ||
done(new Error('expected error but no error returned')); | ||
} | ||
}); | ||
const stream = client.stream(query); | ||
const stream = client.stream(query); | ||
stream.on('done', () => { | ||
done(new Error('stream emitted `done` but `error` expected')); | ||
}); | ||
stream.on('done', () => { | ||
done(new Error('stream emitted `done` but `error` expected')); | ||
}); | ||
stream.on('error', (err) => { | ||
if (err) { | ||
streamed = true; | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
} else { | ||
done(new Error('expected error but no error returned')); | ||
} | ||
}); | ||
stream.on('error', (err) => { | ||
if (err) { | ||
streamed = true; | ||
if (count === 1) { | ||
done(); | ||
} else { | ||
count += 1; | ||
} | ||
} else { | ||
done(new Error('expected error but no error returned')); | ||
} | ||
}); | ||
}; | ||
module.exports.doAuthenticate = (client, done, apiKeyMaybe) => { | ||
const apiKey = apiKeyMaybe || '1234'; | ||
client.authenticate('spec_tests', apiKey, (err, token) => { | ||
if (err) { | ||
done(new Error('failed to authenticate')); | ||
return; | ||
} | ||
const apiKey = apiKeyMaybe || '1234'; | ||
client.authenticate('spec_tests', apiKey, (err, token) => { | ||
if (err) { | ||
done(new Error('failed to authenticate')); | ||
return; | ||
} | ||
if (token) { | ||
done(null, token); | ||
} else { | ||
done(new Error('protocol error: no token received from server')); | ||
} | ||
}); | ||
if (token) { | ||
done(null, token); | ||
} else { | ||
done(new Error('protocol error: no token received from server')); | ||
} | ||
}); | ||
}; |
346
spec/ws.js
@@ -6,3 +6,3 @@ /* eslint-env node, mocha */ | ||
const sonicEndpoint = `${process.env.SONIC_HOST || 'wss://0.0.0.0:443'}/v1/query`; | ||
const sonicEndpoint = `${process.env.SONIC_HOST || 'ws://0.0.0.0:9111'}/v1/query`; | ||
@@ -12,213 +12,213 @@ let token; | ||
function runSpecTests(client, id) { | ||
it(`${id} - should be able to run a simple query and stream the data back from the server`, (done) => { | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
seed: 1000, | ||
'progress-delay': 10 | ||
} | ||
}; | ||
it(`${id} - should be able to run a simple query and stream the data back from the server`, (done) => { | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
seed: 1000, | ||
'progress-delay': 10 | ||
} | ||
}; | ||
util.testHappyPath(client, query, 5, done); | ||
}); | ||
util.testHappyPath(client, query, 5, done); | ||
}); | ||
it(`${id} - should return an error if source class is unknown`, (done) => { | ||
const query = { | ||
query: '1', | ||
auth: token, | ||
trace_id: 'ballz0', | ||
config: { | ||
class: 'UnknownClass' | ||
} | ||
}; | ||
it(`${id} - should return an error if source class is unknown`, (done) => { | ||
const query = { | ||
query: '1', | ||
auth: token, | ||
trace_id: 'ballz0', | ||
config: { | ||
class: 'UnknownClass' | ||
} | ||
}; | ||
util.expectError(client, query, done); | ||
}); | ||
util.expectError(client, query, done); | ||
}); | ||
it(`${id} - should return an error if query is null`, (done) => { | ||
const query = { | ||
query: null, | ||
auth: token, | ||
trace_id: 'ballz1', | ||
config: { | ||
class: 'SyntheticSource' | ||
} | ||
}; | ||
it(`${id} - should return an error if query is null`, (done) => { | ||
const query = { | ||
query: null, | ||
auth: token, | ||
trace_id: 'ballz1', | ||
config: { | ||
class: 'SyntheticSource' | ||
} | ||
}; | ||
util.expectError(client, query, done); | ||
}); | ||
util.expectError(client, query, done); | ||
}); | ||
it(`${id} - should return an error if config is null`, (done) => { | ||
const query = { | ||
query: '1', | ||
trace_id: 'ballz2', | ||
auth: token, | ||
config: null | ||
}; | ||
it(`${id} - should return an error if config is null`, (done) => { | ||
const query = { | ||
query: '1', | ||
trace_id: 'ballz2', | ||
auth: token, | ||
config: null | ||
}; | ||
util.expectError(client, query, done); | ||
}); | ||
util.expectError(client, query, done); | ||
}); | ||
it(`${id} - should return an error if source publisher completes stream with exception`, (done) => { | ||
const query = { | ||
it(`${id} - should return an error if source publisher completes stream with exception`, (done) => { | ||
const query = { | ||
// signals source to throw expected exception | ||
query: '28', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource' | ||
} | ||
}; | ||
query: '28', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource' | ||
} | ||
}; | ||
util.expectError(client, query, done); | ||
}); | ||
util.expectError(client, query, done); | ||
}); | ||
it(`${id} - should return an error if source throws an exception and terminates`, (done) => { | ||
const query = { | ||
it(`${id} - should return an error if source throws an exception and terminates`, (done) => { | ||
const query = { | ||
// signals source to throw unexpected exception | ||
query: '-1', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource' | ||
} | ||
}; | ||
query: '-1', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource' | ||
} | ||
}; | ||
util.expectError(client, query, done); | ||
}); | ||
util.expectError(client, query, done); | ||
}); | ||
it(`${id} - should stream a big payload correctly`, function (done) { | ||
this.timeout(6000); | ||
let q = ''; | ||
let i = 0; | ||
while (i < 10000) { | ||
q += 'aweqefekwljflwekfjkelwfjlwekjfeklwjflwekjfeklwjfeklfejklfjewlkfejwklw'; | ||
i += 1; | ||
} | ||
it(`${id} - should stream a big payload correctly`, function (done) { | ||
this.timeout(6000); | ||
let q = ''; | ||
let i = 0; | ||
while (i < 10000) { | ||
q += 'aweqefekwljflwekfjkelwfjlwekjfeklwjflwekjfeklwjfeklfejklfjewlkfejwklw'; | ||
i += 1; | ||
} | ||
const query = { | ||
query: q, | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
'progress-delay': 0, | ||
size: 5 | ||
} | ||
}; | ||
const query = { | ||
query: q, | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
'progress-delay': 0, | ||
size: 5 | ||
} | ||
}; | ||
util.testHappyPath(client, query, 5, done); | ||
}); | ||
util.testHappyPath(client, query, 5, done); | ||
}); | ||
} | ||
describe('Sonic ws', () => { | ||
const client = new Client(sonicEndpoint); | ||
const client = new Client(sonicEndpoint); | ||
runSpecTests(client, 'unauthenticated'); | ||
runSpecTests(client, 'unauthenticated'); | ||
it('should return an error if source requires authentication and user is unauthenticated', (done) => { | ||
const query = { | ||
query: '1', | ||
it('should return an error if source requires authentication and user is unauthenticated', (done) => { | ||
const query = { | ||
query: '1', | ||
// tipically set server side, but also | ||
// valid to be passed client side | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 2 | ||
} | ||
}; | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 2 | ||
} | ||
}; | ||
util.expectError(client, query, done); | ||
}); | ||
util.expectError(client, query, done); | ||
}); | ||
describe('Sonic ws auth', () => { | ||
it('should throw an error if api key is invalid', (done) => { | ||
client.authenticate('spec_tests', 'mariano', (err) => { | ||
if (err) { | ||
done(); | ||
} else { | ||
done(new Error('did not return error on invalid api key')); | ||
} | ||
}); | ||
}); | ||
describe('Sonic ws auth', () => { | ||
it('should throw an error if api key is invalid', (done) => { | ||
client.authenticate('spec_tests', 'mariano', (err) => { | ||
if (err) { | ||
done(); | ||
} else { | ||
done(new Error('did not return error on invalid api key')); | ||
} | ||
}); | ||
}); | ||
it('should authenticate user', (done) => { | ||
util.doAuthenticate(client, (err, token) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('should authenticate user', (done) => { | ||
util.doAuthenticate(client, (err, token) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
done(); | ||
}); | ||
}); | ||
}); | ||
runSpecTests(new Client(sonicEndpoint, { maxPoolSize: 1, minPoolSize: 1, maxTries: 1 }), 'single connection'); | ||
runSpecTests(new Client(sonicEndpoint, { maxPoolSize: 1, minPoolSize: 1, }), 'single connection'); | ||
describe('Sonic ws with authentication', () => { | ||
const authenticated = new Client(sonicEndpoint); | ||
describe('Sonic ws with authentication', () => { | ||
const authenticated = new Client(sonicEndpoint); | ||
before((done) => { | ||
util.doAuthenticate(authenticated, (err, t) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
token = t; | ||
done(); | ||
}); | ||
}); | ||
before((done) => { | ||
util.doAuthenticate(authenticated, (err, t) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
token = t; | ||
done(); | ||
}); | ||
}); | ||
after((done) => { | ||
authenticated.close(); | ||
done(); | ||
}); | ||
after((done) => { | ||
authenticated.close(); | ||
done(); | ||
}); | ||
// client is authenticated | ||
runSpecTests(authenticated, 'authenticated'); | ||
runSpecTests(authenticated, 'authenticated'); | ||
it('should allow an authenticated and authorized user to run a query on a secure source', (done) => { | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 1 | ||
} | ||
}; | ||
it('should allow an authenticated and authorized user to run a query on a secure source', (done) => { | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 1 | ||
} | ||
}; | ||
util.testHappyPath(authenticated, query, 5, done); | ||
}); | ||
util.testHappyPath(authenticated, query, 5, done); | ||
}); | ||
it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', (done) => { | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 2000 | ||
} | ||
}; | ||
it('should return error if an authenticated user but unauthorized user tries to run a query on a secured source', (done) => { | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 2000 | ||
} | ||
}; | ||
util.expectError(authenticated, query, done); | ||
}); | ||
util.expectError(authenticated, query, done); | ||
}); | ||
it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', (done) => { | ||
util.doAuthenticate(authenticated, (err, token) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
it('should return error if an authenticated and authorized user from not a whitelisted IP tries to run a query on a secured source', (done) => { | ||
util.doAuthenticate(authenticated, (err, token) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 1 | ||
} | ||
}; | ||
util.expectError(authenticated, query, done); | ||
}, 'only_from_ip'); // check server's reference.conf | ||
}); | ||
}); | ||
const query = { | ||
query: '5', | ||
auth: token, | ||
config: { | ||
class: 'SyntheticSource', | ||
security: 1 | ||
} | ||
}; | ||
util.expectError(authenticated, query, done); | ||
}, 'only_from_ip'); // check server's reference.conf | ||
}); | ||
}); | ||
}); |
684
src/lib.js
const BrowserWebSocket = global.MozWebSocket || global.WebSocket; | ||
const WebSocket = BrowserWebSocket || require('ws'); // eslint-disable-line global-require | ||
const EventEmitter = require('events'); | ||
const WsFactory = require('./WsFactory'); | ||
const utils = require('./util'); | ||
const pool = require('generic-pool'); | ||
const Pool = require('generic-pool').Pool; | ||
@@ -13,439 +15,375 @@ const SonicMessage = utils.SonicMessage; | ||
if (BrowserWebSocket) { | ||
WebSocket.prototype.on = function on(event, callback) { | ||
this.addEventListener(event, callback); | ||
}; | ||
WebSocket.prototype.on = function on(event, callback) { | ||
this.addEventListener(event, callback); | ||
}; | ||
WebSocket.prototype.removeListener = function removeListener(event, callback) { | ||
this.removeEventListener(event, callback); | ||
}; | ||
WebSocket.prototype.removeListener = function removeListener(event, callback) { | ||
this.removeEventListener(event, callback); | ||
}; | ||
} | ||
class SonicEmitter extends EventEmitter {} | ||
function getCloseError(ev) { | ||
return new Error(`WebSocket close: code=${ev.code}; reason=${ev.reason}`); | ||
} | ||
const states = { | ||
INITIALIZED: 1, | ||
CLOSING: 2, | ||
CLOSED: 3, | ||
INITIALIZED: 1, | ||
CLOSING: 2, | ||
CLOSED: 3, | ||
}; | ||
class Client { | ||
constructor(sonicAddress, | ||
{ maxPoolSize, minPoolSize, debug, autostart, validate, | ||
validateTimeout, maxTries, acquireTimeout } = {}) { | ||
this.url = sonicAddress; | ||
this.running = {}; | ||
this.nextId = 1; | ||
this.state = states.INITIALIZED; | ||
this.debug = debug; | ||
this.validateTimeout = validateTimeout || 2000; | ||
this.maxPoolSize = maxPoolSize || 5; | ||
this.minPoolSize = minPoolSize || 1; | ||
this.maxTries = maxTries || 1; | ||
/* browsers WebSocket API don't support sending ping/pong */ | ||
this.validate = (!BrowserWebSocket && typeof validate !== 'undefined') ? validate : false; | ||
this.autostart = typeof autostart === 'undefined' ? true : autostart; | ||
this.acquireTimeout = acquireTimeout || 3000; | ||
this._initializePool(); | ||
} | ||
constructor(sonicAddress, { maxPoolSize, minPoolSize, debug, acquireTimeout } = {}) { | ||
this.url = sonicAddress; | ||
this.running = {}; | ||
this.nextId = 1; | ||
this.state = states.INITIALIZED; | ||
this.debug = debug; | ||
this.maxPoolSize = maxPoolSize || 5; | ||
this.minPoolSize = minPoolSize || 1; | ||
this.acquireTimeout = acquireTimeout || 3000; | ||
this._initializePool(); | ||
} | ||
_initializePool() { | ||
const client = this; | ||
const poolOpts = { | ||
max: this.maxPoolSize, // maximum size of the pool | ||
min: this.minPoolSize, // minimum size of the pool | ||
maxTries: this.maxTries, | ||
autostart: this.autostart, | ||
testOnBorrow: this.validate, | ||
}; | ||
_log(log) { | ||
if (typeof this.debug === 'function') { | ||
this.debug(log); | ||
} else if (this.debug) { | ||
console.log(log); // eslint-disable-line no-console | ||
} | ||
} | ||
const WsFactory = { | ||
create() { | ||
return new Promise((resolve, reject) => { | ||
const ws = new WebSocket(client.url); | ||
let onOpen, onClose; | ||
_initializePool() { | ||
const factory = new WsFactory(WebSocket, this.url); | ||
const poolOpts = { | ||
name: 'sonic', | ||
create: factory.create.bind(factory), | ||
validate: factory.validate.bind(factory), | ||
destroy: factory.destroy.bind(factory), | ||
max: this.maxPoolSize, // maximum size of the pool | ||
min: this.minPoolSize, // minimum size of the pool | ||
log: this.debug, | ||
}; | ||
const onError = (err) => { | ||
ws.removeListener('open', onOpen); | ||
ws.removeListener('close', onClose); | ||
reject(err); | ||
}; | ||
onOpen = () => { | ||
ws.removeListener('error', onError); | ||
ws.removeListener('close', onClose); | ||
resolve(ws); | ||
}; | ||
this.pool = new Pool(poolOpts); | ||
} | ||
onClose = (ev) => { | ||
ws.removeListener('error', onError); | ||
ws.removeListener('open', onOpen); | ||
reject(getCloseError(ev)); | ||
}; | ||
_cancel(id, _cb) { | ||
const ws = this.running[id]; | ||
const cb = typeof _cb === 'function' ? _cb : noop; | ||
ws.on('open', onOpen); | ||
ws.on('close', onClose); | ||
ws.on('error', onError); | ||
}); | ||
}, | ||
this._log(`cancelling: WebSocket(${id})`); | ||
validate(ws) { | ||
return new Promise((resolve) => { | ||
let onPong; | ||
/* a 'D' message is expected when a cancel is send, | ||
* therefore resource cleanup should be handled by done handler */ | ||
if (!ws) { | ||
this._log(new Error(`cancel: WebSocket(${id}) is not running any queries`)); | ||
cb(); | ||
return; | ||
} | ||
const timer = setTimeout(() => { | ||
resolve(false); | ||
ws.removeListener('pong', onPong); | ||
}, this.validateTimeout); | ||
const doSend = () => { | ||
if (!BrowserWebSocket) { | ||
ws.send(SonicMessage.CANCEL, () => { | ||
this._log(`cancelled: WebSocket(${id})`); | ||
cb(); | ||
}); | ||
return; | ||
} | ||
onPong = () => { | ||
clearTimeout(timer); | ||
resolve(true); | ||
}; | ||
try { | ||
ws.send(SonicMessage.CANCEL); | ||
this._log(`cancelled: WebSocket(${id})`); | ||
cb(); | ||
} catch (e) { | ||
this._log(e); | ||
cb(e); | ||
} | ||
}; | ||
ws.on('pong', onPong); | ||
if (ws.readyState === WebSocket.CONNECTING) { | ||
ws.on('open', doSend); | ||
} if (ws.readyState === WebSocket.OPEN) { | ||
doSend(); | ||
} else { | ||
// connection is CLOSING/CLOSED | ||
cb(); | ||
} | ||
} | ||
try { | ||
ws.ping(); | ||
} catch (e) { | ||
resolve(false); | ||
} | ||
}); | ||
}, | ||
/* ws client agnostic send method */ | ||
_wsSend(message, doneCb, outputCb, progressCb, metadataCb, startedCb, ws) { | ||
const output = outputCb || noop; | ||
const progress = progressCb || noop; | ||
const metadata = metadataCb || noop; | ||
let onError, onClose, onMessage; | ||
destroy(ws) { | ||
return new Promise((resolve) => { | ||
ws.on('close', () => { | ||
resolve(); | ||
}); | ||
ws.close(1000, 'pool#destroy'); | ||
}); | ||
}, | ||
}; | ||
const done = (err) => { | ||
ws.removeListener('close', onClose); | ||
ws.removeListener('error', onError); | ||
ws.removeListener('message', onMessage); | ||
doneCb(err); | ||
}; | ||
this.pool = pool.createPool(WsFactory, poolOpts); | ||
} | ||
onClose = (ev) => { | ||
this._log(`WebSocket closed: code=${ev.code}; reason=${ev.reason};`); | ||
if (!ws.sonicError) { | ||
done(utils.getCloseError(ev)); | ||
} else { | ||
done(ws.sonicError); | ||
} | ||
}; | ||
_cancel(id, _cb) { | ||
const ws = this.running[id]; | ||
const cb = typeof _cb === 'function' ? _cb : noop; | ||
onError = (err) => { | ||
this._log(err); | ||
// err is defined with `ws`, but not with the | ||
// browser's WebSocket API so we need to get the errors from the close event | ||
ws.sonicError = err; // eslint-disable-line no-param-reassign | ||
}; | ||
if (this.debug) console.log(`cancelling: WebSocket(${id})`); | ||
onMessage = (_message) => { | ||
const msg = BrowserWebSocket ? JSON.parse(_message.data) : JSON.parse(_message.toString('utf-8')); | ||
const completeStream = () => { | ||
this._log(`completed complete/ack sequence: trace_id=${msg.p.trace_id}; error=${msg.v};`); | ||
if (msg.v) { | ||
done(new Error(`query with trace_id \`${msg.p.trace_id}\` failed: ${msg.v}`)); | ||
} else { | ||
done(null); | ||
} | ||
}; | ||
/* a 'D' message is expected when a cancel is send, | ||
* therefore resource cleanup should be handled by done handler | ||
*/ | ||
if (!ws) { | ||
if (this.debug) console.error(new Error(`cancel: WebSocket(${id}) is not running any queries`)); | ||
cb(); | ||
return; | ||
} | ||
switch (msg.e) { | ||
case 'P': | ||
progress(utils.toProgress(msg.p)); | ||
break; | ||
const doSend = () => { | ||
if (!BrowserWebSocket) { | ||
ws.send(SonicMessage.CANCEL, () => { | ||
if (this.debug) console.log(`cancelled: WebSocket(${id})`); | ||
cb(); | ||
}); | ||
return; | ||
} | ||
case 'D': | ||
if (BrowserWebSocket) { | ||
ws.send(SonicMessage.ACK); | ||
completeStream(); | ||
} else { | ||
ws.send(SonicMessage.ACK, completeStream); | ||
} | ||
break; | ||
try { | ||
ws.send(SonicMessage.CANCEL); | ||
if (this.debug) console.log(`cancelled: WebSocket(${id})`); | ||
cb(); | ||
} catch (e) { | ||
if (this.debug) console.error(e); | ||
cb(e); | ||
} | ||
}; | ||
case 'T': | ||
metadata(msg.p.map(elem => [elem[0], typeof elem[1]])); | ||
break; | ||
if (ws.readyState === WebSocket.CONNECTING) { | ||
ws.on('open', doSend); | ||
} if (ws.readyState === WebSocket.OPEN) { | ||
doSend(); | ||
} else { | ||
// connection is CLOSING/CLOSED | ||
cb(); | ||
} | ||
} | ||
case 'S': | ||
if (typeof startedCb !== 'undefined') { | ||
startedCb(msg.v); | ||
} | ||
break; | ||
/* ws client agnostic send method */ | ||
_wsSend(message, doneCb, outputCb, progressCb, metadataCb, startedCb, ws) { | ||
const output = outputCb || noop; | ||
const progress = progressCb || noop; | ||
const metadata = metadataCb || noop; | ||
let onError, onClose, onMessage; | ||
case 'O': | ||
output(msg.p); | ||
break; | ||
const done = (err) => { | ||
ws.removeListener('close', onClose); | ||
ws.removeListener('error', onError); | ||
ws.removeListener('message', onMessage); | ||
doneCb(err); | ||
}; | ||
default: | ||
this._log(`unsupported message received: ${JSON.stringify(msg)}`); | ||
break; | ||
} | ||
}; | ||
onClose = (ev) => { | ||
if (this.debug) console.log(`WebSocket closed: code=${ev.code}; reason=${ev.reason};`); | ||
if (!ws.sonicError) { | ||
done(getCloseError(ev)); | ||
} else { | ||
done(ws.sonicError); | ||
} | ||
}; | ||
ws.on('close', onClose); | ||
ws.on('error', onError); | ||
ws.on('message', onMessage); | ||
ws.send(message); | ||
this._log(`sending message: ${JSON.stringify(message)}`); | ||
} | ||
onError = (err) => { | ||
if (this.debug) console.error(err); | ||
// err is defined with `ws`, but not with the | ||
// browser's WebSocket API so we need to get the errors from the close event | ||
ws.sonicError = err; // eslint-disable-line no-param-reassign | ||
}; | ||
/* pool-aware send method */ | ||
_send(message, doneCb, outputCb, progressCb, metadataCb, startedCb) { | ||
switch (this.state) { | ||
case states.CLOSED: | ||
doneCb(new Error('client is closed and cannot accept more work')); | ||
return; | ||
case states.CLOSING: | ||
doneCb(new Error('client is closing and cannot accept more work')); | ||
return; | ||
default: | ||
} | ||
onMessage = (_message) => { | ||
const msg = BrowserWebSocket ? JSON.parse(_message.data) : JSON.parse(_message.toString('utf-8')); | ||
const completeStream = () => { | ||
if (this.debug) console.log(`completed complete/ack sequence: trace_id=${msg.p.trace_id}; error=${msg.v};`); | ||
if (msg.v) { | ||
done(new Error(`query with trace_id \`${msg.p.trace_id}\` failed: ${msg.v}`)); | ||
} else { | ||
done(null); | ||
} | ||
}; | ||
// identify send for cancel hooks | ||
const id = this.nextId++; | ||
let doned = false; | ||
switch (msg.e) { | ||
case 'P': | ||
progress(utils.toProgress(msg.p)); | ||
break; | ||
const timer = setTimeout(() => { | ||
const err = new Error(`its taking more than (${this.acquireTimeout}) to acquire resource for ticket=${id}`); | ||
this._log(err); | ||
doneCb(err); | ||
}, this.acquireTimeout); | ||
case 'D': | ||
if (BrowserWebSocket) { | ||
ws.send(SonicMessage.ACK); | ||
completeStream(); | ||
} else { | ||
ws.send(SonicMessage.ACK, completeStream); | ||
} | ||
break; | ||
// acquire connection | ||
this.pool.acquire((err, ws) => { | ||
clearTimeout(timer); | ||
case 'T': | ||
metadata(msg.p.map(elem => [elem[0], typeof elem[1]])); | ||
break; | ||
if (err) { | ||
doneCb(err); | ||
return; | ||
} | ||
case 'S': | ||
if (typeof startedCb !== 'undefined') { | ||
startedCb(msg.v); | ||
} | ||
break; | ||
this.running[id] = ws; | ||
case 'O': | ||
output(msg.p); | ||
break; | ||
if (this.debug) { | ||
if (!ws.sonicId) { | ||
/* first ticket for this resource is the resource ID */ | ||
ws.sonicId = id; // eslint-disable-line no-param-reassign | ||
} | ||
console.log(`acquired resource=${ws.sonicId} for ticket=${id}`); // eslint-disable-line no-console | ||
} | ||
default: | ||
if (this.debug) console.log(`unsupported message received: ${JSON.stringify(msg)}`); | ||
break; | ||
} | ||
}; | ||
// doneCb override to release connection back to pool | ||
const doDoneCb = (err) => { | ||
this._log(`done with ticket=${id}; resource=${ws.sonicId}`); | ||
if (!doned) { | ||
doned = true; | ||
delete this.running[id]; | ||
ws.on('close', onClose); | ||
ws.on('error', onError); | ||
ws.on('message', onMessage); | ||
ws.send(message); | ||
if (this.debug) console.log(`sending message: ${JSON.stringify(message)}`); | ||
} | ||
if (err) { | ||
this._log(`destroying resource=${ws.sonicId}; ticket=${id}`); | ||
/* pool.destroy produces unexpected results; this forces resourced to be invalid */ | ||
ws.close(1011, 'pool#destroy'); | ||
} else { | ||
this._log(`releasing resource=${ws.sonicId}; ticket=${id}`); | ||
} | ||
this.pool.release(ws); | ||
doneCb(err); | ||
} | ||
}; | ||
/* pool-aware send method */ | ||
_send(message, doneCb, outputCb, progressCb, metadataCb, startedCb) { | ||
switch (this.state) { | ||
case states.CLOSED: | ||
doneCb(new Error('client is closed and cannot accept more work')); | ||
return; | ||
case states.CLOSING: | ||
doneCb(new Error('client is closing and cannot accept more work')); | ||
return; | ||
default: | ||
} | ||
try { | ||
this._wsSend(JSON.stringify(message), doDoneCb, outputCb, progressCb, metadataCb, startedCb, ws); | ||
} catch (e) { | ||
this._log(e); | ||
doDoneCb(e); | ||
} | ||
}); | ||
// identify send for cancel hooks | ||
const id = this.nextId++; | ||
let doned = false; | ||
return id; // eslint-disable-line consistent-return | ||
} | ||
let timer; | ||
if (this.debug) { | ||
timer = setTimeout(() => | ||
console.log(`its taking more than (${this.acquireTimeout}) to acquire resource for ticket=${id}`), | ||
this.acquireTimeout); | ||
} | ||
stream(query) { | ||
const emitter = new EventEmitter(); | ||
const queryMsg = utils.toMsg(query); | ||
// acquire connection | ||
this.pool.acquire().then((ws) => { | ||
this.running[id] = ws; | ||
function done(err) { | ||
if (err) { | ||
emitter.emit('error', err); | ||
return; | ||
} | ||
if (this.debug) { | ||
if (!ws.sonicId) { | ||
/* first ticket for this resource is the resource ID */ | ||
ws.sonicId = id; // eslint-disable-line no-param-reassign | ||
} | ||
console.log(`acquired resource=${ws.sonicId} for ticket=${id}`); | ||
clearTimeout(timer); | ||
} | ||
emitter.emit('done'); | ||
} | ||
// doneCb override to release connection back to pool | ||
const doDoneCb = (err) => { | ||
if (this.debug) console.log(`done with ticket=${id}; resource=${ws.sonicId}`); | ||
if (!doned) { | ||
doned = true; | ||
delete this.running[id]; | ||
function output(elems) { | ||
emitter.emit('data', elems); | ||
} | ||
if (ws.sonicDestroy) { | ||
if (this.debug) console.log(`destroying resource=${ws.sonicId}; ticket=${id}`); | ||
this.pool.destroy(ws); | ||
} else { | ||
if (this.debug) console.log(`releasing resource=${ws.sonicId}; ticket=${id}`); | ||
this.pool.release(ws); | ||
} | ||
doneCb(err); | ||
} | ||
}; | ||
function metadata(meta) { | ||
emitter.emit('metadata', meta); | ||
} | ||
try { | ||
this._wsSend(JSON.stringify(message), doDoneCb, outputCb, progressCb, metadataCb, startedCb, ws); | ||
} catch (e) { | ||
if (this.debug) console.error(e); | ||
this.pool.release(ws); | ||
doDoneCb(e); | ||
} | ||
}).catch(doneCb); | ||
function progress(prog) { | ||
emitter.emit('progress', prog); | ||
} | ||
return id; // eslint-disable-line consistent-return | ||
} | ||
function started(traceId) { | ||
emitter.emit('started', traceId); | ||
} | ||
stream(query) { | ||
const emitter = new SonicEmitter(); | ||
const queryMsg = utils.toMsg(query); | ||
const id = this._send(queryMsg, done, output, progress, metadata, started); | ||
function done(err) { | ||
if (err) { | ||
emitter.emit('error', err); | ||
return; | ||
} | ||
emitter.cancel = (cb) => { | ||
this._cancel(id, cb); | ||
}; | ||
emitter.emit('done'); | ||
} | ||
return emitter; | ||
} | ||
function output(elems) { | ||
emitter.emit('data', elems); | ||
} | ||
/* TODO: deprecate in favor of run2 */ | ||
run(query, doneCb) { | ||
const data = []; | ||
const queryMsg = utils.toMsg(query); | ||
function metadata(meta) { | ||
emitter.emit('metadata', meta); | ||
} | ||
function done(err) { | ||
if (err) { | ||
doneCb(err, null); | ||
} else { | ||
doneCb(null, data); | ||
} | ||
} | ||
function progress(prog) { | ||
emitter.emit('progress', prog); | ||
} | ||
function output(elems) { | ||
data.push(elems); | ||
} | ||
function started(traceId) { | ||
emitter.emit('started', traceId); | ||
} | ||
this._send(queryMsg, done, output); | ||
} | ||
const id = this._send(queryMsg, done, output, progress, metadata, started); | ||
run2(query) { | ||
return new Promise((resolve, reject) => { | ||
this.run(query, (err, data) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(data); | ||
}); | ||
}); | ||
} | ||
emitter.cancel = (cb) => { | ||
this._cancel(id, cb); | ||
}; | ||
authenticate(user, apiKey, doneCb, traceId) { | ||
let token; | ||
const authMsg = { | ||
e: 'H', | ||
p: { | ||
user, | ||
trace_id: traceId, | ||
}, | ||
v: apiKey, | ||
}; | ||
return emitter; | ||
} | ||
function done(err) { | ||
if (err) { | ||
doneCb(err); | ||
} else { | ||
doneCb(null, token); | ||
} | ||
} | ||
/* TODO: deprecate in favor of run2 */ | ||
run(query, doneCb) { | ||
const data = []; | ||
const queryMsg = utils.toMsg(query); | ||
function output(elems) { | ||
token = elems[0]; | ||
} | ||
function done(err) { | ||
if (err) { | ||
doneCb(err, null); | ||
} else { | ||
doneCb(null, data); | ||
} | ||
} | ||
this._send(authMsg, done, output); | ||
} | ||
function output(elems) { | ||
data.push(elems); | ||
} | ||
_close() { | ||
if (this.state !== states.CLOSING) { | ||
return Promise.resolve(); | ||
} | ||
return this.pool.drain(() => this.pool.destroyAllNow()); | ||
} | ||
this._send(queryMsg, done, output); | ||
} | ||
cancel() { | ||
const ids = Object.keys(this.running); | ||
run2(query) { | ||
return new Promise((resolve, reject) => { | ||
this.run(query, (err, data) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(data); | ||
}); | ||
}); | ||
} | ||
return Promise.all(ids.map(id => new Promise((resolve, reject) => { | ||
this._cancel(id, (err) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(); | ||
}); | ||
}))); | ||
} | ||
authenticate(user, apiKey, doneCb, traceId) { | ||
let token; | ||
const authMsg = { | ||
e: 'H', | ||
p: { | ||
user, | ||
trace_id: traceId, | ||
}, | ||
v: apiKey, | ||
}; | ||
function done(err) { | ||
if (err) { | ||
doneCb(err); | ||
} else { | ||
doneCb(null, token); | ||
} | ||
} | ||
function output(elems) { | ||
token = elems[0]; | ||
} | ||
this._send(authMsg, done, output); | ||
} | ||
_close() { | ||
if (this.state !== states.CLOSING) { | ||
return Promise.resolve(); | ||
} | ||
return this.pool.drain().then(() => this.pool.clear()); | ||
} | ||
cancel() { | ||
const ids = Object.keys(this.running); | ||
return Promise.all(ids.map(id => new Promise((resolve, reject) => { | ||
this._cancel(id, (err) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(); | ||
}); | ||
}))); | ||
} | ||
close() { | ||
this.state = states.CLOSING; | ||
return this.cancel() | ||
.then(() => this._close()) | ||
.then(() => { | ||
this.state = states.CLOSED; | ||
}); | ||
} | ||
close() { | ||
this.state = states.CLOSING; | ||
return this.cancel() | ||
.then(() => this._close()) | ||
.then(() => this.state = states.CLOSED); | ||
} | ||
} | ||
module.exports.Client = Client; |
@@ -1,62 +0,65 @@ | ||
'use strict'; | ||
function toMsg(query) { | ||
var traceId = query.trace_id || query.traceId; | ||
var token = query.token || query.auth; | ||
return { | ||
e: 'Q', | ||
v: query.query, | ||
p: { | ||
auth: token, | ||
trace_id: traceId, | ||
config: query.config | ||
} | ||
}; | ||
var traceId = query.trace_id || query.traceId; | ||
var token = query.token || query.auth; | ||
return { | ||
e: 'Q', | ||
v: query.query, | ||
p: { | ||
auth: token, | ||
trace_id: traceId, | ||
config: query.config | ||
} | ||
}; | ||
} | ||
function toProgress(payload) { | ||
var status; | ||
var status; | ||
switch (payload.s) { | ||
case 0: | ||
status = 'Queued'; | ||
break; | ||
switch (payload.s) { | ||
case 0: | ||
status = 'Queued'; | ||
break; | ||
case 1: | ||
status = 'Started'; | ||
break; | ||
case 1: | ||
status = 'Started'; | ||
break; | ||
case 2: | ||
status = 'Running'; | ||
break; | ||
case 2: | ||
status = 'Running'; | ||
break; | ||
case 3: | ||
status = 'Waiting'; | ||
break; | ||
case 3: | ||
status = 'Waiting'; | ||
break; | ||
case 4: | ||
status = 'Finished'; | ||
break; | ||
case 4: | ||
status = 'Finished'; | ||
break; | ||
default: | ||
status = 'Unknown'; | ||
break; | ||
} | ||
default: | ||
status = 'Unknown'; | ||
break; | ||
} | ||
return { | ||
status: status, | ||
statusCode: payload.s, | ||
progress: payload.p, | ||
total: payload.t, | ||
units: payload.u | ||
}; | ||
return { | ||
status: status, | ||
statusCode: payload.s, | ||
progress: payload.p, | ||
total: payload.t, | ||
units: payload.u | ||
}; | ||
} | ||
function getCloseError(ev) { | ||
return new Error(`WebSocket close: code=${ev.code}; reason=${ev.reason}`); | ||
} | ||
module.exports = { | ||
toMsg: toMsg, | ||
toProgress: toProgress, | ||
SonicMessage: { | ||
ACK: JSON.stringify({ e: 'A' }), | ||
CANCEL: JSON.stringify({ e: 'C' }) | ||
} | ||
getCloseError, | ||
toMsg, | ||
toProgress, | ||
SonicMessage: { | ||
ACK: JSON.stringify({ e: 'A' }), | ||
CANCEL: JSON.stringify({ e: 'C' }) | ||
} | ||
}; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Git dependency
Supply chain riskContains a dependency which resolves to a remote git URL. Dependencies fetched from git URLs are not immutable and can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
0
27696
15
913
+ Addedgeneric-pool@2.5.4(transitive)
Updatedgeneric-pool@^2.5.4