larvitamintercom
Advanced tools
Comparing version 0.3.6 to 0.3.7
455
index.js
@@ -29,3 +29,4 @@ 'use strict'; | ||
function Intercom(options) { | ||
const that = this; | ||
const tasks = [], | ||
that = this; | ||
@@ -66,4 +67,2 @@ let logPrefix = topLogPrefix + 'Intercom() - ', | ||
that.log.verbose(logPrefix + 'Initializing on loopback interface'); | ||
that.initializeListeners(); | ||
} else { | ||
@@ -74,317 +73,271 @@ that.loopback = false; | ||
function openSocket() { | ||
let connectionOptions = { | ||
'port': that.port, | ||
'host': that.host | ||
}; | ||
that.socket = net.connect({ | ||
'port': that.port, | ||
'host': that.host | ||
}); | ||
that.log.info(logPrefix + 'Initializing socket on ' + that.host + ':' + that.port); | ||
that.log.verbose(logPrefix + 'Initializing on ' + that.host + ':' + that.port); | ||
that.socket.on('error', function (err) { | ||
if (that.expectingClose !== false) { | ||
that.log.verbose(logPrefix + 'expected socket close, but also got socket error: ' + err.message); | ||
} else { | ||
that.log.error(logPrefix + 'socket error: ' + err.message); | ||
} | ||
}); | ||
that.socket = net.connect(connectionOptions); | ||
that.socket.setKeepAlive = true; | ||
that.socket.on('connect', function () { | ||
that.log.verbose(logPrefix + 'Socket connected to ' + that.host + ':' + that.port); | ||
onSocketConnect(function (err) { | ||
if (err) { | ||
that.log.error(logPrefix + ' Couldn\'t Initialize connection to rabbitmq'); | ||
} | ||
that.initializeListeners(); | ||
}); | ||
}); | ||
that.socket.on('error', function (err) { | ||
that.socket.on('close', function (err) { | ||
that.log.verbose(logPrefix + 'socket closed'); | ||
if (err) { | ||
if (that.expectingClose !== false) { | ||
that.log.verbose(logPrefix + 'expected socket close, but also got socket error: ' + err.message); | ||
} else { | ||
that.log.error(logPrefix + 'socket error: ' + err.message); | ||
} | ||
}); | ||
that.socket.on('close', function (err) { | ||
that.socket.destroy(); | ||
that.socket.unref(); | ||
if (that.expectingClose !== false) { | ||
that.log.verbose(logPrefix + 'socket closed with error, err: ' + err.message); | ||
} else { | ||
that.log.error(logPrefix + 'socket closed with error, err: ' + err.message); | ||
setTimeout(openSocket, 1000); | ||
} | ||
}); | ||
} | ||
}); | ||
that.socket.on('end', function () { | ||
that.log.info(logPrefix + 'socket connection ended by remote'); | ||
}); | ||
} | ||
that.socket.on('end', function () { | ||
that.log.info(logPrefix + 'socket connection ended by remote'); | ||
}); | ||
function onSocketConnect(cb) { | ||
const tasks = []; | ||
// Create handle by socket connect to rabbitmq | ||
tasks.push(function (cb) { | ||
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function (err, result) { | ||
if (err) { | ||
that.log.error(logPrefix + 'Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message); | ||
that.emit('error', err); | ||
} | ||
// Create handle by socket connect to rabbitmq | ||
tasks.push(function (cb) { | ||
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function (err, result) { | ||
if (err) { | ||
that.log.error(logPrefix + 'Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message); | ||
that.emit('error', err); | ||
} | ||
// log.silly(logPrefix + 'bramqp.initialize() ran on ' + that.host + ':' + that.port); | ||
// log.silly(logPrefix + 'bramqp.initialize() ran on ' + that.host + ':' + that.port); | ||
that.handle = result; | ||
that.handle = result; | ||
cb(err); | ||
}); | ||
cb(err); | ||
}); | ||
}); | ||
// Open AMQP communication | ||
tasks.push(function (cb) { | ||
const heartBeat = true, | ||
auth = parsedConStr.auth; | ||
// Open AMQP communication | ||
tasks.push(function (cb) { | ||
const heartBeat = true, | ||
auth = parsedConStr.auth; | ||
let username, | ||
password; | ||
let username, | ||
password; | ||
if (auth) { | ||
username = parsedConStr.auth.split(':')[0]; | ||
password = parsedConStr.auth.split(':')[1]; | ||
} | ||
if (auth) { | ||
username = parsedConStr.auth.split(':')[0]; | ||
password = parsedConStr.auth.split(':')[1]; | ||
} | ||
that.log.debug(logPrefix + 'openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username); | ||
that.log.debug(logPrefix + 'openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username); | ||
that.handle.openAMQPCommunication(username, password, heartBeat, function (err) { | ||
if (err) { | ||
that.log.error(logPrefix + 'Error opening AMQP communication: ' + err.message); | ||
that.emit('error', err); | ||
} | ||
that.handle.openAMQPCommunication(username, password, heartBeat, function (err) { | ||
if (err) { | ||
that.log.error(logPrefix + 'Error opening AMQP communication: ' + err.message); | ||
that.emit('error', err); | ||
} | ||
cb(err); | ||
}); | ||
cb(err); | ||
}); | ||
async.series(tasks, function (err) { | ||
if (err) return cb(err); | ||
cb(); | ||
}); | ||
}; | ||
openSocket(); | ||
}); | ||
} | ||
// Register listener for incoming messages | ||
tasks.push(function (cb) { | ||
that.handle.on(that.channelName + ':basic.deliver', function (channel, method, data) { | ||
const exchange = data.exchange, | ||
deliveryTag = data['delivery-tag'], | ||
consumerTag = data['consumer-tag']; | ||
Intercom.prototype.initializeListeners = function () { | ||
const tasks = [], | ||
that = this; | ||
// log.silly(logPrefix + 'Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"'); | ||
// Register listener for incoming messages | ||
tasks.push(function (cb) { | ||
that.handle.on(that.channelName + ':basic.deliver', function (channel, method, data) { | ||
const exchange = data.exchange, | ||
deliveryTag = data['delivery-tag'], | ||
consumerTag = data['consumer-tag']; | ||
that.handle.once('content', function (channel, className, properties, content) { | ||
let message; | ||
// log.silly(logPrefix + 'Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"'); | ||
// log.silly(logPrefix + 'Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
that.handle.once('content', function (channel, className, properties, content) { | ||
let message; | ||
try { | ||
message = JSON.parse(content.toString()); | ||
} catch (err) { | ||
that.log.warn(logPrefix + 'subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
return; | ||
} | ||
// log.silly(logPrefix + 'Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
if (lUtils.formatUuid(message.uuid) === false) { | ||
that.log.warn(logPrefix + 'consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
} | ||
try { | ||
message = JSON.parse(content.toString()); | ||
} catch (err) { | ||
that.log.warn(logPrefix + 'subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
return; | ||
} | ||
if (lUtils.formatUuid(message.uuid) === false) { | ||
that.log.warn(logPrefix + 'consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
} | ||
that.emit('incoming_msg_' + exchange, message, deliveryTag); | ||
}); | ||
that.emit('incoming_msg_' + exchange, message, deliveryTag); | ||
}); | ||
cb(); | ||
}); | ||
cb(); | ||
}); | ||
tasks.push(function (cb) { | ||
that.handle.on('error', function (err) { | ||
that.log.error(logPrefix + 'RabbitMQ connection error :' + err.message); | ||
}); | ||
cb(); | ||
// Register listener for close events | ||
tasks.push(function (cb) { | ||
that.handle.on('connection.close', function (channel, method, data) { | ||
if (that.expectingClose === false) { | ||
that.log.error(logPrefix + 'Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
} else { | ||
that.log.info(logPrefix + 'Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
} | ||
}); | ||
cb(); | ||
}); | ||
// Register listener for close events | ||
tasks.push(function (cb) { | ||
that.handle.on('connection.close', function (channel, method, data) { | ||
if (that.expectingClose === false) { | ||
that.log.error(logPrefix + 'Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
} else { | ||
that.log.info(logPrefix + 'Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
} | ||
}); | ||
cb(); | ||
}); | ||
// Log all handle events | ||
// Should be disabled in production code and only manually enabled while debugging due to it being expensive | ||
/** /tasks.push(function (cb) { | ||
const oldEmitter = that.handle.emit; | ||
// Log all handle events | ||
// Should be disabled in production code and only manually enabled while debugging due to it being expensive | ||
/** /tasks.push(function (cb) { | ||
const oldEmitter = that.handle.emit; | ||
that.handle.emit = function () { | ||
const emitArgs = arguments; | ||
that.handle.emit = function () { | ||
const emitArgs = arguments; | ||
that.log.silly(topLogPrefix + 'handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"'); | ||
that.log.silly(topLogPrefix + 'handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"'); | ||
oldEmitter.apply(that.handle, arguments); | ||
} | ||
oldEmitter.apply(that.handle, arguments); | ||
} | ||
cb(); | ||
});/**/ | ||
cb(); | ||
});/**/ | ||
// Construct generic handle comms | ||
tasks.push(function (cb) { | ||
const cmdStrsWithoutOk = ['basic.publish', 'content', 'closeAMQPCommunication', 'basic.nack', 'basic.ack', 'basic.qos']; | ||
// Construct generic handle comms | ||
tasks.push(function (cb) { | ||
const cmdStrsWithoutOk = ['basic.publish', 'content', 'closeAMQPCommunication', 'basic.nack', 'basic.ack', 'basic.qos']; | ||
that.handle.cmd = function cmd(cmdStr, params, cb) { | ||
if (typeof cb !== 'function') { | ||
cb = function () {}; | ||
} | ||
that.handle.cmd = function cmd(cmdStr, params, cb) { | ||
if (typeof cb !== 'function') { | ||
cb = function () {}; | ||
} | ||
that.cmdQueue.push({'cmdStr': cmdStr, 'params': params, 'cb': cb}); | ||
that.cmdQueue.push({'cmdStr': cmdStr, 'params': params, 'cb': cb}); | ||
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"'); | ||
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"'); | ||
if (that.cmdInProgress === true) { | ||
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true'); | ||
return; | ||
} | ||
if (that.cmdInProgress === true) { | ||
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true'); | ||
return; | ||
} | ||
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true'); | ||
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true'); | ||
that.cmdInProgress = true; | ||
that.cmdInProgress = true; | ||
function readFromQueue() { | ||
const mainParams = that.cmdQueue.shift(), | ||
cmdStr = mainParams.cmdStr, | ||
tasks = [], | ||
cb = mainParams.cb; | ||
function readFromQueue() { | ||
const mainParams = that.cmdQueue.shift(), | ||
cmdStr = mainParams.cmdStr, | ||
tasks = [], | ||
cb = mainParams.cb; | ||
let params = mainParams.params, | ||
channel, | ||
method, | ||
data; | ||
let params = mainParams.params, | ||
channel, | ||
method, | ||
data; | ||
if ( ! Array.isArray(params)) { | ||
params = []; | ||
} | ||
if ( ! Array.isArray(params)) { | ||
params = []; | ||
} | ||
// Register the callback | ||
tasks.push(function (cb) { | ||
const cmdGroupName = cmdStr.split('.')[0], | ||
cmdName = cmdStr.split('.')[1]; | ||
// Register the callback | ||
tasks.push(function (cb) { | ||
const cmdGroupName = cmdStr.split('.')[0], | ||
cmdName = cmdStr.split('.')[1]; | ||
let callCb = true, | ||
okTimeout; | ||
let callCb = true, | ||
okTimeout; | ||
function cmdCb(err) { | ||
if (err) { | ||
that.log.error(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message); | ||
callCb = false; | ||
return cb(err); | ||
} | ||
function cmdCb(err) { | ||
if (err) { | ||
that.log.error(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message); | ||
callCb = false; | ||
return cb(err); | ||
} | ||
// log.silly(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded'); | ||
// log.silly(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded'); | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) { | ||
return cb(); | ||
} | ||
} | ||
if (that.loopback === true) { | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) { | ||
return cb(); | ||
} | ||
} | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1 && that.loopback === false) { | ||
okTimeout = setTimeout(function () { | ||
const err = new Error('no answer received from queue within 10s'); | ||
that.log.error(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message); | ||
callCb = false; | ||
cb(err); | ||
}, 10000); | ||
if (that.loopback === true) { | ||
return cb(); | ||
} | ||
that.handle.once(that.channelName + ':' + cmdStr + '-ok', function (x, y, z) { | ||
// We want these in the outer scope, thats why the weird naming | ||
channel = x; | ||
method = y; | ||
data = z; | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1 && that.loopback === false) { | ||
okTimeout = setTimeout(function () { | ||
const err = new Error('no answer received from queue within 10s'); | ||
that.log.error(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message); | ||
callCb = false; | ||
cb(err); | ||
}, 10000); | ||
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue'); | ||
if (callCb === false) { | ||
that.log.warn(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened'); | ||
return; | ||
} | ||
clearTimeout(okTimeout); | ||
cb(); | ||
}); | ||
} | ||
that.handle.once(that.channelName + ':' + cmdStr + '-ok', function (x, y, z) { | ||
// We want these in the outer scope, thats why the weird naming | ||
channel = x; | ||
method = y; | ||
data = z; | ||
params.push(cmdCb); | ||
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue'); | ||
if (callCb === false) { | ||
that.log.warn(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened'); | ||
return; | ||
} | ||
clearTimeout(okTimeout); | ||
cb(); | ||
}); | ||
} | ||
if (cmdName) { | ||
that.handle[cmdGroupName][cmdName].apply(that.handle, params); | ||
} else { | ||
that.handle[cmdGroupName].apply(that.handle, params); | ||
} | ||
}); | ||
params.push(cmdCb); | ||
async.series(tasks, function (err) { | ||
cb(err, channel, method, data); | ||
if (cmdName) { | ||
that.handle[cmdGroupName][cmdName].apply(that.handle, params); | ||
} else { | ||
that.handle[cmdGroupName].apply(that.handle, params); | ||
} | ||
}); | ||
if (that.cmdQueue.length === 0) { | ||
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0'); | ||
that.cmdInProgress = false; | ||
} else { | ||
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning'); | ||
readFromQueue(); | ||
} | ||
}); | ||
} | ||
readFromQueue(); | ||
}; | ||
cb(); | ||
}); | ||
async.series(tasks, function (err) { | ||
cb(err, channel, method, data); | ||
// Set QoS to 10 | ||
tasks.push(function (cb) { | ||
const prefetchSize = 0, | ||
prefetchCount = 10, | ||
global = true; | ||
if (that.cmdQueue.length === 0) { | ||
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0'); | ||
that.cmdInProgress = false; | ||
} else { | ||
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning'); | ||
readFromQueue(); | ||
} | ||
}); | ||
} | ||
readFromQueue(); | ||
}; | ||
cb(); | ||
}); | ||
that.handle.cmd('basic.qos', [that.channelName, prefetchSize, prefetchCount, global], function (err) { | ||
that.log.verbose(logPrefix + 'basic.qos set to: "' + prefetchCount + '"'); | ||
cb(err); | ||
}); | ||
// Set QoS to 10 | ||
tasks.push(function (cb) { | ||
const prefetchSize = 0, | ||
prefetchCount = 10, | ||
global = true; | ||
that.handle.cmd('basic.qos', [that.channelName, prefetchSize, prefetchCount, global], function (err) { | ||
that.log.verbose(logPrefix + 'basic.qos set to: "' + prefetchCount + '"'); | ||
cb(err); | ||
}); | ||
}); | ||
async.series(tasks, function (err) { | ||
if ( ! err) { | ||
if (that.loopback === true) { | ||
that.log.verbose(logPrefix + 'Initialized on loopback interface'); | ||
} else { | ||
that.log.verbose(logPrefix + 'Initialized on ' + that.host + ':' + that.port); | ||
} | ||
that.queueReady = true; | ||
setImmediate(function () { | ||
that.emit('ready'); | ||
}); | ||
async.series(tasks, function (err) { | ||
if ( ! err) { | ||
if (that.loopback === true) { | ||
that.log.verbose(logPrefix + 'Initialized on loopback interface'); | ||
} else { | ||
that.log.verbose(logPrefix + 'Initialized on ' + that.host + ':' + that.port); | ||
} | ||
}); | ||
}; | ||
that.queueReady = true; | ||
setImmediate(function () { | ||
that.emit('ready'); | ||
}); | ||
} | ||
}); | ||
} | ||
@@ -391,0 +344,0 @@ |
{ | ||
"name": "larvitamintercom", | ||
"version": "0.3.6", | ||
"version": "0.3.7", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -9,5 +9,2 @@ [![Build Status](https://travis-ci.org/larvit/larvitamintercom.svg?branch=master)](https://travis-ci.org/larvit/larvitamintercom) [![Dependencies](https://david-dm.org/larvit/larvitamintercom.svg)](https://david-dm.org/larvit/larvitamintercom.svg) | ||
### Connection | ||
When instantiating a new intercom it will try to connect instantly and on connection error or connection lost it will try to reconnect an infinite number of times every 1sec. | ||
### Send | ||
@@ -14,0 +11,0 @@ |
Sorry, the diff of this file is not supported yet
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
65466
13
1595
120
2