cloudvision-connector
Advanced tools
Comparing version 4.5.3 to 4.5.4
@@ -6,2 +6,13 @@ # Change Log | ||
## [4.5.4](https://github.com/aristanetworks/cloudvision/compare/v4.5.3...v4.5.4) (2020-08-10) | ||
### Bug Fixes | ||
* **cloudvision-connector:** Remove deduping, other unused functionality ([#149](https://github.com/aristanetworks/cloudvision/issues/149)) ([b370b45](https://github.com/aristanetworks/cloudvision/commit/b370b45a7b79c8fdb7390fb5114d7edabbbb4dac)) | ||
## [4.5.3](https://github.com/aristanetworks/cloudvision/compare/v4.5.2...v4.5.3) (2020-07-29) | ||
@@ -8,0 +19,0 @@ |
(function (global, factory) { | ||
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports, require('a-msgpack'), require('base64-js'), require('imurmurhash')) : | ||
typeof define === 'function' && define.amd ? define(['exports', 'a-msgpack', 'base64-js', 'imurmurhash'], factory) : | ||
(global = typeof globalThis !== 'undefined' ? globalThis : global || self, factory(global.CloudvisionConnector = {}, global.msgpack, global['base64-js'], global.MurmurHash3)); | ||
}(this, (function (exports, aMsgpack, base64Js, MurmurHash3) { 'use strict'; | ||
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports, require('a-msgpack'), require('uuid'), require('base64-js')) : | ||
typeof define === 'function' && define.amd ? define(['exports', 'a-msgpack', 'uuid', 'base64-js'], factory) : | ||
(global = typeof globalThis !== 'undefined' ? globalThis : global || self, factory(global.CloudvisionConnector = {}, global.msgpack, global.uuid, global['base64-js'])); | ||
}(this, (function (exports, aMsgpack, uuid, base64Js) { 'use strict'; | ||
MurmurHash3 = MurmurHash3 && Object.prototype.hasOwnProperty.call(MurmurHash3, 'default') ? MurmurHash3['default'] : MurmurHash3; | ||
// Copyright (c) 2018, Arista Networks, Inc. | ||
@@ -389,6 +387,2 @@ const msgpack = { | ||
const ACTIVE_CODE = 3001; | ||
/** | ||
* Status code for Paused (when the incoming results for a request have been paused). | ||
*/ | ||
const PAUSED_CODE = 1002; | ||
const GET_REQUEST_COMPLETED = 'GetRequest'; | ||
@@ -400,5 +394,3 @@ const CLOSE = 'close'; | ||
const GET_AND_SUBSCRIBE = 'getAndSubscribe'; | ||
const PAUSE = 'pause'; | ||
const PUBLISH = 'publish'; | ||
const RESUME = 'resume'; | ||
const SUBSCRIBE = 'subscribe'; | ||
@@ -686,37 +678,2 @@ const SEARCH = 'alpha/search'; | ||
/** | ||
* Recursively hashes an object, given an object and a hashState. | ||
*/ | ||
function hashObjectHelper(object, hashState) { | ||
const objKeys = Object.keys(object); | ||
for (let i = 0; i < objKeys.length; i += 1) { | ||
const key = objKeys[i]; | ||
const value = object[key]; | ||
if (value && typeof value === 'object') { | ||
hashState.hash(key.toString()).hash(hashObjectHelper(value, hashState)); | ||
} | ||
else { | ||
hashState.hash(key + '' + value); | ||
} | ||
} | ||
return hashState.result().toString(); | ||
} | ||
/** | ||
* Creates a unique hash given an object. | ||
*/ | ||
function hashObject(object) { | ||
const hashState = MurmurHash3(); | ||
return hashObjectHelper(object, hashState); | ||
} | ||
/** | ||
* Generates token based on the [[WsCommand]] and params ([[CloudVisionParams]], | ||
* [[CloudVisionPublishRequest]], [[ServiceRequest]]) of a request. This is | ||
* used to map requests to responses when dispatching response callbacks. | ||
*/ | ||
function makeToken(command, params) { | ||
return hashObject({ | ||
command, | ||
params, | ||
}); | ||
} | ||
/** | ||
* Creates a notification callback that properly formats the result for the | ||
@@ -876,3 +833,2 @@ * passed callback. | ||
debugMode: false, | ||
pauseStreams: false, | ||
}, websocketClass = WebSocket, parser = Parser) { | ||
@@ -883,6 +839,2 @@ this.connectorOptions = options; | ||
this.events = new Emitter(); | ||
this.closingStreams = new Map(); | ||
this.waitingStreams = new Map(); | ||
this.activeStreams = new Set(); | ||
this.activeRequests = new Set(); | ||
this.WebSocket = websocketClass; | ||
@@ -901,13 +853,2 @@ this.Parser = parser; | ||
} | ||
get streams() { | ||
return this.activeStreams; | ||
} | ||
get streamInClosingState() { | ||
return this.closingStreams; | ||
} | ||
addWaitingStream(waitingOnToken, token, command, params, requestContext) { | ||
const streamsWaiting = this.waitingStreams.get(waitingOnToken) || []; | ||
streamsWaiting.push([token, command, requestContext, params]); | ||
this.waitingStreams.set(waitingOnToken, streamsWaiting); | ||
} | ||
/** | ||
@@ -928,23 +869,14 @@ * PRIVATE METHOD | ||
} | ||
const token = makeToken(command, params); | ||
const token = uuid.v4(); | ||
const requestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
if (!this.activeRequests.has(token)) { | ||
// only execute request if not already getting data | ||
this.activeRequests.add(token); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
} | ||
else { | ||
const queuedCallbackWithUnbind = this.makeQueuedCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, queuedCallbackWithUnbind); | ||
} | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
return token; | ||
} | ||
/** | ||
* Cleans up all event emitters and active streams. | ||
* Cleans up all event emitters. | ||
* All bound callbacks will be unbound. | ||
*/ | ||
cleanUpConnections() { | ||
this.activeStreams.clear(); | ||
this.events.close(); | ||
@@ -967,3 +899,3 @@ } | ||
closeCommand(streams, closeParams, callback) { | ||
const closeToken = makeToken(CLOSE, closeParams); | ||
const closeToken = uuid.v4(); | ||
const requestContext = { | ||
@@ -974,3 +906,4 @@ command: CLOSE, | ||
}; | ||
this.setStreamClosingState(streams, closeToken, requestContext, callback); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(closeToken, callback); | ||
this.events.bind(closeToken, requestContext, callbackWithUnbind); | ||
try { | ||
@@ -981,3 +914,2 @@ this.sendMessage(closeToken, CLOSE, requestContext, closeParams); | ||
log(ERROR, err); | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, undefined, undefined, closeToken); | ||
@@ -1021,13 +953,2 @@ } | ||
/** | ||
* Enables any session wide options by negotiating with the server | ||
*/ | ||
enableOptions(callback) { | ||
if (this.connectorOptions.pauseStreams) { | ||
this.pause({ pauseStreams: true }, callback); | ||
} | ||
else { | ||
callback(null); | ||
} | ||
} | ||
/** | ||
* Sends the command along with the params to the API, if the WebSocket is | ||
@@ -1043,3 +964,2 @@ * connected. The response is received via the provided callback function. | ||
// Unbind callback when any error message is received | ||
this.activeRequests.delete(token); | ||
this.events.unbind(token, callbackWithUnbind); | ||
@@ -1055,12 +975,2 @@ this.instrumentation.callInfo(requestContext.command, requestContext, { | ||
} | ||
makeQueuedCallbackWithUnbind(token, callback) { | ||
const callbackWithUnbind = (requestContext, err, result, status) => { | ||
if (err) { | ||
// Unbind callback and invoke callback only when any error message is received | ||
this.events.unbind(token, callbackWithUnbind); | ||
callback(err, result, status, token, requestContext); | ||
} | ||
}; | ||
return callbackWithUnbind; | ||
} | ||
/** | ||
@@ -1074,30 +984,2 @@ * Writes data in params to the CloudVision API. It receives one message via | ||
/** | ||
* Cleans up the steam closing state set in `setStreamClosingState`, as well | ||
* as re-subscribing to any streams (with the same token as the stream that | ||
* was just closed) opened up during the closing of the current stream. | ||
*/ | ||
removeStreamClosingState(streams, closeToken) { | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
this.closingStreams.delete(token); | ||
const streamsArgs = this.waitingStreams.get(closeToken); | ||
if (streamsArgs) { | ||
const streamsArgsLen = streamsArgs.length; | ||
for (let j = 0; j < streamsArgsLen; j += 1) { | ||
this.reSubscribeStream(streamsArgs[j], closeToken); | ||
} | ||
} | ||
} | ||
} | ||
else { | ||
this.closingStreams.delete(streams.token); | ||
const streamArgs = this.waitingStreams.get(closeToken); | ||
if (streamArgs) { | ||
this.reSubscribeStream(streamArgs[0], closeToken); | ||
} | ||
} | ||
} | ||
/** | ||
* Send a service request command to the CloudVision API | ||
@@ -1109,18 +991,2 @@ */ | ||
/** | ||
* Re initiates a subscribe if there has been a `subscribe` call for the | ||
* stream while the stream was closing. | ||
*/ | ||
reSubscribeStream(streamArgs, closeToken) { | ||
this.events.unbindAll(closeToken); | ||
this.waitingStreams.delete(closeToken); | ||
this.sendMessage(...streamArgs); | ||
} | ||
/** | ||
* PRIVATE METHOD | ||
* Requests the server to resume one of the currently paused streams. | ||
*/ | ||
resume(params, callback) { | ||
return this.callCommand(RESUME, params, callback); | ||
} | ||
/** | ||
* Run connects connector to the specified url | ||
@@ -1148,11 +1014,4 @@ */ | ||
if (!this.isRunning) { | ||
// Enable any options if they have been configured | ||
this.isRunning = true; | ||
this.enableOptions((err, _res, status, token) => { | ||
// We're good to go | ||
if (status && status.code !== EOF_CODE) { | ||
log(ERROR, err, status, token); | ||
} | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
}); | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
} | ||
@@ -1195,12 +1054,2 @@ }; | ||
validateResponse(msg.result, status, token, this.connectorOptions.batchResults); | ||
// Automatically resume streams that have been paused by the server | ||
if (status && status.code === PAUSED_CODE) { | ||
const cb = (err, _result, resumeStatus) => { | ||
if (err) { | ||
log(ERROR, err, resumeStatus); | ||
} | ||
}; | ||
this.resume({ token }, cb); | ||
return; | ||
} | ||
this.events.emit(token, null, msg.result, status); | ||
@@ -1251,26 +1100,2 @@ if (((_a = msg.result) === null || _a === void 0 ? void 0 : _a.metadata) && | ||
/** | ||
* Sets all the parameters for one or more streams when they are closed. | ||
* When the `close` request has been successfully processed, | ||
* `removeStreamClosingState` is called to clean up any closing state. | ||
*/ | ||
setStreamClosingState(streams, closeToken, requestContext, callback) { | ||
const closeCallback = (closeRequestContext, err, result, status) => { | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, result, status, closeToken, closeRequestContext); | ||
}; | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
const oldToken = this.closingStreams.get(token); | ||
this.waitingStreams.delete(oldToken || ''); | ||
this.closingStreams.set(token, closeToken); | ||
} | ||
} | ||
else { | ||
this.closingStreams.set(streams.token, closeToken); | ||
} | ||
this.events.bind(closeToken, requestContext, closeCallback); | ||
} | ||
/** | ||
* Sends the command along with the params to the API, which creates a | ||
@@ -1288,3 +1113,3 @@ * subscriptions on the server. It receives a stream of messages via the | ||
} | ||
const token = makeToken(command, params); | ||
const token = uuid.v4(); | ||
const callerRequestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
@@ -1302,3 +1127,2 @@ const callbackWithUnbind = (requestContext, err, result, status) => { | ||
if (status && status.code === ACTIVE_CODE) { | ||
this.activeStreams.add(token); | ||
this.instrumentation.callInfo(callerRequestContext.command, callerRequestContext, { | ||
@@ -1310,34 +1134,6 @@ message: 'stream active', | ||
}; | ||
const numCallbacks = this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
if (numCallbacks === 1) { | ||
// Only execute the request on the first callback. | ||
// If there are open request to the same data, just attach the callback | ||
const closingStreamToken = this.closingStreams.get(token); | ||
if (closingStreamToken) { | ||
// The stream is still closing, so wait for it to close before re requesting | ||
this.addWaitingStream(closingStreamToken, token, command, params, callerRequestContext); | ||
} | ||
else { | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
} | ||
} | ||
else if (this.activeStreams.has(token)) { | ||
// The stream is already open immediately call get | ||
callback(null, undefined, { code: ACTIVE_CODE }, token, callerRequestContext); | ||
} | ||
this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
return { token, callback: callbackWithUnbind }; | ||
} | ||
/** | ||
* Configures pausing of results on the server. When the pause feature is enabled, | ||
* the server pauses the results returned to the client every x number of bytes. | ||
* | ||
* When pausing is enabled, only the new streams created after that will be paused | ||
* (i.e. existing streams are not affected). To continue receiving results, a | ||
* wrpc client has to send a 'resume' command with the desired stream token to | ||
* unpause. Currently, the cloudvision-connector automatically resumes any streams | ||
* paused by the server. | ||
*/ | ||
pause(params, callback) { | ||
return this.callCommand(PAUSE, params, callback); | ||
} | ||
} | ||
@@ -1459,8 +1255,2 @@ Wrpc.CONNECTED = CONNECTED; | ||
/** | ||
* Generates the unique token that the request can be referenced by. | ||
*/ | ||
getCommandToken(command, params) { | ||
return makeToken(command, params); | ||
} | ||
/** | ||
* Returns all notifications that match the given query and options. | ||
@@ -1601,3 +1391,2 @@ * | ||
Connector.ID = ID; | ||
Connector.PAUSE = PAUSE; | ||
Connector.PUBLISH = PUBLISH; | ||
@@ -1692,3 +1481,2 @@ Connector.SEARCH_SUBSCRIBE = SEARCH_SUBSCRIBE; | ||
exports.fromBinaryKey = fromBinaryKey; | ||
exports.hashObject = hashObject; | ||
exports.sanitizeOptions = sanitizeOptions; | ||
@@ -1695,0 +1483,0 @@ exports.toBinaryKey = toBinaryKey; |
@@ -1,1 +0,1 @@ | ||
!function(e,t){"object"==typeof exports&&"undefined"!=typeof module?t(exports,require("a-msgpack"),require("base64-js"),require("imurmurhash")):"function"==typeof define&&define.amd?define(["exports","a-msgpack","base64-js","imurmurhash"],t):t((e="undefined"!=typeof globalThis?globalThis:e||self).CloudvisionConnector={},e.msgpack,e["base64-js"],e.MurmurHash3)}(this,(function(e,t,n,s){"use strict";s=s&&Object.prototype.hasOwnProperty.call(s,"default")?s.default:s;const i=e=>t.encode(e,{extensionCodec:t.Codec}),r=e=>t.decode(e,{extensionCodec:t.Codec,useJSBI:!0});function o(e){const t=[];for(let s=0;s<e.length;s+=1)t.push(n.fromByteArray(i(e[s])));return t}function a(e){const t={path_elements:[],timestamp:0};t.path_elements=function(e){const t=[];for(let s=0;s<e.length;s+=1)t.push(r(n.toByteArray(e[s])));return t}(e.path_elements||[]);let s="";return e.timestamp.nanos&&(s+=e.timestamp.nanos),s=s.padStart(9,"0"),t.timestamp=parseInt((""+e.timestamp.seconds+s).slice(0,13),10),e.updates&&(t.updates=function(e){const t={};for(let s=0;s<e.length;s+=1){const i=e[s].key,o=e[s].value,a=r(n.toByteArray(i)),c=r(n.toByteArray(o));t[i]={key:a,value:c}}return t}(e.updates)),e.deletes&&(t.deletes=function(e){const t={};for(let s=0;s<e.length;s+=1){const i=e[s];t[i]={key:r(n.toByteArray(i))}}return t}(e.deletes)),e.delete_all&&(t.deletes={}),t}function c(e,t){const n=a(t);e.notifications[JSON.stringify(n.path_elements)]?e.notifications[JSON.stringify(n.path_elements)].push(n):e.notifications[JSON.stringify(n.path_elements)]=[n]}function u(e,t){if(e.timestamp.seconds<t.timestamp.seconds)return-1;if(e.timestamp.seconds>t.timestamp.seconds)return 1;if(e.timestamp.nanos&&t.timestamp.nanos){if(e.timestamp.nanos<t.timestamp.nanos)return-1;if(e.timestamp.nanos>t.timestamp.nanos)return 1}return e.timestamp.nanos?t.timestamp.nanos?0:1:-1}function l(e){const t={timestamp:e.timestamp};return e.path_elements&&e.path_elements.length&&(t.path_elements=o(e.path_elements)),e.updates&&(t.updates=function(e){const t=[];for(let s=0;s<e.length;s+=1){const r=e[s].key,o=e[s].value;t.push({key:n.fromByteArray(i(r)),value:n.fromByteArray(i(o))})}return t}(e.updates)),e.deletes&&(Object.keys(e.deletes).length?t.deletes=function(e){const t=[];for(let s=0;s<e.length;s+=1){const r=e[s];t.push(n.fromByteArray(i(r)))}return t}(e.deletes):t.delete_all=!0),t}class h{static parse(e,t){const n=JSON.parse(e),s=n.result;if(s&&!s.datasets&&s.notifications){const e=function(e,t){if(e.notifications.sort(u),t){const t={dataset:e.dataset,metadata:e.metadata||{},notifications:{}};for(let n=0;n<e.notifications.length;n+=1){c(t,e.notifications[n])}return t}const n={dataset:e.dataset,metadata:e.metadata||{},notifications:[]};for(let t=0;t<e.notifications.length;t+=1){const s=a(e.notifications[t]);n.notifications.push(s)}return n}(s,t);return{error:n.error,result:e,status:n.status,token:n.token}}return{error:n.error,result:n.result,status:n.status,token:n.token}}static stringify(e){const t=Object.assign({},e.params);return h.isQuery(t)&&(t.query=function(e){const t=[];for(let s=0;s<e.length;s+=1){const r={dataset:e[s].dataset,paths:[]},a=e[s];for(let e=0;e<a.paths.length;e+=1){const t=a.paths[e],s={path_elements:[]},c=[];if(t.keys&&Array.isArray(t.keys)){for(let e=0;e<t.keys.length;e+=1)c.push(n.fromByteArray(i(t.keys[e])));s.keys=c}s.path_elements=o(t.path_elements),r.paths.push(s)}t.push(r)}return t}(t.query)),h.isPublish(t)&&(t.batch=function(e){const t=[],n={dataset:e.dataset,notifications:t};if(Array.isArray(e.notifications))for(let n=0;n<e.notifications.length;n+=1)t.push(l(e.notifications[n]));else{const n=Object.keys(e.notifications);for(let s=0;s<n.length;s+=1){const i=n[s];for(let n=0;n<e.notifications[i].length;n+=1)t.push(l(e.notifications[i][n]))}}return n}(t.batch)),JSON.stringify({token:e.token,command:e.command,params:t})}static isQuery(e){const t=e;return void 0!==t.query&&Array.isArray(t.query)}static isPublish(e){const t=e;return void 0!==t.sync&&void 0!==t.batch}}const d="ERROR",m=new Set(["ANY","IP","MAC"]),f="cloudvision-connector",p={command:"NO_COMMAND",token:"NO_TOKEN",encodedParams:"NO_PARAMS"};class g{constructor(){this.events=new Map,this.requestContext=new Map}getEventsMap(){return this.events}getRequestContextMap(){return this.requestContext}has(e){return this.events.has(e)}bind(e,t,n){let s=this.events.get(e);return s?s.push(n):(s=[n],this.events.set(e,s),this.requestContext.set(e,t)),s.length}unbind(e,t){const n=this.events.get(e);if(!n)return null;const s=n.indexOf(t);-1!==s&&(n.splice(s,1),this.events.set(e,n));const i=n.length;return 0===i&&(this.events.delete(e),this.requestContext.delete(e)),i}unbindAll(e){this.events.delete(e),this.requestContext.delete(e)}emit(e,...t){const n=this.events.get(e);if(n)for(let s=n.length-1;s>=0;s-=1){const i=this.requestContext.get(e)||p;n[s](i,...t)}}close(){this.events.clear(),this.requestContext.clear()}}function y(){}class b{constructor(e){this.enableInstrumentation=!1,e&&(this.enableInstrumentation=!0),this.instrumentedCommands=e?e.commands:[],this.startFunction=e?e.start:y,this.infoFunction=e?e.info:y,this.endFunction=e?e.end:y}callStart(e,t){this.enableInstrumentation&&this.instrumentedCommands.includes(e)&&this.startFunction(t)}callInfo(e,t,n){this.enableInstrumentation&&this.instrumentedCommands.includes(e)&&this.infoFunction(t,n)}callEnd(e,t){this.enableInstrumentation&&this.instrumentedCommands.includes(e)&&this.endFunction(t)}}function S(e,t,n,s){const i=t?`${e}: ${t}`:e+": No message provided",r=n?`Status Code: ${n.code}, Status Message: ${n.message}`:void 0,o=s?"Request Token: "+s:void 0;switch(console.groupCollapsed("[[ CloudVision Connector ]]"),e){case d:o&&console.error(o),console.error(i),r&&console.error(r);break;case"WARN":o&&console.warn(o),console.warn(i),r&&console.warn(r);break;default:o&&console.log(o),console.log(i),r&&console.log(r)}console.groupEnd()}function v(e){return"number"==typeof e&&e>0}function C(e){return e/1e15>1}function E(e){const{start:t,end:n,versions:s}=e;let i,r,o;return v(t)&&(i=Math.floor(t)*(C(t)?1:1e6)),v(n)&&(r=Math.floor(n)*(C(n)?1:1e6)),v(s)&&(o=Math.floor(s)),{start:i,end:r,versions:o}}function k(e,t){const{start:n,end:s,versions:i}=e;return n&&s&&n>=s?(t(function(e,t,n){return`invalid params: start: ${e}, end: ${t}, versions: ${n||"undefined"}`}(n,s,i)),!1):!n||!i||(t("Defining start and versions is invalid"),!1)}function O(e,t,n=!1){return Array.isArray(e)?!!(n||e&&e.length)||(t("`query` param cannot be empty"),!1):(t("`query` param must be an array"),!1)}function A(e){return function e(t,n){const s=Object.keys(t);for(let i=0;i<s.length;i+=1){const r=s[i],o=t[r];o&&"object"==typeof o?n.hash(r.toString()).hash(e(o,n)):n.hash(r+""+o)}return n.result().toString()}(e,s())}function T(e,t){return A({command:e,params:t})}function N(e,t={}){return(n,s,i,r,o)=>{if(i&&1001===i.code)return void e(null,void 0,i,r,o);if(n)return e(`Error: ${n}\nOptions: ${JSON.stringify(t)}`,void 0,i,r,o),void e(null,void 0,{code:1001},r,o);const a=s;if(a&&a.datasets)return void e(null,a,i,r,o);const c=s;(c&&c.dataset||c)&&e(null,c,i,r,o)}}function _(e,t){const n={};if(Array.isArray(e)){const s=e.length;for(let i=0;i<s;i+=1){const{token:s,callback:r}=e[i];0===t.unbind(s,r)&&(n[s]=!0)}}else{const{token:s,callback:i}=e;0===t.unbind(s,i)&&(n[s]=!0)}return Object.keys(n).length?n:null}function D(e){const t=e.searchType;let n="ANY";return t&&m.has(t)&&(n=t),{search:e.search||"",searchType:n}}function q(e){return n.fromByteArray(t.encode(e,{extensionCodec:t.Codec}))}class w{constructor(e={batchResults:!0,debugMode:!1,pauseStreams:!1},t=WebSocket,n=h){this.connectorOptions=e,this.isRunning=!1,this.connectionEvents=new g,this.events=new g,this.closingStreams=new Map,this.waitingStreams=new Map,this.activeStreams=new Set,this.activeRequests=new Set,this.WebSocket=t,this.Parser=n,this.instrumentation=new b(e.instrumentationConfig)}get websocket(){return this.ws}get connectionEmitter(){return this.connectionEvents}get eventsEmitter(){return this.events}get streams(){return this.activeStreams}get streamInClosingState(){return this.closingStreams}addWaitingStream(e,t,n,s,i){const r=this.waitingStreams.get(e)||[];r.push([t,n,i,s]),this.waitingStreams.set(e,r)}callCommand(e,t,n){if(!this.isRunning)return n("Connection is down"),null;const s=T(e,t),i={command:e,encodedParams:q(t),token:s};if(this.activeRequests.has(s)){const e=this.makeQueuedCallbackWithUnbind(s,n);this.events.bind(s,i,e)}else{this.activeRequests.add(s);const r=this.makeCallbackWithUnbind(s,n);this.events.bind(s,i,r),this.sendMessageOrError(s,e,t,i)}return s}cleanUpConnections(){this.activeStreams.clear(),this.events.close()}close(){this.closeWs(new CloseEvent("close")),this.connectionEvents.close(),this.ws.close()}closeWs(e){this.isRunning=!1,this.cleanUpConnections(),this.connectionEvents.emit("connection",w.DISCONNECTED,e)}closeCommand(e,t,n){const s=T("close",t),i={command:"close",encodedParams:q(e),token:s};this.setStreamClosingState(e,s,i,n);try{this.sendMessage(s,"close",i,t)}catch(t){S(d,t),this.removeStreamClosingState(e,s),n(t,void 0,void 0,s)}return s}closeStreams(e,t){const n=_(e,this.events);return n?this.closeCommand(e,n,t):null}closeStream(e,t){const n=_(e,this.events);return n?this.closeCommand(e,n,t):null}connection(e){const t=(t,n,s)=>{e(n,s)};return this.connectionEvents.bind("connection",{command:"connection",encodedParams:"",token:""},t),()=>{this.connectionEvents.unbind("connection",t)}}enableOptions(e){this.connectorOptions.pauseStreams?this.pause({pauseStreams:!0},e):e(null)}get(e,t,n){return this.callCommand(e,t,n)}makeCallbackWithUnbind(e,t){const n=(s,i,r,o)=>{i&&(this.activeRequests.delete(e),this.events.unbind(e,n),this.instrumentation.callInfo(s.command,s,{error:i})),this.instrumentation.callEnd(s.command,s),t(i,r,o,e,s)};return n}makeQueuedCallbackWithUnbind(e,t){const n=(s,i,r,o)=>{i&&(this.events.unbind(e,n),t(i,r,o,e,s))};return n}publish(e,t){return this.callCommand("publish",e,t)}removeStreamClosingState(e,t){if(Array.isArray(e)){const n=e.length;for(let s=0;s<n;s+=1){const{token:n}=e[s];this.closingStreams.delete(n);const i=this.waitingStreams.get(t);if(i){const e=i.length;for(let n=0;n<e;n+=1)this.reSubscribeStream(i[n],t)}}}else{this.closingStreams.delete(e.token);const n=this.waitingStreams.get(t);n&&this.reSubscribeStream(n[0],t)}}requestService(e,t){return this.callCommand("serviceRequest",e,t)}reSubscribeStream(e,t){this.events.unbindAll(t),this.waitingStreams.delete(t),this.sendMessage(...e)}resume(e,t){return this.callCommand("resume",e,t)}run(e){this.runWithWs(new this.WebSocket(e))}runWithWs(e){this.ws=e,this.ws.onopen=e=>{this.isRunning||(this.isRunning=!0,this.enableOptions((t,n,s,i)=>{s&&1001!==s.code&&S(d,t,s,i),this.connectionEvents.emit("connection",w.CONNECTED,e)}))},this.ws.onclose=e=>{this.closeWs(e)},this.ws.onmessage=e=>{var t;if("string"!=typeof e.data)return;let n;try{n=this.Parser.parse(e.data,this.connectorOptions.batchResults),this.connectorOptions.debugMode&&self.postMessage({response:n,source:f,timestamp:Date.now()},"*")}catch(e){return void S(d,e)}if(!n||!n.token)return void S(d,"No message body or message token");const{error:s,status:i,token:r}=n;if(s)this.events.emit(r,s,n.result,i);else if(function(e,t,n,s){if(t&&Object.keys(t).length)return;const i=e;!i.dataset||i.dataset.name?!i.dataset||i.dataset.type?!i.dataset||i.notifications?s||!i.dataset||Array.isArray(i.notifications)||S(d,"Key 'notifications' is not an array",void 0,n):S(d,"No key 'notifications' found in response",void 0,n):S(d,"No key 'type' found in dataset",void 0,n):S(d,"No key 'name' found in dataset",void 0,n)}(n.result,i,r,this.connectorOptions.batchResults),i&&1002===i.code){const e=(e,t,n)=>{e&&S(d,e,n)};this.resume({token:r},e)}else this.events.emit(r,null,n.result,i),(null===(t=n.result)||void 0===t?void 0:t.metadata)&&"EOF"===n.result.metadata.GetRequest&&this.events.emit(r,null,null,{message:"GetRequest",code:1001})}}search(e,t){return this.callCommand("alpha/search",e,t)}sendMessage(e,t,n,s){this.connectorOptions.debugMode&&self.postMessage({request:{token:e,command:t,params:s},source:f,timestamp:Date.now()},"*"),this.instrumentation.callStart(t,n),this.ws.send(this.Parser.stringify({token:e,command:t,params:s}))}sendMessageOrError(e,t,n,s){try{this.sendMessage(e,t,s,n)}catch(t){S(d,t),this.events.emit(e,t,void 0,void 0)}}setStreamClosingState(e,t,n,s){if(Array.isArray(e)){const n=e.length;for(let s=0;s<n;s+=1){const{token:n}=e[s],i=this.closingStreams.get(n);this.waitingStreams.delete(i||""),this.closingStreams.set(n,t)}}else this.closingStreams.set(e.token,t);this.events.bind(t,n,(n,i,r,o)=>{this.removeStreamClosingState(e,t),s(i,r,o,t,n)})}stream(e,t,n){if(!this.isRunning)return n("Connection is down"),null;const s=T(e,t),i={command:e,encodedParams:q(t),token:s},r=(e,t,o,a)=>{t&&(this.events.unbind(s,r),this.instrumentation.callInfo(i.command,i,{error:t}),this.instrumentation.callEnd(e.command,e)),a&&3001===a.code&&(this.activeStreams.add(s),this.instrumentation.callInfo(i.command,i,{message:"stream active"})),n(t,o,a,s,e)};if(1===this.events.bind(s,i,r)){const n=this.closingStreams.get(s);n?this.addWaitingStream(n,s,e,t,i):this.sendMessageOrError(s,e,t,i)}else this.activeStreams.has(s)&&n(null,void 0,{code:3001},s,i);return{token:s,callback:r}}pause(e,t){return this.callCommand("pause",e,t)}}w.CONNECTED="connected",w.DISCONNECTED="disconnected";class P extends w{closeSubscriptions(e,t){return this.closeStreams(e,N(t))}getAndSubscribe(e,t,n){if(!O(e,t))return null;if(!k(n,t))return null;const s=E(n),i={query:e,start:s.start,end:s.end,versions:s.versions};return this.stream("getAndSubscribe",i,N(t))}getApps(e){const t={types:["app"]};return this.get("getDatasets",t,N(e))}getDatasets(e){const t={types:["app","device"]};return this.get("getDatasets",t,N(e))}getDevices(e){const t=["device"];return this.get("getDatasets",{types:t},N(e))}getCommandToken(e,t){return T(e,t)}getWithOptions(e,t,n){if("DEVICES_DATASET_ID"===e)return this.getDatasets(t),null;if(!O(e,t))return null;if(!k(n,t))return null;const{start:s,end:i,versions:r}=E(n),o={query:e,start:s,end:i,versions:r};return this.get("get",o,N(t,n))}runService(e,t){this.requestService(e,N(t))}runStreamingService(e,t){return e?this.stream("serviceRequest",e,N(t)):(t("`request` param cannot be empty"),null)}searchWithOptions(e,t,n){if(!O(e,t,!0))return null;if(!k(n,t))return null;const{start:s,end:i}=E(n),r=D(n),o={query:e,start:s,end:i,search:r.search,searchType:r.searchType};return this.search(o,N(t,n))}searchSubscribe(e,t,n={search:""}){if(!O(e,t,!0))return null;const s=D(n);return this.stream("alpha/searchSubscribe",{query:e,search:s.search,searchType:s.searchType},N(t))}subscribe(e,t){return O(e,t)?this.stream("subscribe",{query:e},N(t)):null}writeSync(e,t){this.publish({sync:!0,batch:{dataset:e.dataset,notifications:e.notifications}},function(e){return(t,n,s)=>{s&&1001===s.code?e(!0):t&&e(!1,`Error: ${t}\n`)}}(t))}}P.CLOSE="close",P.DEVICES_DATASET_ID="DEVICES_DATASET_ID",P.GET_DATASETS="getDatasets",P.GET="get",P.GET_AND_SUBSCRIBE="getAndSubscribe",P.ID=f,P.PAUSE="pause",P.PUBLISH="publish",P.SEARCH_SUBSCRIBE="alpha/searchSubscribe",P.SEARCH="alpha/search",P.SERVICE_REQUEST="serviceRequest",P.SUBSCRIBE="subscribe",Object.defineProperty(e,"Bool",{enumerable:!0,get:function(){return t.Bool}}),Object.defineProperty(e,"Float32",{enumerable:!0,get:function(){return t.Float32}}),Object.defineProperty(e,"Float64",{enumerable:!0,get:function(){return t.Float64}}),Object.defineProperty(e,"Int",{enumerable:!0,get:function(){return t.Int}}),Object.defineProperty(e,"NeatTypeSerializer",{enumerable:!0,get:function(){return t.NeatTypeSerializer}}),Object.defineProperty(e,"NeatTypes",{enumerable:!0,get:function(){return t.NeatTypes}}),Object.defineProperty(e,"Nil",{enumerable:!0,get:function(){return t.Nil}}),Object.defineProperty(e,"Pointer",{enumerable:!0,get:function(){return t.Pointer}}),Object.defineProperty(e,"Str",{enumerable:!0,get:function(){return t.Str}}),Object.defineProperty(e,"createBaseType",{enumerable:!0,get:function(){return t.createBaseType}}),Object.defineProperty(e,"isJsbi",{enumerable:!0,get:function(){return t.isJsbi}}),Object.defineProperty(e,"isNeatType",{enumerable:!0,get:function(){return t.isNeatType}}),e.ACTIVE_CODE=3001,e.APP_DATASET_TYPE="app",e.CONNECTED="connected",e.DEVICE_DATASET_TYPE="device",e.DISCONNECTED="disconnected",e.EOF="EOF",e.EOF_CODE=1001,e.GET_REQUEST_COMPLETED="GetRequest",e.Parser=h,e.default=P,e.fromBinaryKey=function(e,s=!0){return t.decode(n.toByteArray(e),{extensionCodec:t.Codec,useJSBI:s})},e.hashObject=A,e.sanitizeOptions=E,e.toBinaryKey=q,Object.defineProperty(e,"__esModule",{value:!0})})); | ||
!function(e,t){"object"==typeof exports&&"undefined"!=typeof module?t(exports,require("a-msgpack"),require("uuid"),require("base64-js")):"function"==typeof define&&define.amd?define(["exports","a-msgpack","uuid","base64-js"],t):t((e="undefined"!=typeof globalThis?globalThis:e||self).CloudvisionConnector={},e.msgpack,e.uuid,e["base64-js"])}(this,(function(e,t,n,s){"use strict";const r=e=>t.encode(e,{extensionCodec:t.Codec}),o=e=>t.decode(e,{extensionCodec:t.Codec,useJSBI:!0});function i(e){const t=[];for(let n=0;n<e.length;n+=1)t.push(s.fromByteArray(r(e[n])));return t}function a(e){const t={path_elements:[],timestamp:0};t.path_elements=function(e){const t=[];for(let n=0;n<e.length;n+=1)t.push(o(s.toByteArray(e[n])));return t}(e.path_elements||[]);let n="";return e.timestamp.nanos&&(n+=e.timestamp.nanos),n=n.padStart(9,"0"),t.timestamp=parseInt((""+e.timestamp.seconds+n).slice(0,13),10),e.updates&&(t.updates=function(e){const t={};for(let n=0;n<e.length;n+=1){const r=e[n].key,i=e[n].value,a=o(s.toByteArray(r)),c=o(s.toByteArray(i));t[r]={key:a,value:c}}return t}(e.updates)),e.deletes&&(t.deletes=function(e){const t={};for(let n=0;n<e.length;n+=1){const r=e[n];t[r]={key:o(s.toByteArray(r))}}return t}(e.deletes)),e.delete_all&&(t.deletes={}),t}function c(e,t){const n=a(t);e.notifications[JSON.stringify(n.path_elements)]?e.notifications[JSON.stringify(n.path_elements)].push(n):e.notifications[JSON.stringify(n.path_elements)]=[n]}function u(e,t){if(e.timestamp.seconds<t.timestamp.seconds)return-1;if(e.timestamp.seconds>t.timestamp.seconds)return 1;if(e.timestamp.nanos&&t.timestamp.nanos){if(e.timestamp.nanos<t.timestamp.nanos)return-1;if(e.timestamp.nanos>t.timestamp.nanos)return 1}return e.timestamp.nanos?t.timestamp.nanos?0:1:-1}function l(e){const t={timestamp:e.timestamp};return e.path_elements&&e.path_elements.length&&(t.path_elements=i(e.path_elements)),e.updates&&(t.updates=function(e){const t=[];for(let n=0;n<e.length;n+=1){const o=e[n].key,i=e[n].value;t.push({key:s.fromByteArray(r(o)),value:s.fromByteArray(r(i))})}return t}(e.updates)),e.deletes&&(Object.keys(e.deletes).length?t.deletes=function(e){const t=[];for(let n=0;n<e.length;n+=1){const o=e[n];t.push(s.fromByteArray(r(o)))}return t}(e.deletes):t.delete_all=!0),t}class d{static parse(e,t){const n=JSON.parse(e),s=n.result;if(s&&!s.datasets&&s.notifications){const e=function(e,t){if(e.notifications.sort(u),t){const t={dataset:e.dataset,metadata:e.metadata||{},notifications:{}};for(let n=0;n<e.notifications.length;n+=1){c(t,e.notifications[n])}return t}const n={dataset:e.dataset,metadata:e.metadata||{},notifications:[]};for(let t=0;t<e.notifications.length;t+=1){const s=a(e.notifications[t]);n.notifications.push(s)}return n}(s,t);return{error:n.error,result:e,status:n.status,token:n.token}}return{error:n.error,result:n.result,status:n.status,token:n.token}}static stringify(e){const t=Object.assign({},e.params);return d.isQuery(t)&&(t.query=function(e){const t=[];for(let n=0;n<e.length;n+=1){const o={dataset:e[n].dataset,paths:[]},a=e[n];for(let e=0;e<a.paths.length;e+=1){const t=a.paths[e],n={path_elements:[]},c=[];if(t.keys&&Array.isArray(t.keys)){for(let e=0;e<t.keys.length;e+=1)c.push(s.fromByteArray(r(t.keys[e])));n.keys=c}n.path_elements=i(t.path_elements),o.paths.push(n)}t.push(o)}return t}(t.query)),d.isPublish(t)&&(t.batch=function(e){const t=[],n={dataset:e.dataset,notifications:t};if(Array.isArray(e.notifications))for(let n=0;n<e.notifications.length;n+=1)t.push(l(e.notifications[n]));else{const n=Object.keys(e.notifications);for(let s=0;s<n.length;s+=1){const r=n[s];for(let n=0;n<e.notifications[r].length;n+=1)t.push(l(e.notifications[r][n]))}}return n}(t.batch)),JSON.stringify({token:e.token,command:e.command,params:t})}static isQuery(e){const t=e;return void 0!==t.query&&Array.isArray(t.query)}static isPublish(e){const t=e;return void 0!==t.sync&&void 0!==t.batch}}const h="ERROR",m=new Set(["ANY","IP","MAC"]),f="cloudvision-connector",p={command:"NO_COMMAND",token:"NO_TOKEN",encodedParams:"NO_PARAMS"};class y{constructor(){this.events=new Map,this.requestContext=new Map}getEventsMap(){return this.events}getRequestContextMap(){return this.requestContext}has(e){return this.events.has(e)}bind(e,t,n){let s=this.events.get(e);return s?s.push(n):(s=[n],this.events.set(e,s),this.requestContext.set(e,t)),s.length}unbind(e,t){const n=this.events.get(e);if(!n)return null;const s=n.indexOf(t);-1!==s&&(n.splice(s,1),this.events.set(e,n));const r=n.length;return 0===r&&(this.events.delete(e),this.requestContext.delete(e)),r}unbindAll(e){this.events.delete(e),this.requestContext.delete(e)}emit(e,...t){const n=this.events.get(e);if(n)for(let s=n.length-1;s>=0;s-=1){const r=this.requestContext.get(e)||p;n[s](r,...t)}}close(){this.events.clear(),this.requestContext.clear()}}function b(){}class g{constructor(e){this.enableInstrumentation=!1,e&&(this.enableInstrumentation=!0),this.instrumentedCommands=e?e.commands:[],this.startFunction=e?e.start:b,this.infoFunction=e?e.info:b,this.endFunction=e?e.end:b}callStart(e,t){this.enableInstrumentation&&this.instrumentedCommands.includes(e)&&this.startFunction(t)}callInfo(e,t,n){this.enableInstrumentation&&this.instrumentedCommands.includes(e)&&this.infoFunction(t,n)}callEnd(e,t){this.enableInstrumentation&&this.instrumentedCommands.includes(e)&&this.endFunction(t)}}function v(e,t,n,s){const r=t?`${e}: ${t}`:e+": No message provided",o=n?`Status Code: ${n.code}, Status Message: ${n.message}`:void 0,i=s?"Request Token: "+s:void 0;switch(console.groupCollapsed("[[ CloudVision Connector ]]"),e){case h:i&&console.error(i),console.error(r),o&&console.error(o);break;case"WARN":i&&console.warn(i),console.warn(r),o&&console.warn(o);break;default:i&&console.log(i),console.log(r),o&&console.log(o)}console.groupEnd()}function E(e){return"number"==typeof e&&e>0}function C(e){return e/1e15>1}function S(e){const{start:t,end:n,versions:s}=e;let r,o,i;return E(t)&&(r=Math.floor(t)*(C(t)?1:1e6)),E(n)&&(o=Math.floor(n)*(C(n)?1:1e6)),E(s)&&(i=Math.floor(s)),{start:r,end:o,versions:i}}function O(e,t){const{start:n,end:s,versions:r}=e;return n&&s&&n>=s?(t(function(e,t,n){return`invalid params: start: ${e}, end: ${t}, versions: ${n||"undefined"}`}(n,s,r)),!1):!n||!r||(t("Defining start and versions is invalid"),!1)}function k(e,t,n=!1){return Array.isArray(e)?!!(n||e&&e.length)||(t("`query` param cannot be empty"),!1):(t("`query` param must be an array"),!1)}function A(e,t={}){return(n,s,r,o,i)=>{if(r&&1001===r.code)return void e(null,void 0,r,o,i);if(n)return e(`Error: ${n}\nOptions: ${JSON.stringify(t)}`,void 0,r,o,i),void e(null,void 0,{code:1001},o,i);const a=s;if(a&&a.datasets)return void e(null,a,r,o,i);const c=s;(c&&c.dataset||c)&&e(null,c,r,o,i)}}function T(e,t){const n={};if(Array.isArray(e)){const s=e.length;for(let r=0;r<s;r+=1){const{token:s,callback:o}=e[r];0===t.unbind(s,o)&&(n[s]=!0)}}else{const{token:s,callback:r}=e;0===t.unbind(s,r)&&(n[s]=!0)}return Object.keys(n).length?n:null}function N(e){const t=e.searchType;let n="ANY";return t&&m.has(t)&&(n=t),{search:e.search||"",searchType:n}}function _(e){return s.fromByteArray(t.encode(e,{extensionCodec:t.Codec}))}class D{constructor(e={batchResults:!0,debugMode:!1},t=WebSocket,n=d){this.connectorOptions=e,this.isRunning=!1,this.connectionEvents=new y,this.events=new y,this.WebSocket=t,this.Parser=n,this.instrumentation=new g(e.instrumentationConfig)}get websocket(){return this.ws}get connectionEmitter(){return this.connectionEvents}get eventsEmitter(){return this.events}callCommand(e,t,s){if(!this.isRunning)return s("Connection is down"),null;const r=n.v4(),o={command:e,encodedParams:_(t),token:r},i=this.makeCallbackWithUnbind(r,s);return this.events.bind(r,o,i),this.sendMessageOrError(r,e,t,o),r}cleanUpConnections(){this.events.close()}close(){this.closeWs(new CloseEvent("close")),this.connectionEvents.close(),this.ws.close()}closeWs(e){this.isRunning=!1,this.cleanUpConnections(),this.connectionEvents.emit("connection",D.DISCONNECTED,e)}closeCommand(e,t,s){const r=n.v4(),o={command:"close",encodedParams:_(e),token:r},i=this.makeCallbackWithUnbind(r,s);this.events.bind(r,o,i);try{this.sendMessage(r,"close",o,t)}catch(e){v(h,e),s(e,void 0,void 0,r)}return r}closeStreams(e,t){const n=T(e,this.events);return n?this.closeCommand(e,n,t):null}closeStream(e,t){const n=T(e,this.events);return n?this.closeCommand(e,n,t):null}connection(e){const t=(t,n,s)=>{e(n,s)};return this.connectionEvents.bind("connection",{command:"connection",encodedParams:"",token:""},t),()=>{this.connectionEvents.unbind("connection",t)}}get(e,t,n){return this.callCommand(e,t,n)}makeCallbackWithUnbind(e,t){const n=(s,r,o,i)=>{r&&(this.events.unbind(e,n),this.instrumentation.callInfo(s.command,s,{error:r})),this.instrumentation.callEnd(s.command,s),t(r,o,i,e,s)};return n}publish(e,t){return this.callCommand("publish",e,t)}requestService(e,t){return this.callCommand("serviceRequest",e,t)}run(e){this.runWithWs(new this.WebSocket(e))}runWithWs(e){this.ws=e,this.ws.onopen=e=>{this.isRunning||(this.isRunning=!0,this.connectionEvents.emit("connection",D.CONNECTED,e))},this.ws.onclose=e=>{this.closeWs(e)},this.ws.onmessage=e=>{var t;if("string"!=typeof e.data)return;let n;try{n=this.Parser.parse(e.data,this.connectorOptions.batchResults),this.connectorOptions.debugMode&&self.postMessage({response:n,source:f,timestamp:Date.now()},"*")}catch(e){return void v(h,e)}if(!n||!n.token)return void v(h,"No message body or message token");const{error:s,status:r,token:o}=n;s?this.events.emit(o,s,n.result,r):(!function(e,t,n,s){if(t&&Object.keys(t).length)return;const r=e;!r.dataset||r.dataset.name?!r.dataset||r.dataset.type?!r.dataset||r.notifications?s||!r.dataset||Array.isArray(r.notifications)||v(h,"Key 'notifications' is not an array",void 0,n):v(h,"No key 'notifications' found in response",void 0,n):v(h,"No key 'type' found in dataset",void 0,n):v(h,"No key 'name' found in dataset",void 0,n)}(n.result,r,o,this.connectorOptions.batchResults),this.events.emit(o,null,n.result,r),(null===(t=n.result)||void 0===t?void 0:t.metadata)&&"EOF"===n.result.metadata.GetRequest&&this.events.emit(o,null,null,{message:"GetRequest",code:1001}))}}search(e,t){return this.callCommand("alpha/search",e,t)}sendMessage(e,t,n,s){this.connectorOptions.debugMode&&self.postMessage({request:{token:e,command:t,params:s},source:f,timestamp:Date.now()},"*"),this.instrumentation.callStart(t,n),this.ws.send(this.Parser.stringify({token:e,command:t,params:s}))}sendMessageOrError(e,t,n,s){try{this.sendMessage(e,t,s,n)}catch(t){v(h,t),this.events.emit(e,t,void 0,void 0)}}stream(e,t,s){if(!this.isRunning)return s("Connection is down"),null;const r=n.v4(),o={command:e,encodedParams:_(t),token:r},i=(e,t,n,a)=>{t&&(this.events.unbind(r,i),this.instrumentation.callInfo(o.command,o,{error:t}),this.instrumentation.callEnd(e.command,e)),a&&3001===a.code&&this.instrumentation.callInfo(o.command,o,{message:"stream active"}),s(t,n,a,r,e)};return this.events.bind(r,o,i),this.sendMessageOrError(r,e,t,o),{token:r,callback:i}}}D.CONNECTED="connected",D.DISCONNECTED="disconnected";class P extends D{closeSubscriptions(e,t){return this.closeStreams(e,A(t))}getAndSubscribe(e,t,n){if(!k(e,t))return null;if(!O(n,t))return null;const s=S(n),r={query:e,start:s.start,end:s.end,versions:s.versions};return this.stream("getAndSubscribe",r,A(t))}getApps(e){const t={types:["app"]};return this.get("getDatasets",t,A(e))}getDatasets(e){const t={types:["app","device"]};return this.get("getDatasets",t,A(e))}getDevices(e){const t=["device"];return this.get("getDatasets",{types:t},A(e))}getWithOptions(e,t,n){if("DEVICES_DATASET_ID"===e)return this.getDatasets(t),null;if(!k(e,t))return null;if(!O(n,t))return null;const{start:s,end:r,versions:o}=S(n),i={query:e,start:s,end:r,versions:o};return this.get("get",i,A(t,n))}runService(e,t){this.requestService(e,A(t))}runStreamingService(e,t){return e?this.stream("serviceRequest",e,A(t)):(t("`request` param cannot be empty"),null)}searchWithOptions(e,t,n){if(!k(e,t,!0))return null;if(!O(n,t))return null;const{start:s,end:r}=S(n),o=N(n),i={query:e,start:s,end:r,search:o.search,searchType:o.searchType};return this.search(i,A(t,n))}searchSubscribe(e,t,n={search:""}){if(!k(e,t,!0))return null;const s=N(n);return this.stream("alpha/searchSubscribe",{query:e,search:s.search,searchType:s.searchType},A(t))}subscribe(e,t){return k(e,t)?this.stream("subscribe",{query:e},A(t)):null}writeSync(e,t){this.publish({sync:!0,batch:{dataset:e.dataset,notifications:e.notifications}},function(e){return(t,n,s)=>{s&&1001===s.code?e(!0):t&&e(!1,`Error: ${t}\n`)}}(t))}}P.CLOSE="close",P.DEVICES_DATASET_ID="DEVICES_DATASET_ID",P.GET_DATASETS="getDatasets",P.GET="get",P.GET_AND_SUBSCRIBE="getAndSubscribe",P.ID=f,P.PUBLISH="publish",P.SEARCH_SUBSCRIBE="alpha/searchSubscribe",P.SEARCH="alpha/search",P.SERVICE_REQUEST="serviceRequest",P.SUBSCRIBE="subscribe",Object.defineProperty(e,"Bool",{enumerable:!0,get:function(){return t.Bool}}),Object.defineProperty(e,"Float32",{enumerable:!0,get:function(){return t.Float32}}),Object.defineProperty(e,"Float64",{enumerable:!0,get:function(){return t.Float64}}),Object.defineProperty(e,"Int",{enumerable:!0,get:function(){return t.Int}}),Object.defineProperty(e,"NeatTypeSerializer",{enumerable:!0,get:function(){return t.NeatTypeSerializer}}),Object.defineProperty(e,"NeatTypes",{enumerable:!0,get:function(){return t.NeatTypes}}),Object.defineProperty(e,"Nil",{enumerable:!0,get:function(){return t.Nil}}),Object.defineProperty(e,"Pointer",{enumerable:!0,get:function(){return t.Pointer}}),Object.defineProperty(e,"Str",{enumerable:!0,get:function(){return t.Str}}),Object.defineProperty(e,"createBaseType",{enumerable:!0,get:function(){return t.createBaseType}}),Object.defineProperty(e,"isJsbi",{enumerable:!0,get:function(){return t.isJsbi}}),Object.defineProperty(e,"isNeatType",{enumerable:!0,get:function(){return t.isNeatType}}),e.ACTIVE_CODE=3001,e.APP_DATASET_TYPE="app",e.CONNECTED="connected",e.DEVICE_DATASET_TYPE="device",e.DISCONNECTED="disconnected",e.EOF="EOF",e.EOF_CODE=1001,e.GET_REQUEST_COMPLETED="GetRequest",e.Parser=d,e.default=P,e.fromBinaryKey=function(e,n=!0){return t.decode(s.toByteArray(e),{extensionCodec:t.Codec,useJSBI:n})},e.sanitizeOptions=S,e.toBinaryKey=_,Object.defineProperty(e,"__esModule",{value:!0})})); |
import { encode, Codec, decode } from 'a-msgpack'; | ||
export { Bool, Float32, Float64, Int, NeatTypeSerializer, NeatTypes, Nil, Pointer, Str, createBaseType, isJsbi, isNeatType } from 'a-msgpack'; | ||
import { v4 } from 'uuid'; | ||
import { fromByteArray, toByteArray } from 'base64-js'; | ||
import MurmurHash3 from 'imurmurhash'; | ||
@@ -386,6 +386,2 @@ // Copyright (c) 2018, Arista Networks, Inc. | ||
const ACTIVE_CODE = 3001; | ||
/** | ||
* Status code for Paused (when the incoming results for a request have been paused). | ||
*/ | ||
const PAUSED_CODE = 1002; | ||
const GET_REQUEST_COMPLETED = 'GetRequest'; | ||
@@ -397,5 +393,3 @@ const CLOSE = 'close'; | ||
const GET_AND_SUBSCRIBE = 'getAndSubscribe'; | ||
const PAUSE = 'pause'; | ||
const PUBLISH = 'publish'; | ||
const RESUME = 'resume'; | ||
const SUBSCRIBE = 'subscribe'; | ||
@@ -683,37 +677,2 @@ const SEARCH = 'alpha/search'; | ||
/** | ||
* Recursively hashes an object, given an object and a hashState. | ||
*/ | ||
function hashObjectHelper(object, hashState) { | ||
const objKeys = Object.keys(object); | ||
for (let i = 0; i < objKeys.length; i += 1) { | ||
const key = objKeys[i]; | ||
const value = object[key]; | ||
if (value && typeof value === 'object') { | ||
hashState.hash(key.toString()).hash(hashObjectHelper(value, hashState)); | ||
} | ||
else { | ||
hashState.hash(key + '' + value); | ||
} | ||
} | ||
return hashState.result().toString(); | ||
} | ||
/** | ||
* Creates a unique hash given an object. | ||
*/ | ||
function hashObject(object) { | ||
const hashState = MurmurHash3(); | ||
return hashObjectHelper(object, hashState); | ||
} | ||
/** | ||
* Generates token based on the [[WsCommand]] and params ([[CloudVisionParams]], | ||
* [[CloudVisionPublishRequest]], [[ServiceRequest]]) of a request. This is | ||
* used to map requests to responses when dispatching response callbacks. | ||
*/ | ||
function makeToken(command, params) { | ||
return hashObject({ | ||
command, | ||
params, | ||
}); | ||
} | ||
/** | ||
* Creates a notification callback that properly formats the result for the | ||
@@ -873,3 +832,2 @@ * passed callback. | ||
debugMode: false, | ||
pauseStreams: false, | ||
}, websocketClass = WebSocket, parser = Parser) { | ||
@@ -880,6 +838,2 @@ this.connectorOptions = options; | ||
this.events = new Emitter(); | ||
this.closingStreams = new Map(); | ||
this.waitingStreams = new Map(); | ||
this.activeStreams = new Set(); | ||
this.activeRequests = new Set(); | ||
this.WebSocket = websocketClass; | ||
@@ -898,13 +852,2 @@ this.Parser = parser; | ||
} | ||
get streams() { | ||
return this.activeStreams; | ||
} | ||
get streamInClosingState() { | ||
return this.closingStreams; | ||
} | ||
addWaitingStream(waitingOnToken, token, command, params, requestContext) { | ||
const streamsWaiting = this.waitingStreams.get(waitingOnToken) || []; | ||
streamsWaiting.push([token, command, requestContext, params]); | ||
this.waitingStreams.set(waitingOnToken, streamsWaiting); | ||
} | ||
/** | ||
@@ -925,23 +868,14 @@ * PRIVATE METHOD | ||
} | ||
const token = makeToken(command, params); | ||
const token = v4(); | ||
const requestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
if (!this.activeRequests.has(token)) { | ||
// only execute request if not already getting data | ||
this.activeRequests.add(token); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
} | ||
else { | ||
const queuedCallbackWithUnbind = this.makeQueuedCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, queuedCallbackWithUnbind); | ||
} | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
return token; | ||
} | ||
/** | ||
* Cleans up all event emitters and active streams. | ||
* Cleans up all event emitters. | ||
* All bound callbacks will be unbound. | ||
*/ | ||
cleanUpConnections() { | ||
this.activeStreams.clear(); | ||
this.events.close(); | ||
@@ -964,3 +898,3 @@ } | ||
closeCommand(streams, closeParams, callback) { | ||
const closeToken = makeToken(CLOSE, closeParams); | ||
const closeToken = v4(); | ||
const requestContext = { | ||
@@ -971,3 +905,4 @@ command: CLOSE, | ||
}; | ||
this.setStreamClosingState(streams, closeToken, requestContext, callback); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(closeToken, callback); | ||
this.events.bind(closeToken, requestContext, callbackWithUnbind); | ||
try { | ||
@@ -978,3 +913,2 @@ this.sendMessage(closeToken, CLOSE, requestContext, closeParams); | ||
log(ERROR, err); | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, undefined, undefined, closeToken); | ||
@@ -1018,13 +952,2 @@ } | ||
/** | ||
* Enables any session wide options by negotiating with the server | ||
*/ | ||
enableOptions(callback) { | ||
if (this.connectorOptions.pauseStreams) { | ||
this.pause({ pauseStreams: true }, callback); | ||
} | ||
else { | ||
callback(null); | ||
} | ||
} | ||
/** | ||
* Sends the command along with the params to the API, if the WebSocket is | ||
@@ -1040,3 +963,2 @@ * connected. The response is received via the provided callback function. | ||
// Unbind callback when any error message is received | ||
this.activeRequests.delete(token); | ||
this.events.unbind(token, callbackWithUnbind); | ||
@@ -1052,12 +974,2 @@ this.instrumentation.callInfo(requestContext.command, requestContext, { | ||
} | ||
makeQueuedCallbackWithUnbind(token, callback) { | ||
const callbackWithUnbind = (requestContext, err, result, status) => { | ||
if (err) { | ||
// Unbind callback and invoke callback only when any error message is received | ||
this.events.unbind(token, callbackWithUnbind); | ||
callback(err, result, status, token, requestContext); | ||
} | ||
}; | ||
return callbackWithUnbind; | ||
} | ||
/** | ||
@@ -1071,30 +983,2 @@ * Writes data in params to the CloudVision API. It receives one message via | ||
/** | ||
* Cleans up the steam closing state set in `setStreamClosingState`, as well | ||
* as re-subscribing to any streams (with the same token as the stream that | ||
* was just closed) opened up during the closing of the current stream. | ||
*/ | ||
removeStreamClosingState(streams, closeToken) { | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
this.closingStreams.delete(token); | ||
const streamsArgs = this.waitingStreams.get(closeToken); | ||
if (streamsArgs) { | ||
const streamsArgsLen = streamsArgs.length; | ||
for (let j = 0; j < streamsArgsLen; j += 1) { | ||
this.reSubscribeStream(streamsArgs[j], closeToken); | ||
} | ||
} | ||
} | ||
} | ||
else { | ||
this.closingStreams.delete(streams.token); | ||
const streamArgs = this.waitingStreams.get(closeToken); | ||
if (streamArgs) { | ||
this.reSubscribeStream(streamArgs[0], closeToken); | ||
} | ||
} | ||
} | ||
/** | ||
* Send a service request command to the CloudVision API | ||
@@ -1106,18 +990,2 @@ */ | ||
/** | ||
* Re initiates a subscribe if there has been a `subscribe` call for the | ||
* stream while the stream was closing. | ||
*/ | ||
reSubscribeStream(streamArgs, closeToken) { | ||
this.events.unbindAll(closeToken); | ||
this.waitingStreams.delete(closeToken); | ||
this.sendMessage(...streamArgs); | ||
} | ||
/** | ||
* PRIVATE METHOD | ||
* Requests the server to resume one of the currently paused streams. | ||
*/ | ||
resume(params, callback) { | ||
return this.callCommand(RESUME, params, callback); | ||
} | ||
/** | ||
* Run connects connector to the specified url | ||
@@ -1145,11 +1013,4 @@ */ | ||
if (!this.isRunning) { | ||
// Enable any options if they have been configured | ||
this.isRunning = true; | ||
this.enableOptions((err, _res, status, token) => { | ||
// We're good to go | ||
if (status && status.code !== EOF_CODE) { | ||
log(ERROR, err, status, token); | ||
} | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
}); | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
} | ||
@@ -1192,12 +1053,2 @@ }; | ||
validateResponse(msg.result, status, token, this.connectorOptions.batchResults); | ||
// Automatically resume streams that have been paused by the server | ||
if (status && status.code === PAUSED_CODE) { | ||
const cb = (err, _result, resumeStatus) => { | ||
if (err) { | ||
log(ERROR, err, resumeStatus); | ||
} | ||
}; | ||
this.resume({ token }, cb); | ||
return; | ||
} | ||
this.events.emit(token, null, msg.result, status); | ||
@@ -1248,26 +1099,2 @@ if (((_a = msg.result) === null || _a === void 0 ? void 0 : _a.metadata) && | ||
/** | ||
* Sets all the parameters for one or more streams when they are closed. | ||
* When the `close` request has been successfully processed, | ||
* `removeStreamClosingState` is called to clean up any closing state. | ||
*/ | ||
setStreamClosingState(streams, closeToken, requestContext, callback) { | ||
const closeCallback = (closeRequestContext, err, result, status) => { | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, result, status, closeToken, closeRequestContext); | ||
}; | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
const oldToken = this.closingStreams.get(token); | ||
this.waitingStreams.delete(oldToken || ''); | ||
this.closingStreams.set(token, closeToken); | ||
} | ||
} | ||
else { | ||
this.closingStreams.set(streams.token, closeToken); | ||
} | ||
this.events.bind(closeToken, requestContext, closeCallback); | ||
} | ||
/** | ||
* Sends the command along with the params to the API, which creates a | ||
@@ -1285,3 +1112,3 @@ * subscriptions on the server. It receives a stream of messages via the | ||
} | ||
const token = makeToken(command, params); | ||
const token = v4(); | ||
const callerRequestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
@@ -1299,3 +1126,2 @@ const callbackWithUnbind = (requestContext, err, result, status) => { | ||
if (status && status.code === ACTIVE_CODE) { | ||
this.activeStreams.add(token); | ||
this.instrumentation.callInfo(callerRequestContext.command, callerRequestContext, { | ||
@@ -1307,34 +1133,6 @@ message: 'stream active', | ||
}; | ||
const numCallbacks = this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
if (numCallbacks === 1) { | ||
// Only execute the request on the first callback. | ||
// If there are open request to the same data, just attach the callback | ||
const closingStreamToken = this.closingStreams.get(token); | ||
if (closingStreamToken) { | ||
// The stream is still closing, so wait for it to close before re requesting | ||
this.addWaitingStream(closingStreamToken, token, command, params, callerRequestContext); | ||
} | ||
else { | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
} | ||
} | ||
else if (this.activeStreams.has(token)) { | ||
// The stream is already open immediately call get | ||
callback(null, undefined, { code: ACTIVE_CODE }, token, callerRequestContext); | ||
} | ||
this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
return { token, callback: callbackWithUnbind }; | ||
} | ||
/** | ||
* Configures pausing of results on the server. When the pause feature is enabled, | ||
* the server pauses the results returned to the client every x number of bytes. | ||
* | ||
* When pausing is enabled, only the new streams created after that will be paused | ||
* (i.e. existing streams are not affected). To continue receiving results, a | ||
* wrpc client has to send a 'resume' command with the desired stream token to | ||
* unpause. Currently, the cloudvision-connector automatically resumes any streams | ||
* paused by the server. | ||
*/ | ||
pause(params, callback) { | ||
return this.callCommand(PAUSE, params, callback); | ||
} | ||
} | ||
@@ -1456,8 +1254,2 @@ Wrpc.CONNECTED = CONNECTED; | ||
/** | ||
* Generates the unique token that the request can be referenced by. | ||
*/ | ||
getCommandToken(command, params) { | ||
return makeToken(command, params); | ||
} | ||
/** | ||
* Returns all notifications that match the given query and options. | ||
@@ -1598,3 +1390,2 @@ * | ||
Connector.ID = ID; | ||
Connector.PAUSE = PAUSE; | ||
Connector.PUBLISH = PUBLISH; | ||
@@ -1607,2 +1398,2 @@ Connector.SEARCH_SUBSCRIBE = SEARCH_SUBSCRIBE; | ||
export default Connector; | ||
export { ACTIVE_CODE, APP_DATASET_TYPE, CONNECTED, DEVICE_DATASET_TYPE, DISCONNECTED, EOF, EOF_CODE, GET_REQUEST_COMPLETED, Parser, fromBinaryKey, hashObject, sanitizeOptions, toBinaryKey }; | ||
export { ACTIVE_CODE, APP_DATASET_TYPE, CONNECTED, DEVICE_DATASET_TYPE, DISCONNECTED, EOF, EOF_CODE, GET_REQUEST_COMPLETED, Parser, fromBinaryKey, sanitizeOptions, toBinaryKey }; |
@@ -5,7 +5,5 @@ 'use strict'; | ||
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; } | ||
var aMsgpack = require('a-msgpack'); | ||
var uuid = require('uuid'); | ||
var base64Js = require('base64-js'); | ||
var MurmurHash3 = _interopDefault(require('imurmurhash')); | ||
@@ -392,6 +390,2 @@ // Copyright (c) 2018, Arista Networks, Inc. | ||
const ACTIVE_CODE = 3001; | ||
/** | ||
* Status code for Paused (when the incoming results for a request have been paused). | ||
*/ | ||
const PAUSED_CODE = 1002; | ||
const GET_REQUEST_COMPLETED = 'GetRequest'; | ||
@@ -403,5 +397,3 @@ const CLOSE = 'close'; | ||
const GET_AND_SUBSCRIBE = 'getAndSubscribe'; | ||
const PAUSE = 'pause'; | ||
const PUBLISH = 'publish'; | ||
const RESUME = 'resume'; | ||
const SUBSCRIBE = 'subscribe'; | ||
@@ -689,37 +681,2 @@ const SEARCH = 'alpha/search'; | ||
/** | ||
* Recursively hashes an object, given an object and a hashState. | ||
*/ | ||
function hashObjectHelper(object, hashState) { | ||
const objKeys = Object.keys(object); | ||
for (let i = 0; i < objKeys.length; i += 1) { | ||
const key = objKeys[i]; | ||
const value = object[key]; | ||
if (value && typeof value === 'object') { | ||
hashState.hash(key.toString()).hash(hashObjectHelper(value, hashState)); | ||
} | ||
else { | ||
hashState.hash(key + '' + value); | ||
} | ||
} | ||
return hashState.result().toString(); | ||
} | ||
/** | ||
* Creates a unique hash given an object. | ||
*/ | ||
function hashObject(object) { | ||
const hashState = MurmurHash3(); | ||
return hashObjectHelper(object, hashState); | ||
} | ||
/** | ||
* Generates token based on the [[WsCommand]] and params ([[CloudVisionParams]], | ||
* [[CloudVisionPublishRequest]], [[ServiceRequest]]) of a request. This is | ||
* used to map requests to responses when dispatching response callbacks. | ||
*/ | ||
function makeToken(command, params) { | ||
return hashObject({ | ||
command, | ||
params, | ||
}); | ||
} | ||
/** | ||
* Creates a notification callback that properly formats the result for the | ||
@@ -879,3 +836,2 @@ * passed callback. | ||
debugMode: false, | ||
pauseStreams: false, | ||
}, websocketClass = WebSocket, parser = Parser) { | ||
@@ -886,6 +842,2 @@ this.connectorOptions = options; | ||
this.events = new Emitter(); | ||
this.closingStreams = new Map(); | ||
this.waitingStreams = new Map(); | ||
this.activeStreams = new Set(); | ||
this.activeRequests = new Set(); | ||
this.WebSocket = websocketClass; | ||
@@ -904,13 +856,2 @@ this.Parser = parser; | ||
} | ||
get streams() { | ||
return this.activeStreams; | ||
} | ||
get streamInClosingState() { | ||
return this.closingStreams; | ||
} | ||
addWaitingStream(waitingOnToken, token, command, params, requestContext) { | ||
const streamsWaiting = this.waitingStreams.get(waitingOnToken) || []; | ||
streamsWaiting.push([token, command, requestContext, params]); | ||
this.waitingStreams.set(waitingOnToken, streamsWaiting); | ||
} | ||
/** | ||
@@ -931,23 +872,14 @@ * PRIVATE METHOD | ||
} | ||
const token = makeToken(command, params); | ||
const token = uuid.v4(); | ||
const requestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
if (!this.activeRequests.has(token)) { | ||
// only execute request if not already getting data | ||
this.activeRequests.add(token); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
} | ||
else { | ||
const queuedCallbackWithUnbind = this.makeQueuedCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, queuedCallbackWithUnbind); | ||
} | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
return token; | ||
} | ||
/** | ||
* Cleans up all event emitters and active streams. | ||
* Cleans up all event emitters. | ||
* All bound callbacks will be unbound. | ||
*/ | ||
cleanUpConnections() { | ||
this.activeStreams.clear(); | ||
this.events.close(); | ||
@@ -970,3 +902,3 @@ } | ||
closeCommand(streams, closeParams, callback) { | ||
const closeToken = makeToken(CLOSE, closeParams); | ||
const closeToken = uuid.v4(); | ||
const requestContext = { | ||
@@ -977,3 +909,4 @@ command: CLOSE, | ||
}; | ||
this.setStreamClosingState(streams, closeToken, requestContext, callback); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(closeToken, callback); | ||
this.events.bind(closeToken, requestContext, callbackWithUnbind); | ||
try { | ||
@@ -984,3 +917,2 @@ this.sendMessage(closeToken, CLOSE, requestContext, closeParams); | ||
log(ERROR, err); | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, undefined, undefined, closeToken); | ||
@@ -1024,13 +956,2 @@ } | ||
/** | ||
* Enables any session wide options by negotiating with the server | ||
*/ | ||
enableOptions(callback) { | ||
if (this.connectorOptions.pauseStreams) { | ||
this.pause({ pauseStreams: true }, callback); | ||
} | ||
else { | ||
callback(null); | ||
} | ||
} | ||
/** | ||
* Sends the command along with the params to the API, if the WebSocket is | ||
@@ -1046,3 +967,2 @@ * connected. The response is received via the provided callback function. | ||
// Unbind callback when any error message is received | ||
this.activeRequests.delete(token); | ||
this.events.unbind(token, callbackWithUnbind); | ||
@@ -1058,12 +978,2 @@ this.instrumentation.callInfo(requestContext.command, requestContext, { | ||
} | ||
makeQueuedCallbackWithUnbind(token, callback) { | ||
const callbackWithUnbind = (requestContext, err, result, status) => { | ||
if (err) { | ||
// Unbind callback and invoke callback only when any error message is received | ||
this.events.unbind(token, callbackWithUnbind); | ||
callback(err, result, status, token, requestContext); | ||
} | ||
}; | ||
return callbackWithUnbind; | ||
} | ||
/** | ||
@@ -1077,30 +987,2 @@ * Writes data in params to the CloudVision API. It receives one message via | ||
/** | ||
* Cleans up the steam closing state set in `setStreamClosingState`, as well | ||
* as re-subscribing to any streams (with the same token as the stream that | ||
* was just closed) opened up during the closing of the current stream. | ||
*/ | ||
removeStreamClosingState(streams, closeToken) { | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
this.closingStreams.delete(token); | ||
const streamsArgs = this.waitingStreams.get(closeToken); | ||
if (streamsArgs) { | ||
const streamsArgsLen = streamsArgs.length; | ||
for (let j = 0; j < streamsArgsLen; j += 1) { | ||
this.reSubscribeStream(streamsArgs[j], closeToken); | ||
} | ||
} | ||
} | ||
} | ||
else { | ||
this.closingStreams.delete(streams.token); | ||
const streamArgs = this.waitingStreams.get(closeToken); | ||
if (streamArgs) { | ||
this.reSubscribeStream(streamArgs[0], closeToken); | ||
} | ||
} | ||
} | ||
/** | ||
* Send a service request command to the CloudVision API | ||
@@ -1112,18 +994,2 @@ */ | ||
/** | ||
* Re initiates a subscribe if there has been a `subscribe` call for the | ||
* stream while the stream was closing. | ||
*/ | ||
reSubscribeStream(streamArgs, closeToken) { | ||
this.events.unbindAll(closeToken); | ||
this.waitingStreams.delete(closeToken); | ||
this.sendMessage(...streamArgs); | ||
} | ||
/** | ||
* PRIVATE METHOD | ||
* Requests the server to resume one of the currently paused streams. | ||
*/ | ||
resume(params, callback) { | ||
return this.callCommand(RESUME, params, callback); | ||
} | ||
/** | ||
* Run connects connector to the specified url | ||
@@ -1151,11 +1017,4 @@ */ | ||
if (!this.isRunning) { | ||
// Enable any options if they have been configured | ||
this.isRunning = true; | ||
this.enableOptions((err, _res, status, token) => { | ||
// We're good to go | ||
if (status && status.code !== EOF_CODE) { | ||
log(ERROR, err, status, token); | ||
} | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
}); | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
} | ||
@@ -1198,12 +1057,2 @@ }; | ||
validateResponse(msg.result, status, token, this.connectorOptions.batchResults); | ||
// Automatically resume streams that have been paused by the server | ||
if (status && status.code === PAUSED_CODE) { | ||
const cb = (err, _result, resumeStatus) => { | ||
if (err) { | ||
log(ERROR, err, resumeStatus); | ||
} | ||
}; | ||
this.resume({ token }, cb); | ||
return; | ||
} | ||
this.events.emit(token, null, msg.result, status); | ||
@@ -1254,26 +1103,2 @@ if (((_a = msg.result) === null || _a === void 0 ? void 0 : _a.metadata) && | ||
/** | ||
* Sets all the parameters for one or more streams when they are closed. | ||
* When the `close` request has been successfully processed, | ||
* `removeStreamClosingState` is called to clean up any closing state. | ||
*/ | ||
setStreamClosingState(streams, closeToken, requestContext, callback) { | ||
const closeCallback = (closeRequestContext, err, result, status) => { | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, result, status, closeToken, closeRequestContext); | ||
}; | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
const oldToken = this.closingStreams.get(token); | ||
this.waitingStreams.delete(oldToken || ''); | ||
this.closingStreams.set(token, closeToken); | ||
} | ||
} | ||
else { | ||
this.closingStreams.set(streams.token, closeToken); | ||
} | ||
this.events.bind(closeToken, requestContext, closeCallback); | ||
} | ||
/** | ||
* Sends the command along with the params to the API, which creates a | ||
@@ -1291,3 +1116,3 @@ * subscriptions on the server. It receives a stream of messages via the | ||
} | ||
const token = makeToken(command, params); | ||
const token = uuid.v4(); | ||
const callerRequestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
@@ -1305,3 +1130,2 @@ const callbackWithUnbind = (requestContext, err, result, status) => { | ||
if (status && status.code === ACTIVE_CODE) { | ||
this.activeStreams.add(token); | ||
this.instrumentation.callInfo(callerRequestContext.command, callerRequestContext, { | ||
@@ -1313,34 +1137,6 @@ message: 'stream active', | ||
}; | ||
const numCallbacks = this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
if (numCallbacks === 1) { | ||
// Only execute the request on the first callback. | ||
// If there are open request to the same data, just attach the callback | ||
const closingStreamToken = this.closingStreams.get(token); | ||
if (closingStreamToken) { | ||
// The stream is still closing, so wait for it to close before re requesting | ||
this.addWaitingStream(closingStreamToken, token, command, params, callerRequestContext); | ||
} | ||
else { | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
} | ||
} | ||
else if (this.activeStreams.has(token)) { | ||
// The stream is already open immediately call get | ||
callback(null, undefined, { code: ACTIVE_CODE }, token, callerRequestContext); | ||
} | ||
this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
return { token, callback: callbackWithUnbind }; | ||
} | ||
/** | ||
* Configures pausing of results on the server. When the pause feature is enabled, | ||
* the server pauses the results returned to the client every x number of bytes. | ||
* | ||
* When pausing is enabled, only the new streams created after that will be paused | ||
* (i.e. existing streams are not affected). To continue receiving results, a | ||
* wrpc client has to send a 'resume' command with the desired stream token to | ||
* unpause. Currently, the cloudvision-connector automatically resumes any streams | ||
* paused by the server. | ||
*/ | ||
pause(params, callback) { | ||
return this.callCommand(PAUSE, params, callback); | ||
} | ||
} | ||
@@ -1462,8 +1258,2 @@ Wrpc.CONNECTED = CONNECTED; | ||
/** | ||
* Generates the unique token that the request can be referenced by. | ||
*/ | ||
getCommandToken(command, params) { | ||
return makeToken(command, params); | ||
} | ||
/** | ||
* Returns all notifications that match the given query and options. | ||
@@ -1604,3 +1394,2 @@ * | ||
Connector.ID = ID; | ||
Connector.PAUSE = PAUSE; | ||
Connector.PUBLISH = PUBLISH; | ||
@@ -1695,4 +1484,3 @@ Connector.SEARCH_SUBSCRIBE = SEARCH_SUBSCRIBE; | ||
exports.fromBinaryKey = fromBinaryKey; | ||
exports.hashObject = hashObject; | ||
exports.sanitizeOptions = sanitizeOptions; | ||
exports.toBinaryKey = toBinaryKey; |
{ | ||
"name": "cloudvision-connector", | ||
"version": "4.5.3", | ||
"version": "4.5.4", | ||
"description": "A module to communicate with the CloudVision API server", | ||
@@ -53,6 +53,6 @@ "author": "extensions@arista.com", | ||
"dependencies": { | ||
"a-msgpack": "4.5.3", | ||
"a-msgpack": "4.5.4", | ||
"base64-js": "1.3.1", | ||
"imurmurhash": "0.1.4", | ||
"jsbi": "3.1.2" | ||
"jsbi": "3.1.2", | ||
"uuid": "8.3.0" | ||
}, | ||
@@ -69,2 +69,3 @@ "devDependencies": { | ||
"@types/js-yaml": "3.12.4", | ||
"@types/uuid": "8.0.1", | ||
"cross-env": "7.0.2", | ||
@@ -77,3 +78,3 @@ "eslint": "6.8.0", | ||
"eslint-plugin-import": "2.22.0", | ||
"jest": "26.0.1", | ||
"jest": "26.2.2", | ||
"js-yaml": "3.14.0", | ||
@@ -83,7 +84,7 @@ "mock-socket": "9.0.3", | ||
"rimraf": "3.0.2", | ||
"rollup": "2.23.0", | ||
"rollup": "2.23.1", | ||
"rollup-plugin-terser": "5.3.0", | ||
"ts-jest": "26.1.4", | ||
"tslib": "2.0.0", | ||
"typedoc": "0.17.8", | ||
"tslib": "2.0.1", | ||
"typedoc": "0.18.0", | ||
"typedoc-neo-theme": "1.0.9", | ||
@@ -93,3 +94,3 @@ "typedoc-plugin-markdown": "2.2.17", | ||
}, | ||
"gitHead": "81855b9a1e7289dd3e6760cc576bd70af5199ef2" | ||
"gitHead": "b83bd7e4e375363ba8e3cff8f5255b08139f79aa" | ||
} |
import { | ||
AppParams, | ||
CloudVisionParams, | ||
DatasetType, | ||
@@ -15,3 +14,2 @@ NotifCallback, | ||
SubscriptionIdentifier, | ||
WsCommand, | ||
} from '../types'; | ||
@@ -29,3 +27,2 @@ | ||
ID, | ||
PAUSE, | ||
PUBLISH, | ||
@@ -40,3 +37,2 @@ SEARCH, | ||
makePublishCallback, | ||
makeToken, | ||
sanitizeOptions, | ||
@@ -67,4 +63,2 @@ sanitizeSearchOptions, | ||
public static PAUSE: typeof PAUSE = PAUSE; | ||
public static PUBLISH: typeof PUBLISH = PUBLISH; | ||
@@ -201,9 +195,2 @@ | ||
/** | ||
* Generates the unique token that the request can be referenced by. | ||
*/ | ||
public getCommandToken(command: WsCommand, params: CloudVisionParams): string { | ||
return makeToken(command, params); | ||
} | ||
/** | ||
* Returns all notifications that match the given query and options. | ||
@@ -210,0 +197,0 @@ * |
@@ -37,7 +37,2 @@ // Copyright (c) 2018, Arista Networks, Inc. | ||
/** | ||
* Status code for Paused (when the incoming results for a request have been paused). | ||
*/ | ||
export const PAUSED_CODE = 1002; | ||
/** | ||
* Error status sent when a request has finished streaming data. | ||
@@ -54,5 +49,3 @@ */ | ||
export const GET_AND_SUBSCRIBE = 'getAndSubscribe'; | ||
export const PAUSE = 'pause'; | ||
export const PUBLISH = 'publish'; | ||
export const RESUME = 'resume'; | ||
export const SUBSCRIBE = 'subscribe'; | ||
@@ -59,0 +52,0 @@ export const SEARCH = 'alpha/search'; |
@@ -45,2 +45,2 @@ // Copyright (c) 2018, Arista Networks, Inc. | ||
} from './constants'; | ||
export { fromBinaryKey, hashObject, toBinaryKey, sanitizeOptions } from './utils'; | ||
export { fromBinaryKey, toBinaryKey, sanitizeOptions } from './utils'; |
@@ -20,3 +20,2 @@ // Copyright (c) 2018, Arista Networks, Inc. | ||
import { fromByteArray, toByteArray } from 'base64-js'; | ||
import MurmurHash3 from 'imurmurhash'; | ||
@@ -29,4 +28,2 @@ import { | ||
CloudVisionNotifs, | ||
CloudVisionParams, | ||
CloudVisionPublishRequest, | ||
CloudVisionResult, | ||
@@ -42,5 +39,3 @@ CloudVisionServiceResult, | ||
SearchType, | ||
ServiceRequest, | ||
SubscriptionIdentifier, | ||
WsCommand, | ||
} from '../types'; | ||
@@ -64,11 +59,2 @@ | ||
/** | ||
* Definition for an instance of MurmurHash3, rather than the module. | ||
*/ | ||
interface MurmurHash { | ||
hash(value: string): MurmurHash; | ||
reset(seed?: number): MurmurHash; | ||
result(): number; | ||
} | ||
/** | ||
* Checks if an argument is valid (a number larger than 0). | ||
@@ -165,43 +151,2 @@ */ | ||
/** | ||
* Recursively hashes an object, given an object and a hashState. | ||
*/ | ||
function hashObjectHelper<T extends {}>(object: T, hashState: MurmurHash): string { | ||
const objKeys = Object.keys(object) as (keyof T)[]; | ||
for (let i = 0; i < objKeys.length; i += 1) { | ||
const key = objKeys[i]; | ||
const value = object[key]; | ||
if (value && typeof value === 'object') { | ||
hashState.hash(key.toString()).hash(hashObjectHelper(value, hashState)); | ||
} else { | ||
hashState.hash(key + '' + value); | ||
} | ||
} | ||
return hashState.result().toString(); | ||
} | ||
/** | ||
* Creates a unique hash given an object. | ||
*/ | ||
export function hashObject(object: {}): string { | ||
const hashState = MurmurHash3(); | ||
return hashObjectHelper(object, hashState); | ||
} | ||
/** | ||
* Generates token based on the [[WsCommand]] and params ([[CloudVisionParams]], | ||
* [[CloudVisionPublishRequest]], [[ServiceRequest]]) of a request. This is | ||
* used to map requests to responses when dispatching response callbacks. | ||
*/ | ||
export function makeToken( | ||
command: WsCommand, | ||
params: CloudVisionParams | CloudVisionPublishRequest | ServiceRequest, | ||
): string { | ||
return hashObject({ | ||
command, | ||
params, | ||
}); | ||
} | ||
/** | ||
* Creates a notification callback that properly formats the result for the | ||
@@ -208,0 +153,0 @@ * passed callback. |
247
src/Wrpc.ts
@@ -18,2 +18,4 @@ // Copyright (c) 2018, Arista Networks, Inc. | ||
import { v4 as uuidv4 } from 'uuid'; | ||
import { | ||
@@ -27,3 +29,2 @@ CloseParams, | ||
CloudVisionResult, | ||
CloudVisionServiceResult, | ||
CloudVisionStatus, | ||
@@ -34,5 +35,3 @@ ConnectionCallback, | ||
NotifCallback, | ||
PauseParams, | ||
RequestContext, | ||
ResumeParams, | ||
ServiceRequest, | ||
@@ -55,6 +54,3 @@ StreamCommand, | ||
ID, | ||
PAUSE, | ||
PAUSED_CODE, | ||
PUBLISH, | ||
RESUME, | ||
SEARCH, | ||
@@ -66,12 +62,6 @@ SERVICE_REQUEST, | ||
import { log } from './logger'; | ||
import { createCloseParams, makeToken, toBinaryKey, validateResponse } from './utils'; | ||
import { createCloseParams, toBinaryKey, validateResponse } from './utils'; | ||
type StreamArgs = [string, WsCommand, RequestContext, CloudVisionParams | ServiceRequest]; | ||
/** | ||
* Options passed to the constructor, that configure some global options | ||
* | ||
* `pauseStreams`: Negotiates the enabling of pause/resume with the server. | ||
* Pause/resume is a way to add backpressure to the server, so that it stops | ||
* sending notifications when the client cannot keep up. | ||
*/ | ||
@@ -82,3 +72,2 @@ interface ConnectorOptions { | ||
instrumentationConfig?: InstrumentationConfig; | ||
pauseStreams: boolean; | ||
} | ||
@@ -95,4 +84,2 @@ | ||
private closingStreams: Map<string, string>; | ||
private connectionEvents: Emitter; | ||
@@ -110,8 +97,2 @@ | ||
private waitingStreams: Map<string, StreamArgs[]>; | ||
private activeStreams: Set<string>; | ||
private activeRequests: Set<string>; | ||
private ws!: WebSocket; | ||
@@ -125,3 +106,2 @@ | ||
debugMode: false, | ||
pauseStreams: false, | ||
}, | ||
@@ -135,6 +115,2 @@ websocketClass = WebSocket, | ||
this.events = new Emitter(); | ||
this.closingStreams = new Map(); | ||
this.waitingStreams = new Map(); | ||
this.activeStreams = new Set(); | ||
this.activeRequests = new Set(); | ||
this.WebSocket = websocketClass; | ||
@@ -157,22 +133,2 @@ this.Parser = parser; | ||
public get streams(): Set<string> { | ||
return this.activeStreams; | ||
} | ||
public get streamInClosingState(): Map<string, string> { | ||
return this.closingStreams; | ||
} | ||
private addWaitingStream( | ||
waitingOnToken: string, | ||
token: string, | ||
command: StreamCommand, | ||
params: CloudVisionParams | ServiceRequest, | ||
requestContext: RequestContext, | ||
): void { | ||
const streamsWaiting = this.waitingStreams.get(waitingOnToken) || []; | ||
streamsWaiting.push([token, command, requestContext, params]); | ||
this.waitingStreams.set(waitingOnToken, streamsWaiting); | ||
} | ||
/** | ||
@@ -197,14 +153,7 @@ * PRIVATE METHOD | ||
} | ||
const token: string = makeToken(command, params); | ||
const token: string = uuidv4(); | ||
const requestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
if (!this.activeRequests.has(token)) { | ||
// only execute request if not already getting data | ||
this.activeRequests.add(token); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
} else { | ||
const queuedCallbackWithUnbind = this.makeQueuedCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, queuedCallbackWithUnbind); | ||
} | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(token, callback); | ||
this.events.bind(token, requestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, requestContext); | ||
@@ -215,7 +164,6 @@ return token; | ||
/** | ||
* Cleans up all event emitters and active streams. | ||
* Cleans up all event emitters. | ||
* All bound callbacks will be unbound. | ||
*/ | ||
public cleanUpConnections(): void { | ||
this.activeStreams.clear(); | ||
this.events.close(); | ||
@@ -245,3 +193,3 @@ } | ||
): string { | ||
const closeToken: string = makeToken(CLOSE, closeParams); | ||
const closeToken: string = uuidv4(); | ||
const requestContext: RequestContext = { | ||
@@ -253,3 +201,5 @@ command: CLOSE, | ||
this.setStreamClosingState(streams, closeToken, requestContext, callback); | ||
const callbackWithUnbind = this.makeCallbackWithUnbind(closeToken, callback); | ||
this.events.bind(closeToken, requestContext, callbackWithUnbind); | ||
try { | ||
@@ -259,3 +209,2 @@ this.sendMessage(closeToken, CLOSE, requestContext, closeParams); | ||
log(ERROR, err); | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, undefined, undefined, closeToken); | ||
@@ -312,13 +261,2 @@ } | ||
/** | ||
* Enables any session wide options by negotiating with the server | ||
*/ | ||
private enableOptions(callback: NotifCallback): void { | ||
if (this.connectorOptions.pauseStreams) { | ||
this.pause({ pauseStreams: true }, callback); | ||
} else { | ||
callback(null); | ||
} | ||
} | ||
/** | ||
* Sends the command along with the params to the API, if the WebSocket is | ||
@@ -344,3 +282,2 @@ * connected. The response is received via the provided callback function. | ||
// Unbind callback when any error message is received | ||
this.activeRequests.delete(token); | ||
this.events.unbind(token, callbackWithUnbind); | ||
@@ -358,19 +295,2 @@ this.instrumentation.callInfo(requestContext.command, requestContext, { | ||
private makeQueuedCallbackWithUnbind(token: string, callback: NotifCallback): EventCallback { | ||
const callbackWithUnbind = ( | ||
requestContext: RequestContext, | ||
err: string | null, | ||
result?: CloudVisionBatchedResult | CloudVisionResult, | ||
status?: CloudVisionStatus, | ||
): void => { | ||
if (err) { | ||
// Unbind callback and invoke callback only when any error message is received | ||
this.events.unbind(token, callbackWithUnbind); | ||
callback(err, result, status, token, requestContext); | ||
} | ||
}; | ||
return callbackWithUnbind; | ||
} | ||
/** | ||
@@ -385,33 +305,2 @@ * Writes data in params to the CloudVision API. It receives one message via | ||
/** | ||
* Cleans up the steam closing state set in `setStreamClosingState`, as well | ||
* as re-subscribing to any streams (with the same token as the stream that | ||
* was just closed) opened up during the closing of the current stream. | ||
*/ | ||
private removeStreamClosingState( | ||
streams: SubscriptionIdentifier[] | SubscriptionIdentifier, | ||
closeToken: string, | ||
): void { | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
this.closingStreams.delete(token); | ||
const streamsArgs = this.waitingStreams.get(closeToken); | ||
if (streamsArgs) { | ||
const streamsArgsLen = streamsArgs.length; | ||
for (let j = 0; j < streamsArgsLen; j += 1) { | ||
this.reSubscribeStream(streamsArgs[j], closeToken); | ||
} | ||
} | ||
} | ||
} else { | ||
this.closingStreams.delete(streams.token); | ||
const streamArgs = this.waitingStreams.get(closeToken); | ||
if (streamArgs) { | ||
this.reSubscribeStream(streamArgs[0], closeToken); | ||
} | ||
} | ||
} | ||
/** | ||
* Send a service request command to the CloudVision API | ||
@@ -424,20 +313,2 @@ */ | ||
/** | ||
* Re initiates a subscribe if there has been a `subscribe` call for the | ||
* stream while the stream was closing. | ||
*/ | ||
private reSubscribeStream(streamArgs: StreamArgs, closeToken: string): void { | ||
this.events.unbindAll(closeToken); | ||
this.waitingStreams.delete(closeToken); | ||
this.sendMessage(...streamArgs); | ||
} | ||
/** | ||
* PRIVATE METHOD | ||
* Requests the server to resume one of the currently paused streams. | ||
*/ | ||
private resume(params: ResumeParams, callback: NotifCallback): string | null { | ||
return this.callCommand(RESUME, params, callback); | ||
} | ||
/** | ||
* Run connects connector to the specified url | ||
@@ -466,11 +337,4 @@ */ | ||
if (!this.isRunning) { | ||
// Enable any options if they have been configured | ||
this.isRunning = true; | ||
this.enableOptions((err, _res, status, token): void => { | ||
// We're good to go | ||
if (status && status.code !== EOF_CODE) { | ||
log(ERROR, err, status, token); | ||
} | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
}); | ||
this.connectionEvents.emit('connection', Wrpc.CONNECTED, event); | ||
} | ||
@@ -522,17 +386,2 @@ }; | ||
// Automatically resume streams that have been paused by the server | ||
if (status && status.code === PAUSED_CODE) { | ||
const cb = ( | ||
err: string | null, | ||
_result?: CloudVisionBatchedResult | CloudVisionResult | CloudVisionServiceResult, | ||
resumeStatus?: CloudVisionStatus, | ||
): void => { | ||
if (err) { | ||
log(ERROR, err, resumeStatus); | ||
} | ||
}; | ||
this.resume({ token }, cb); | ||
return; | ||
} | ||
this.events.emit(token, null, msg.result, status); | ||
@@ -603,38 +452,2 @@ if ( | ||
/** | ||
* Sets all the parameters for one or more streams when they are closed. | ||
* When the `close` request has been successfully processed, | ||
* `removeStreamClosingState` is called to clean up any closing state. | ||
*/ | ||
private setStreamClosingState( | ||
streams: SubscriptionIdentifier[] | SubscriptionIdentifier, | ||
closeToken: string, | ||
requestContext: RequestContext, | ||
callback: NotifCallback, | ||
): void { | ||
const closeCallback = ( | ||
closeRequestContext: RequestContext, | ||
err: string | null, | ||
result?: CloudVisionBatchedResult | CloudVisionResult, | ||
status?: CloudVisionStatus, | ||
): void => { | ||
this.removeStreamClosingState(streams, closeToken); | ||
callback(err, result, status, closeToken, closeRequestContext); | ||
}; | ||
if (Array.isArray(streams)) { | ||
const streamsLen = streams.length; | ||
for (let i = 0; i < streamsLen; i += 1) { | ||
const { token } = streams[i]; | ||
const oldToken = this.closingStreams.get(token); | ||
this.waitingStreams.delete(oldToken || ''); | ||
this.closingStreams.set(token, closeToken); | ||
} | ||
} else { | ||
this.closingStreams.set(streams.token, closeToken); | ||
} | ||
this.events.bind(closeToken, requestContext, closeCallback); | ||
} | ||
/** | ||
* Sends the command along with the params to the API, which creates a | ||
@@ -656,3 +469,3 @@ * subscriptions on the server. It receives a stream of messages via the | ||
} | ||
const token = makeToken(command, params); | ||
const token = uuidv4(); | ||
const callerRequestContext = { command, encodedParams: toBinaryKey(params), token }; | ||
@@ -675,3 +488,2 @@ const callbackWithUnbind: EventCallback = ( | ||
if (status && status.code === ACTIVE_CODE) { | ||
this.activeStreams.add(token); | ||
this.instrumentation.callInfo(callerRequestContext.command, callerRequestContext, { | ||
@@ -683,34 +495,7 @@ message: 'stream active', | ||
}; | ||
const numCallbacks = this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
if (numCallbacks === 1) { | ||
// Only execute the request on the first callback. | ||
// If there are open request to the same data, just attach the callback | ||
const closingStreamToken = this.closingStreams.get(token); | ||
if (closingStreamToken) { | ||
// The stream is still closing, so wait for it to close before re requesting | ||
this.addWaitingStream(closingStreamToken, token, command, params, callerRequestContext); | ||
} else { | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
} | ||
} else if (this.activeStreams.has(token)) { | ||
// The stream is already open immediately call get | ||
callback(null, undefined, { code: ACTIVE_CODE }, token, callerRequestContext); | ||
} | ||
this.events.bind(token, callerRequestContext, callbackWithUnbind); | ||
this.sendMessageOrError(token, command, params, callerRequestContext); | ||
return { token, callback: callbackWithUnbind }; | ||
} | ||
/** | ||
* Configures pausing of results on the server. When the pause feature is enabled, | ||
* the server pauses the results returned to the client every x number of bytes. | ||
* | ||
* When pausing is enabled, only the new streams created after that will be paused | ||
* (i.e. existing streams are not affected). To continue receiving results, a | ||
* wrpc client has to send a 'resume' command with the desired stream token to | ||
* unpause. Currently, the cloudvision-connector automatically resumes any streams | ||
* paused by the server. | ||
*/ | ||
private pause(params: PauseParams, callback: NotifCallback): string | null { | ||
return this.callCommand(PAUSE, params, callback); | ||
} | ||
} |
@@ -1,4 +0,4 @@ | ||
import { CloudVisionParams, NotifCallback, Options, PublishCallback, PublishRequest, Query, SearchOptions, ServiceRequest, SubscriptionIdentifier, WsCommand } from '../types'; | ||
import { NotifCallback, Options, PublishCallback, PublishRequest, Query, SearchOptions, ServiceRequest, SubscriptionIdentifier } from '../types'; | ||
import Wrpc from './Wrpc'; | ||
import { CLOSE, DEVICES_DATASET_ID, GET, GET_AND_SUBSCRIBE, GET_DATASETS, ID, PAUSE, PUBLISH, SEARCH, SEARCH_SUBSCRIBE, SERVICE_REQUEST, SUBSCRIBE } from './constants'; | ||
import { CLOSE, DEVICES_DATASET_ID, GET, GET_AND_SUBSCRIBE, GET_DATASETS, ID, PUBLISH, SEARCH, SEARCH_SUBSCRIBE, SERVICE_REQUEST, SUBSCRIBE } from './constants'; | ||
/** | ||
@@ -17,3 +17,2 @@ * The Connector uses WebSocket to enable transmission of data over an | ||
static ID: typeof ID; | ||
static PAUSE: typeof PAUSE; | ||
static PUBLISH: typeof PUBLISH; | ||
@@ -101,6 +100,2 @@ static SEARCH_SUBSCRIBE: typeof SEARCH_SUBSCRIBE; | ||
/** | ||
* Generates the unique token that the request can be referenced by. | ||
*/ | ||
getCommandToken(command: WsCommand, params: CloudVisionParams): string; | ||
/** | ||
* Returns all notifications that match the given query and options. | ||
@@ -107,0 +102,0 @@ * |
@@ -15,6 +15,2 @@ import { RequestContext, SearchType } from '../types'; | ||
/** | ||
* Status code for Paused (when the incoming results for a request have been paused). | ||
*/ | ||
export declare const PAUSED_CODE = 1002; | ||
/** | ||
* Error status sent when a request has finished streaming data. | ||
@@ -29,5 +25,3 @@ */ | ||
export declare const GET_AND_SUBSCRIBE = "getAndSubscribe"; | ||
export declare const PAUSE = "pause"; | ||
export declare const PUBLISH = "publish"; | ||
export declare const RESUME = "resume"; | ||
export declare const SUBSCRIBE = "subscribe"; | ||
@@ -34,0 +28,0 @@ export declare const SEARCH = "alpha/search"; |
@@ -5,2 +5,2 @@ export { createBaseType, isJsbi, isNeatType, Bool, Float32, Float64, Int, NeatTypes, NeatTypeSerializer, Nil, Pointer, Str, } from 'a-msgpack'; | ||
export { ACTIVE_CODE, APP_DATASET_TYPE, CONNECTED, DEVICE_DATASET_TYPE, DISCONNECTED, EOF, EOF_CODE, GET_REQUEST_COMPLETED, } from './constants'; | ||
export { fromBinaryKey, hashObject, toBinaryKey, sanitizeOptions } from './utils'; | ||
export { fromBinaryKey, toBinaryKey, sanitizeOptions } from './utils'; |
@@ -1,2 +0,2 @@ | ||
import { CloseParams, CloudVisionBatchedResult, CloudVisionParams, CloudVisionPublishRequest, CloudVisionResult, CloudVisionServiceResult, CloudVisionStatus, NotifCallback, Options, PublishCallback, Query, SearchOptions, SearchType, ServiceRequest, SubscriptionIdentifier, WsCommand } from '../types'; | ||
import { CloseParams, CloudVisionBatchedResult, CloudVisionResult, CloudVisionServiceResult, CloudVisionStatus, NotifCallback, Options, PublishCallback, Query, SearchOptions, SearchType, SubscriptionIdentifier } from '../types'; | ||
import Emitter from './emitter'; | ||
@@ -45,12 +45,2 @@ interface ExplicitSearchOptions extends SearchOptions { | ||
/** | ||
* Creates a unique hash given an object. | ||
*/ | ||
export declare function hashObject(object: {}): string; | ||
/** | ||
* Generates token based on the [[WsCommand]] and params ([[CloudVisionParams]], | ||
* [[CloudVisionPublishRequest]], [[ServiceRequest]]) of a request. This is | ||
* used to map requests to responses when dispatching response callbacks. | ||
*/ | ||
export declare function makeToken(command: WsCommand, params: CloudVisionParams | CloudVisionPublishRequest | ServiceRequest): string; | ||
/** | ||
* Creates a notification callback that properly formats the result for the | ||
@@ -57,0 +47,0 @@ * passed callback. |
@@ -8,6 +8,2 @@ import { CloudVisionParams, CloudVisionPublishRequest, ConnectionCallback, GetCommand, NotifCallback, ServiceRequest, StreamCommand, SubscriptionIdentifier } from '../types'; | ||
* Options passed to the constructor, that configure some global options | ||
* | ||
* `pauseStreams`: Negotiates the enabling of pause/resume with the server. | ||
* Pause/resume is a way to add backpressure to the server, so that it stops | ||
* sending notifications when the client cannot keep up. | ||
*/ | ||
@@ -18,3 +14,2 @@ interface ConnectorOptions { | ||
instrumentationConfig?: InstrumentationConfig; | ||
pauseStreams: boolean; | ||
} | ||
@@ -28,3 +23,2 @@ /** | ||
static DISCONNECTED: typeof DISCONNECTED; | ||
private closingStreams; | ||
private connectionEvents; | ||
@@ -36,5 +30,2 @@ private connectorOptions; | ||
private instrumentation; | ||
private waitingStreams; | ||
private activeStreams; | ||
private activeRequests; | ||
private ws; | ||
@@ -53,5 +44,2 @@ private WebSocket; | ||
get eventsEmitter(): Emitter; | ||
get streams(): Set<string>; | ||
get streamInClosingState(): Map<string, string>; | ||
private addWaitingStream; | ||
/** | ||
@@ -69,3 +57,3 @@ * PRIVATE METHOD | ||
/** | ||
* Cleans up all event emitters and active streams. | ||
* Cleans up all event emitters. | ||
* All bound callbacks will be unbound. | ||
@@ -95,6 +83,2 @@ */ | ||
/** | ||
* Enables any session wide options by negotiating with the server | ||
*/ | ||
private enableOptions; | ||
/** | ||
* Sends the command along with the params to the API, if the WebSocket is | ||
@@ -105,3 +89,2 @@ * connected. The response is received via the provided callback function. | ||
private makeCallbackWithUnbind; | ||
private makeQueuedCallbackWithUnbind; | ||
/** | ||
@@ -113,8 +96,2 @@ * Writes data in params to the CloudVision API. It receives one message via | ||
/** | ||
* Cleans up the steam closing state set in `setStreamClosingState`, as well | ||
* as re-subscribing to any streams (with the same token as the stream that | ||
* was just closed) opened up during the closing of the current stream. | ||
*/ | ||
private removeStreamClosingState; | ||
/** | ||
* Send a service request command to the CloudVision API | ||
@@ -124,12 +101,2 @@ */ | ||
/** | ||
* Re initiates a subscribe if there has been a `subscribe` call for the | ||
* stream while the stream was closing. | ||
*/ | ||
private reSubscribeStream; | ||
/** | ||
* PRIVATE METHOD | ||
* Requests the server to resume one of the currently paused streams. | ||
*/ | ||
private resume; | ||
/** | ||
* Run connects connector to the specified url | ||
@@ -160,8 +127,2 @@ */ | ||
/** | ||
* Sets all the parameters for one or more streams when they are closed. | ||
* When the `close` request has been successfully processed, | ||
* `removeStreamClosingState` is called to clean up any closing state. | ||
*/ | ||
private setStreamClosingState; | ||
/** | ||
* Sends the command along with the params to the API, which creates a | ||
@@ -175,14 +136,3 @@ * subscriptions on the server. It receives a stream of messages via the | ||
stream(command: StreamCommand, params: CloudVisionParams | ServiceRequest, callback: NotifCallback): SubscriptionIdentifier | null; | ||
/** | ||
* Configures pausing of results on the server. When the pause feature is enabled, | ||
* the server pauses the results returned to the client every x number of bytes. | ||
* | ||
* When pausing is enabled, only the new streams created after that will be paused | ||
* (i.e. existing streams are not affected). To continue receiving results, a | ||
* wrpc client has to send a 'resume' command with the desired stream token to | ||
* unpause. Currently, the cloudvision-connector automatically resumes any streams | ||
* paused by the server. | ||
*/ | ||
private pause; | ||
} | ||
export {}; |
import { NeatType, PathElements } from 'a-msgpack'; | ||
import { APP_DATASET_TYPE, CLOSE, DEVICE_DATASET_TYPE, GET, GET_AND_SUBSCRIBE, GET_DATASETS, PAUSE, PUBLISH, RESUME, SEARCH, SEARCH_SUBSCRIBE, SEARCH_TYPE_ANY, SEARCH_TYPE_IP, SEARCH_TYPE_MAC, SERVICE_REQUEST, SUBSCRIBE } from '../src/constants'; | ||
import { APP_DATASET_TYPE, CLOSE, DEVICE_DATASET_TYPE, GET, GET_AND_SUBSCRIBE, GET_DATASETS, PUBLISH, SEARCH, SEARCH_SUBSCRIBE, SEARCH_TYPE_ANY, SEARCH_TYPE_IP, SEARCH_TYPE_MAC, SERVICE_REQUEST, SUBSCRIBE } from '../src/constants'; | ||
export declare type GetCommand = typeof GET | typeof GET_DATASETS | typeof SEARCH; | ||
@@ -9,3 +9,3 @@ /** @deprecated: Use `GetCommand`. */ | ||
export declare type StreamCommands = StreamCommand; | ||
export declare type WsCommand = GetCommand | StreamCommand | typeof CLOSE | typeof PAUSE | typeof PUBLISH | typeof RESUME; | ||
export declare type WsCommand = GetCommand | StreamCommand | typeof CLOSE | typeof PUBLISH; | ||
export declare type DatasetType = typeof APP_DATASET_TYPE | typeof DEVICE_DATASET_TYPE; | ||
@@ -97,8 +97,2 @@ /** @deprecated: Use `DatasetType`. */ | ||
} | ||
export interface PauseParams { | ||
pauseStreams: boolean; | ||
} | ||
export interface ResumeParams { | ||
token: string; | ||
} | ||
export declare type CloudVisionParams = AppParams | CloseParams | PauseParams | QueryParams | ResumeParams | SearchParams; | ||
export declare type CloudVisionParams = AppParams | CloseParams | QueryParams | SearchParams; |
310943
30
6941
+ Addeduuid@8.3.0
+ Addeda-msgpack@4.5.4(transitive)
+ Addeduuid@8.3.0(transitive)
- Removedimurmurhash@0.1.4
- Removeda-msgpack@4.5.3(transitive)
- Removedimurmurhash@0.1.4(transitive)
Updateda-msgpack@4.5.4