larvitamintercom
Advanced tools
Comparing version 0.1.5 to 0.1.6
77
index.js
@@ -188,3 +188,4 @@ 'use strict'; | ||
Intercom.prototype.consume = function(options, msgCb, cb) { | ||
const tasks = [], | ||
const returnObj = {}, | ||
tasks = [], | ||
that = this; | ||
@@ -216,2 +217,29 @@ | ||
returnObj.cancel = function cancel(cb) { | ||
if (returnObj.data === undefined || returnObj.data['consumer-tag'] === undefined) { | ||
const err = new Error('No consumer tag is defined, consume have probably not been started yet.'); | ||
log.warn('larvitamintercom: consume() - ' + err.message); | ||
cb(err); | ||
return; | ||
} | ||
that.handle.basic.cancel(returnObj.data['consumer-tag'], function(err) { | ||
if (err) { | ||
log.warn('larvitamintercom: consume() - end() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message); | ||
} else { | ||
log.verbose('larvitamintercom: consume() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"'); | ||
} | ||
cb(err); | ||
}); | ||
/* We could not get this to work :( // Lilleman and gagge 2016-12-27 | ||
that.handle.once(that.channelName + ':basic.cancel-ok', function(channel, method, data) { | ||
log.verbose('larvitamintercom: consume() - cancel() - Canceled consuming.'); | ||
log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel)); | ||
log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. method: ' + JSON.stringify(method)); | ||
log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. data: ' + JSON.stringify(data)); | ||
cb(); | ||
});*/ | ||
}; | ||
// Declare exchange | ||
@@ -250,2 +278,5 @@ tasks.push(function(cb) { | ||
that.handle.once(that.channelName + ':basic.consume-ok', function(channel, method, data) { | ||
returnObj.channel = channel; | ||
returnObj.method = method; | ||
returnObj.data = data; | ||
log.verbose('larvitamintercom: consume() - Started consuming with consumer tag: "' + data['consumer-tag'] + '"'); | ||
@@ -282,3 +313,7 @@ cb(); | ||
async.series(tasks, cb); | ||
async.series(tasks, function(err) { | ||
if (err) { cb(err); return; } | ||
cb(err, returnObj); | ||
}); | ||
}; | ||
@@ -531,3 +566,4 @@ | ||
Intercom.prototype.subscribe = function(options, msgCb, cb) { | ||
const tasks = [], | ||
const returnObj = {}, | ||
tasks = [], | ||
that = this; | ||
@@ -553,2 +589,30 @@ | ||
returnObj.cancel = function cancel(cb) { | ||
if (returnObj.data === undefined || returnObj.data['consumer-tag'] === undefined) { | ||
const err = new Error('No consumer tag is defined, consume have probably not been started yet.'); | ||
log.warn('larvitamintercom: subscribe() - ' + err.message); | ||
cb(err); | ||
return; | ||
} | ||
that.handle.basic.cancel(returnObj.data['consumer-tag'], function(err) { | ||
if (err) { | ||
log.warn('larvitamintercom: subscribe() - end() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message); | ||
} else { | ||
log.verbose('larvitamintercom: subscribe() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"'); | ||
} | ||
cb(err); | ||
}); | ||
/* We could not get this to work :( // Lilleman and gagge 2016-12-27 | ||
that.handle.once(that.channelName + ':basic.cancel-ok', function(channel, method, data) { | ||
log.verbose('larvitamintercom: subscribe() - cancel() - Canceled consuming.'); | ||
log.debug('larvitamintercom: subscribe() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel)); | ||
log.debug('larvitamintercom: subscribe() - cancel() - Canceled consuming. method: ' + JSON.stringify(method)); | ||
log.debug('larvitamintercom: subscribe() - cancel() - Canceled consuming. data: ' + JSON.stringify(data)); | ||
cb(); | ||
});*/ | ||
}; | ||
// Declare exchange | ||
@@ -592,2 +656,5 @@ tasks.push(function(cb) { | ||
that.handle.once(that.channelName + ':basic.consume-ok', function(channel, method, data) { | ||
returnObj.channel = channel; | ||
returnObj.method = method; | ||
returnObj.data = data; | ||
log.verbose('larvitamintercom: subscribe() - Started consuming with consumer tag: "' + data['consumer-tag'] + '", queueName: "' + queueName + '"'); | ||
@@ -624,5 +691,7 @@ cb(); | ||
async.series(tasks, cb); | ||
async.series(tasks, function(err) { | ||
cb(err, returnObj); | ||
}); | ||
}; | ||
exports = module.exports = Intercom; |
{ | ||
"name": "larvitamintercom", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -57,3 +57,3 @@ [![Build Status](https://travis-ci.org/larvit/larvitamintercom.svg?branch=master)](https://travis-ci.org/larvit/larvitamintercom) [![Dependencies](https://david-dm.org/larvit/larvitamintercom.svg)](https://david-dm.org/larvit/larvitamintercom.svg) | ||
let options = {'exchange': 'foo'}; // Will default to "default" if options is omitted | ||
let options = {'exchange': 'foo'}; // Will default to "default" if options is omitted | ||
@@ -69,6 +69,13 @@ intercom.consume(options, function(message, ack, deliveryTag) { | ||
ack(new Error('Something was wrong with the message')); | ||
}, function(err) { | ||
}, function(err, consumeInstance) { | ||
// Callback from established consume connection | ||
// Stop consuming when your application is not interested any more | ||
setTimeout(function() { | ||
consumeInstance.cancel(function(err) { | ||
if (err) throw err; | ||
// IMPORTANT!!! This callback is syncronous and does NOT guarantee no more messages comes on the consume() | ||
}); | ||
}, 3600000); | ||
}); | ||
``` | ||
@@ -102,4 +109,12 @@ | ||
ack(new Error('Something was wrong with the message')); | ||
}, function(err) { | ||
}, function(err, subscribeInstance) { | ||
// Callback from established subscribe connection | ||
// Stop consuming when your application is not interested any more | ||
setTimeout(function() { | ||
subscribeInstance.cancel(function(err) { | ||
if (err) throw err; | ||
// IMPORTANT!!! This callback is syncronous and does NOT guarantee no more messages comes on the subscribe() | ||
}); | ||
}, 3600000); | ||
}); | ||
@@ -106,0 +121,0 @@ ``` |
@@ -29,3 +29,3 @@ 'use strict'; | ||
for (let i = 0; i < 14; i ++) { | ||
for (let i = 0; i < 18; i ++) { | ||
tasks.push(function(cb) { | ||
@@ -71,3 +71,3 @@ const intercom = new Intercom(config); | ||
after(function(done) { | ||
/*after(function(done) { | ||
const tasks = []; | ||
@@ -80,3 +80,9 @@ | ||
tasks.push(function(cb) { | ||
intercom.close(cb); | ||
console.log('Closing intercom ' + i); | ||
intercom.close(function(err) { | ||
if (err) throw err; | ||
console.log('Closed intercom: ' + i); | ||
cb(err); | ||
}); | ||
}); | ||
@@ -86,3 +92,3 @@ } | ||
async.parallel(tasks, done); | ||
}); | ||
});*/ | ||
@@ -347,4 +353,4 @@ describe('Send and receive', function() { | ||
this.timeout(500); | ||
this.slow(420); // > 1050 is shown in yellow, 1000ms is setTimeout() | ||
this.timeout(2000); | ||
this.slow(700); | ||
@@ -369,2 +375,96 @@ intercoms[0].send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function(err) { | ||
}); | ||
it('should not receive after the consumation is cancelled', function(done) { | ||
const consumeIntercom = intercoms[14], | ||
sendIntercom = intercoms[15], | ||
exchangeName = 'notReceiveAfterConsumeCancel'; | ||
let receivedMessages = 0, | ||
consumeInstance; | ||
this.slow(1000); | ||
// Handle a message from queue | ||
function handleMsg(message, ack) { | ||
ack(); | ||
receivedMessages ++; | ||
if (receivedMessages === 1) { | ||
consumeInstance.cancel(function(err) { | ||
if (err) throw err; | ||
// The callback is sadly not trustworthy. Instead wait a bit and try again | ||
setTimeout(function() { | ||
sendAgain(); | ||
}, 200); | ||
}); | ||
} | ||
} | ||
consumeIntercom.consume({'exchange': exchangeName}, handleMsg, function(err, result) { | ||
consumeInstance = result; | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
function sendAgain() { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
// Wait a while, and then make sure we have not gotten a second message | ||
setTimeout(function() { | ||
assert.deepEqual(receivedMessages, 1); | ||
done(); | ||
}, 200); | ||
}); | ||
} | ||
}); | ||
it('should not receive after the subscription is cancelled', function(done) { | ||
const subscribeIntercom = intercoms[16], | ||
sendIntercom = intercoms[17], | ||
exchangeName = 'notReceiveAfterSubscribeCancel'; | ||
let receivedMessages = 0, | ||
subscribeInstance; | ||
this.slow(1000); | ||
// Handle a message from queue | ||
function handleMsg(message, ack) { | ||
ack(); | ||
receivedMessages ++; | ||
if (receivedMessages === 1) { | ||
subscribeInstance.cancel(function(err) { | ||
if (err) throw err; | ||
// The callback is sadly not trustworthy. Instead wait a bit and try again | ||
setTimeout(function() { | ||
sendAgain(); | ||
}, 200); | ||
}); | ||
} | ||
} | ||
subscribeIntercom.subscribe({'exchange': exchangeName}, handleMsg, function(err, result) { | ||
subscribeInstance = result; | ||
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
}); | ||
}); | ||
function sendAgain() { | ||
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeName}, function(err) { | ||
if (err) throw err; | ||
// Wait a while, and then make sure we have not gotten a second message | ||
setTimeout(function() { | ||
assert.deepEqual(receivedMessages, 1); | ||
done(); | ||
}, 200); | ||
}); | ||
} | ||
}); | ||
}); |
42677
952
127