amqplib-easy
Advanced tools
Comparing version 2.0.0 to 2.1.0
119
index.js
@@ -43,25 +43,38 @@ 'use strict'; | ||
ch.prefetch(options.prefetch); | ||
return BPromise.all([ | ||
options.exchange ? | ||
ch.assertExchange(options.exchange, | ||
options.exchangeType, | ||
options.exchangeOptions) | ||
: BPromise.resolve(), | ||
ch.assertQueue(options.queue, options.queueOptions), | ||
options.retry && options.retry.failQueue ? ch.assertQueue(options.retry.failQueue, options.queueOptions) : true | ||
]) | ||
.then(function () { | ||
if (options.topics && options.topics.length) { | ||
return BPromise.map(options.topics, function (topic) { | ||
return ch.bindQueue(options.queue, options.exchange, topic); | ||
}); | ||
} | ||
}) | ||
.then(function () { | ||
if (options.retry) { | ||
return ch.consume(options.queue, retry({ | ||
channel: ch, | ||
consumerQueue: options.queue, | ||
failureQueue: options.retry.failQueue, | ||
handler: function (msg) { | ||
return BPromise.resolve() | ||
.then(function () { | ||
return BPromise.all([ | ||
options.exchange ? ch.assertExchange(options.exchange, options.exchangeType, options.exchangeOptions) : BPromise.resolve(), | ||
ch.assertQueue(options.queue, options.queueOptions), | ||
options.retry && options.retry.failQueue ? ch.assertQueue(options.retry.failQueue, options.queueOptions) : BPromise.resolve() | ||
]); | ||
}) | ||
.then(function () { | ||
if (options.topics && options.topics.length) { | ||
return BPromise.map(options.topics, function (topic) { | ||
return ch.bindQueue(options.queue, options.exchange, topic); | ||
}); | ||
} | ||
}) | ||
.then(function () { | ||
if (options.retry) { | ||
return ch.consume(options.queue, retry({ | ||
channel: ch, | ||
consumerQueue: options.queue, | ||
failureQueue: options.retry.failQueue, | ||
handler: function (msg) { | ||
if (!msg) { return; } | ||
return BPromise.resolve() | ||
.then(function () { | ||
try { | ||
msg.json = JSON.parse(msg.content.toString()); | ||
} catch (err) { | ||
console.error('Error converting AMQP message content to JSON.', err); | ||
} | ||
return handler(msg, ch); | ||
}); | ||
} | ||
})); | ||
} else { | ||
return ch.consume(options.queue, function (msg) { | ||
if (!msg) { return; } | ||
@@ -76,32 +89,18 @@ return BPromise.resolve() | ||
return handler(msg, ch); | ||
}) | ||
.then(function () { | ||
return ch.ack(msg); | ||
}) | ||
.catch(function (err) { | ||
ch.nack(msg); | ||
throw err; | ||
}); | ||
} | ||
})); | ||
} else { | ||
return ch.consume(options.queue, function (msg) { | ||
if (!msg) { return; } | ||
return BPromise.resolve() | ||
.then(function () { | ||
try { | ||
msg.json = JSON.parse(msg.content.toString()); | ||
} catch (err) { | ||
console.error('Error converting AMQP message content to JSON.', err); | ||
} | ||
return handler(msg, ch); | ||
}) | ||
.then(function () { | ||
return ch.ack(msg); | ||
}) | ||
.catch(function (err) { | ||
ch.nack(msg); | ||
throw err; | ||
}); | ||
}); | ||
} | ||
}) | ||
.then(function (consumerInfo) { | ||
return function () { | ||
return BPromise.resolve(ch.cancel(consumerInfo.consumerTag)); | ||
}; | ||
}); | ||
}); | ||
} | ||
}) | ||
.then(function (consumerInfo) { | ||
return function () { | ||
return BPromise.resolve(ch.cancel(consumerInfo.consumerTag)); | ||
}; | ||
}); | ||
}); | ||
@@ -120,7 +119,4 @@ } | ||
return BPromise.all([ | ||
ch.assertExchange(queueConfig.exchange, | ||
queueConfig.exchangeType || 'topic', | ||
queueConfig.exchangeOptions || {durable: true}), | ||
ch.assertQueue(queueConfig.queue, | ||
queueConfig.queueOptions || {durable: true}) | ||
ch.assertExchange(queueConfig.exchange, queueConfig.exchangeType || 'topic', queueConfig.exchangeOptions || {durable: true}), | ||
queueConfig.queue ? ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) : BPromise.resolve() | ||
]); | ||
@@ -130,5 +126,5 @@ }) | ||
return ch.publish(queueConfig.exchange, | ||
key, | ||
new Buffer(JSON.stringify(json)), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}); | ||
key, | ||
new Buffer(JSON.stringify(json)), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}); | ||
}); | ||
@@ -144,4 +140,3 @@ } | ||
.then(function (ch) { | ||
return ch.assertQueue(queueConfig.queue, | ||
queueConfig.queueOptions || {durable: true}) | ||
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) | ||
.then(function () { | ||
@@ -148,0 +143,0 @@ return ch.sendToQueue( |
{ | ||
"name": "amqplib-easy", | ||
"version": "2.0.0", | ||
"version": "2.1.0", | ||
"description": "Simplified API for interacting with AMQP", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
15849
11
238