amqplib-easy
Advanced tools
Comparing version 4.2.2 to 4.3.0
@@ -104,7 +104,7 @@ 'use strict' | ||
.then(function () { | ||
if (options.topics && options.topics.length) { | ||
if ((options.topics && options.topics.length) || (options.exchangeType === 'x-delayed-message' && options.exchangeOptions.arguments['x-delayed-type'] === 'topic')) { | ||
return Promise.map(options.topics, function (topic) { | ||
return ch.bindQueue(options.queue, options.exchange, topic, options.arguments) | ||
}) | ||
} else if (options.exchangeType === 'fanout') { | ||
} else if (options.exchangeType === 'fanout' || (options.exchangeType === 'x-delayed-message' && options.exchangeOptions.arguments['x-delayed-type'] === 'fanout')) { | ||
return ch.bindQueue(options.queue, options.exchange, '', options.arguments) | ||
@@ -111,0 +111,0 @@ } |
{ | ||
"name": "amqplib-easy", | ||
"version": "4.2.2", | ||
"version": "4.3.0", | ||
"description": "Simplified API for interacting with AMQP", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -159,6 +159,6 @@ /*globals it:false*/ | ||
.then(function (channel) { | ||
channel.checkExchange('cat') | ||
return channel.checkExchange('cat') | ||
.then( | ||
function () { | ||
channel.deleteExchange('cat') | ||
return channel.deleteExchange('cat') | ||
}, | ||
@@ -264,1 +264,92 @@ function () { /* NBD it doesn't exist */ | ||
}) | ||
describe('x-delayed-message', function () { | ||
var plugin = false | ||
after(function () { | ||
if (!plugin) return | ||
return amqp.connect() | ||
.then(function (connection) { | ||
return BPromise.resolve(connection.createChannel()) | ||
.then(function (channel) { | ||
return BPromise.all( | ||
[ | ||
channel.checkExchange('cat') | ||
.then(function () { | ||
channel.deleteExchange('cat') | ||
}), | ||
channel.checkQueue('found_cats') | ||
.then(function () { | ||
channel.deleteQueue('found_cats') | ||
}), | ||
channel.checkQueue('found_cats.failure') | ||
.then(function () { | ||
channel.deleteQueue('found_cats.failure') | ||
}) | ||
]) | ||
.catch(function () { | ||
// the queue doesn't exist, so w/e | ||
return | ||
}) | ||
}) | ||
}) | ||
}) | ||
it('should installed rabbitmq delayed message plugin', function (done) { | ||
amqp.connect() | ||
.then(function (connection) { | ||
connection.on('error', function () {}) | ||
return BPromise.resolve(connection.createChannel()) | ||
.then(function (channel) { | ||
return channel.assertExchange('cat', 'x-delayed-message', {arguments: {'x-delayed-type': 'fanout'}}) | ||
}) | ||
.then(function () { | ||
return BPromise.resolve(connection.createChannel()) | ||
.then(function (channel) { | ||
plugin = true | ||
return channel.deleteExchange('cat') | ||
}) | ||
}) | ||
}) | ||
.catch(function () {}) | ||
.finally(function () { | ||
done() | ||
}) | ||
}) | ||
it('should publish delayed message of 3 sec via fanout', function (done) { | ||
if (!plugin) { | ||
this.skip() | ||
} | ||
this.timeout(6000) | ||
amqp | ||
.consume({ | ||
exchange: 'cat', | ||
exchangeType: 'x-delayed-message', | ||
exchangeOptions: {arguments: {'x-delayed-type': 'fanout'}}, | ||
queue: 'found_cats' | ||
}, function (cat) { | ||
var name = cat.json.name | ||
// There may be some delay, use 2.9 sec to test | ||
var time = cat.json.time + 2900 | ||
try { | ||
name.should.equal('Sally') | ||
time.should.be.below(new Date().getTime()) | ||
done() | ||
} catch (err) { | ||
done(err) | ||
} | ||
}) | ||
.then(function () { | ||
return amqp.publish( | ||
{exchange: 'cat', exchangeType: 'x-delayed-message'}, | ||
'found.tawny', | ||
{name: 'Sally', time: new Date().getTime()}, | ||
{headers: {'x-delay': 3000}} | ||
) | ||
}) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
25595
556