amqplib-easy
Advanced tools
Comparing version 1.0.1 to 2.0.0
118
index.js
@@ -12,3 +12,3 @@ 'use strict'; | ||
if (!connections[amqpUrl]) { | ||
connections[amqpUrl] = amqp.connect(amqpUrl); | ||
connections[amqpUrl] = BPromise.resolve(amqp.connect(amqpUrl)); | ||
} | ||
@@ -38,10 +38,7 @@ return connections[amqpUrl]; | ||
return BPromise.resolve() | ||
.then(function () { | ||
return connect(); | ||
}) | ||
return connect() | ||
.then(function (conn) { | ||
return conn.createChannel(); | ||
}) | ||
.tap(function (ch) { | ||
.then(function (ch) { | ||
ch.prefetch(options.prefetch); | ||
@@ -56,18 +53,31 @@ return BPromise.all([ | ||
options.retry && options.retry.failQueue ? ch.assertQueue(options.retry.failQueue, options.queueOptions) : true | ||
]); | ||
}) | ||
.tap(function (ch) { | ||
if (options.topics && options.topics.length) { | ||
return BPromise.map(options.topics, function (topic) { | ||
return ch.bindQueue(options.queue, options.exchange, topic); | ||
}); | ||
} | ||
}) | ||
.then(function (ch) { | ||
if (options.retry) { | ||
return ch.consume(options.queue, retry({ | ||
channel: ch, | ||
consumerQueue: options.queue, | ||
failureQueue: options.retry.failQueue, | ||
handler: function (msg) { | ||
]) | ||
.then(function () { | ||
if (options.topics && options.topics.length) { | ||
return BPromise.map(options.topics, function (topic) { | ||
return ch.bindQueue(options.queue, options.exchange, topic); | ||
}); | ||
} | ||
}) | ||
.then(function () { | ||
if (options.retry) { | ||
return ch.consume(options.queue, retry({ | ||
channel: ch, | ||
consumerQueue: options.queue, | ||
failureQueue: options.retry.failQueue, | ||
handler: function (msg) { | ||
if (!msg) { return; } | ||
return BPromise.resolve() | ||
.then(function () { | ||
try { | ||
msg.json = JSON.parse(msg.content.toString()); | ||
} catch (err) { | ||
console.error('Error converting AMQP message content to JSON.', err); | ||
} | ||
return handler(msg, ch); | ||
}); | ||
} | ||
})); | ||
} else { | ||
return ch.consume(options.queue, function (msg) { | ||
if (!msg) { return; } | ||
@@ -82,26 +92,18 @@ return BPromise.resolve() | ||
return handler(msg, ch); | ||
}) | ||
.then(function () { | ||
return ch.ack(msg); | ||
}) | ||
.catch(function (err) { | ||
ch.nack(msg); | ||
throw err; | ||
}); | ||
} | ||
})); | ||
} else { | ||
return ch.consume(options.queue, function (msg) { | ||
if (!msg) { return; } | ||
return BPromise.resolve() | ||
.then(function () { | ||
try { | ||
msg.json = JSON.parse(msg.content.toString()); | ||
} catch (err) { | ||
console.error('Error converting AMQP message content to JSON.', err); | ||
} | ||
return handler(msg, ch); | ||
}) | ||
.then(function () { | ||
return ch.ack(msg); | ||
}) | ||
.catch(function (err) { | ||
ch.nack(msg); | ||
throw err; | ||
}); | ||
}); | ||
} | ||
}); | ||
} | ||
}) | ||
.then(function (consumerInfo) { | ||
return function () { | ||
return BPromise.resolve(ch.cancel(consumerInfo.consumerTag)); | ||
}; | ||
}); | ||
}); | ||
@@ -111,6 +113,3 @@ } | ||
function publish(queueConfig, key, json, messageOptions) { | ||
return BPromise.resolve() | ||
.then(function () { | ||
return connect(); | ||
}) | ||
return connect() | ||
.then(function (conn) { | ||
@@ -140,6 +139,3 @@ return conn.createChannel(); | ||
function sendToQueue(queueConfig, json, messageOptions) { | ||
return BPromise.resolve() | ||
.then(function () { | ||
return connect(); | ||
}) | ||
return connect() | ||
.then(function (conn) { | ||
@@ -149,10 +145,12 @@ // optional future =improvement - maybe we should keep the channel open? | ||
}) | ||
.tap(function (ch) { | ||
.then(function (ch) { | ||
return ch.assertQueue(queueConfig.queue, | ||
queueConfig.queueOptions || {durable: true}); | ||
}) | ||
.then(function (ch) { | ||
return ch.sendToQueue(queueConfig.queue, | ||
new Buffer(JSON.stringify(json)), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}); | ||
queueConfig.queueOptions || {durable: true}) | ||
.then(function () { | ||
return ch.sendToQueue( | ||
queueConfig.queue, | ||
new Buffer(JSON.stringify(json)), | ||
messageOptions || queueConfig.messageOptions || {persistent: true} | ||
); | ||
}); | ||
}); | ||
@@ -159,0 +157,0 @@ } |
{ | ||
"name": "amqplib-easy", | ||
"version": "1.0.1", | ||
"version": "2.0.0", | ||
"description": "Simplified API for interacting with AMQP", | ||
@@ -8,7 +8,7 @@ "main": "index.js", | ||
"lint": "eslint --ignore-path .gitignore .", | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
"test": "npm run lint && mocha" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/lanetix/node-amqplib-easy" | ||
"url": "https://github.com/lanetix/node-lanetix-amqp-easy" | ||
}, | ||
@@ -21,10 +21,12 @@ "keywords": [ | ||
"author": "Mike Atkins", | ||
"license": "ISC", | ||
"license": "MIT", | ||
"bugs": { | ||
"url": "https://github.com/lanetix/node-amqplib-easy/issues" | ||
"url": "https://github.com/lanetix/node-lanetix-amqp-easy/issues" | ||
}, | ||
"homepage": "https://github.com/lanetix/node-amqplib-easy", | ||
"homepage": "https://github.com/lanetix/node-lanetix-amqp-easy", | ||
"devDependencies": { | ||
"eslint": "^0.11.0", | ||
"eslint-plugin-nodeca": "^1.0.3" | ||
"eslint-plugin-nodeca": "^1.0.3", | ||
"mocha": "^2.1.0", | ||
"should": "^4.5.0" | ||
}, | ||
@@ -31,0 +33,0 @@ "dependencies": { |
@@ -1,1 +0,50 @@ | ||
# node-lanetix-amqp-easy | ||
amqplib-easy | ||
============ | ||
[![Build Status](https://travis-ci.org/lanetix/node-lanetix-amqp-easy.svg?branch=api-doc)](https://travis-ci.org/lanetix/node-lanetix-amqp-easy) | ||
[amqplib](https://github.com/squaremo/amqp.node) but easy! Let us manage your | ||
channels, connections, assertions and bindings for you, so you can just send | ||
messages. | ||
Installation | ||
------------ | ||
```javascript | ||
npm install --save amqplib-easy | ||
``` | ||
Usage | ||
----- | ||
```javascript | ||
var amqp = require('amqplib-easy')('amqp://foo:bar@amqp.lol'); | ||
amqp.consume( | ||
{ | ||
exchange: 'cat', | ||
queue: 'found_cats', | ||
topics: [ 'found.*' ] | ||
}, | ||
function (cat) { | ||
console.log('Found a cat named', cat.json.name); | ||
} | ||
); | ||
amqp.publish({ exchange: 'cat' }, 'found.tawny', { name: 'Sally' }); | ||
amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Fred' }); | ||
``` | ||
yields | ||
``` | ||
Found a cat named Sally | ||
Found a cat named Fred | ||
``` | ||
###[API](API.md) | ||
###Logging | ||
All methods return a promise, so you can attach logging to them via the | ||
following. | ||
```javascript | ||
amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Fred' }) | ||
.then(function () { console.log('sent to queue'); }) | ||
.catch(function (err) { console.error('sending failed', err); }); | ||
``` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No bug tracker
MaintenancePackage does not have a linked bug tracker in package.json.
Found 1 instance in 1 package
No website
QualityPackage does not have a website.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No bug tracker
MaintenancePackage does not have a linked bug tracker in package.json.
Found 1 instance in 1 package
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
No website
QualityPackage does not have a website.
Found 1 instance in 1 package
16289
13
262
1
51
4