amqp.channel
Advanced tools
Comparing version 0.0.9 to 1.0.0
var noop = function(){}, | ||
amqp = require('amqplib'), | ||
Promise = require('bluebird'); | ||
Promise = require('bluebird'), | ||
simplify = require('./simplify'); | ||
@@ -14,13 +15,19 @@ module.exports = function createChannel(url, assertions, log){ | ||
var user = amqp.auth.split(':')[0]; | ||
var close = connection.close.bind(connection); | ||
var close = function(e){ connection.close(); return Promise.reject(e); }; | ||
log.info('Connected to %s as "%s"', amqp.host, user); | ||
process.once('SIGINT', close); | ||
process.once('SIGTERM', close); | ||
return connection.createConfirmChannel().then(setupChannel, close); | ||
return connection.createConfirmChannel().then(setupChannel).catch(close); | ||
} | ||
function setupChannel(channel) { | ||
var setup = []; | ||
var setup = [], channelIsBlocked = false; | ||
for (var method in assertions) { | ||
setup.push.apply(setup, assertions[method].map(applyToChannel(method))); | ||
if (typeof channel[method] === 'function') { | ||
setup.push.apply(setup, assertions[method].map(applyToChannel(method))); | ||
} else { | ||
return closeChannel( | ||
new TypeError('Channel has no method "' + method + '"') | ||
); | ||
} | ||
}; | ||
@@ -31,3 +38,8 @@ | ||
channel.on('unblocked', blocked(false)); | ||
channel.isBlocked = false; | ||
if (!channel.hasOwnProperty('isBlocked')) { | ||
Object.defineProperty(channel, 'isBlocked', { | ||
get: function(){ return channelIsBlocked }, | ||
enumerable: true | ||
}); | ||
} | ||
@@ -45,3 +57,3 @@ return Promise.all(setup).then(returnChannel, closeChannel); | ||
log.info('- Channel setup complete'); | ||
return channel; | ||
return simplify(channel); | ||
} | ||
@@ -58,5 +70,5 @@ | ||
var level = isBlocked ? 'warn' : 'info'; | ||
return function changeState(){ | ||
return function changeBlockedState(){ | ||
log[level]('- Channel %s', state); | ||
channel.isBlocked = isBlocked; | ||
channelIsBlocked = isBlocked; | ||
}; | ||
@@ -63,0 +75,0 @@ } |
{ | ||
"name": "amqp.channel", | ||
"version": "0.0.9", | ||
"version": "1.0.0", | ||
"description": "A simplified way to setup an AMQP connection/channel with amqplib", | ||
@@ -5,0 +5,0 @@ "main": "channel.js", |
190
README.md
@@ -8,5 +8,5 @@ # amqp.channel | ||
## Compared to amqplib | ||
## Simplified Configuration | ||
amqplib syntax: | ||
**amqplib syntax:** | ||
@@ -23,9 +23,10 @@ ```javascript | ||
channel.deleteExchange('alt.exchange', { ifEmpty: true }), | ||
channel.assertQueue('queue', { durable: true }), | ||
channel.checkQueue('queue'), | ||
channel.bindQueue('queue', 'exchange', ''), | ||
channel.unbindQueue('queue', 'exchange', ''), | ||
channel.purgeQueue('queue'), | ||
channel.deleteQueue('queue', { ifEmpty: true }), | ||
channel.prefetch(1) | ||
channel.assertQueue('first', { durable: true }), | ||
channel.assertQueue('second'), | ||
channel.checkQueue('first'), | ||
channel.bindQueue('first', 'exchange', ''), | ||
channel.unbindQueue('first', 'exchange', ''), | ||
channel.purgeQueue('first'), | ||
channel.deleteQueue('first', { ifEmpty: true }), | ||
channel.deleteQueue('second') | ||
]); | ||
@@ -37,3 +38,3 @@ }).then(function(channel){ | ||
amqp.channel syntax: | ||
**amqp.channel syntax:** | ||
@@ -47,9 +48,8 @@ ```javascript | ||
deleteExchange : [['alt.exchange', { ifEmpty: true }]], | ||
assertQueue : [['queue', { durable: true }]], | ||
checkQueue : [['queue']], | ||
bindQueue : [['queue', 'exchange', '']], | ||
unbindQueue : [['queue', 'exchange', '']], | ||
purgeQueue : [['queue']], | ||
deleteQueue : [['queue', { ifEmpty: true }]], | ||
prefetch : [[1]] | ||
assertQueue : [['first', { durable: true }], ['second']], | ||
checkQueue : [['first']], | ||
bindQueue : [['first', 'exchange', '']], | ||
unbindQueue : [['first', 'exchange', '']], | ||
purgeQueue : [['first']], | ||
deleteQueue : [['first', { ifEmpty: true }], ['second']] | ||
}).then(function(channel){ | ||
@@ -60,6 +60,67 @@ // Do stuff with the channel | ||
## Example | ||
## Simplified Usage | ||
Say you wanted to listen to an exchange `'foo'` and send a different message to queue `'bar'` every time the message contained the word `'baz'`. | ||
The `channel` object resolved by the returned `Promise` will behave differently from a normal `channel` object returned by the amqplib library in a few (hopefully convenient) ways: | ||
1. The `consume`, `publish`, and `sendToQueue` channel methods have been changed to explicitly handle JSON. | ||
2. The `publish` and `sendToQueue` methods have been "promisified" in a way that will still provide information to know whether or not the write buffer is full (and therefore, whether or not you should continue writing to it) by adding an additional `ok` boolean property to the promise. | ||
3. A `channel` consumer callback will no longer receive `null` when that consumer had been cancelled by Rabbit MQ. Instead, the `channel` object will emit a `'cancelled'` event with all the arguments passed to the `channel.consume()` call for the consumer that was cancelled. | ||
#### Examples of Modified Usage: | ||
Automatic translation of JS object to JSON string to Buffer for sending/publishing: | ||
```javascript | ||
channel.sendToQueue('someQueue', { hello: 'world' }); | ||
channel.publish('someExchange', 'routingKey', { hello: 'world' }); | ||
``` | ||
Promisification of `sendToQueue` and `publish` methods: | ||
```javascript | ||
return channel.sendToQueue('someQueue', { hello: 'world' }).then(function(){ | ||
return channel.publish('someExchange', 'routingKey', { hello: 'world' }); | ||
}); | ||
``` | ||
Automatic translation of message Buffer to JSON string to JS object for consuming: | ||
```javascript | ||
channel.sendToQueue('someQueue', { hello: 'world' }); | ||
channel.consume('someQueue', function(parsedMessage, originalMessage){ | ||
console.log('hello', parsedMessage.hello); // => hello world | ||
channel.ack(originalMessage); | ||
}); | ||
``` | ||
Handling a consumer getting cancelled by Rabbit MQ: | ||
```javascript | ||
channel.on('cancelled', function(queue, callback, options){ | ||
// When the consumer below gets cancelled by Rabbit MQ | ||
console.log(queue, callback.name, options); // 'someQueue', 'onMessage', { noAck: true } | ||
}); | ||
channel.consume('someQueue', function onMessage(parsedMessage, originalMessage){ | ||
console.log(parsedMessage); | ||
}, { noAck: true }); | ||
``` | ||
The `ok` property on the promises returned by the `sendToQueue` and `publish` methods: | ||
```javascript | ||
var sent = channel.sendToQueue('someQueue', { hello: 'world' }); | ||
if (sent.ok) { | ||
// continue sending | ||
} else { | ||
// maybe pause sending until unblocked? | ||
channel.once('drain', function(){ | ||
// continue sending | ||
}); | ||
} | ||
``` | ||
## Real World Example | ||
Say you wanted to listen to the `'foo'` exchange and send a different message to the `'bar'` queue every time the message's `baz` property contained the word `'qux'`. | ||
In your config.js: | ||
@@ -78,2 +139,20 @@ | ||
cfg.channelMethodsToCall = { | ||
assertQueue: // Channel method to invoke | ||
[ // Array of channel method invocations | ||
[ // channel.assertQueue( cfg.queue.toConsumeFrom ) | ||
cfg.queue.toConsumeFrom | ||
], | ||
[ // channel.assertQueue( cfg.queue.toSendTo, { durable: true } ); | ||
cfg.queue.toSendTo, { durable: true } | ||
] | ||
], | ||
assertExchange: [ | ||
[ cfg.exchange, 'fanout' ] // channel.assetExchange(cfg.exchange, 'fanout') | ||
], | ||
bindQueue: [ | ||
[ cfg.queue.toConsumeFrom, cfg.exchange, '' ] | ||
] | ||
} | ||
module.exports = cfg; | ||
@@ -87,34 +166,51 @@ ``` | ||
var amqp = require('amqp.channel'); | ||
var channelMethodsToCall = { | ||
// Channel method to invoke | ||
assertQueue: [ // Array of channel method invocations | ||
[ cfg.queue.toConsumeFrom ], // Arguments applied to the first invocation | ||
[ cfg.queue.toSendTo, { durable: true }] // Arguments applied to the second | ||
], | ||
assertExchange: [ | ||
[ cfg.exchange, 'fanout' ] // channel.assetExchange(cfg.exchange, 'fanout') | ||
], | ||
bindQueue: [ | ||
[ cfg.queue.toConsumeFrom, cfg.exchange, '' ] | ||
] | ||
}; | ||
module.exports = amqp(cfg.amqpUrl, channelMethodsToCall).then(consumeQueue); | ||
module.exports = amqp(cfg.amqpUrl, cfg.channelMethodsToCall) | ||
.then(consumeAtMost(1)) | ||
.then(consumeFrom(cfg.queue.toConsumeFrom)); | ||
function consumeAtMost(maxMessages){ | ||
return function(channel){ | ||
// Only process `maxMessages` at a time and don't consume another | ||
// message until we've either `ack` or `nack` the current one. | ||
return channel.prefetch(maxMessage).then(function(){ | ||
return channel; | ||
}); | ||
} | ||
} | ||
function consumeQueue(channel){ | ||
channel.consume(cfg.queue.toConsumeFrom, function onMessage(msg){ | ||
if (msg === null) { | ||
channel.nack(msg); | ||
console.warn('RabbitMQ cancelled consumer'); | ||
} else { | ||
channel.ack(msg); | ||
var message = msg.content.toString('utf8') | ||
if (/baz/.test(message)) { | ||
var msgToSend = new Buffer('qux'); | ||
function consumeFrom(queue){ | ||
return function(channel){ | ||
channel.consume(queue, function onMessage(parsed, msg){ | ||
if (/baz/.test(parsed.baz)) { | ||
var msgToSend = { hello: 'world' }; | ||
var options = { persistent: true }; | ||
channel.sendToQueue(cfg.queue.toSendTo, msgToSend, options); | ||
var sendMsg = channel.sendToQueue(cfg.queue.toSendTo, msgToSend, options); | ||
sendMsg.catch(function(e){ | ||
console.error(e); | ||
// Try to process message again? | ||
// onMessage(parsed, msg); | ||
}); | ||
if (sendMsg.ok) { | ||
channel.ack(msg); | ||
} else { | ||
sendMsg.then(function(){ | ||
channel.ack(msg); | ||
}); | ||
} | ||
} else { | ||
channel.ack(msg); | ||
} | ||
} | ||
}); | ||
return channel; | ||
}); | ||
channel.on('cancelled', function onConsumerCancelled(queue, cb, options){ | ||
console.warn('RabbitMQ cancelled your consumer for %s', queue); | ||
// Try to setup the consumer again? | ||
// channel.consume(queue, cb, options); | ||
}); | ||
return channel; | ||
} | ||
} | ||
@@ -121,0 +217,0 @@ ``` |
@@ -153,2 +153,7 @@ /*jslint nodejs: true, expr: true*/ | ||
}); | ||
after(function(){ | ||
connect.reset(); | ||
createConfirmChannel.reset(); | ||
}); | ||
}); | ||
@@ -159,5 +164,4 @@ | ||
var test = done.bind(null, null); | ||
channel.assertExchange = channel.assertQueue = | ||
sinon.stub().returns(Promise.reject(new Error('Failure'))); | ||
getChannel = stubChannel(amqpUrl, assertions, npmlog); | ||
var methods = _.merge({ bogus : [['foo', 'bar']] }, assertions); | ||
getChannel = stubChannel(amqpUrl, methods, npmlog); | ||
getChannel.then(test, test); | ||
@@ -170,3 +174,3 @@ }); | ||
.that.includes.something | ||
.that.is.an.instanceOf(Error); | ||
.that.is.an.instanceOf(TypeError); | ||
}) | ||
@@ -180,5 +184,160 @@ | ||
it('should reject with an error', function(){ | ||
expect(getChannel).to.be.rejectedWith(Error); | ||
expect(getChannel).to.be.rejectedWith(TypeError); | ||
}); | ||
after(function(){ | ||
connect.reset(); | ||
createConfirmChannel.reset(); | ||
}); | ||
}); | ||
describe('Simplified', function(){ | ||
before(function(){ | ||
return getChannel = stubChannel(amqpUrl, null, null); | ||
}); | ||
function serialize(thing){ | ||
return { | ||
content: new Buffer(JSON.stringify(thing)), | ||
fields: {}, | ||
properties: { contentType: 'application/json' } | ||
}; | ||
} | ||
function serializedMessage(msg){ | ||
return function validateSerializedMessage(buffer){ | ||
var parsed = JSON.parse(buffer.toString()); | ||
return expect(parsed).to.eql(msg) || true; | ||
} | ||
} | ||
describe('#publish()', function(){ | ||
var promise = { | ||
resolved: null, | ||
rejected: null | ||
}; | ||
var msg = { hello: 'world' }; | ||
var originalFn = sinon.stub(); | ||
originalFn.onFirstCall().returns(false).callsArgWith(4, new Error('test')); | ||
originalFn.onSecondCall().returns(true).callsArgWith(4, null); | ||
channel.publish = originalFn; | ||
before(function(){ | ||
var ch = null | ||
return getChannel.then(function(c){ ch = c; | ||
return promise.rejected = ch.publish('exchange', 'routingKey', msg); | ||
}).catch(function(){ | ||
return promise.resolved = ch.publish('exchange', 'routingKey', msg); | ||
}); | ||
}); | ||
it('should call the original #publish method', function(){ | ||
expect(originalFn).to.have.been.calledTwice; | ||
}); | ||
it('should serialize the passed message object into a Buffer', function(){ | ||
expect(originalFn).and.to.have.been.calledWithExactly( | ||
'exchange', | ||
'routingKey', | ||
sinon.match(serializedMessage(msg)), | ||
sinon.match({ contentType: 'application/json' }), | ||
sinon.match.func | ||
); | ||
}); | ||
it('should return a promise with extra `ok` property', function(){ | ||
expect(promise.rejected).to.have.property('ok').that.is.false; | ||
expect(promise.rejected).to.be.rejectedWith(Error); | ||
expect(promise.resolved).to.have.property('ok').that.is.true; | ||
expect(promise.resolved).to.be.resolved; | ||
}); | ||
}); | ||
describe('#sendToQueue()', function(){ | ||
var promise = { | ||
resolved: null, | ||
rejected: null | ||
}; | ||
var msg = { hello: 'world' }; | ||
var originalFn = sinon.stub(); | ||
originalFn.onFirstCall().returns(false).callsArgWith(3, new Error('test')); | ||
originalFn.onSecondCall().returns(true).callsArgWith(3, null); | ||
channel.sendToQueue = originalFn; | ||
before(function(){ | ||
var ch = null | ||
return getChannel.then(function(c){ ch = c; | ||
return promise.rejected = ch.sendToQueue('queue', msg); | ||
}).catch(function(){ | ||
return promise.resolved = ch.sendToQueue('queue', msg); | ||
}); | ||
}); | ||
it('should call the original #publish method', function(){ | ||
expect(originalFn).to.have.been.calledTwice; | ||
}); | ||
it('should serialize the passed message object into a Buffer', function(){ | ||
expect(originalFn).and.to.have.been.calledWithExactly( | ||
'queue', | ||
sinon.match(serializedMessage(msg)), | ||
sinon.match({ contentType: 'application/json' }), | ||
sinon.match.func | ||
); | ||
}); | ||
it('should return a promise with extra `ok` property', function(){ | ||
expect(promise.rejected).to.have.property('ok').that.is.false; | ||
expect(promise.rejected).to.be.rejectedWith(Error); | ||
expect(promise.resolved).to.have.property('ok').that.is.true; | ||
expect(promise.resolved).to.be.resolved; | ||
}); | ||
}); | ||
describe('#consume()', function(){ | ||
var receiveMessage = null; | ||
var onCancelled = sinon.spy(); | ||
channel.consume = sinon.spy(function(queue, callback){ | ||
receiveMessage = callback; | ||
}); | ||
channel.on('cancelled', onCancelled); | ||
before(function(){ | ||
return getChannel; | ||
}); | ||
// And this is how you get multiline test descriptions | ||
it('should be modified so that the callback supplied in the second', test()); | ||
it('argument will itself be invoked with the parsed message object', test()); | ||
it('and the original message whenever a message is recieved and', test()); | ||
it('should emit a `cancelled` event on the channel with the parameters', test(true)); | ||
it('passed to channel.consume() when a consumer gets cancelled', test(true)); | ||
function test(cancelled){ | ||
var consumer = sinon.spy(); | ||
var msg = cancelled ? null : { hello: 'world', when: Date.now() }; | ||
var testMsg = cancelled ? null : serialize(msg); | ||
return function(){ | ||
channel.consume('queue', consumer, { noAck: true }); | ||
receiveMessage(testMsg); | ||
if (cancelled) { | ||
expect(onCancelled).to.have.been.calledWithExactly( | ||
'queue', | ||
consumer, | ||
{ noAck: true } | ||
); | ||
} else { | ||
expect(consumer).to.have.been.calledWithExactly( | ||
sinon.match(msg), | ||
testMsg | ||
); | ||
} | ||
} | ||
} | ||
}); | ||
after(function(){ | ||
connect.reset(); | ||
createConfirmChannel.reset(); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
28921
8
467
0
216