amqp-wrapper
Advanced tools
Comparing version 5.6.0-requeue.0 to 6.0.0-node10.0
65
amqp.js
@@ -14,5 +14,3 @@ 'use strict'; | ||
let connection; | ||
let channel; | ||
let consumerTag; | ||
var connection, channel; | ||
@@ -29,4 +27,8 @@ var prefetch = config.prefetch || 10; | ||
function createChannel (err, conn) { | ||
if (err) return d.reject(err); | ||
debug('createChannel()'); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
connection = conn; | ||
conn.createConfirmChannel(assertExchange); | ||
@@ -36,4 +38,8 @@ } | ||
function assertExchange (err, ch) { | ||
if (err) return d.reject(err); | ||
debug('assertExchange()', ch); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
channel = ch; | ||
channel.prefetch(prefetch); | ||
@@ -44,5 +50,11 @@ channel.assertExchange(config.exchange, 'topic', {}, assertQueues); | ||
function assertQueues (err) { | ||
if (err) return d.reject(err); | ||
if (!config.queue || !config.queue.name) return d.resolve(); | ||
queueSetup.setupForConsume(channel, config, d.resolver(cb)); | ||
debug('assertQueues()'); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
if (config.queue && config.queue.name) { | ||
queueSetup.setupForConsume(channel, config, d.resolver(cb)); | ||
} else { | ||
d.resolve(); | ||
} | ||
} | ||
@@ -52,10 +64,7 @@ return d.nodeify(cb); | ||
function requeueAll () { | ||
channel.cancel(consumerTag); | ||
channel.nackAll(true); | ||
} | ||
function close (cb) { | ||
if (!connection) return cb(); | ||
return connection.close(cb); | ||
if (connection) { | ||
return connection.close(cb); | ||
} | ||
cb(); | ||
} | ||
@@ -96,8 +105,12 @@ | ||
*/ | ||
function consume (handleMessage, options, cb) { | ||
const d = Deferred(); | ||
function consume (handleMessage, options) { | ||
debug('consume()'); | ||
function onMessage (message) { | ||
function callback (message) { | ||
function done (err, requeue) { | ||
if (err) return channel.nack(message, false, requeue || false); | ||
if (requeue === undefined) { | ||
requeue = false; | ||
} | ||
if (err) { | ||
return channel.nack(message, false, requeue); | ||
} | ||
channel.ack(message); | ||
@@ -111,3 +124,3 @@ } | ||
} catch (error) { | ||
console.error(error); | ||
console.log(error); | ||
// Do not requeue on exception - it means something unexpected | ||
@@ -119,10 +132,3 @@ // (and prob. non-transitory) happened. | ||
channel.consume(config.queue.name, onMessage, options, consumeCb); | ||
function consumeCb (err, ok) { | ||
if (err) return d.reject(err); | ||
consumerTag = ok.consumerTag; | ||
return d.resolve(); | ||
} | ||
return d.nodeify(cb); | ||
channel.consume(config.queue.name, callback, options); | ||
} | ||
@@ -134,4 +140,3 @@ | ||
publish, | ||
consume, | ||
requeueAll | ||
consume | ||
}; | ||
@@ -138,0 +143,0 @@ }; |
{ | ||
"name": "amqp-wrapper", | ||
"version": "5.6.0-requeue.0", | ||
"version": "6.0.0-node10.0", | ||
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.", | ||
"main": "amqp.js", | ||
"scripts": { | ||
"test": "semistandard && NODE_ENV=test mocha test --recursive", | ||
"jshint": "jshint -c .jshintrc --exclude-path .gitignore .", | ||
"codestyle": "jscs -p google lib/ test/", | ||
"test": "semistandard && NODE_ENV=test mocha test --recursive --exit", | ||
"coverage": "nyc -a -c -r html -r text -r lcov npm test" | ||
@@ -27,15 +25,12 @@ }, | ||
"dependencies": { | ||
"amqplib": "^0.4.0", | ||
"async": "^0.9.0", | ||
"amqplib": "^0.5.5", | ||
"async": "^3.1.0", | ||
"debug": "^2.6.8", | ||
"deferential": "^1.0.0", | ||
"json-stringify-safe": "^5.0.0", | ||
"q": "^1.4.1" | ||
"q": "^1.5.1" | ||
}, | ||
"devDependencies": { | ||
"expect.js": "^0.3.1", | ||
"jscs": "^1.6.1", | ||
"jshint": "^2.8.0", | ||
"lodash": "^2.4.1", | ||
"mocha": "^1.21.4", | ||
"mocha": "^6.2.0", | ||
"nyc": "^11.4.1", | ||
@@ -42,0 +37,0 @@ "sandboxed-module": "^0.3.0", |
@@ -10,3 +10,3 @@ 'use strict'; | ||
var amqp = AMQP(config); | ||
return amqp.connect().then(function () { | ||
amqp.connect().then(function () { | ||
done(); | ||
@@ -13,0 +13,0 @@ }, done); |
@@ -21,3 +21,3 @@ 'use strict'; | ||
it('should throw with no url', function (done) { | ||
expect(function () { AMQP({exchange: ''}); }).to | ||
expect(function () { AMQP({ exchange: '' }); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
@@ -27,3 +27,3 @@ done(); | ||
it('should throw with no exchange', function (done) { | ||
expect(function () { AMQP({url: ''}); }).to | ||
expect(function () { AMQP({ url: '' }); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
@@ -127,3 +127,3 @@ done(); | ||
} | ||
amqp.publish('myqueue', {woo: 'test'}, {}, done); | ||
amqp.publish('myqueue', { woo: 'test' }, {}, done); | ||
}); | ||
@@ -139,3 +139,3 @@ }); | ||
var amqpLibMock = require('./amqplibmock')({overrides: {ack: ack}}); | ||
var amqpLibMock = require('./amqplibmock')({ overrides: { ack: ack } }); | ||
@@ -169,3 +169,3 @@ var mockedAMQP = SandboxedModule.require('../amqp', { | ||
messageToDeliver: 'nonvalidjson', | ||
overrides: {nack: nack} | ||
overrides: { nack: nack } | ||
}); | ||
@@ -197,3 +197,3 @@ | ||
var amqpLibMock = require('./amqplibmock')({overrides: {nack: nack}}); | ||
var amqpLibMock = require('./amqplibmock')({ overrides: { nack: nack } }); | ||
@@ -218,24 +218,4 @@ var mockedAMQP = SandboxedModule.require('../amqp', { | ||
}); | ||
describe('#requeueAll', function () { | ||
it('returns messages to the queue', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
amqp.publish('myqueue', 'test', { hi: 'there' }, done); | ||
amqp.consume((msg) => { | ||
expect(msg.hi).to.equal('there'); | ||
}).then(() => { | ||
amqp.requeueAll(); | ||
amqp.consume((msg) => { | ||
expect(msg.hi).to.equal('there'); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
// vim: set et sw=2 colorcolumn=80: |
@@ -8,3 +8,3 @@ module.exports = { | ||
routingKey: 'myRoutingQueue', | ||
options: {deadLetterExchange: 'wow'} | ||
options: { deadLetterExchange: 'wow' } | ||
} | ||
@@ -26,3 +26,3 @@ }, | ||
routingKey: ['myRoutingKey', 'myRoutingKey2'], | ||
options: {deadLetterExchange: 'wow'} | ||
options: { deadLetterExchange: 'wow' } | ||
} | ||
@@ -29,0 +29,0 @@ } |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
6
20304
438
+ Addedamqplib@0.5.6(transitive)
+ Addedasync@3.2.6(transitive)
+ Addedbitsyntax@0.1.0(transitive)
+ Addedbluebird@3.7.2(transitive)
+ Addedbuffer-more-ints@1.0.0(transitive)
+ Addedquerystringify@2.2.0(transitive)
+ Addedrequires-port@1.0.0(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedurl-parse@1.4.7(transitive)
- Removedamqplib@0.4.2(transitive)
- Removedasync@0.9.2(transitive)
- Removedbitsyntax@0.0.4(transitive)
- Removedbuffer-more-ints@0.0.2(transitive)
- Removedwhen@3.6.4(transitive)
Updatedamqplib@^0.5.5
Updatedasync@^3.1.0
Updatedq@^1.5.1