larvitamintercom
Advanced tools
Comparing version 0.1.0 to 0.1.1
141
index.js
@@ -30,2 +30,5 @@ 'use strict'; | ||
that.port = parsedConStr.port || 5672; | ||
that.declaredExchanges = []; | ||
that.sendQueue = []; | ||
that.sendInProgress = false; | ||
@@ -246,2 +249,9 @@ that.socket = net.connect({ | ||
if (that.declaredExchanges.indexOf(exchangeName) !== - 1) { | ||
log.debug('larvitamintercom: declareExchange() - Exchange already declared! exchangeName: "' + exchangeName + '"'); | ||
cb(); | ||
return; | ||
} | ||
that.handle.exchange.declare( | ||
@@ -264,2 +274,3 @@ that.channelName, | ||
that.declaredExchanges.push(exchangeName); | ||
cb(err); | ||
@@ -324,8 +335,4 @@ } | ||
Intercom.prototype.send = function(orgMsg, options, cb) { | ||
const message = require('util')._extend({}, orgMsg), | ||
tasks = [], | ||
that = this; | ||
const that = this; | ||
let stringifiedMsg; | ||
if (typeof options === 'function') { | ||
@@ -342,66 +349,92 @@ cb = options; | ||
try { | ||
if (message.uuid === undefined) { | ||
message.uuid = uuidLib.v4(); | ||
} | ||
that.sendQueue.push({'orgMsg': orgMsg, 'options': options, 'cb': cb}); | ||
stringifiedMsg = JSON.stringify(message); | ||
} catch(err) { | ||
log.warn('larvitamintercom: send() - Could not stringify message. Message attached to next log call.'); | ||
log.warn('larvitamintercom: send() - Unstringifiable message attached:', message); | ||
cb(err); | ||
if (that.sendInProgress === true) { | ||
return; | ||
} | ||
log.debug('larvitamintercom: send() - Sending to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
that.sendInProgress = true; | ||
// Declare exchange | ||
tasks.push(function(cb) { | ||
that.declareExchange(options.exchange, cb); | ||
}); | ||
function readFromQueue() { | ||
const params = that.sendQueue.shift(), | ||
orgMsg = params.orgMsg, | ||
options = params.options, | ||
cb = params.cb, | ||
message = require('util')._extend({}, orgMsg), | ||
tasks = []; | ||
// Publish | ||
tasks.push(function(cb) { | ||
const mandatory = true, | ||
immediate = false; | ||
let stringifiedMsg; | ||
that.handle.basic.publish( | ||
that.channelName, | ||
options.exchange, | ||
'ignored-routing-key', | ||
mandatory, | ||
immediate, | ||
function(err) { | ||
try { | ||
if (message.uuid === undefined) { | ||
message.uuid = uuidLib.v4(); | ||
} | ||
stringifiedMsg = JSON.stringify(message); | ||
} catch(err) { | ||
log.warn('larvitamintercom: send() - Could not stringify message. Message attached to next log call.'); | ||
log.warn('larvitamintercom: send() - Unstringifiable message attached:', message); | ||
cb(err); | ||
return; | ||
} | ||
log.debug('larvitamintercom: send() - readFromQueue() - Sending to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
// Declare exchange | ||
tasks.push(function(cb) { | ||
that.declareExchange(options.exchange, cb); | ||
}); | ||
// Publish | ||
tasks.push(function(cb) { | ||
const mandatory = true, | ||
immediate = false; | ||
that.handle.basic.publish( | ||
that.channelName, | ||
options.exchange, | ||
'ignored-routing-key', | ||
mandatory, | ||
immediate, | ||
function(err) { | ||
if (err) { | ||
log.warn('larvitamintercom: send() - readFromQueue() - Could not publish to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
cb(err); | ||
return; | ||
} | ||
log.debug('larvitamintercom: send() - readFromQueue() - Published (no content sent) to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
cb(); | ||
} | ||
); | ||
}); | ||
// Send content | ||
tasks.push(function(cb) { | ||
const properties = {'content-type': 'application/json'}, | ||
className = 'basic'; | ||
that.handle.content(that.channelName, className, properties, stringifiedMsg, function(err) { | ||
if (err) { | ||
log.warn('larvitamintercom: send() - Could not publish to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
cb(err); | ||
return; | ||
log.warn('larvitamintercom: send() - readFromQueue() - Could not send publish content to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
} | ||
log.debug('larvitamintercom: send() - Published (no content sent) to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
log.debug('larvitamintercom: send() - readFromQueue() - Content sent to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
cb(); | ||
} | ||
); | ||
}); | ||
cb(err); | ||
}); | ||
}); | ||
// Send content | ||
tasks.push(function(cb) { | ||
const properties = {'content-type': 'application/json'}, | ||
className = 'basic'; | ||
async.series(tasks, function(err) { | ||
cb(err, message.uuid); | ||
that.handle.content(that.channelName, className, properties, stringifiedMsg, function(err) { | ||
if (err) { | ||
log.warn('larvitamintercom: send() - Could not send publish content to exchange: "' + options.exchange + '". err: ' + err.message + ', uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
if (that.sendQueue.length === 0) { | ||
that.sendInProgress = false; | ||
} else { | ||
readFromQueue(); | ||
} | ||
log.debug('larvitamintercom: send() - Content sent to exchange: "' + options.exchange + '", uuid: "' + message.uuid + ', message: "' + stringifiedMsg + '"'); | ||
cb(err); | ||
}); | ||
}); | ||
async.series(tasks, function(err) { | ||
cb(err, message.uuid); | ||
}); | ||
} | ||
readFromQueue(); | ||
}; | ||
@@ -408,0 +441,0 @@ |
{ | ||
"name": "larvitamintercom", | ||
"version": "0.1.0", | ||
"version": "0.1.1", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -29,3 +29,3 @@ 'use strict'; | ||
for (let i = 0; i < 11; i ++) { | ||
for (let i = 0; i < 14; i ++) { | ||
tasks.push(function(cb) { | ||
@@ -74,3 +74,3 @@ const intercom = new Intercom(config); | ||
this.timeout(20000); | ||
this.timeout(10000); | ||
@@ -88,3 +88,3 @@ for (let i = 0; intercoms[i] !== undefined; i ++) { | ||
describe('Send and receive', function() { | ||
/**/ | ||
it('check so the first intercom is up', function(done) { | ||
@@ -228,2 +228,72 @@ const intercom = intercoms[0]; | ||
}); | ||
it('send and receive multiple messages on different Intercoms', function(done) { | ||
const exchangeName = 'anotherInstance', | ||
orgMsg1 = {'ba': 'bo'}, | ||
orgMsg2 = {'waff': 'woff'}; | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
intercoms[11].subscribe({'exchange': exchangeName}, function(msg, ack) { | ||
if (JSON.stringify(msg.ba) === JSON.stringify(orgMsg1.ba)) { | ||
msg1Received ++; | ||
ack(); | ||
} else if (JSON.stringify(msg.waff) === JSON.stringify(orgMsg2.waff)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
if (msg1Received === 1 && msg2Received === 1) { | ||
done(); | ||
} | ||
}, function(err) { | ||
if (err) throw err; | ||
intercoms[11].send(orgMsg1, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
}); | ||
intercoms[12].send(orgMsg2, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
}); | ||
it('send and receive multiple messages on the same Intercom', function(done) { | ||
const exchangeName = 'yetAnotherInstance', | ||
orgMsg1 = {'bar': 'bor'}, | ||
orgMsg2 = {'waffer': 'woffer'}; | ||
let msg1Received = 0, | ||
msg2Received = 0; | ||
intercoms[13].subscribe({'exchange': exchangeName}, function(msg, ack) { | ||
if (JSON.stringify(msg.bar) === JSON.stringify(orgMsg1.bar)) { | ||
msg1Received ++; | ||
ack(); | ||
} else if (JSON.stringify(msg.waffer) === JSON.stringify(orgMsg2.waffer)) { | ||
msg2Received ++; | ||
ack(); | ||
} | ||
if (msg1Received === 10 && msg2Received === 10) { | ||
done(); | ||
} | ||
}, function(err) { | ||
if (err) throw err; | ||
for (let i = 0; i !== 10; i ++) { | ||
intercoms[13].send(orgMsg1, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
}); | ||
intercoms[13].send(orgMsg2, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
}); | ||
} | ||
}); | ||
}); | ||
}); |
32041
691