larvitamintercom
Advanced tools
Comparing version 0.1.18 to 0.2.0
474
index.js
'use strict'; | ||
const EventEmitter = require('events').EventEmitter, | ||
topLogPrefix = 'larvitamintercom: index.js - ', | ||
uuidLib = require('uuid'), | ||
@@ -20,6 +21,9 @@ bramqp = require('bramqp'), | ||
* Events | ||
* .on('error', function(err)) - something serious happened! | ||
* .on('error', function (err)) - something serious happened! | ||
* | ||
* @param str conStr - AMQP connection string OR "loopback interface" to only work in loopback mode | ||
*/ | ||
function Intercom(conStr) { | ||
const parsedConStr = url.parse(conStr), | ||
logPrefix = topLogPrefix + 'Intercom() - ', | ||
tasks = [], | ||
@@ -32,75 +36,85 @@ that = this; | ||
that.declaredExchanges = []; | ||
that.host = parsedConStr.hostname; | ||
that.port = parsedConStr.port || 5672; | ||
that.expectingClose = false; | ||
that.queueReady = false; | ||
that.sendInProgress = false; | ||
that.sendQueue = []; | ||
that.expectingClose = false; | ||
that.socket = net.connect({ | ||
'port': that.port, | ||
'host': that.host | ||
}); | ||
if (conStr === 'loopback interface') { | ||
that.loopback = true; | ||
that.loopbackConQueue = {}; | ||
that.handle = new EventEmitter; | ||
log.verbose('larvitamintercom: Intercom() - Initializing on ' + that.host + ':' + that.port); | ||
log.verbose(logPrefix + 'Initializing on loopback interface'); | ||
} else { | ||
that.loopback = false; | ||
that.host = parsedConStr.hostname; | ||
that.port = parsedConStr.port || 5672; | ||
that.socket.on('error', function(err) { | ||
log.error('larvitamintercom: Intercom() - socket error: ' + err.message); | ||
}); | ||
that.socket = net.connect({ | ||
'port': that.port, | ||
'host': that.host | ||
}); | ||
that.socket.on('close', function(hadError) { | ||
log.info('larvitamintercom: Intercom() - socket closed'); | ||
if (hadError) { | ||
log.error('larvitamintercom: Intercom() - socket closed with error'); | ||
} | ||
}); | ||
log.verbose(logPrefix + 'Initializing on ' + that.host + ':' + that.port); | ||
that.socket.on('end', function() { | ||
log.info('larvitamintercom: Intercom() - socket connection ended by remote'); | ||
}); | ||
that.socket.on('error', function (err) { | ||
log.error(logPrefix + 'socket error: ' + err.message); | ||
}); | ||
// Create handle by socket connect to rabbitmq | ||
tasks.push(function(cb) { | ||
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function(err, result) { | ||
if (err) { | ||
log.error('larvitamintercom: Intercom() - Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message); | ||
that.emit('error', err); | ||
that.socket.on('close', function (hadError) { | ||
log.info(logPrefix + 'socket closed'); | ||
if (hadError) { | ||
log.error(logPrefix + 'socket closed with error'); | ||
} | ||
}); | ||
log.debug('larvitamintercom: Intercom() - bramqp.initialize() ran on ' + that.host + ':' + that.port); | ||
that.socket.on('end', function () { | ||
log.info(logPrefix + 'socket connection ended by remote'); | ||
}); | ||
that.handle = result; | ||
// Create handle by socket connect to rabbitmq | ||
tasks.push(function (cb) { | ||
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function (err, result) { | ||
if (err) { | ||
log.error(logPrefix + 'Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message); | ||
that.emit('error', err); | ||
} | ||
cb(err); | ||
}); | ||
}); | ||
log.debug(logPrefix + 'bramqp.initialize() ran on ' + that.host + ':' + that.port); | ||
// Open AMQP communication | ||
tasks.push(function(cb) { | ||
const heartBeat = true, | ||
auth = parsedConStr.auth; | ||
that.handle = result; | ||
let username, | ||
password; | ||
cb(err); | ||
}); | ||
}); | ||
if (auth) { | ||
username = parsedConStr.auth.split(':')[0]; | ||
password = parsedConStr.auth.split(':')[1]; | ||
} | ||
// Open AMQP communication | ||
tasks.push(function (cb) { | ||
const heartBeat = true, | ||
auth = parsedConStr.auth; | ||
log.debug('larvitamintercom: Intercom() - openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username); | ||
let username, | ||
password; | ||
that.handle.openAMQPCommunication(username, password, heartBeat, function(err) { | ||
if (err) { | ||
log.error('larvitamintercom: Intercom() - Error opening AMQP communication: ' + err.message); | ||
that.emit('error', err); | ||
if (auth) { | ||
username = parsedConStr.auth.split(':')[0]; | ||
password = parsedConStr.auth.split(':')[1]; | ||
} | ||
cb(err); | ||
log.debug(logPrefix + 'openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username); | ||
that.handle.openAMQPCommunication(username, password, heartBeat, function (err) { | ||
if (err) { | ||
log.error(logPrefix + 'Error opening AMQP communication: ' + err.message); | ||
that.emit('error', err); | ||
} | ||
cb(err); | ||
}); | ||
}); | ||
}); | ||
} | ||
// Register listener for incoming messages | ||
tasks.push(function(cb) { | ||
that.handle.on(that.channelName + ':basic.deliver', function(channel, method, data) { | ||
tasks.push(function (cb) { | ||
that.handle.on(that.channelName + ':basic.deliver', function (channel, method, data) { | ||
const exchange = data.exchange, | ||
@@ -110,19 +124,18 @@ deliveryTag = data['delivery-tag'], | ||
log.debug('larvitamintercom: Intercom() - Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"'); | ||
log.debug(logPrefix + 'Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"'); | ||
that.handle.once('content', function(channel, className, properties, content) { | ||
that.handle.once('content', function (channel, className, properties, content) { | ||
let message; | ||
log.debug('larvitamintercom: Intercom() - Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
log.debug(logPrefix + 'Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
try { | ||
message = JSON.parse(content.toString()); | ||
} catch(err) { | ||
log.warn('larvitamintercom: subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
cb(err); | ||
return; | ||
} catch (err) { | ||
log.warn(logPrefix + 'subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
return cb(err); | ||
} | ||
if (lUtils.formatUuid(message.uuid) === false) { | ||
log.warn('larvitamintercom: consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
log.warn(logPrefix + 'consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"'); | ||
} | ||
@@ -137,8 +150,8 @@ | ||
// Register listener for close events | ||
tasks.push(function(cb) { | ||
that.handle.on('connection.close', function(channel, method, data) { | ||
tasks.push(function (cb) { | ||
that.handle.on('connection.close', function (channel, method, data) { | ||
if (that.expectingClose === false) { | ||
log.error('larvitamintercom: Intercom() - Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
log.error(logPrefix + 'Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
} else { | ||
log.info('larvitamintercom: Intercom() - Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
log.info(logPrefix + 'Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"'); | ||
} | ||
@@ -151,9 +164,9 @@ }); | ||
// Should be disabled in production code and only manually enabled while debugging due to it being expensive | ||
/** /tasks.push(function(cb) { | ||
/** /tasks.push(function (cb) { | ||
const oldEmitter = that.handle.emit; | ||
that.handle.emit = function() { | ||
that.handle.emit = function () { | ||
const emitArgs = arguments; | ||
log.silly('larvitamintercom: handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"'); | ||
log.silly(topLogPrefix + 'handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"'); | ||
@@ -167,3 +180,3 @@ oldEmitter.apply(that.handle, arguments); | ||
// Construct generic handle comms | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
const cmdStrsWithoutOk = ['basic.publish', 'content', 'closeAMQPCommunication', 'basic.nack', 'basic.ack']; | ||
@@ -173,3 +186,3 @@ | ||
if (typeof cb !== 'function') { | ||
cb = function() {}; | ||
cb = function () {}; | ||
} | ||
@@ -179,10 +192,10 @@ | ||
log.debug('larvitamintercom: handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"'); | ||
log.debug(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"'); | ||
if (that.cmdInProgress === true) { | ||
log.silly('larvitamintercom: handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true'); | ||
log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true'); | ||
return; | ||
} | ||
log.silly('larvitamintercom: handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true'); | ||
log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true'); | ||
@@ -194,4 +207,4 @@ that.cmdInProgress = true; | ||
cmdStr = mainParams.cmdStr, | ||
cb = mainParams.cb, | ||
tasks = []; | ||
tasks = [], | ||
cb = mainParams.cb; | ||
@@ -208,3 +221,3 @@ let params = mainParams.params, | ||
// Register the callback | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
const cmdGroupName = cmdStr.split('.')[0], | ||
@@ -216,6 +229,24 @@ cmdName = cmdStr.split('.')[1]; | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1) { | ||
okTimeout = setTimeout(function() { | ||
function cmdCb(err) { | ||
if (err) { | ||
log.error(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message); | ||
callCb = false; | ||
return cb(err); | ||
} | ||
log.debug(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded'); | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) { | ||
cb(); | ||
} | ||
} | ||
if (that.loopback === true) { | ||
return cb(); | ||
} | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1 && that.loopback === false) { | ||
okTimeout = setTimeout(function () { | ||
const err = new Error('no answer received from queue within 500ms'); | ||
log.error('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message); | ||
log.error(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message); | ||
callCb = false; | ||
@@ -225,3 +256,3 @@ cb(err); | ||
that.handle.once(that.channelName + ':' + cmdStr + '-ok', function(x, y, z) { | ||
that.handle.once(that.channelName + ':' + cmdStr + '-ok', function (x, y, z) { | ||
// We want these in the outer scope, thats why the weird naming | ||
@@ -232,5 +263,5 @@ channel = x; | ||
log.debug('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue'); | ||
log.debug(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue'); | ||
if (callCb === false) { | ||
log.warn('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened'); | ||
log.warn(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened'); | ||
return; | ||
@@ -243,17 +274,4 @@ } | ||
params.push(function(err) { | ||
if (err) { | ||
log.error('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message); | ||
callCb = false; | ||
cb(err); | ||
return; | ||
} | ||
params.push(cmdCb); | ||
log.debug('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded'); | ||
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) { | ||
cb(); | ||
} | ||
}); | ||
if (cmdName) { | ||
@@ -266,10 +284,10 @@ that.handle[cmdGroupName][cmdName].apply(that.handle, params); | ||
async.series(tasks, function(err) { | ||
async.series(tasks, function (err) { | ||
cb(err, channel, method, data); | ||
if (that.cmdQueue.length === 0) { | ||
log.silly('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0'); | ||
log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0'); | ||
that.cmdInProgress = false; | ||
} else { | ||
log.silly('larvitamintercom: handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning'); | ||
log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning'); | ||
readFromQueue(); | ||
@@ -284,7 +302,13 @@ } | ||
async.series(tasks, function(err) { | ||
async.series(tasks, function (err) { | ||
if ( ! err) { | ||
log.debug('larvitamintercom: Intercom() - Initialized on ' + that.host + ':' + that.port); | ||
if (that.loopback === true) { | ||
log.debug(logPrefix + 'Initialized on loopback interface'); | ||
} else { | ||
log.debug(logPrefix + 'Initialized on ' + that.host + ':' + that.port); | ||
} | ||
that.queueReady = true; | ||
that.emit('ready'); | ||
setImmediate(function () { | ||
that.emit('ready'); | ||
}); | ||
} | ||
@@ -297,3 +321,3 @@ }); | ||
Intercom.prototype.bindQueue = function(queueName, exchange, cb) { | ||
Intercom.prototype.bindQueue = function (queueName, exchange, cb) { | ||
const noWait = false, // "If set, the server will not respond to the method. The client | ||
@@ -306,13 +330,15 @@ // should not wait for a reply method. If the server could not complete | ||
log.verbose('larvitamintercom: bindQueue() - Binding queue "' + queueName + '" to exchange "' + exchange + '"'); | ||
log.verbose(topLogPrefix + 'bindQueue() - Binding queue "' + queueName + '" to exchange "' + exchange + '"'); | ||
that.ready(function(err) { | ||
if (err) { cb(err); return; } | ||
if (that.loopback === true) return cb(); | ||
that.handle.cmd('queue.bind', [that.channelName, queueName, exchange, 'ignored-routing-key', noWait, args], function(err) { | ||
that.ready(function (err) { | ||
if (err) return cb(err); | ||
that.handle.cmd('queue.bind', [that.channelName, queueName, exchange, 'ignored-routing-key', noWait, args], function (err) { | ||
if (err) { | ||
log.error('larvitamintercom: bindQueue() - Could not bind queue: "' + queueName + '" to exchange: "' + exchange + '", err: ' + err.message); | ||
log.error(topLogPrefix + 'bindQueue() - Could not bind queue: "' + queueName + '" to exchange: "' + exchange + '", err: ' + err.message); | ||
} | ||
log.debug('larvitamintercom: bindQueue() - Bound queue "' + queueName + '" to exchange "' + exchange + '"'); | ||
log.debug(topLogPrefix + 'bindQueue() - Bound queue "' + queueName + '" to exchange "' + exchange + '"'); | ||
@@ -325,25 +351,29 @@ cb(err); | ||
// Close the RabbitMQ connection | ||
Intercom.prototype.close = function(cb) { | ||
Intercom.prototype.close = function (cb) { | ||
const that = this; | ||
if (typeof cb !== 'function') { | ||
cb = function() {}; | ||
cb = function () {}; | ||
} | ||
log.verbose('larvitamintercom: close() - on ' + that.host + ':' + that.port); | ||
if (that.loopback === true) { | ||
log.verbose(topLogPrefix + 'close() - on loopback interface'); | ||
return cb(); | ||
} else { | ||
log.verbose(topLogPrefix + 'close() - on ' + that.host + ':' + that.port); | ||
} | ||
that.expectingClose = true; | ||
that.ready(function(err) { | ||
if (err) { cb(err); return; } | ||
that.ready(function (err) { | ||
if (err) return cb(err); | ||
that.handle.cmd('closeAMQPCommunication', function(err) { | ||
that.handle.cmd('closeAMQPCommunication', function (err) { | ||
if (err) { | ||
log.warn('larvitamintercom: close() - Could not closeAMQPCommunication: ' + err.message); | ||
cb(err); | ||
return; | ||
log.warn(topLogPrefix + 'close() - Could not closeAMQPCommunication: ' + err.message); | ||
return cb(err); | ||
} | ||
setImmediate(function() { | ||
log.debug('larvitamintercom: close() - closed ' + that.host + ':' + that.port); | ||
setImmediate(function () { | ||
log.debug(topLogPrefix + 'close() - closed ' + that.host + ':' + that.port); | ||
cb(); | ||
@@ -355,3 +385,5 @@ }); | ||
Intercom.prototype.consume = function(options, msgCb, cb) { | ||
Intercom.prototype.consume = function (options, msgCb, cb) { | ||
const that = this; | ||
if (typeof options === 'function') { | ||
@@ -373,8 +405,24 @@ cb = msgCb; | ||
log.verbose('larvitamintercom: consume() - Starting on exchange "' + options.exchange + '"'); | ||
if (that.loopback === true) { | ||
if (Array.isArray(that.loopbackConQueue[options.exchange])) { | ||
setTimeout(function () { | ||
for (let i = 0; that.loopbackConQueue[options.exchange][i] !== undefined; i ++) { | ||
const queueItem = that.loopbackConQueue[options.exchange][i]; | ||
queueItem.options.ignoreConQueue = true; | ||
that.send(queueItem.orgMsg, queueItem.options); | ||
} | ||
that.loopbackConQueue[options.exchange] = 'connected'; | ||
}, 10); | ||
} | ||
} | ||
log.verbose(topLogPrefix + 'consume() - Starting on exchange "' + options.exchange + '"'); | ||
this.genericConsume(options, msgCb, cb); | ||
}; | ||
Intercom.prototype.declareExchange = function(exchangeName, cb) { | ||
Intercom.prototype.declareExchange = function (exchangeName, cb) { | ||
const exchangeType = 'fanout', | ||
@@ -391,23 +439,23 @@ autoDelete = false, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.auto-delete | ||
log.debug('larvitamintercom: declareExchange() - exchangeName: "' + exchangeName + '"'); | ||
log.debug(topLogPrefix + 'declareExchange() - exchangeName: "' + exchangeName + '"'); | ||
that.ready(function(err) { | ||
if (err) { cb(err); return; } | ||
if (that.loopback === true) return cb(); | ||
that.ready(function (err) { | ||
if (err) return cb(err); | ||
if (that.declaredExchanges.indexOf(exchangeName) !== - 1) { | ||
log.debug('larvitamintercom: declareExchange() - Already declared. exchangeName: "' + exchangeName + '"'); | ||
cb(); | ||
return; | ||
log.debug(topLogPrefix + 'declareExchange() - Already declared. exchangeName: "' + exchangeName + '"'); | ||
return cb(); | ||
} | ||
log.verbose('larvitamintercom: declareExchange() - Declaring exchangeName: "' + exchangeName + '"'); | ||
log.verbose(topLogPrefix + 'declareExchange() - Declaring exchangeName: "' + exchangeName + '"'); | ||
that.handle.cmd('exchange.declare', [that.channelName, exchangeName, exchangeType, passive, durable, autoDelete, internal, noWait, args], function(err) { | ||
that.handle.cmd('exchange.declare', [that.channelName, exchangeName, exchangeType, passive, durable, autoDelete, internal, noWait, args], function (err) { | ||
if (err) { | ||
log.warn('larvitamintercom: declareExchange() - Could not declare exchange "' + exchangeName + '", err: ' + err.message); | ||
cb(err); | ||
return; | ||
log.warn(topLogPrefix + 'declareExchange() - Could not declare exchange "' + exchangeName + '", err: ' + err.message); | ||
return cb(err); | ||
} | ||
log.debug('larvitamintercom: declareExchange() - Declared! exchangeName: "' + exchangeName + '"'); | ||
log.debug(topLogPrefix + 'declareExchange() - Declared! exchangeName: "' + exchangeName + '"'); | ||
@@ -429,3 +477,3 @@ that.declaredExchanges.push(exchangeName); | ||
*/ | ||
Intercom.prototype.declareQueue = function(options, cb) { | ||
Intercom.prototype.declareQueue = function (options, cb) { | ||
const autoDelete = false, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare.auto-delete | ||
@@ -443,18 +491,19 @@ passive = false, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare.passive | ||
log.verbose('larvitamintercom: declareQueue() - Declaring queueName: "' + options.queueName + '" exclusive: ' + options.exclusive.toString()); | ||
log.verbose(topLogPrefix + 'declareQueue() - Declaring queueName: "' + options.queueName + '" exclusive: ' + options.exclusive.toString()); | ||
that.ready(function(err) { | ||
if (err) { cb(err); return; } | ||
if (that.loopback === true) return cb(); | ||
that.handle.cmd('queue.declare', [that.channelName, options.queueName, passive, durable, options.exclusive, autoDelete, noWait, args], function(err, channel, method, data) { | ||
that.ready(function (err) { | ||
if (err) return cb(err); | ||
that.handle.cmd('queue.declare', [that.channelName, options.queueName, passive, durable, options.exclusive, autoDelete, noWait, args], function (err, channel, method, data) { | ||
let queueName; | ||
if (err) { | ||
log.error('larvitamintercom: declareQueue() - Could not declare queue, name: "' + options.queueName + '" err: ' + err.message); | ||
cb(err); | ||
return; | ||
log.error(topLogPrefix + 'declareQueue() - Could not declare queue, name: "' + options.queueName + '" err: ' + err.message); | ||
return cb(err); | ||
} | ||
queueName = data.queue; | ||
log.debug('larvitamintercom: declareQueue() - Declared! queueName: "' + queueName + '" exclusive: ' + options.exclusive.toString()); | ||
log.debug(topLogPrefix + 'declareQueue() - Declared! queueName: "' + queueName + '" exclusive: ' + options.exclusive.toString()); | ||
cb(err, queueName); | ||
@@ -466,3 +515,3 @@ }); | ||
/* Not working! | ||
Intercom.prototype.deleteQueue = function(queueName, cb) { | ||
Intercom.prototype.deleteQueue = function (queueName, cb) { | ||
const ifUnused = false, // If set, the server will only delete the queue if it | ||
@@ -480,8 +529,8 @@ // has no consumers. If the queue has consumers the | ||
if (typeof cb !== 'function') { | ||
cb = function() {}; | ||
cb = function () {}; | ||
} | ||
that.handle.queue.delete(that.channelName, queueName, ifUnused, ifEmpty, noWait); | ||
that.handle.once(that.channelName + ':queue.delete-ok', function(channel, method, data) { | ||
log.verobse('larvitamintercom: deleteQueue() - queue "' + queueName + '", containing "' + data['message-count'] + '" deleted.'); | ||
that.handle.once(that.channelName + ':queue.delete-ok', function (channel, method, data) { | ||
log.verobse(topLogPrefix + 'deleteQueue() - queue "' + queueName + '", containing "' + data['message-count'] + '" deleted.'); | ||
cb(); | ||
@@ -491,4 +540,5 @@ }); | ||
Intercom.prototype.genericConsume = function(options, msgCb, cb) { | ||
Intercom.prototype.genericConsume = function (options, msgCb, cb) { | ||
const returnObj = {}, | ||
logPrefix = topLogPrefix + 'Intercom.prototype.genericConsume() - ', | ||
tasks = [], | ||
@@ -500,3 +550,3 @@ that = this; | ||
if (cb === undefined) { | ||
cb = function() {}; | ||
cb = function () {}; | ||
} | ||
@@ -517,3 +567,3 @@ | ||
if (typeof cb !== 'function') { | ||
cb = function() {}; | ||
cb = function () {}; | ||
} | ||
@@ -523,3 +573,3 @@ | ||
const err = new Error('No consumer tag is defined, consume have probably not been started yet.'); | ||
log.warn('larvitamintercom: genericConsume() - cancel() - ' + err.message); | ||
log.warn(topLogPrefix + 'genericConsume() - cancel() - ' + err.message); | ||
cb(err); | ||
@@ -529,7 +579,7 @@ return; | ||
that.handle.basic.cancel(returnObj.data['consumer-tag'], function(err) { | ||
that.handle.basic.cancel(returnObj.data['consumer-tag'], function (err) { | ||
if (err) { | ||
log.warn('larvitamintercom: genericConsume() - cancel() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message); | ||
log.warn(topLogPrefix + 'genericConsume() - cancel() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message); | ||
} else { | ||
log.verbose('larvitamintercom: genericConsume() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"'); | ||
log.verbose(topLogPrefix + 'genericConsume() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"'); | ||
} | ||
@@ -540,7 +590,7 @@ | ||
// We could not get this to work :( // Lilleman and gagge 2016-12-27 | ||
//that.handle.once(that.channelName + ':basic.cancel-ok', function(channel, method, data) { | ||
// log.verbose('larvitamintercom: consume() - cancel() - Canceled consuming.'); | ||
// log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel)); | ||
// log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. method: ' + JSON.stringify(method)); | ||
// log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. data: ' + JSON.stringify(data)); | ||
//that.handle.once(that.channelName + ':basic.cancel-ok', function (channel, method, data) { | ||
// log.verbose(topLogPrefix + 'consume() - cancel() - Canceled consuming.'); | ||
// log.debug(topLogPrefix + 'consume() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel)); | ||
// log.debug(topLogPrefix + 'consume() - cancel() - Canceled consuming. method: ' + JSON.stringify(method)); | ||
// log.debug(topLogPrefix + 'consume() - cancel() - Canceled consuming. data: ' + JSON.stringify(data)); | ||
// cb(); | ||
@@ -551,3 +601,3 @@ //}); | ||
// Declare exchange | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
that.declareExchange(options.exchange, cb); | ||
@@ -557,7 +607,7 @@ }); | ||
// Declare queue | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
if (options.type === 'consume') { | ||
that.declareQueue({'queueName': queueName}, cb); | ||
} else if (options.type === 'subscribe') { | ||
that.declareQueue({'exclusive': true}, function(err, result) { | ||
that.declareQueue({'exclusive': true}, function (err, result) { | ||
queueName = result; | ||
@@ -567,3 +617,3 @@ cb(err); | ||
} else { | ||
log.error('larvitamintercom: genericConsume() - Options.type must be "consume" or "subscribe", but is: "' + options.type + '"'); | ||
log.error(logPrefix + 'Options.type must be "consume" or "subscribe", but is: "' + options.type + '"'); | ||
} | ||
@@ -573,3 +623,3 @@ }); | ||
// Bind queue | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
that.bindQueue(queueName, options.exchange, cb); | ||
@@ -579,3 +629,3 @@ }); | ||
// Start consuming | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
const consumerTag = null, // https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume.consumer-tag | ||
@@ -595,6 +645,9 @@ noLocal = false, // "If the no-local field is set the server will not send messages to the connection | ||
that.handle.cmd('basic.consume', [that.channelName, queueName, consumerTag, noLocal, noAck, exclusive, noWait, args], function(err, channel, method, data) { | ||
// No need to send a command on the queue for the loopback, handle this directly in the send function | ||
if (that.loopback === true) return cb(); | ||
that.handle.cmd('basic.consume', [that.channelName, queueName, consumerTag, noLocal, noAck, exclusive, noWait, args], function (err, channel, method, data) { | ||
let consumerTag; | ||
if (err) { cb(err); return; } | ||
if (err) return cb(err); | ||
@@ -608,6 +661,6 @@ returnObj.channel = channel; | ||
} else { | ||
log.warn('larvitamintercom: genericConsume() - No consumerTag obtained for queue: "' + queueName + '"'); | ||
log.warn(logPrefix + 'No consumerTag obtained for queue: "' + queueName + '"'); | ||
} | ||
log.verbose('larvitamintercom: genericConsume() - Started consuming on queue: "' + queueName + '" with consumer tag: "' + consumerTag + '"'); | ||
log.verbose(logPrefix + 'Started consuming on queue: "' + queueName + '" with consumer tag: "' + consumerTag + '"'); | ||
cb(); | ||
@@ -618,3 +671,3 @@ }); | ||
// Register msgCb | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
const eventName = 'incoming_msg_' + options.exchange; | ||
@@ -624,14 +677,13 @@ | ||
const err = new Error('Only one subscribe or consume is allowed for each exchange. exchange: "' + options.exchange + '"'); | ||
log.warn('larvitamintercom: genericConsume() - ' + err.message); | ||
cb(err); | ||
return; | ||
log.warn(topLogPrefix + 'genericConsume() - ' + err.message); | ||
return cb(err); | ||
} | ||
that.on(eventName, function(message, deliveryTag) { | ||
msgCb(message, function(err) { | ||
that.on(eventName, function (message, deliveryTag) { | ||
msgCb(message, function (err) { | ||
if (err) { | ||
log.warn('larvitamintercom: genericConsume() - nack on deliveryTag: "' + deliveryTag + '" err: ' + err.message); | ||
log.warn(logPrefix + 'nack on deliveryTag: "' + deliveryTag + '" err: ' + err.message); | ||
that.handle.cmd('basic.nack', [that.channelName, deliveryTag]); | ||
} else { | ||
log.debug('larvitamintercom: genericConsume() - ack on deliveryTag: "' + deliveryTag + '"'); | ||
log.debug(logPrefix + 'ack on deliveryTag: "' + deliveryTag + '"'); | ||
that.handle.cmd('basic.nack', [that.channelName, deliveryTag]); | ||
@@ -645,4 +697,4 @@ } | ||
async.series(tasks, function(err) { | ||
if (err) { cb(err); return; } | ||
async.series(tasks, function (err) { | ||
if (err) return cb(err); | ||
@@ -653,7 +705,4 @@ cb(err, returnObj); | ||
Intercom.prototype.ready = function(cb) { | ||
if (this.queueReady === true) { | ||
cb(); | ||
return; | ||
} | ||
Intercom.prototype.ready = function (cb) { | ||
if (this.queueReady === true) return cb(); | ||
@@ -674,3 +723,3 @@ this.on('ready', cb); | ||
*/ | ||
Intercom.prototype.send = function(orgMsg, options, cb) { | ||
Intercom.prototype.send = function (orgMsg, options, cb) { | ||
const message = require('util')._extend({}, orgMsg), | ||
@@ -690,3 +739,3 @@ that = this, | ||
if (cb === undefined) { | ||
cb = function() {}; | ||
cb = function () {}; | ||
} | ||
@@ -704,13 +753,30 @@ | ||
stringifiedMsg = JSON.stringify(message); | ||
} catch(err) { | ||
log.warn('larvitamintercom: send() - Could not stringify message. Message attached to next log call.'); | ||
log.warn('larvitamintercom: send() - Unstringifiable message attached:', message); | ||
cb(err); | ||
return; | ||
} catch (err) { | ||
log.warn(topLogPrefix + 'send() - Could not stringify message. Message attached to next log call.'); | ||
log.warn(topLogPrefix + 'send() - Unstringifiable message attached:', message); | ||
return cb(err); | ||
} | ||
log.debug('larvitamintercom: send() - readFromQueue() - Sending to exchange: "' + options.exchange + '", uuid: "' + message.uuid + '", message: "' + stringifiedMsg + '"'); | ||
log.debug(topLogPrefix + 'send() - readFromQueue() - Sending to exchange: "' + options.exchange + '", uuid: "' + message.uuid + '", message: "' + stringifiedMsg + '"'); | ||
if (that.loopback === true) { | ||
if ( | ||
options.forceConsumeQueue === true | ||
&& that.loopbackConQueue[options.exchange] !== 'connected' | ||
&& options.ignoreConQueue !== true | ||
) { | ||
if (that.loopbackConQueue[options.exchange] === undefined) { | ||
that.loopbackConQueue[options.exchange] = []; | ||
} | ||
that.loopbackConQueue[options.exchange].push({'orgMsg': orgMsg, 'options': options}); | ||
return cb(); | ||
} | ||
that.emit('incoming_msg_' + options.exchange, message, uuidLib.v4()); | ||
return cb(); | ||
} | ||
// Declare exchange | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
that.declareExchange(options.exchange, cb); | ||
@@ -723,3 +789,3 @@ }); | ||
// Declare queue | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
that.declareQueue({'queueName': queueName}, cb); | ||
@@ -729,3 +795,3 @@ }); | ||
// Bind queue | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
that.bindQueue(queueName, options.exchange, cb); | ||
@@ -735,3 +801,3 @@ }); | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
const properties = {'content-type': 'application/json'}, | ||
@@ -742,5 +808,5 @@ className = 'basic', | ||
that.handle.cmd('basic.publish', [that.channelName, options.exchange, 'ignored-routing-key', mandatory, immediate], function(err) { | ||
that.handle.cmd('basic.publish', [that.channelName, options.exchange, 'ignored-routing-key', mandatory, immediate], function (err) { | ||
if (err) { | ||
log.warn('larvitamintercom: send() - readFromQueue() - Could not publish to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
log.warn(topLogPrefix + 'send() - readFromQueue() - Could not publish to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
if ( ! cbErr) { | ||
@@ -753,3 +819,3 @@ cbErr = err; | ||
log.debug('larvitamintercom: send() - readFromQueue() - Published (no content sent) to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
log.debug(topLogPrefix + 'send() - readFromQueue() - Published (no content sent) to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
@@ -762,5 +828,5 @@ cbsRan ++; | ||
that.handle.cmd('content', [that.channelName, className, properties, stringifiedMsg], function(err) { | ||
that.handle.cmd('content', [that.channelName, className, properties, stringifiedMsg], function (err) { | ||
if (err) { | ||
log.warn('larvitamintercom: send() - readFromQueue() - Could not send publish content to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
log.warn(topLogPrefix + 'send() - readFromQueue() - Could not send publish content to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
if ( ! cbErr) { | ||
@@ -773,3 +839,3 @@ cbErr = err; | ||
log.debug('larvitamintercom: send() - readFromQueue() - Content sent to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
log.debug(topLogPrefix + 'send() - readFromQueue() - Content sent to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
@@ -783,3 +849,3 @@ cbsRan ++; | ||
async.series(tasks, function(err) { | ||
async.series(tasks, function (err) { | ||
cb(err, message.uuid); | ||
@@ -789,3 +855,3 @@ }); | ||
Intercom.prototype.subscribe = function(options, msgCb, cb) { | ||
Intercom.prototype.subscribe = function (options, msgCb, cb) { | ||
if (typeof options === 'function') { | ||
@@ -803,3 +869,3 @@ cb = msgCb; | ||
log.verbose('larvitamintercom: subscribe() - Starting on exchange "' + options.exchange + '"'); | ||
log.verbose(topLogPrefix + 'subscribe() - Starting on exchange "' + options.exchange + '"'); | ||
@@ -806,0 +872,0 @@ this.genericConsume(options, msgCb, cb); |
{ | ||
"name": "larvitamintercom", | ||
"version": "0.1.18", | ||
"version": "0.2.0", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -21,3 +21,3 @@ [![Build Status](https://travis-ci.org/larvit/larvitamintercom.svg?branch=master)](https://travis-ci.org/larvit/larvitamintercom) [![Dependencies](https://david-dm.org/larvit/larvitamintercom.svg)](https://david-dm.org/larvit/larvitamintercom.svg) | ||
intercom.send(message, options, function(err, msgUuid) { | ||
intercom.send(message, options, function (err, msgUuid) { | ||
// called when message is accepted by queue handler | ||
@@ -60,3 +60,3 @@ // msgUuid will be a unique UUID for this specific message | ||
intercom.consume(options, function(message, ack, deliveryTag) { | ||
intercom.consume(options, function (message, ack, deliveryTag) { | ||
// message being the object sent with intercom.send() | ||
@@ -70,3 +70,3 @@ | ||
ack(new Error('Something was wrong with the message')); | ||
}, function(err) { | ||
}, function (err) { | ||
// Callback from established consume connection | ||
@@ -93,3 +93,3 @@ }); | ||
intercom.subscribe(options, function(message, ack, deliveryTag) { | ||
intercom.subscribe(options, function (message, ack, deliveryTag) { | ||
// message subscribe the object sent with intercom.send() | ||
@@ -103,3 +103,3 @@ | ||
ack(new Error('Something was wrong with the message')); | ||
}, function(err, subscribeInstance) { | ||
}, function (err, subscribeInstance) { | ||
// Callback from established subscribe connection | ||
@@ -106,0 +106,0 @@ }); |
@@ -11,3 +11,4 @@ 'use strict'; | ||
let confFile; | ||
let confFile, | ||
conStr; | ||
@@ -24,3 +25,3 @@ // Set up winston | ||
before(function(done) { | ||
before(function (done) { | ||
this.timeout(20000); | ||
@@ -32,3 +33,3 @@ | ||
for (let i = 0; i < 20; i ++) { | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
const intercom = new Intercom(config); | ||
@@ -45,12 +46,18 @@ | ||
if (process.env.CONFFILE === undefined) { | ||
confFile = __dirname + '/../config/amqp_test.json'; | ||
if (process.env.CONSTR === undefined) { | ||
confFile = __dirname + '/../config/amqp_test.json'; | ||
} else { | ||
confFile = process.env.CONFFILE; | ||
conStr = process.env.CONSTR; | ||
} | ||
if (conStr !== undefined) { | ||
log.verbose('Autobahn using environment CONSTR'); | ||
instantiateIntercoms(conStr); | ||
return; | ||
} | ||
log.verbose('Autobahn config file: "' + confFile + '"'); | ||
// First look for absolute path | ||
fs.stat(confFile, function(err) { | ||
fs.stat(confFile, function (err) { | ||
if (err) { | ||
@@ -60,6 +67,6 @@ | ||
confFile = __dirname + '/../config/' + confFile; | ||
fs.stat(confFile, function(err) { | ||
fs.stat(confFile, function (err) { | ||
if (err) throw err; | ||
log.verbose('Autobahn config: ' + JSON.stringify(require(confFile))); | ||
instantiateIntercoms(require(confFile).default); | ||
instantiateIntercoms(require(confFile)); | ||
}); | ||
@@ -71,7 +78,7 @@ | ||
log.verbose('Autobahn config: ' + JSON.stringify(require(confFile))); | ||
instantiateIntercoms(require(confFile).default); | ||
instantiateIntercoms(require(confFile)); | ||
}); | ||
}); | ||
/*after(function(done) { | ||
/*after(function (done) { | ||
const tasks = []; | ||
@@ -83,5 +90,5 @@ | ||
const intercom = intercoms[i]; | ||
tasks.push(function(cb) { | ||
tasks.push(function (cb) { | ||
console.log('Closing intercom ' + i); | ||
intercom.close(function(err) { | ||
intercom.close(function (err) { | ||
if (err) throw err; | ||
@@ -98,353 +105,571 @@ | ||
describe('Send and receive', function() { | ||
it('check so the first intercom is up', function(done) { | ||
const intercom = intercoms[0]; | ||
describe('Send and receive', function () { | ||
assert.notDeepEqual(intercom.handle, undefined); | ||
assert.notDeepEqual(intercom.handle.channel, undefined); | ||
done(); | ||
}); | ||
describe('network connection', function () { | ||
it('check so the first intercom is up', function (done) { | ||
const intercom = intercoms[0]; | ||
it('send and receive a message to the default exchange', function(done) { | ||
const orgMsg = {'foo': 'bar'}, | ||
tasks = []; | ||
assert.notDeepEqual(intercom.handle, undefined); | ||
assert.notDeepEqual(intercom.handle.channel, undefined); | ||
done(); | ||
}); | ||
this.timeout(2500); | ||
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout() | ||
it('send and receive a message to the default exchange', function (done) { | ||
const orgMsg = {'foo': 'bar'}, | ||
tasks = []; | ||
let subscribed = 0, | ||
consumed = 0; | ||
this.timeout(2500); | ||
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout() | ||
function consume(intercom, cb) { | ||
intercom.consume(function(msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
consumed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
let subscribed = 0, | ||
consumed = 0; | ||
function subscribe(intercom, cb) { | ||
intercom.subscribe(function(msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
subscribed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
function consume(intercom, cb) { | ||
intercom.consume(function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
consumed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
tasks.push(function(cb) { consume(intercoms[0], cb); }); | ||
tasks.push(function(cb) { consume(intercoms[1], cb); }); | ||
tasks.push(function(cb) { subscribe(intercoms[2], cb); }); | ||
tasks.push(function(cb) { subscribe(intercoms[3], cb); }); | ||
function subscribe(intercom, cb) { | ||
intercom.subscribe(function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
subscribed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
async.parallel(tasks, function(err) { | ||
if (err) throw err; | ||
tasks.push(function (cb) { consume(intercoms[0], cb); }); | ||
tasks.push(function (cb) { consume(intercoms[1], cb); }); | ||
tasks.push(function (cb) { subscribe(intercoms[2], cb); }); | ||
tasks.push(function (cb) { subscribe(intercoms[3], cb); }); | ||
intercoms[4].send(orgMsg, function(err) { | ||
async.parallel(tasks, function (err) { | ||
if (err) throw err; | ||
// Wait for a while to make sure consume() is not ran multiple times. | ||
// This is not pretty, but I can not think of a better way | ||
setTimeout(function() { | ||
assert.deepEqual(consumed, 1); | ||
assert.deepEqual(subscribed, 2); | ||
done(); | ||
}, 1000); | ||
intercoms[4].send(orgMsg, function (err) { | ||
if (err) throw err; | ||
// Wait for a while to make sure consume() is not ran multiple times. | ||
// This is not pretty, but I can not think of a better way | ||
setTimeout(function () { | ||
assert.deepStrictEqual(consumed, 1); | ||
assert.deepStrictEqual(subscribed, 2); | ||
done(); | ||
}, 1000); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('send and receive a message to a custom exchange', function(done) { | ||
const exchange = 'customeOne', | ||
orgMsg = {'foo': 'bard'}, | ||
tasks = []; | ||
it('send and receive a message to a custom exchange', function (done) { | ||
const exchange = 'customeOne', | ||
orgMsg = {'foo': 'bard'}, | ||
tasks = []; | ||
this.timeout(2500); | ||
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout() | ||
this.timeout(2500); | ||
this.slow(2100); // > 1050 is shown in yellow, 1000ms is setTimeout() | ||
let subscribed = 0, | ||
consumed = 0; | ||
let subscribed = 0, | ||
consumed = 0; | ||
function consume(intercom, cb) { | ||
intercom.consume({'exchange': exchange}, function(msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
consumed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
function consume(intercom, cb) { | ||
intercom.consume({'exchange': exchange}, function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
consumed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
function subscribe(intercom, cb) { | ||
intercom.subscribe({'exchange': exchange}, function(msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
subscribed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
function subscribe(intercom, cb) { | ||
intercom.subscribe({'exchange': exchange}, function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
subscribed ++; | ||
ack(); | ||
}, cb); | ||
} | ||
tasks.push(function(cb) { consume(intercoms[5], cb); }); | ||
tasks.push(function(cb) { consume(intercoms[6], cb); }); | ||
tasks.push(function(cb) { subscribe(intercoms[7], cb); }); | ||
tasks.push(function(cb) { subscribe(intercoms[8], cb); }); | ||
tasks.push(function (cb) { consume(intercoms[5], cb); }); | ||
tasks.push(function (cb) { consume(intercoms[6], cb); }); | ||
tasks.push(function (cb) { subscribe(intercoms[7], cb); }); | ||
tasks.push(function (cb) { subscribe(intercoms[8], cb); }); | ||
async.parallel(tasks, function(err) { | ||
if (err) throw err; | ||
intercoms[9].send(orgMsg, {'exchange': exchange}, function(err) { | ||
async.parallel(tasks, function (err) { | ||
if (err) throw err; | ||
// Wait for a while to make sure consume() is not ran multiple times. | ||
// This is not pretty, but I can not think of a better way | ||
setTimeout(function() { | ||
assert.deepEqual(consumed, 1); | ||
assert.deepEqual(subscribed, 2); | ||
done(); | ||
}, 1000); | ||
intercoms[9].send(orgMsg, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
// Wait for a while to make sure consume() is not ran multiple times. | ||
// This is not pretty, but I can not think of a better way | ||
setTimeout(function () { | ||
assert.deepStrictEqual(consumed, 1); | ||
assert.deepStrictEqual(subscribed, 2); | ||
done(); | ||
}, 1000); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('send and receive on the same Intercom', function(done) { | ||
const intercom = {'subscribe': intercoms[10], 'consume': intercoms[11]}, | ||
tasks = []; | ||
it('send and receive on the same Intercom', function (done) { | ||
const intercom = {'subscribe': intercoms[10], 'consume': intercoms[11]}, | ||
tasks = []; | ||
for (const method of Object.keys(intercom)) { | ||
tasks.push(function(cb) { | ||
const exchange = 'sameInstance' + method, | ||
orgMsg = {'bi': 'bu'}; | ||
for (const method of Object.keys(intercom)) { | ||
tasks.push(function (cb) { | ||
const exchange = 'sameInstance' + method, | ||
orgMsg = {'bi': 'bu'}; | ||
intercom[method][method]({'exchange': exchange}, function(msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
ack(); | ||
cb(); | ||
}, function(err) { | ||
if (err) throw err; | ||
intercom[method][method]({'exchange': exchange}, function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
ack(); | ||
cb(); | ||
}, function (err) { | ||
if (err) throw err; | ||
intercom[method].send(orgMsg, {'exchange': exchange}, function(err) { | ||
if (err) throw err; | ||
intercom[method].send(orgMsg, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
}); | ||
}); | ||
} | ||
} | ||
async.parallel(tasks, done); | ||
}); | ||
async.parallel(tasks, done); | ||
}); | ||
it('send and receive multiple messages on different Intercoms', function(done) { | ||
const tasks = [], | ||
intercom = { | ||
'subscribe': { | ||
'intercom1': intercoms[12], 'intercom2': intercoms[13] | ||
}, | ||
'consume': { | ||
'intercom1': intercoms[14], 'intercom2': intercoms[15] | ||
} | ||
}; | ||
it('send and receive multiple messages on different Intercoms', function (done) { | ||
const tasks = [], | ||
intercom = { | ||
'subscribe': { | ||
'intercom1': intercoms[12], 'intercom2': intercoms[13] | ||
}, | ||
'consume': { | ||
'intercom1': intercoms[14], 'intercom2': intercoms[15] | ||
} | ||
}; | ||
for (const method of Object.keys(intercom)) { | ||
tasks.push(function(cb) { | ||
const intercom1 = intercom[method].intercom1, | ||
intercom2 = intercom[method].intercom2, | ||
exchange = 'anotherInstance' + method, | ||
orgMsg1 = {'ba': 'bo'}, | ||
orgMsg2 = {'waff': 'woff'}; | ||
for (const method of Object.keys(intercom)) { | ||
tasks.push(function (cb) { | ||
const intercom1 = intercom[method].intercom1, | ||
intercom2 = intercom[method].intercom2, | ||
exchange = 'anotherInstance' + method, | ||
orgMsg1 = {'ba': 'bo'}, | ||
orgMsg2 = {'waff': 'woff'}; | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
intercom1[method]({'exchange': exchange}, function(msg, ack) { | ||
if (JSON.stringify(msg.ba) === JSON.stringify(orgMsg1.ba)) { | ||
msg1Received ++; | ||
ack(); | ||
} else if (JSON.stringify(msg.waff) === JSON.stringify(orgMsg2.waff)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
intercom1[method]({'exchange': exchange}, function (msg, ack) { | ||
if (JSON.stringify(msg.ba) === JSON.stringify(orgMsg1.ba)) { | ||
msg1Received ++; | ||
ack(); | ||
} else if (JSON.stringify(msg.waff) === JSON.stringify(orgMsg2.waff)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
if (msg1Received === 1 && msg2Received === 1) { | ||
cb(); | ||
} | ||
}, function(err) { | ||
if (err) throw err; | ||
if (msg1Received === 1 && msg2Received === 1) { | ||
cb(); | ||
} | ||
}, function (err) { | ||
if (err) throw err; | ||
intercom1.send(orgMsg1, {'exchange': exchange}, function(err) { | ||
if (err) throw err; | ||
intercom1.send(orgMsg1, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom2.send(orgMsg2, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
}); | ||
} | ||
intercom2.send(orgMsg2, {'exchange': exchange}, function(err) { | ||
async.parallel(tasks, done); | ||
}); | ||
it('send and receive multiple messages on the same Intercom', function (done) { | ||
const intercom = {'subscribe': intercoms[16], 'consume': intercoms[17]}, | ||
tasks = []; | ||
for (const method of Object.keys(intercom)) { | ||
tasks.push(function (cb) { | ||
const exchange = 'yetAnotherInstance' + method, | ||
orgMsg1 = {'bar': 'bor'}, | ||
orgMsg2 = {'waffer': 'woffer'}; | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
intercom[method][method]({'exchange': exchange}, function (msg, ack) { | ||
if (JSON.stringify(msg.bar) === JSON.stringify(orgMsg1.bar)) { | ||
msg1Received ++; | ||
ack(); | ||
} else if (JSON.stringify(msg.waffer) === JSON.stringify(orgMsg2.waffer)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
if (msg1Received === 10 && msg2Received === 10) { | ||
cb(); | ||
} | ||
}, function (err) { | ||
if (err) throw err; | ||
for (let i = 0; i !== 10; i ++) { | ||
intercom[method].send(orgMsg1, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom[method].send(orgMsg2, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
} | ||
}); | ||
}); | ||
} | ||
async.parallel(tasks, done); | ||
}); | ||
it('send and receive messages on different exchanges', function (done) { | ||
const exchange1 = 'differentExes1', | ||
exchange2 = 'differentExes2', | ||
orgMsg1 = {'bar': 'bor'}, | ||
orgMsg2 = {'waffer': 'woffer'}, | ||
tasks = []; | ||
let receivedMsg1 = 0, | ||
receivedMsg2 = 0; | ||
tasks.push(function (cb) { | ||
intercoms[0].subscribe({'exchange': exchange1}, function (msg, ack) { | ||
assert.deepStrictEqual(msg.bar, orgMsg1.bar); | ||
receivedMsg1 ++; | ||
ack(); | ||
}, cb); | ||
}); | ||
} | ||
async.parallel(tasks, done); | ||
}); | ||
tasks.push(function (cb) { | ||
intercoms[0].subscribe({'exchange': exchange2}, function (msg, ack) { | ||
assert.deepStrictEqual(msg.waffer, orgMsg2.waffer); | ||
receivedMsg2 ++; | ||
ack(); | ||
}, cb); | ||
}); | ||
it('send and receive multiple messages on the same Intercom', function(done) { | ||
const intercom = {'subscribe': intercoms[16], 'consume': intercoms[17]}, | ||
tasks = []; | ||
tasks.push(function (cb) { | ||
intercoms[0].send(orgMsg1, {'exchange': exchange1}, cb); | ||
}); | ||
for (const method of Object.keys(intercom)) { | ||
tasks.push(function(cb) { | ||
const exchange = 'yetAnotherInstance' + method, | ||
orgMsg1 = {'bar': 'bor'}, | ||
orgMsg2 = {'waffer': 'woffer'}; | ||
tasks.push(function (cb) { | ||
intercoms[0].send(orgMsg2, {'exchange': exchange2}, cb); | ||
}); | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
async.series(tasks, function (err) { | ||
let interval; | ||
intercom[method][method]({'exchange': exchange}, function(msg, ack) { | ||
if (JSON.stringify(msg.bar) === JSON.stringify(orgMsg1.bar)) { | ||
msg1Received ++; | ||
ack(); | ||
} else if (JSON.stringify(msg.waffer) === JSON.stringify(orgMsg2.waffer)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
if (err) throw err; | ||
if (msg1Received === 10 && msg2Received === 10) { | ||
cb(); | ||
interval = setInterval(function () { | ||
if (receivedMsg1 === 1 && receivedMsg2 === 1) { | ||
clearInterval(interval); | ||
done(); | ||
} | ||
}, function(err) { | ||
if (err) throw err; | ||
}, 10); | ||
}); | ||
}); | ||
for (let i = 0; i !== 10; i ++) { | ||
intercom[method].send(orgMsg1, {'exchange': exchange}, function(err) { | ||
if (err) throw err; | ||
}); | ||
it('send before consumer is up and still receive', function (done) { | ||
const exchange = 'dkfia893M', // Random exchange to not collide with another test | ||
orgMsg = {'foo': 'bar'}; | ||
intercom[method].send(orgMsg2, {'exchange': exchange}, function(err) { | ||
if (err) throw err; | ||
}); | ||
} | ||
}); | ||
this.timeout(2000); | ||
this.slow(700); | ||
intercoms[0].send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function (err) { | ||
if (err) throw err; | ||
setTimeout(function () { | ||
intercoms[0].consume({'exchange': exchange}, function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
ack(); | ||
done(); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
}, 200); | ||
}); | ||
} | ||
}); | ||
async.parallel(tasks, done); | ||
}); | ||
it('send and declare exchanges at the same time', function (done) { | ||
const intercom = intercoms[0], | ||
exchange = 'breakCmdChain', | ||
orgMsg1 = {'blippel': 'bloppel'}, | ||
orgMsg2 = {'maffab': 'berk'}; | ||
it('send and receive messages on different exchanges', function(done) { | ||
const exchange1 = 'differentExes1', | ||
exchange2 = 'differentExes2', | ||
orgMsg1 = {'bar': 'bor'}, | ||
orgMsg2 = {'waffer': 'woffer'}, | ||
tasks = []; | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
let receivedMsg1 = 0, | ||
receivedMsg2 = 0; | ||
intercom.subscribe({'exchange': exchange}, function (msg, ack) { | ||
if (JSON.stringify(msg.blippel) === JSON.stringify(orgMsg1.blippel)) { | ||
msg1Received ++; | ||
ack(); | ||
} else if (JSON.stringify(msg.maffab) === JSON.stringify(orgMsg2.maffab)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
tasks.push(function(cb) { | ||
intercoms[0].subscribe({'exchange': exchange1}, function(msg, ack) { | ||
assert.deepEqual(msg.bar, orgMsg1.bar); | ||
receivedMsg1 ++; | ||
ack(); | ||
}, cb); | ||
if (msg1Received === 10 && msg2Received === 10) { | ||
done(); | ||
} | ||
}, function (err) { | ||
if (err) throw err; | ||
for (let i = 0; i !== 10; i ++) { | ||
intercom.send(orgMsg1, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
// Lets provoke! | ||
intercom.declareExchange(exchange + '_foobar', function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom.send(orgMsg2, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
} | ||
}); | ||
}); | ||
}); | ||
tasks.push(function(cb) { | ||
intercoms[0].subscribe({'exchange': exchange2}, function(msg, ack) { | ||
assert.deepEqual(msg.waffer, orgMsg2.waffer); | ||
receivedMsg2 ++; | ||
describe('loopback interface', function () { | ||
it('send and receive a message to the default exchange via consume', function (done) { | ||
const intercom = new Intercom('loopback interface'), | ||
orgMsg = {'fooconsume': 'barconsume'}; | ||
intercom.consume(function (msg, ack, deliveryTag) { | ||
assert.notStrictEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
ack(); | ||
}, cb); | ||
done(); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom.send(orgMsg, function (err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
tasks.push(function(cb) { | ||
intercoms[0].send(orgMsg1, {'exchange': exchange1}, cb); | ||
it('send and receive a message to the default exchange via subscribe', function (done) { | ||
const intercom = new Intercom('loopback interface'), | ||
orgMsg = {'foosubscribe': 'barsubscribe'}; | ||
intercom.subscribe(function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
ack(); | ||
done(); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom.send(orgMsg, function (err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
tasks.push(function(cb) { | ||
intercoms[0].send(orgMsg2, {'exchange': exchange2}, cb); | ||
it('send and receive a message to a custom exchange via consume', function (done) { | ||
const intercom = new Intercom('loopback interface'), | ||
exchange = 'loopbackExchCon', | ||
orgMsg = {'fooconcusExch': 'barconcusExch'}; | ||
intercom.consume({'exchange': exchange + '_nope'}, function () { | ||
throw new Error('should not receive any messages here'); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom.consume({'exchange': exchange}, function (msg, ack, deliveryTag) { | ||
assert.notStrictEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
ack(); | ||
setTimeout(done, 20); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom.send(orgMsg, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
async.series(tasks, function(err) { | ||
let interval; | ||
it('send and receive a message to a custom exchange via subscribe', function (done) { | ||
const intercom = new Intercom('loopback interface'), | ||
exchange = 'loopbackExchSub', | ||
orgMsg = {'foosubExch': 'barsubExch'}; | ||
if (err) throw err; | ||
intercom.subscribe({'exchange': exchange + '_nope'}, function () { | ||
throw new Error('should not receive any messages here'); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
interval = setInterval(function() { | ||
if (receivedMsg1 === 1 && receivedMsg2 === 1) { | ||
clearInterval(interval); | ||
done(); | ||
} | ||
}, 10); | ||
intercom.subscribe({'exchange': exchange}, function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
ack(); | ||
setTimeout(done, 20); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
intercom.send(orgMsg, {'exchange': exchange}, function (err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
}); | ||
it('send before consumer is up and still receive', function(done) { | ||
const exchange = 'dkfia893M', // Random exchange to not collide with another test | ||
orgMsg = {'foo': 'bar'}; | ||
it('send and receive messages on different exchanges', function (done) { | ||
const exchange1 = 'differentExes1', | ||
exchange2 = 'differentExes2', | ||
intercom = new Intercom('loopback interface'), | ||
orgMsg1 = {'bar': 'bor'}, | ||
orgMsg2 = {'waffer': 'woffer'}, | ||
tasks = []; | ||
this.timeout(2000); | ||
this.slow(700); | ||
let receivedMsg1 = 0, | ||
receivedMsg2 = 0; | ||
intercoms[0].send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function(err) { | ||
if (err) throw err; | ||
tasks.push(function (cb) { | ||
intercom.subscribe({'exchange': exchange1}, function (msg, ack) { | ||
assert.deepStrictEqual(msg.bar, orgMsg1.bar); | ||
receivedMsg1 ++; | ||
ack(); | ||
}, cb); | ||
}); | ||
setTimeout(function() { | ||
intercoms[0].consume({'exchange': exchange}, function(msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
tasks.push(function (cb) { | ||
intercom.subscribe({'exchange': exchange2}, function (msg, ack) { | ||
assert.deepStrictEqual(msg.waffer, orgMsg2.waffer); | ||
receivedMsg2 ++; | ||
ack(); | ||
done(); | ||
}, function(err) { | ||
if (err) throw err; | ||
}, cb); | ||
}); | ||
}); | ||
}, 200); | ||
tasks.push(function (cb) { | ||
intercom.send(orgMsg1, {'exchange': exchange1}, cb); | ||
}); | ||
tasks.push(function (cb) { | ||
intercom.send(orgMsg2, {'exchange': exchange2}, cb); | ||
}); | ||
async.series(tasks, function (err) { | ||
let interval; | ||
if (err) throw err; | ||
interval = setInterval(function () { | ||
if (receivedMsg1 === 1 && receivedMsg2 === 1) { | ||
clearInterval(interval); | ||
done(); | ||
} | ||
}, 10); | ||
}); | ||
}); | ||
}); | ||
it('send and declare exchanges at the same time', function(done) { | ||
const intercom = intercoms[0], | ||
exchange = 'breakCmdChain', | ||
orgMsg1 = {'blippel': 'bloppel'}, | ||
orgMsg2 = {'maffab': 'berk'}; | ||
it('send 1000 messages on subscription', function (done) { | ||
const intercom = new Intercom('loopback interface'); | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
let receivedMsgs = 0, | ||
expectedSum = 0, | ||
actualSum = 0; | ||
intercom.subscribe({'exchange': exchange}, function(msg, ack) { | ||
if (JSON.stringify(msg.blippel) === JSON.stringify(orgMsg1.blippel)) { | ||
msg1Received ++; | ||
intercom.subscribe(function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
actualSum += msg.thisNr; | ||
receivedMsgs ++; | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
ack(); | ||
} else if (JSON.stringify(msg.maffab) === JSON.stringify(orgMsg2.maffab)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
if (msg1Received === 10 && msg2Received === 10) { | ||
done(); | ||
} | ||
}, function(err) { | ||
if (err) throw err; | ||
if (receivedMsgs === 1000) { | ||
assert.strictEqual(expectedSum, actualSum); | ||
done(); | ||
} | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
for (let i = 0; i !== 10; i ++) { | ||
intercom.send(orgMsg1, {'exchange': exchange}, function(err) { | ||
if (err) throw err; | ||
}); | ||
for (let i = 0; i !== 1000; i ++) { | ||
const thisNr = Math.round(Math.random()); | ||
// Lets provoke! | ||
intercom.declareExchange(exchange + '_foobar', function(err) { | ||
expectedSum += thisNr; | ||
intercom.send({'thisNr': thisNr}, function (err) { | ||
if (err) throw err; | ||
}); | ||
} | ||
}); | ||
intercom.send(orgMsg2, {'exchange': exchange}, function(err) { | ||
it('send 1000 messages on consume', function (done) { | ||
const intercom = new Intercom('loopback interface'); | ||
let receivedMsgs = 0, | ||
expectedSum = 0, | ||
actualSum = 0; | ||
intercom.consume(function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false, 'msg.uuid must be a valid uuid'); | ||
actualSum += msg.thisNr; | ||
receivedMsgs ++; | ||
assert(deliveryTag, 'deliveryTag should be non-empty'); | ||
ack(); | ||
if (receivedMsgs === 1000) { | ||
assert.strictEqual(expectedSum, actualSum); | ||
done(); | ||
} | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
for (let i = 0; i !== 1000; i ++) { | ||
const thisNr = Math.round(Math.random()); | ||
expectedSum += thisNr; | ||
intercom.send({'thisNr': thisNr}, function (err) { | ||
if (err) throw err; | ||
@@ -454,6 +679,32 @@ }); | ||
}); | ||
it('send before consumer is up and still receive', function (done) { | ||
const intercom = new Intercom('loopback interface'), | ||
exchange = 'dkfia893M', // Random exchange to not collide with another test | ||
orgMsg = {'foo': 'bar'}; | ||
this.timeout(2000); | ||
this.slow(700); | ||
intercom.send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function (err) { | ||
if (err) throw err; | ||
setTimeout(function () { | ||
intercom.consume({'exchange': exchange}, function (msg, ack, deliveryTag) { | ||
assert.notDeepEqual(lUtils.formatUuid(msg.uuid), false); | ||
delete msg.uuid; | ||
assert.deepStrictEqual(JSON.stringify(orgMsg), JSON.stringify(msg)); | ||
assert(deliveryTag, 'deliveryTag shoult be non-empty'); | ||
ack(); | ||
done(); | ||
}, function (err) { | ||
if (err) throw err; | ||
}); | ||
}, 200); | ||
}); | ||
}); | ||
}); | ||
/* Disabled until .cancel() is fixed | ||
it('should not receive after the consumation is cancelled', function(done) { | ||
it('should not receive after the consumation is cancelled', function (done) { | ||
const consumeIntercom = intercoms[14], | ||
@@ -476,7 +727,7 @@ sendIntercom = intercoms[15], | ||
if (receivedNoMsgs === 1) { | ||
consumeInstance.cancel(function(err) { | ||
consumeInstance.cancel(function (err) { | ||
if (err) throw err; | ||
// The callback is sadly not trustworthy. Instead wait a bit and try again | ||
setTimeout(function() { | ||
setTimeout(function () { | ||
sendAgain(); | ||
@@ -493,11 +744,11 @@ }, 200); | ||
consumeIntercom.consume({'exchange': exchangeNo}, handleNoMsg, function(err, result) { | ||
consumeIntercom.consume({'exchange': exchangeNo}, handleNoMsg, function (err, result) { | ||
consumeInstance = result; | ||
consumeIntercom.consume({'exchange': exchangeYes}, handleYesMsg, function(err) { | ||
consumeIntercom.consume({'exchange': exchangeYes}, handleYesMsg, function (err) { | ||
if (err) throw err; | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function(err) { | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function (err) { | ||
if (err) throw err; | ||
}); | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function(err) { | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function (err) { | ||
if (err) throw err; | ||
@@ -510,9 +761,9 @@ }); | ||
function sendAgain() { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function(err) { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function (err) { | ||
if (err) throw err; | ||
// Wait a while, and then make sure we have not gotten a second message | ||
setTimeout(function() { | ||
assert.deepEqual(receivedNoMsgs, 1); | ||
assert.deepEqual(receivedYesMsgs, 2); | ||
setTimeout(function () { | ||
assert.deepStrictEqual(receivedNoMsgs, 1); | ||
assert.deepStrictEqual(receivedYesMsgs, 2); | ||
done(); | ||
@@ -522,3 +773,3 @@ }, 200); | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function(err) { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function (err) { | ||
if (err) throw err; | ||
@@ -529,3 +780,3 @@ }); | ||
it('should not receive after the subscription is cancelled', function(done) { | ||
it('should not receive after the subscription is cancelled', function (done) { | ||
const subscribeIntercom = intercoms[16], | ||
@@ -548,7 +799,7 @@ sendIntercom = intercoms[17], | ||
if (receivedNoMsgs === 1) { | ||
subscribeInstance.cancel(function(err) { | ||
subscribeInstance.cancel(function (err) { | ||
if (err) throw err; | ||
// The callback is sadly not trustworthy. Instead wait a bit and try again | ||
setTimeout(function() { | ||
setTimeout(function () { | ||
sendAgain(); | ||
@@ -565,8 +816,8 @@ }, 200); | ||
subscribeIntercom.subscribe({'exchange': exchangeNo}, handleNoMsg, function(err, result) { | ||
subscribeIntercom.subscribe({'exchange': exchangeNo}, handleNoMsg, function (err, result) { | ||
subscribeInstance = result; | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function(err) { | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeNo}, function (err) { | ||
if (err) throw err; | ||
}); | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function(err) { | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeYes}, function (err) { | ||
if (err) throw err; | ||
@@ -576,3 +827,3 @@ }); | ||
subscribeIntercom.subscribe({'exchange': exchangeYes}, handleYesMsg, function(err) { | ||
subscribeIntercom.subscribe({'exchange': exchangeYes}, handleYesMsg, function (err) { | ||
if (err) throw err; | ||
@@ -582,9 +833,9 @@ }); | ||
function sendAgain() { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function(err) { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeNo}, function (err) { | ||
if (err) throw err; | ||
// Wait a while, and then make sure we have not gotten a second message | ||
setTimeout(function() { | ||
assert.deepEqual(receivedNoMsgs, 1); | ||
assert.deepEqual(receivedYesMsgs, 2); | ||
setTimeout(function () { | ||
assert.deepStrictEqual(receivedNoMsgs, 1); | ||
assert.deepStrictEqual(receivedYesMsgs, 2); | ||
done(); | ||
@@ -594,3 +845,3 @@ }, 200); | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function(err) { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeYes}, function (err) { | ||
if (err) throw err; | ||
@@ -597,0 +848,0 @@ }); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
56016
1348
10