grexxconnect-ess
Advanced tools
Comparing version 1.1.2 to 1.2.1
@@ -94,2 +94,6 @@ /*! Copyright (C) Grexx - All Rights Reserved | ||
disconnect(code, message) { | ||
this.instance.close(code, message); | ||
} | ||
reconnect( code, msg ) { | ||
@@ -96,0 +100,0 @@ this.log('connection failed'.red, `retry in ${this.autoReconnectInterval}ms`, code.toString().red, (msg || '').yellow); |
130
index.js
/* | ||
* Copyright (C) 2014-2021 Grexx - All Rights Reserved | ||
* Copyright (C) 2014-2022 Grexx - All Rights Reserved | ||
* Unauthorized copying of this file, via any medium is strictly prohibited | ||
@@ -11,4 +11,6 @@ * Proprietary and confidential | ||
const TIMEOUT_LONG = 30 * 1000; //== 30 seconds | ||
class EssConnect extends EventEmitter { | ||
constructor( settings ) { | ||
constructor(settings) { | ||
super(); | ||
@@ -25,4 +27,33 @@ | ||
const send = ( type, content ) => { | ||
return new Promise(( resolve, reject ) => { | ||
const _waitQueue = {}, | ||
_unfinishedResponses = {}; | ||
const send = (type, content, timeout = TIMEOUT_LONG) => { | ||
return new Promise((resolve, reject) => { | ||
const essCorrelationId = content?.['ess-correlation-id']; | ||
if (!essCorrelationId || !this._ackFunctional) { | ||
try { | ||
ws.send(JSON.stringify({ | ||
type: type, | ||
content: content, | ||
timestamp: Date.now() | ||
})); | ||
return resolve(); | ||
} catch (error) { | ||
return reject(error); | ||
} | ||
} | ||
const exceededTimeout = setTimeout(async () => { | ||
ws.debug(`timeout of ${timeout}ms exceeded for ack response from server`); | ||
return reject('Timeout exceeded'); | ||
}, timeout); | ||
_waitQueue[essCorrelationId] = () => { | ||
clearTimeout(exceededTimeout); | ||
ws.debug(`ack response received from server`); | ||
return resolve('Ack response received from server'); | ||
}; | ||
ws.send(JSON.stringify({ | ||
@@ -32,7 +63,5 @@ type: type, | ||
timestamp: Date.now() | ||
}), error => { | ||
if ( error ) { | ||
reject(error); | ||
} else { | ||
resolve(); | ||
}), (error) => { | ||
if (error) { | ||
return reject(error); | ||
} | ||
@@ -43,10 +72,49 @@ }); | ||
ws.on('close', ( code, msg ) => this.emit('disconnected', code, msg)); | ||
ws.on('message', _message => { | ||
switch ( _message.type ) { | ||
const sendAck = async (connectCorrelationId, message = '', type = 'ack') => { | ||
this.ws.debug(`acknowledging message from server for message header ID: ${connectCorrelationId}`.gray); | ||
await send(type, {'connect-correlation-id': connectCorrelationId, message}); | ||
this.emit(type); | ||
}; | ||
const completeUnfinishedResponses = () => { | ||
this.ws.debug(`checking for any incomplete message responses: ${Object.keys(_unfinishedResponses).length.toString().blue} unfinished responses`); | ||
for (const key in _unfinishedResponses) { | ||
const {type, content} = _unfinishedResponses[key]; | ||
send(type, content).then(() => { | ||
delete _unfinishedResponses[key]; | ||
}).catch(() => handleTimeout(type, content)); | ||
} | ||
}; | ||
const handleTimeout = (type, content) => { | ||
const id = content['ess-correlation-id']; | ||
_unfinishedResponses[id] = {type, content}; | ||
ws.disconnect(1012, "Ack response from server timed out"); | ||
}; | ||
ws.on('close', (code, msg) => this.emit('disconnected', code, msg)); | ||
ws.on('message', async (_message) => { | ||
const connectCorrelationId = _message['connect-correlation-id'], | ||
essCorrelationId = _message['ess-correlation-id']; | ||
switch (_message.type) { | ||
case 'ping': | ||
this.ws.debug('responding to connection-ping'); | ||
send('pong'); | ||
this.ws.debug('responding to connection-ping'.gray); | ||
this.emit('ping'); | ||
await send('pong'); | ||
break; | ||
case 'ackFunctional': | ||
this.ws.debug('responding to functionality check'.green, 'acknowledgements'.blue); | ||
this._ackFunctional = true; | ||
await sendAck(connectCorrelationId, null, 'ackFunctional'); | ||
completeUnfinishedResponses(); | ||
break; | ||
case 'ack': | ||
this.ws.debug(`received ack from GrexxConnect for message with header ID: ${essCorrelationId}`); | ||
if (typeof (essCorrelationId) != 'undefined' && typeof (_waitQueue[essCorrelationId]) == 'function') { | ||
const execFunc = _waitQueue[essCorrelationId]; | ||
execFunc(); | ||
delete _waitQueue[essCorrelationId]; | ||
} | ||
break; | ||
case 'authorized': | ||
@@ -63,2 +131,3 @@ this.log('connected and authorized for'.green, _message.namespace.blue); | ||
case'message': | ||
await sendAck(connectCorrelationId, "message received and being processed"); | ||
this.emit('message', { | ||
@@ -68,11 +137,18 @@ content: _message.content, | ||
resolve: outputData => { | ||
return send('response', { | ||
toMessage: _message, | ||
outputData: outputData | ||
const type = 'response', | ||
content = { | ||
'ess-correlation-id': connectCorrelationId, | ||
toMessage: _message, | ||
outputData: outputData | ||
}; | ||
return send(type, content).catch(() => { | ||
handleTimeout(type, content); | ||
}); | ||
}, | ||
reject: ( publicMessage, debugMessage, additionalContext ) => { | ||
reject: (publicMessage, debugMessage, additionalContext) => { | ||
return send('reject', { | ||
toMessage: _message, | ||
errorContext: Object.assign({ | ||
'ess-correlation-id': connectCorrelationId, | ||
toMessage: _message, | ||
errorContext: Object.assign({ | ||
publicMessage, | ||
@@ -108,17 +184,17 @@ debugMessage | ||
*/ | ||
setMonitoringResponse( statusFunction ) { | ||
setMonitoringResponse(statusFunction) { | ||
this._statusFunction = statusFunction; | ||
} | ||
log( ...args ) { | ||
log(...args) { | ||
console.info(' [ESS][MAIN] '.gray, ...args); | ||
} | ||
handleSystemMessage( message, send ) { | ||
if ( message.content.action === 'ping' ) { | ||
this.ws.debug('responding to system-ping'); | ||
handleSystemMessage(message, send) { | ||
if (message.content.action === 'ping') { | ||
this.ws.debug('responding to system-ping'.gray); | ||
send('system', { | ||
action: 'pong', | ||
client_data: typeof this._statusFunction === 'function' ? | ||
this._statusFunction() : { status: 'ok' }, | ||
this._statusFunction() : {status: 'ok'}, | ||
server_data: message.content.id | ||
@@ -129,3 +205,3 @@ }); | ||
parseMessage( message = [] ) { | ||
parseMessage(message = []) { | ||
const result = {}; | ||
@@ -132,0 +208,0 @@ |
@@ -5,3 +5,4 @@ { | ||
"description" : "Grexx Connect - External System Service Helper", | ||
"version" : "1.1.2", | ||
"version" : "1.2.1", | ||
"types" : "./schema/index.d.ts", | ||
"contributors" : [ | ||
@@ -8,0 +9,0 @@ { |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
17322
6
342
1