Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More

larvitamintercom

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

larvitamintercom - npm Package Compare versions

Comparing version 0.1.18 to 0.2.0

'use strict';
const EventEmitter = require('events').EventEmitter,
topLogPrefix = 'larvitamintercom: index.js - ',
uuidLib = require('uuid'),

@@ -20,6 +21,9 @@ bramqp = require('bramqp'),

* Events
* .on('error', function(err)) - something serious happened!
* .on('error', function (err)) - something serious happened!
*
* @param str conStr - AMQP connection string OR "loopback interface" to only work in loopback mode
*/
function Intercom(conStr) {
const parsedConStr = url.parse(conStr),
logPrefix = topLogPrefix + 'Intercom() - ',
tasks = [],

@@ -32,75 +36,85 @@ that = this;

that.declaredExchanges = [];
that.host = parsedConStr.hostname;
that.port = parsedConStr.port || 5672;
that.expectingClose = false;
that.queueReady = false;
that.sendInProgress = false;
that.sendQueue = [];
that.expectingClose = false;
that.socket = net.connect({
'port': that.port,
'host': that.host
});
if (conStr === 'loopback interface') {
that.loopback = true;
that.loopbackConQueue = {};
that.handle = new EventEmitter;
log.verbose('larvitamintercom: Intercom() - Initializing on ' + that.host + ':' + that.port);
log.verbose(logPrefix + 'Initializing on loopback interface');
} else {
that.loopback = false;
that.host = parsedConStr.hostname;
that.port = parsedConStr.port || 5672;
that.socket.on('error', function(err) {
log.error('larvitamintercom: Intercom() - socket error: ' + err.message);
});
that.socket = net.connect({
'port': that.port,
'host': that.host
});
that.socket.on('close', function(hadError) {
log.info('larvitamintercom: Intercom() - socket closed');
if (hadError) {
log.error('larvitamintercom: Intercom() - socket closed with error');
}
});
log.verbose(logPrefix + 'Initializing on ' + that.host + ':' + that.port);
that.socket.on('end', function() {
log.info('larvitamintercom: Intercom() - socket connection ended by remote');
});
that.socket.on('error', function (err) {
log.error(logPrefix + 'socket error: ' + err.message);
});
// Create handle by socket connect to rabbitmq
tasks.push(function(cb) {
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function(err, result) {
if (err) {
log.error('larvitamintercom: Intercom() - Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message);
that.emit('error', err);
that.socket.on('close', function (hadError) {
log.info(logPrefix + 'socket closed');
if (hadError) {
log.error(logPrefix + 'socket closed with error');
}
});
log.debug('larvitamintercom: Intercom() - bramqp.initialize() ran on ' + that.host + ':' + that.port);
that.socket.on('end', function () {
log.info(logPrefix + 'socket connection ended by remote');
});
that.handle = result;
// Create handle by socket connect to rabbitmq
tasks.push(function (cb) {
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function (err, result) {
if (err) {
log.error(logPrefix + 'Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message);
that.emit('error', err);
}
cb(err);
});
});
log.debug(logPrefix + 'bramqp.initialize() ran on ' + that.host + ':' + that.port);
// Open AMQP communication
tasks.push(function(cb) {
const heartBeat = true,
auth = parsedConStr.auth;
that.handle = result;
let username,
password;
cb(err);
});
});
if (auth) {
username = parsedConStr.auth.split(':')[0];
password = parsedConStr.auth.split(':')[1];
}
// Open AMQP communication
tasks.push(function (cb) {
const heartBeat = true,
auth = parsedConStr.auth;
log.debug('larvitamintercom: Intercom() - openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username);
let username,
password;
that.handle.openAMQPCommunication(username, password, heartBeat, function(err) {
if (err) {
log.error('larvitamintercom: Intercom() - Error opening AMQP communication: ' + err.message);
that.emit('error', err);
if (auth) {
username = parsedConStr.auth.split(':')[0];
password = parsedConStr.auth.split(':')[1];
}
cb(err);
log.debug(logPrefix + 'openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username);
that.handle.openAMQPCommunication(username, password, heartBeat, function (err) {
if (err) {
log.error(logPrefix + 'Error opening AMQP communication: ' + err.message);
that.emit('error', err);
}
cb(err);
});
});
});
}
// Register listener for incoming messages
tasks.push(function(cb) {
that.handle.on(that.channelName + ':basic.deliver', function(channel, method, data) {
tasks.push(function (cb) {
that.handle.on(that.channelName + ':basic.deliver', function (channel, method, data) {
const exchange = data.exchange,

@@ -110,19 +124,18 @@ deliveryTag = data['delivery-tag'],

log.debug('larvitamintercom: Intercom() - Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"');
log.debug(logPrefix + 'Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"');
that.handle.once('content', function(channel, className, properties, content) {
that.handle.once('content', function (channel, className, properties, content) {
let message;
log.debug('larvitamintercom: Intercom() - Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
log.debug(logPrefix + 'Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
try {
message = JSON.parse(content.toString());
} catch(err) {
log.warn('larvitamintercom: subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
cb(err);
return;
} catch (err) {
log.warn(logPrefix + 'subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
return cb(err);
}
if (lUtils.formatUuid(message.uuid) === false) {
log.warn('larvitamintercom: consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
log.warn(logPrefix + 'consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
}

@@ -137,8 +150,8 @@

// Register listener for close events
tasks.push(function(cb) {
that.handle.on('connection.close', function(channel, method, data) {
tasks.push(function (cb) {
that.handle.on('connection.close', function (channel, method, data) {
if (that.expectingClose === false) {
log.error('larvitamintercom: Intercom() - Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
log.error(logPrefix + 'Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
} else {
log.info('larvitamintercom: Intercom() - Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
log.info(logPrefix + 'Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
}

@@ -151,9 +164,9 @@ });

// Should be disabled in production code and only manually enabled while debugging due to it being expensive
/** /tasks.push(function(cb) {
/** /tasks.push(function (cb) {
const oldEmitter = that.handle.emit;
that.handle.emit = function() {
that.handle.emit = function () {
const emitArgs = arguments;
log.silly('larvitamintercom: handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"');
log.silly(topLogPrefix + 'handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"');

@@ -167,3 +180,3 @@ oldEmitter.apply(that.handle, arguments);

// Construct generic handle comms
tasks.push(function(cb) {
tasks.push(function (cb) {
const cmdStrsWithoutOk = ['basic.publish', 'content', 'closeAMQPCommunication', 'basic.nack', 'basic.ack'];

@@ -173,3 +186,3 @@

if (typeof cb !== 'function') {
cb = function() {};
cb = function () {};
}

@@ -179,10 +192,10 @@

log.debug('larvitamintercom: handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"');
log.debug(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"');
if (that.cmdInProgress === true) {
log.silly('larvitamintercom: handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true');
log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true');
return;
}
log.silly('larvitamintercom: handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true');
log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true');

@@ -194,4 +207,4 @@ that.cmdInProgress = true;

cmdStr = mainParams.cmdStr,
cb = mainParams.cb,
tasks = [];
tasks = [],
cb = mainParams.cb;

@@ -208,3 +221,3 @@ let params = mainParams.params,

// Register the callback
tasks.push(function(cb) {
tasks.push(function (cb) {
const cmdGroupName = cmdStr.split('.')[0],

@@ -216,6 +229,24 @@ cmdName = cmdStr.split('.')[1];

if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1) {
okTimeout = setTimeout(function() {
function cmdCb(err) {
if (err) {
log.error(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message);
callCb = false;
return cb(err);
}
log.debug(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded');
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) {
cb();
}
}
if (that.loopback === true) {
return cb();
}
if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1 && that.loopback === false) {
okTimeout = setTimeout(function () {
const err = new Error('no answer received from queue within 500ms');
log.error('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message);
log.error(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message);
callCb = false;

@@ -225,3 +256,3 @@ cb(err);

that.handle.once(that.channelName + ':' + cmdStr + '-ok', function(x, y, z) {
that.handle.once(that.channelName + ':' + cmdStr + '-ok', function (x, y, z) {
// We want these in the outer scope, thats why the weird naming

@@ -232,5 +263,5 @@ channel = x;

log.debug('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue');
log.debug(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue');
if (callCb === false) {
log.warn('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened');
log.warn(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened');
return;

@@ -243,17 +274,4 @@ }

params.push(function(err) {
if (err) {
log.error('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message);
callCb = false;
cb(err);
return;
}
params.push(cmdCb);
log.debug('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded');
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) {
cb();
}
});
if (cmdName) {

@@ -266,10 +284,10 @@ that.handle[cmdGroupName][cmdName].apply(that.handle, params);

async.series(tasks, function(err) {
async.series(tasks, function (err) {
cb(err, channel, method, data);
if (that.cmdQueue.length === 0) {
log.silly('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0');
log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0');
that.cmdInProgress = false;
} else {
log.silly('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning');
log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning');
readFromQueue();

@@ -284,7 +302,13 @@ }

async.series(tasks, function(err) {
async.series(tasks, function (err) {
if ( ! err) {
log.debug('larvitamintercom: Intercom() - Initialized on ' + that.host + ':' + that.port);
if (that.loopback === true) {
log.debug(logPrefix + 'Initialized on loopback interface');
} else {
log.debug(logPrefix + 'Initialized on ' + that.host + ':' + that.port);
}
that.queueReady = true;
that.emit('ready');
setImmediate(function () {
that.emit('ready');
});
}

@@ -297,3 +321,3 @@ });

Intercom.prototype.bindQueue = function(queueName, exchange, cb) {
Intercom.prototype.bindQueue = function (queueName, exchange, cb) {
const noWait = false, // "If set, the server will not respond to the method. The client

@@ -306,13 +330,15 @@ // should not wait for a reply method. If the server could not complete

log.verbose('larvitamintercom: bindQueue() - Binding queue "' + queueName + '" to exchange "' + exchange + '"');
log.verbose(topLogPrefix + 'bindQueue() - Binding queue "' + queueName + '" to exchange "' + exchange + '"');
that.ready(function(err) {
if (err) { cb(err); return; }
if (that.loopback === true) return cb();
that.handle.cmd('queue.bind', [that.channelName, queueName, exchange, 'ignored-routing-key', noWait, args], function(err) {
that.ready(function (err) {
if (err) return cb(err);
that.handle.cmd('queue.bind', [that.channelName, queueName, exchange, 'ignored-routing-key', noWait, args], function (err) {
if (err) {
log.error('larvitamintercom: bindQueue() - Could not bind queue: "' + queueName + '" to exchange: "' + exchange + '", err: ' + err.message);
log.error(topLogPrefix + 'bindQueue() - Could not bind queue: "' + queueName + '" to exchange: "' + exchange + '", err: ' + err.message);
}
log.debug('larvitamintercom: bindQueue() - Bound queue "' + queueName + '" to exchange "' + exchange + '"');
log.debug(topLogPrefix + 'bindQueue() - Bound queue "' + queueName + '" to exchange "' + exchange + '"');

@@ -325,25 +351,29 @@ cb(err);

// Close the RabbitMQ connection
Intercom.prototype.close = function(cb) {
Intercom.prototype.close = function (cb) {
const that = this;
if (typeof cb !== 'function') {
cb = function() {};
cb = function () {};
}
log.verbose('larvitamintercom: close() - on ' + that.host + ':' + that.port);
if (that.loopback === true) {
log.verbose(topLogPrefix + 'close() - on loopback interface');
return cb();
} else {
log.verbose(topLogPrefix + 'close() - on ' + that.host + ':' + that.port);
}
that.expectingClose = true;
that.ready(function(err) {
if (err) { cb(err); return; }
that.ready(function (err) {
if (err) return cb(err);
that.handle.cmd('closeAMQPCommunication', function(err) {
that.handle.cmd('closeAMQPCommunication', function (err) {
if (err) {
log.warn('larvitamintercom: close() - Could not closeAMQPCommunication: ' + err.message);
cb(err);
return;
log.warn(topLogPrefix + 'close() - Could not closeAMQPCommunication: ' + err.message);
return cb(err);
}
setImmediate(function() {
log.debug('larvitamintercom: close() - closed ' + that.host + ':' + that.port);
setImmediate(function () {
log.debug(topLogPrefix + 'close() - closed ' + that.host + ':' + that.port);
cb();

@@ -355,3 +385,5 @@ });

Intercom.prototype.consume = function(options, msgCb, cb) {
Intercom.prototype.consume = function (options, msgCb, cb) {
const that = this;
if (typeof options === 'function') {

@@ -373,8 +405,24 @@ cb = msgCb;

log.verbose('larvitamintercom: consume() - Starting on exchange "' + options.exchange + '"');
if (that.loopback === true) {
if (Array.isArray(that.loopbackConQueue[options.exchange])) {
setTimeout(function () {
for (let i = 0; that.loopbackConQueue[options.exchange][i] !== undefined; i ++) {
const queueItem = that.loopbackConQueue[options.exchange][i];
queueItem.options.ignoreConQueue = true;
that.send(queueItem.orgMsg, queueItem.options);
}
that.loopbackConQueue[options.exchange] = 'connected';
}, 10);
}
}
log.verbose(topLogPrefix + 'consume() - Starting on exchange "' + options.exchange + '"');
this.genericConsume(options, msgCb, cb);
};
Intercom.prototype.declareExchange = function(exchangeName, cb) {
Intercom.prototype.declareExchange = function (exchangeName, cb) {
const exchangeType = 'fanout',

@@ -391,23 +439,23 @@ autoDelete = false, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.auto-delete

log.debug('larvitamintercom: declareExchange() - exchangeName: "' + exchangeName + '"');
log.debug(topLogPrefix + 'declareExchange() - exchangeName: "' + exchangeName + '"');
that.ready(function(err) {
if (err) { cb(err); return; }
if (that.loopback === true) return cb();
that.ready(function (err) {
if (err) return cb(err);
if (that.declaredExchanges.indexOf(exchangeName) !== - 1) {
log.debug('larvitamintercom: declareExchange() - Already declared. exchangeName: "' + exchangeName + '"');
cb();
return;
log.debug(topLogPrefix + 'declareExchange() - Already declared. exchangeName: "' + exchangeName + '"');
return cb();
}
log.verbose('larvitamintercom: declareExchange() - Declaring exchangeName: "' + exchangeName + '"');
log.verbose(topLogPrefix + 'declareExchange() - Declaring exchangeName: "' + exchangeName + '"');
that.handle.cmd('exchange.declare', [that.channelName, exchangeName, exchangeType, passive, durable, autoDelete, internal, noWait, args], function(err) {
that.handle.cmd('exchange.declare', [that.channelName, exchangeName, exchangeType, passive, durable, autoDelete, internal, noWait, args], function (err) {
if (err) {
log.warn('larvitamintercom: declareExchange() - Could not declare exchange "' + exchangeName + '", err: ' + err.message);
cb(err);
return;
log.warn(topLogPrefix + 'declareExchange() - Could not declare exchange "' + exchangeName + '", err: ' + err.message);
return cb(err);
}
log.debug('larvitamintercom: declareExchange() - Declared! exchangeName: "' + exchangeName + '"');
log.debug(topLogPrefix + 'declareExchange() - Declared! exchangeName: "' + exchangeName + '"');

@@ -429,3 +477,3 @@ that.declaredExchanges.push(exchangeName);

*/
Intercom.prototype.declareQueue = function(options, cb) {
Intercom.prototype.declareQueue = function (options, cb) {
const autoDelete = false, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare.auto-delete

@@ -443,18 +491,19 @@ passive = false, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare.passive

log.verbose('larvitamintercom: declareQueue() - Declaring queueName: "' + options.queueName + '" exclusive: ' + options.exclusive.toString());
log.verbose(topLogPrefix + 'declareQueue() - Declaring queueName: "' + options.queueName + '" exclusive: ' + options.exclusive.toString());
that.ready(function(err) {
if (err) { cb(err); return; }
if (that.loopback === true) return cb();
that.handle.cmd('queue.declare', [that.channelName, options.queueName, passive, durable, options.exclusive, autoDelete, noWait, args], function(err, channel, method, data) {
that.ready(function (err) {
if (err) return cb(err);
that.handle.cmd('queue.declare', [that.channelName, options.queueName, passive, durable, options.exclusive, autoDelete, noWait, args], function (err, channel, method, data) {
let queueName;
if (err) {
log.error('larvitamintercom: declareQueue() - Could not declare queue, name: "' + options.queueName + '" err: ' + err.message);
cb(err);
return;
log.error(topLogPrefix + 'declareQueue() - Could not declare queue, name: "' + options.queueName + '" err: ' + err.message);
return cb(err);
}
queueName = data.queue;
log.debug('larvitamintercom: declareQueue() - Declared! queueName: "' + queueName + '" exclusive: ' + options.exclusive.toString());
log.debug(topLogPrefix + 'declareQueue() - Declared! queueName: "' + queueName + '" exclusive: ' + options.exclusive.toString());
cb(err, queueName);

@@ -466,3 +515,3 @@ });

/* Not working!
Intercom.prototype.deleteQueue = function(queueName, cb) {
Intercom.prototype.deleteQueue = function (queueName, cb) {
const ifUnused = false, // If set, the server will only delete the queue if it

@@ -480,8 +529,8 @@ // has no consumers. If the queue has consumers the

if (typeof cb !== 'function') {
cb = function() {};
cb = function () {};
}
that.handle.queue.delete(that.channelName, queueName, ifUnused, ifEmpty, noWait);
that.handle.once(that.channelName + ':queue.delete-ok', function(channel, method, data) {
log.verobse('larvitamintercom: deleteQueue() - queue "' + queueName + '", containing "' + data['message-count'] + '" deleted.');
that.handle.once(that.channelName + ':queue.delete-ok', function (channel, method, data) {
log.verobse(topLogPrefix + 'deleteQueue() - queue "' + queueName + '", containing "' + data['message-count'] + '" deleted.');
cb();

@@ -491,4 +540,5 @@ });

Intercom.prototype.genericConsume = function(options, msgCb, cb) {
Intercom.prototype.genericConsume = function (options, msgCb, cb) {
const returnObj = {},
logPrefix = topLogPrefix + 'Intercom.prototype.genericConsume() - ',
tasks = [],

@@ -500,3 +550,3 @@ that = this;

if (cb === undefined) {
cb = function() {};
cb = function () {};
}

@@ -517,3 +567,3 @@

if (typeof cb !== 'function') {
cb = function() {};
cb = function () {};
}

@@ -523,3 +573,3 @@

const err = new Error('No consumer tag is defined, consume have probably not been started yet.');
log.warn('larvitamintercom: genericConsume() - cancel() - ' + err.message);
log.warn(topLogPrefix + 'genericConsume() - cancel() - ' + err.message);
cb(err);

@@ -529,7 +579,7 @@ return;

that.handle.basic.cancel(returnObj.data['consumer-tag'], function(err) {
that.handle.basic.cancel(returnObj.data['consumer-tag'], function (err) {
if (err) {
log.warn('larvitamintercom: genericConsume() - cancel() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message);
log.warn(topLogPrefix + 'genericConsume() - cancel() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message);
} else {
log.verbose('larvitamintercom: genericConsume() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"');
log.verbose(topLogPrefix + 'genericConsume() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"');
}

@@ -540,7 +590,7 @@

// We could not get this to work :( // Lilleman and gagge 2016-12-27
//that.handle.once(that.channelName + ':basic.cancel-ok', function(channel, method, data) {
// log.verbose('larvitamintercom: consume() - cancel() - Canceled consuming.');
// log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel));
// log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. method: ' + JSON.stringify(method));
// log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. data: ' + JSON.stringify(data));
//that.handle.once(that.channelName + ':basic.cancel-ok', function (channel, method, data) {
// log.verbose(topLogPrefix + 'consume() - cancel() - Canceled consuming.');
// log.debug(topLogPrefix + 'consume() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel));
// log.debug(topLogPrefix + 'consume() - cancel() - Canceled consuming. method: ' + JSON.stringify(method));
// log.debug(topLogPrefix + 'consume() - cancel() - Canceled consuming. data: ' + JSON.stringify(data));
// cb();

@@ -551,3 +601,3 @@ //});

// Declare exchange
tasks.push(function(cb) {
tasks.push(function (cb) {
that.declareExchange(options.exchange, cb);

@@ -557,7 +607,7 @@ });

// Declare queue
tasks.push(function(cb) {
tasks.push(function (cb) {
if (options.type === 'consume') {
that.declareQueue({'queueName': queueName}, cb);
} else if (options.type === 'subscribe') {
that.declareQueue({'exclusive': true}, function(err, result) {
that.declareQueue({'exclusive': true}, function (err, result) {
queueName = result;

@@ -567,3 +617,3 @@ cb(err);

} else {
log.error('larvitamintercom: genericConsume() - Options.type must be "consume" or "subscribe", but is: "' + options.type + '"');
log.error(logPrefix + 'Options.type must be "consume" or "subscribe", but is: "' + options.type + '"');
}

@@ -573,3 +623,3 @@ });

// Bind queue
tasks.push(function(cb) {
tasks.push(function (cb) {
that.bindQueue(queueName, options.exchange, cb);

@@ -579,3 +629,3 @@ });

// Start consuming
tasks.push(function(cb) {
tasks.push(function (cb) {
const consumerTag = null, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume.consumer-tag

@@ -595,6 +645,9 @@ noLocal = false, // "If the no-local field is set the server will not send messages to the connection

that.handle.cmd('basic.consume', [that.channelName, queueName, consumerTag, noLocal, noAck, exclusive, noWait, args], function(err, channel, method, data) {
// No need to send a command on the queue for the loopback, handle this directly in the send function
if (that.loopback === true) return cb();
that.handle.cmd('basic.consume', [that.channelName, queueName, consumerTag, noLocal, noAck, exclusive, noWait, args], function (err, channel, method, data) {
let consumerTag;
if (err) { cb(err); return; }
if (err) return cb(err);

@@ -608,6 +661,6 @@ returnObj.channel = channel;

} else {
log.warn('larvitamintercom: genericConsume() - No consumerTag obtained for queue: "' + queueName + '"');
log.warn(logPrefix + 'No consumerTag obtained for queue: "' + queueName + '"');
}
log.verbose('larvitamintercom: genericConsume() - Started consuming on queue: "' + queueName + '" with consumer tag: "' + consumerTag + '"');
log.verbose(logPrefix + 'Started consuming on queue: "' + queueName + '" with consumer tag: "' + consumerTag + '"');
cb();

@@ -618,3 +671,3 @@ });

// Register msgCb
tasks.push(function(cb) {
tasks.push(function (cb) {
const eventName = 'incoming_msg_' + options.exchange;

@@ -624,14 +677,13 @@

const err = new Error('Only one subscribe or consume is allowed for each exchange. exchange: "' + options.exchange + '"');
log.warn('larvitamintercom: genericConsume() - ' + err.message);
cb(err);
return;
log.warn(topLogPrefix + 'genericConsume() - ' + err.message);
return cb(err);
}
that.on(eventName, function(message, deliveryTag) {
msgCb(message, function(err) {
that.on(eventName, function (message, deliveryTag) {
msgCb(message, function (err) {
if (err) {
log.warn('larvitamintercom: genericConsume() - nack on deliveryTag: "' + deliveryTag + '" err: ' + err.message);
log.warn(logPrefix + 'nack on deliveryTag: "' + deliveryTag + '" err: ' + err.message);
that.handle.cmd('basic.nack', [that.channelName, deliveryTag]);
} else {
log.debug('larvitamintercom: genericConsume() - ack on deliveryTag: "' + deliveryTag + '"');
log.debug(logPrefix + 'ack on deliveryTag: "' + deliveryTag + '"');
that.handle.cmd('basic.nack', [that.channelName, deliveryTag]);

@@ -645,4 +697,4 @@ }

async.series(tasks, function(err) {
if (err) { cb(err); return; }
async.series(tasks, function (err) {
if (err) return cb(err);

@@ -653,7 +705,4 @@ cb(err, returnObj);

Intercom.prototype.ready = function(cb) {
if (this.queueReady === true) {
cb();
return;
}
Intercom.prototype.ready = function (cb) {
if (this.queueReady === true) return cb();

@@ -674,3 +723,3 @@ this.on('ready', cb);

*/
Intercom.prototype.send = function(orgMsg, options, cb) {
Intercom.prototype.send = function (orgMsg, options, cb) {
const message = require('util')._extend({}, orgMsg),

@@ -690,3 +739,3 @@ that = this,

if (cb === undefined) {
cb = function() {};
cb = function () {};
}

@@ -704,13 +753,30 @@

stringifiedMsg = JSON.stringify(message);
} catch(err) {
log.warn('larvitamintercom: send() - Could not stringify message. Message attached to next log call.');
log.warn('larvitamintercom: send() - Unstringifiable message attached:', message);
cb(err);
return;
} catch (err) {
log.warn(topLogPrefix + 'send() - Could not stringify message. Message attached to next log call.');
log.warn(topLogPrefix + 'send() - Unstringifiable message attached:', message);
return cb(err);
}
log.debug('larvitamintercom: send() - readFromQueue() - Sending to exchange: "' + options.exchange + '", uuid: "' + message.uuid + '", message: "' + stringifiedMsg + '"');
log.debug(topLogPrefix + 'send() - readFromQueue() - Sending to exchange: "' + options.exchange + '", uuid: "' + message.uuid + '", message: "' + stringifiedMsg + '"');
if (that.loopback === true) {
if (
options.forceConsumeQueue === true
&& that.loopbackConQueue[options.exchange] !== 'connected'
&& options.ignoreConQueue !== true
) {
if (that.loopbackConQueue[options.exchange] === undefined) {
that.loopbackConQueue[options.exchange] = [];
}
that.loopbackConQueue[options.exchange].push({'orgMsg': orgMsg, 'options': options});
return cb();
}
that.emit('incoming_msg_' + options.exchange, message, uuidLib.v4());
return cb();
}
// Declare exchange
tasks.push(function(cb) {
tasks.push(function (cb) {
that.declareExchange(options.exchange, cb);

@@ -723,3 +789,3 @@ });

// Declare queue
tasks.push(function(cb) {
tasks.push(function (cb) {
that.declareQueue({'queueName': queueName}, cb);

@@ -729,3 +795,3 @@ });

// Bind queue
tasks.push(function(cb) {
tasks.push(function (cb) {
that.bindQueue(queueName, options.exchange, cb);

@@ -735,3 +801,3 @@ });

tasks.push(function(cb) {
tasks.push(function (cb) {
const properties = {'content-type': 'application/json'},

@@ -742,5 +808,5 @@ className = 'basic',

that.handle.cmd('basic.publish', [that.channelName, options.exchange, 'ignored-routing-key', mandatory, immediate], function(err) {
that.handle.cmd('basic.publish', [that.channelName, options.exchange, 'ignored-routing-key', mandatory, immediate], function (err) {
if (err) {
log.warn('larvitamintercom: send() - readFromQueue() - Could not publish to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');
log.warn(topLogPrefix + 'send() - readFromQueue() - Could not publish to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');
if ( ! cbErr) {

@@ -753,3 +819,3 @@ cbErr = err;

log.debug('larvitamintercom: send() - readFromQueue() - Published (no content sent) to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');
log.debug(topLogPrefix + 'send() - readFromQueue() - Published (no content sent) to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');

@@ -762,5 +828,5 @@ cbsRan ++;

that.handle.cmd('content', [that.channelName, className, properties, stringifiedMsg], function(err) {
that.handle.cmd('content', [that.channelName, className, properties, stringifiedMsg], function (err) {
if (err) {
log.warn('larvitamintercom: send() - readFromQueue() - Could not send publish content to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');
log.warn(topLogPrefix + 'send() - readFromQueue() - Could not send publish content to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');
if ( ! cbErr) {

@@ -773,3 +839,3 @@ cbErr = err;

log.debug('larvitamintercom: send() - readFromQueue() - Content sent to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');
log.debug(topLogPrefix + 'send() - readFromQueue() - Content sent to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"');

@@ -783,3 +849,3 @@ cbsRan ++;

async.series(tasks, function(err) {
async.series(tasks, function (err) {
cb(err, message.uuid);

@@ -789,3 +855,3 @@ });

Intercom.prototype.subscribe = function(options, msgCb, cb) {
Intercom.prototype.subscribe = function (options, msgCb, cb) {
if (typeof options === 'function') {

@@ -803,3 +869,3 @@ cb = msgCb;

log.verbose('larvitamintercom: subscribe() - Starting on exchange "' + options.exchange + '"');
log.verbose(topLogPrefix + 'subscribe() - Starting on exchange "' + options.exchange + '"');

@@ -806,0 +872,0 @@ this.genericConsume(options, msgCb, cb);

{
"name": "larvitamintercom",
"version": "0.1.18",
"version": "0.2.0",
"description": "",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -21,3 +21,3 @@ [![Build Status](https://travis-ci.org/larvit/larvitamintercom.svg?branch=master)](https://travis-ci.org/larvit/larvitamintercom) [![Dependencies](https://david-dm.org/larvit/larvitamintercom.svg)](https://david-dm.org/larvit/larvitamintercom.svg)

intercom.send(message, options, function(err, msgUuid) {
intercom.send(message, options, function (err, msgUuid) {
// called when message is accepted by queue handler

@@ -60,3 +60,3 @@ // msgUuid will be a unique UUID for this specific message

intercom.consume(options, function(message, ack, deliveryTag) {
intercom.consume(options, function (message, ack, deliveryTag) {
// message being the object sent with intercom.send()

@@ -70,3 +70,3 @@

ack(new Error('Something was wrong with the message'));
}, function(err) {
}, function (err) {
// Callback from established consume connection

@@ -93,3 +93,3 @@ });

intercom.subscribe(options, function(message, ack, deliveryTag) {
intercom.subscribe(options, function (message, ack, deliveryTag) {
// message subscribe the object sent with intercom.send()

@@ -103,3 +103,3 @@

ack(new Error('Something was wrong with the message'));
}, function(err, subscribeInstance) {
}, function (err, subscribeInstance) {
// Callback from established subscribe connection

@@ -106,0 +106,0 @@ });

@@ -11,3 +11,4 @@ 'use strict';

let confFile;
let confFile,
conStr;

@@ -24,3 +25,3 @@ // Set up winston

before(function(done) {
before(function (done) {
this.timeout(20000);

@@ -32,3 +33,3 @@

for (let i = 0; i < 20; i ++) {
tasks.push(function(cb) {
tasks.push(function (cb) {
const intercom = new Intercom(config);

@@ -45,12 +46,18 @@

if (process.env.CONFFILE === undefined) {
confFile = __dirname + '/../config/amqp_test.json';
if (process.env.CONSTR === undefined) {
confFile = __dirname + '/../config/amqp_test.json';
} else {
confFile = process.env.CONFFILE;
conStr = process.env.CONSTR;
}
if (conStr !== undefined) {
log.verbose('Autobahn using environment CONSTR');
instantiateIntercoms(conStr);
return;
}
log.verbose('Autobahn config file: "' + confFile + '"');
// First look for absolute path
fs.stat(confFile, function(err) {
fs.stat(confFile, function (err) {
if (err) {

@@ -60,6 +67,6 @@

confFile = __dirname + '/../config/' + confFile;
fs.stat(confFile, function(err) {
fs.stat(confFile, function (err) {
if (err) throw err;
log.verbose('Autobahn config: ' + JSON.stringify(require(confFile)));
instantiateIntercoms(require(confFile).default);
instantiateIntercoms(require(confFile));
});

@@ -71,7 +78,7 @@

log.verbose('Autobahn config: ' + JSON.stringify(require(confFile)));
instantiateIntercoms(require(confFile).default);
instantiateIntercoms(require(confFile));
});
});
/*after(function(done) {
/*after(function (done) {
const tasks = [];

@@ -83,5 +90,5 @@

const intercom = intercoms[i];
tasks.push(function(cb) {
tasks.push(function (cb) {
console.log('Closing intercom ' + i);
intercom.close(function(err) {
intercom.close(function (err) {
if (err) throw err;

@@ -98,353 +105,571 @@

describe('Send and receive', function() {
it('check so the first intercom is up', function(done) {
const intercom = intercoms[0];
describe('Send and receive', function () {
assert.notDeepEqual(intercom.handle, undefined);
assert.notDeepEqual(intercom.handle.channel, undefined);
done();
});
describe('network connection', function () {
it('check so the first intercom is up', function (done) {
const intercom = intercoms[0];
it('send and receive a message to the default exchange', function(done) {
const orgMsg = {'foo': 'bar'},
tasks = [];
assert.notDeepEqual(intercom.handle, undefined);
assert.notDeepEqual(intercom.handle.channel, undefined);
done();
});
this.timeout(2500);
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout()
it('send and receive a message to the default exchange', function (done) {
const orgMsg = {'foo': 'bar'},
tasks = [];
let subscribed = 0,
consumed = 0;
this.timeout(2500);
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout()
function consume(intercom, cb) {
intercom.consume(function(msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
consumed ++;
ack();
}, cb);
}
let subscribed = 0,
consumed = 0;
function subscribe(intercom, cb) {
intercom.subscribe(function(msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
subscribed ++;
ack();
}, cb);
}
function consume(intercom, cb) {
intercom.consume(function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
consumed ++;
ack();
}, cb);
}
tasks.push(function(cb) { consume(intercoms[0], cb); });
tasks.push(function(cb) { consume(intercoms[1], cb); });
tasks.push(function(cb) { subscribe(intercoms[2], cb); });
tasks.push(function(cb) { subscribe(intercoms[3], cb); });
function subscribe(intercom, cb) {
intercom.subscribe(function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
subscribed ++;
ack();
}, cb);
}
async.parallel(tasks, function(err) {
if (err) throw err;
tasks.push(function (cb) { consume(intercoms[0], cb); });
tasks.push(function (cb) { consume(intercoms[1], cb); });
tasks.push(function (cb) { subscribe(intercoms[2], cb); });
tasks.push(function (cb) { subscribe(intercoms[3], cb); });
intercoms[4].send(orgMsg, function(err) {
async.parallel(tasks, function (err) {
if (err) throw err;
// Wait for a while to make sure consume() is not ran multiple times.
// This is not pretty, but I can not think of a better way
setTimeout(function() {
assert.deepEqual(consumed, 1);
assert.deepEqual(subscribed, 2);
done();
}, 1000);
intercoms[4].send(orgMsg, function (err) {
if (err) throw err;
// Wait for a while to make sure consume() is not ran multiple times.
// This is not pretty, but I can not think of a better way
setTimeout(function () {
assert.deepStrictEqual(consumed, 1);
assert.deepStrictEqual(subscribed, 2);
done();
}, 1000);
});
});
});
});
it('send and receive a message to a custom exchange', function(done) {
const exchange = 'customeOne',
orgMsg = {'foo': 'bard'},
tasks = [];
it('send and receive a message to a custom exchange', function (done) {
const exchange = 'customeOne',
orgMsg = {'foo': 'bard'},
tasks = [];
this.timeout(2500);
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout()
this.timeout(2500);
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout()
let subscribed = 0,
consumed = 0;
let subscribed = 0,
consumed = 0;
function consume(intercom, cb) {
intercom.consume({'exchange': exchange}, function(msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
consumed ++;
ack();
}, cb);
}
function consume(intercom, cb) {
intercom.consume({'exchange': exchange}, function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
consumed ++;
ack();
}, cb);
}
function subscribe(intercom, cb) {
intercom.subscribe({'exchange': exchange}, function(msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
subscribed ++;
ack();
}, cb);
}
function subscribe(intercom, cb) {
intercom.subscribe({'exchange': exchange}, function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
subscribed ++;
ack();
}, cb);
}
tasks.push(function(cb) { consume(intercoms[5], cb); });
tasks.push(function(cb) { consume(intercoms[6], cb); });
tasks.push(function(cb) { subscribe(intercoms[7], cb); });
tasks.push(function(cb) { subscribe(intercoms[8], cb); });
tasks.push(function (cb) { consume(intercoms[5], cb); });
tasks.push(function (cb) { consume(intercoms[6], cb); });
tasks.push(function (cb) { subscribe(intercoms[7], cb); });
tasks.push(function (cb) { subscribe(intercoms[8], cb); });
async.parallel(tasks, function(err) {
if (err) throw err;
intercoms[9].send(orgMsg, {'exchange': exchange}, function(err) {
async.parallel(tasks, function (err) {
if (err) throw err;
// Wait for a while to make sure consume() is not ran multiple times.
// This is not pretty, but I can not think of a better way
setTimeout(function() {
assert.deepEqual(consumed, 1);
assert.deepEqual(subscribed, 2);
done();
}, 1000);
intercoms[9].send(orgMsg, {'exchange': exchange}, function (err) {
if (err) throw err;
// Wait for a while to make sure consume() is not ran multiple times.
// This is not pretty, but I can not think of a better way
setTimeout(function () {
assert.deepStrictEqual(consumed, 1);
assert.deepStrictEqual(subscribed, 2);
done();
}, 1000);
});
});
});
});
it('send and receive on the same Intercom', function(done) {
const intercom = {'subscribe': intercoms[10], 'consume': intercoms[11]},
tasks = [];
it('send and receive on the same Intercom', function (done) {
const intercom = {'subscribe': intercoms[10], 'consume': intercoms[11]},
tasks = [];
for (const method of Object.keys(intercom)) {
tasks.push(function(cb) {
const exchange = 'sameInstance' + method,
orgMsg = {'bi': 'bu'};
for (const method of Object.keys(intercom)) {
tasks.push(function (cb) {
const exchange = 'sameInstance' + method,
orgMsg = {'bi': 'bu'};
intercom[method][method]({'exchange': exchange}, function(msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
ack();
cb();
}, function(err) {
if (err) throw err;
intercom[method][method]({'exchange': exchange}, function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
ack();
cb();
}, function (err) {
if (err) throw err;
intercom[method].send(orgMsg, {'exchange': exchange}, function(err) {
if (err) throw err;
intercom[method].send(orgMsg, {'exchange': exchange}, function (err) {
if (err) throw err;
});
});
});
});
}
}
async.parallel(tasks, done);
});
async.parallel(tasks, done);
});
it('send and receive multiple messages on different Intercoms', function(done) {
const tasks = [],
intercom = {
'subscribe': {
'intercom1': intercoms[12], 'intercom2': intercoms[13]
},
'consume': {
'intercom1': intercoms[14], 'intercom2': intercoms[15]
}
};
it('send and receive multiple messages on different Intercoms', function (done) {
const tasks = [],
intercom = {
'subscribe': {
'intercom1': intercoms[12], 'intercom2': intercoms[13]
},
'consume': {
'intercom1': intercoms[14], 'intercom2': intercoms[15]
}
};
for (const method of Object.keys(intercom)) {
tasks.push(function(cb) {
const intercom1 = intercom[method].intercom1,
intercom2 = intercom[method].intercom2,
exchange = 'anotherInstance' + method,
orgMsg1 = {'ba': 'bo'},
orgMsg2 = {'waff': 'woff'};
for (const method of Object.keys(intercom)) {
tasks.push(function (cb) {
const intercom1 = intercom[method].intercom1,
intercom2 = intercom[method].intercom2,
exchange = 'anotherInstance' + method,
orgMsg1 = {'ba': 'bo'},
orgMsg2 = {'waff': 'woff'};
let msg1Received = 0,
msg2Received = 0;
let msg1Received = 0,
msg2Received = 0;
intercom1[method]({'exchange': exchange}, function(msg, ack) {
if (JSON.stringify(msg.ba) === JSON.stringify(orgMsg1.ba)) {
msg1Received ++;
ack();
} else if (JSON.stringify(msg.waff) === JSON.stringify(orgMsg2.waff)) {
msg2Received ++;
ack();
}
intercom1[method]({'exchange': exchange}, function (msg, ack) {
if (JSON.stringify(msg.ba) === JSON.stringify(orgMsg1.ba)) {
msg1Received ++;
ack();
} else if (JSON.stringify(msg.waff) === JSON.stringify(orgMsg2.waff)) {
msg2Received ++;
ack();
}
if (msg1Received === 1 && msg2Received === 1) {
cb();
}
}, function(err) {
if (err) throw err;
if (msg1Received === 1 && msg2Received === 1) {
cb();
}
}, function (err) {
if (err) throw err;
intercom1.send(orgMsg1, {'exchange': exchange}, function(err) {
if (err) throw err;
intercom1.send(orgMsg1, {'exchange': exchange}, function (err) {
if (err) throw err;
});
intercom2.send(orgMsg2, {'exchange': exchange}, function (err) {
if (err) throw err;
});
});
});
}
intercom2.send(orgMsg2, {'exchange': exchange}, function(err) {
async.parallel(tasks, done);
});
it('send and receive multiple messages on the same Intercom', function (done) {
const intercom = {'subscribe': intercoms[16], 'consume': intercoms[17]},
tasks = [];
for (const method of Object.keys(intercom)) {
tasks.push(function (cb) {
const exchange = 'yetAnotherInstance' + method,
orgMsg1 = {'bar': 'bor'},
orgMsg2 = {'waffer': 'woffer'};
let msg1Received = 0,
msg2Received = 0;
intercom[method][method]({'exchange': exchange}, function (msg, ack) {
if (JSON.stringify(msg.bar) === JSON.stringify(orgMsg1.bar)) {
msg1Received ++;
ack();
} else if (JSON.stringify(msg.waffer) === JSON.stringify(orgMsg2.waffer)) {
msg2Received ++;
ack();
}
if (msg1Received === 10 && msg2Received === 10) {
cb();
}
}, function (err) {
if (err) throw err;
for (let i = 0; i !== 10; i ++) {
intercom[method].send(orgMsg1, {'exchange': exchange}, function (err) {
if (err) throw err;
});
intercom[method].send(orgMsg2, {'exchange': exchange}, function (err) {
if (err) throw err;
});
}
});
});
}
async.parallel(tasks, done);
});
it('send and receive messages on different exchanges', function (done) {
const exchange1 = 'differentExes1',
exchange2 = 'differentExes2',
orgMsg1 = {'bar': 'bor'},
orgMsg2 = {'waffer': 'woffer'},
tasks = [];
let receivedMsg1 = 0,
receivedMsg2 = 0;
tasks.push(function (cb) {
intercoms[0].subscribe({'exchange': exchange1}, function (msg, ack) {
assert.deepStrictEqual(msg.bar, orgMsg1.bar);
receivedMsg1 ++;
ack();
}, cb);
});
}
async.parallel(tasks, done);
});
tasks.push(function (cb) {
intercoms[0].subscribe({'exchange': exchange2}, function (msg, ack) {
assert.deepStrictEqual(msg.waffer, orgMsg2.waffer);
receivedMsg2 ++;
ack();
}, cb);
});
it('send and receive multiple messages on the same Intercom', function(done) {
const intercom = {'subscribe': intercoms[16], 'consume': intercoms[17]},
tasks = [];
tasks.push(function (cb) {
intercoms[0].send(orgMsg1, {'exchange': exchange1}, cb);
});
for (const method of Object.keys(intercom)) {
tasks.push(function(cb) {
const exchange = 'yetAnotherInstance' + method,
orgMsg1 = {'bar': 'bor'},
orgMsg2 = {'waffer': 'woffer'};
tasks.push(function (cb) {
intercoms[0].send(orgMsg2, {'exchange': exchange2}, cb);
});
let msg1Received = 0,
msg2Received = 0;
async.series(tasks, function (err) {
let interval;
intercom[method][method]({'exchange': exchange}, function(msg, ack) {
if (JSON.stringify(msg.bar) === JSON.stringify(orgMsg1.bar)) {
msg1Received ++;
ack();
} else if (JSON.stringify(msg.waffer) === JSON.stringify(orgMsg2.waffer)) {
msg2Received ++;
ack();
}
if (err) throw err;
if (msg1Received === 10 && msg2Received === 10) {
cb();
interval = setInterval(function () {
if (receivedMsg1 === 1 && receivedMsg2 === 1) {
clearInterval(interval);
done();
}
}, function(err) {
if (err) throw err;
}, 10);
});
});
for (let i = 0; i !== 10; i ++) {
intercom[method].send(orgMsg1, {'exchange': exchange}, function(err) {
if (err) throw err;
});
it('send before consumer is up and still receive', function (done) {
const exchange = 'dkfia893M', // Random exchange to not collide with another test
orgMsg = {'foo': 'bar'};
intercom[method].send(orgMsg2, {'exchange': exchange}, function(err) {
if (err) throw err;
});
}
});
this.timeout(2000);
this.slow(700);
intercoms[0].send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function (err) {
if (err) throw err;
setTimeout(function () {
intercoms[0].consume({'exchange': exchange}, function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
ack();
done();
}, function (err) {
if (err) throw err;
});
}, 200);
});
}
});
async.parallel(tasks, done);
});
it('send and declare exchanges at the same time', function (done) {
const intercom = intercoms[0],
exchange = 'breakCmdChain',
orgMsg1 = {'blippel': 'bloppel'},
orgMsg2 = {'maffab': 'berk'};
it('send and receive messages on different exchanges', function(done) {
const exchange1 = 'differentExes1',
exchange2 = 'differentExes2',
orgMsg1 = {'bar': 'bor'},
orgMsg2 = {'waffer': 'woffer'},
tasks = [];
let msg1Received = 0,
msg2Received = 0;
let receivedMsg1 = 0,
receivedMsg2 = 0;
intercom.subscribe({'exchange': exchange}, function (msg, ack) {
if (JSON.stringify(msg.blippel) === JSON.stringify(orgMsg1.blippel)) {
msg1Received ++;
ack();
} else if (JSON.stringify(msg.maffab) === JSON.stringify(orgMsg2.maffab)) {
msg2Received ++;
ack();
}
tasks.push(function(cb) {
intercoms[0].subscribe({'exchange': exchange1}, function(msg, ack) {
assert.deepEqual(msg.bar, orgMsg1.bar);
receivedMsg1 ++;
ack();
}, cb);
if (msg1Received === 10 && msg2Received === 10) {
done();
}
}, function (err) {
if (err) throw err;
for (let i = 0; i !== 10; i ++) {
intercom.send(orgMsg1, {'exchange': exchange}, function (err) {
if (err) throw err;
});
// Lets provoke!
intercom.declareExchange(exchange + '_foobar', function (err) {
if (err) throw err;
});
intercom.send(orgMsg2, {'exchange': exchange}, function (err) {
if (err) throw err;
});
}
});
});
});
tasks.push(function(cb) {
intercoms[0].subscribe({'exchange': exchange2}, function(msg, ack) {
assert.deepEqual(msg.waffer, orgMsg2.waffer);
receivedMsg2 ++;
describe('loopback interface', function () {
it('send and receive a message to the default exchange via consume', function (done) {
const intercom = new Intercom('loopback interface'),
orgMsg = {'fooconsume': 'barconsume'};
intercom.consume(function (msg, ack, deliveryTag) {
assert.notStrictEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
ack();
}, cb);
done();
}, function (err) {
if (err) throw err;
});
intercom.send(orgMsg, function (err) {
if (err) throw err;
});
});
tasks.push(function(cb) {
intercoms[0].send(orgMsg1, {'exchange': exchange1}, cb);
it('send and receive a message to the default exchange via subscribe', function (done) {
const intercom = new Intercom('loopback interface'),
orgMsg = {'foosubscribe': 'barsubscribe'};
intercom.subscribe(function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
ack();
done();
}, function (err) {
if (err) throw err;
});
intercom.send(orgMsg, function (err) {
if (err) throw err;
});
});
tasks.push(function(cb) {
intercoms[0].send(orgMsg2, {'exchange': exchange2}, cb);
it('send and receive a message to a custom exchange via consume', function (done) {
const intercom = new Intercom('loopback interface'),
exchange = 'loopbackExchCon',
orgMsg = {'fooconcusExch': 'barconcusExch'};
intercom.consume({'exchange': exchange + '_nope'}, function () {
throw new Error('should not receive any messages here');
}, function (err) {
if (err) throw err;
});
intercom.consume({'exchange': exchange}, function (msg, ack, deliveryTag) {
assert.notStrictEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
ack();
setTimeout(done, 20);
}, function (err) {
if (err) throw err;
});
intercom.send(orgMsg, {'exchange': exchange}, function (err) {
if (err) throw err;
});
});
async.series(tasks, function(err) {
let interval;
it('send and receive a message to a custom exchange via subscribe', function (done) {
const intercom = new Intercom('loopback interface'),
exchange = 'loopbackExchSub',
orgMsg = {'foosubExch': 'barsubExch'};
if (err) throw err;
intercom.subscribe({'exchange': exchange + '_nope'}, function () {
throw new Error('should not receive any messages here');
}, function (err) {
if (err) throw err;
});
interval = setInterval(function() {
if (receivedMsg1 === 1 && receivedMsg2 === 1) {
clearInterval(interval);
done();
}
}, 10);
intercom.subscribe({'exchange': exchange}, function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag should be non-empty');
ack();
setTimeout(done, 20);
}, function (err) {
if (err) throw err;
});
intercom.send(orgMsg, {'exchange': exchange}, function (err) {
if (err) throw err;
});
});
});
it('send before consumer is up and still receive', function(done) {
const exchange = 'dkfia893M', // Random exchange to not collide with another test
orgMsg = {'foo': 'bar'};
it('send and receive messages on different exchanges', function (done) {
const exchange1 = 'differentExes1',
exchange2 = 'differentExes2',
intercom = new Intercom('loopback interface'),
orgMsg1 = {'bar': 'bor'},
orgMsg2 = {'waffer': 'woffer'},
tasks = [];
this.timeout(2000);
this.slow(700);
let receivedMsg1 = 0,
receivedMsg2 = 0;
intercoms[0].send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function(err) {
if (err) throw err;
tasks.push(function (cb) {
intercom.subscribe({'exchange': exchange1}, function (msg, ack) {
assert.deepStrictEqual(msg.bar, orgMsg1.bar);
receivedMsg1 ++;
ack();
}, cb);
});
setTimeout(function() {
intercoms[0].consume({'exchange': exchange}, function(msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
tasks.push(function (cb) {
intercom.subscribe({'exchange': exchange2}, function (msg, ack) {
assert.deepStrictEqual(msg.waffer, orgMsg2.waffer);
receivedMsg2 ++;
ack();
done();
}, function(err) {
if (err) throw err;
}, cb);
});
});
}, 200);
tasks.push(function (cb) {
intercom.send(orgMsg1, {'exchange': exchange1}, cb);
});
tasks.push(function (cb) {
intercom.send(orgMsg2, {'exchange': exchange2}, cb);
});
async.series(tasks, function (err) {
let interval;
if (err) throw err;
interval = setInterval(function () {
if (receivedMsg1 === 1 && receivedMsg2 === 1) {
clearInterval(interval);
done();
}
}, 10);
});
});
});
it('send and declare exchanges at the same time', function(done) {
const intercom = intercoms[0],
exchange = 'breakCmdChain',
orgMsg1 = {'blippel': 'bloppel'},
orgMsg2 = {'maffab': 'berk'};
it('send 1000 messages on subscription', function (done) {
const intercom = new Intercom('loopback interface');
let msg1Received = 0,
msg2Received = 0;
let receivedMsgs = 0,
expectedSum = 0,
actualSum = 0;
intercom.subscribe({'exchange': exchange}, function(msg, ack) {
if (JSON.stringify(msg.blippel) === JSON.stringify(orgMsg1.blippel)) {
msg1Received ++;
intercom.subscribe(function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
actualSum += msg.thisNr;
receivedMsgs ++;
assert(deliveryTag, 'deliveryTag should be non-empty');
ack();
} else if (JSON.stringify(msg.maffab) === JSON.stringify(orgMsg2.maffab)) {
msg2Received ++;
ack();
}
if (msg1Received === 10 && msg2Received === 10) {
done();
}
}, function(err) {
if (err) throw err;
if (receivedMsgs === 1000) {
assert.strictEqual(expectedSum, actualSum);
done();
}
}, function (err) {
if (err) throw err;
});
for (let i = 0; i !== 10; i ++) {
intercom.send(orgMsg1, {'exchange': exchange}, function(err) {
if (err) throw err;
});
for (let i = 0; i !== 1000; i ++) {
const thisNr = Math.round(Math.random());
// Lets provoke!
intercom.declareExchange(exchange + '_foobar', function(err) {
expectedSum += thisNr;
intercom.send({'thisNr': thisNr}, function (err) {
if (err) throw err;
});
}
});
intercom.send(orgMsg2, {'exchange': exchange}, function(err) {
it('send 1000 messages on consume', function (done) {
const intercom = new Intercom('loopback interface');
let receivedMsgs = 0,
expectedSum = 0,
actualSum = 0;
intercom.consume(function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid');
actualSum += msg.thisNr;
receivedMsgs ++;
assert(deliveryTag, 'deliveryTag should be non-empty');
ack();
if (receivedMsgs === 1000) {
assert.strictEqual(expectedSum, actualSum);
done();
}
}, function (err) {
if (err) throw err;
});
for (let i = 0; i !== 1000; i ++) {
const thisNr = Math.round(Math.random());
expectedSum += thisNr;
intercom.send({'thisNr': thisNr}, function (err) {
if (err) throw err;

@@ -454,6 +679,32 @@ });

});
it('send before consumer is up and still receive', function (done) {
const intercom = new Intercom('loopback interface'),
exchange = 'dkfia893M', // Random exchange to not collide with another test
orgMsg = {'foo': 'bar'};
this.timeout(2000);
this.slow(700);
intercom.send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function (err) {
if (err) throw err;
setTimeout(function () {
intercom.consume({'exchange': exchange}, function (msg, ack, deliveryTag) {
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false);
delete msg.uuid;
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg));
assert(deliveryTag, 'deliveryTag shoult be non-empty');
ack();
done();
}, function (err) {
if (err) throw err;
});
}, 200);
});
});
});
/* Disabled until .cancel() is fixed
it('should not receive after the consumation is cancelled', function(done) {
it('should not receive after the consumation is cancelled', function (done) {
const consumeIntercom = intercoms[14],

@@ -476,7 +727,7 @@ sendIntercom = intercoms[15],

if (receivedNoMsgs === 1) {
consumeInstance.cancel(function(err) {
consumeInstance.cancel(function (err) {
if (err) throw err;
// The callback is sadly not trustworthy. Instead wait a bit and try again
setTimeout(function() {
setTimeout(function () {
sendAgain();

@@ -493,11 +744,11 @@ }, 200);

consumeIntercom.consume({'exchange': exchangeNo}, handleNoMsg, function(err, result) {
consumeIntercom.consume({'exchange': exchangeNo}, handleNoMsg, function (err, result) {
consumeInstance = result;
consumeIntercom.consume({'exchange': exchangeYes}, handleYesMsg, function(err) {
consumeIntercom.consume({'exchange': exchangeYes}, handleYesMsg, function (err) {
if (err) throw err;
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function(err) {
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function (err) {
if (err) throw err;
});
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function(err) {
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function (err) {
if (err) throw err;

@@ -510,9 +761,9 @@ });

function sendAgain() {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function(err) {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function (err) {
if (err) throw err;
// Wait a while, and then make sure we have not gotten a second message
setTimeout(function() {
assert.deepEqual(receivedNoMsgs, 1);
assert.deepEqual(receivedYesMsgs, 2);
setTimeout(function () {
assert.deepStrictEqual(receivedNoMsgs, 1);
assert.deepStrictEqual(receivedYesMsgs, 2);
done();

@@ -522,3 +773,3 @@ }, 200);

sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function(err) {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function (err) {
if (err) throw err;

@@ -529,3 +780,3 @@ });

it('should not receive after the subscription is cancelled', function(done) {
it('should not receive after the subscription is cancelled', function (done) {
const subscribeIntercom = intercoms[16],

@@ -548,7 +799,7 @@ sendIntercom = intercoms[17],

if (receivedNoMsgs === 1) {
subscribeInstance.cancel(function(err) {
subscribeInstance.cancel(function (err) {
if (err) throw err;
// The callback is sadly not trustworthy. Instead wait a bit and try again
setTimeout(function() {
setTimeout(function () {
sendAgain();

@@ -565,8 +816,8 @@ }, 200);

subscribeIntercom.subscribe({'exchange': exchangeNo}, handleNoMsg, function(err, result) {
subscribeIntercom.subscribe({'exchange': exchangeNo}, handleNoMsg, function (err, result) {
subscribeInstance = result;
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function(err) {
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function (err) {
if (err) throw err;
});
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function(err) {
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function (err) {
if (err) throw err;

@@ -576,3 +827,3 @@ });

subscribeIntercom.subscribe({'exchange': exchangeYes}, handleYesMsg, function(err) {
subscribeIntercom.subscribe({'exchange': exchangeYes}, handleYesMsg, function (err) {
if (err) throw err;

@@ -582,9 +833,9 @@ });

function sendAgain() {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function(err) {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function (err) {
if (err) throw err;
// Wait a while, and then make sure we have not gotten a second message
setTimeout(function() {
assert.deepEqual(receivedNoMsgs, 1);
assert.deepEqual(receivedYesMsgs, 2);
setTimeout(function () {
assert.deepStrictEqual(receivedNoMsgs, 1);
assert.deepStrictEqual(receivedYesMsgs, 2);
done();

@@ -594,3 +845,3 @@ }, 200);

sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function(err) {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function (err) {
if (err) throw err;

@@ -597,0 +848,0 @@ });

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet