Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp.channel

Package Overview
Dependencies
Maintainers
2
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp.channel - npm Package Compare versions

Comparing version 0.0.9 to 1.0.0

simplify.js

30

channel.js
var noop = function(){},
amqp = require('amqplib'),
Promise = require('bluebird');
Promise = require('bluebird'),
simplify = require('./simplify');

@@ -14,13 +15,19 @@ module.exports = function createChannel(url, assertions, log){

var user = amqp.auth.split(':')[0];
var close = connection.close.bind(connection);
var close = function(e){ connection.close(); return Promise.reject(e); };
log.info('Connected to %s as "%s"', amqp.host, user);
process.once('SIGINT', close);
process.once('SIGTERM', close);
return connection.createConfirmChannel().then(setupChannel, close);
return connection.createConfirmChannel().then(setupChannel).catch(close);
}
function setupChannel(channel) {
var setup = [];
var setup = [], channelIsBlocked = false;
for (var method in assertions) {
setup.push.apply(setup, assertions[method].map(applyToChannel(method)));
if (typeof channel[method] === 'function') {
setup.push.apply(setup, assertions[method].map(applyToChannel(method)));
} else {
return closeChannel(
new TypeError('Channel has no method "' + method + '"')
);
}
};

@@ -31,3 +38,8 @@

channel.on('unblocked', blocked(false));
channel.isBlocked = false;
if (!channel.hasOwnProperty('isBlocked')) {
Object.defineProperty(channel, 'isBlocked', {
get: function(){ return channelIsBlocked },
enumerable: true
});
}

@@ -45,3 +57,3 @@ return Promise.all(setup).then(returnChannel, closeChannel);

log.info('- Channel setup complete');
return channel;
return simplify(channel);
}

@@ -58,5 +70,5 @@

var level = isBlocked ? 'warn' : 'info';
return function changeState(){
return function changeBlockedState(){
log[level]('- Channel %s', state);
channel.isBlocked = isBlocked;
channelIsBlocked = isBlocked;
};

@@ -63,0 +75,0 @@ }

{
"name": "amqp.channel",
"version": "0.0.9",
"version": "1.0.0",
"description": "A simplified way to setup an AMQP connection/channel with amqplib",

@@ -5,0 +5,0 @@ "main": "channel.js",

@@ -8,5 +8,5 @@ # amqp.channel

## Compared to amqplib
## Simplified Configuration
amqplib syntax:
**amqplib syntax:**

@@ -23,9 +23,10 @@ ```javascript

channel.deleteExchange('alt.exchange', { ifEmpty: true }),
channel.assertQueue('queue', { durable: true }),
channel.checkQueue('queue'),
channel.bindQueue('queue', 'exchange', ''),
channel.unbindQueue('queue', 'exchange', ''),
channel.purgeQueue('queue'),
channel.deleteQueue('queue', { ifEmpty: true }),
channel.prefetch(1)
channel.assertQueue('first', { durable: true }),
channel.assertQueue('second'),
channel.checkQueue('first'),
channel.bindQueue('first', 'exchange', ''),
channel.unbindQueue('first', 'exchange', ''),
channel.purgeQueue('first'),
channel.deleteQueue('first', { ifEmpty: true }),
channel.deleteQueue('second')
]);

@@ -37,3 +38,3 @@ }).then(function(channel){

amqp.channel syntax:
**amqp.channel syntax:**

@@ -47,9 +48,8 @@ ```javascript

deleteExchange : [['alt.exchange', { ifEmpty: true }]],
assertQueue : [['queue', { durable: true }]],
checkQueue : [['queue']],
bindQueue : [['queue', 'exchange', '']],
unbindQueue : [['queue', 'exchange', '']],
purgeQueue : [['queue']],
deleteQueue : [['queue', { ifEmpty: true }]],
prefetch : [[1]]
assertQueue : [['first', { durable: true }], ['second']],
checkQueue : [['first']],
bindQueue : [['first', 'exchange', '']],
unbindQueue : [['first', 'exchange', '']],
purgeQueue : [['first']],
deleteQueue : [['first', { ifEmpty: true }], ['second']]
}).then(function(channel){

@@ -60,6 +60,67 @@ // Do stuff with the channel

## Example
## Simplified Usage
Say you wanted to listen to an exchange `'foo'` and send a different message to queue `'bar'` every time the message contained the word `'baz'`.
The `channel` object resolved by the returned `Promise` will behave differently from a normal `channel` object returned by the amqplib library in a few (hopefully convenient) ways:
1. The `consume`, `publish`, and `sendToQueue` channel methods have been changed to explicitly handle JSON.
2. The `publish` and `sendToQueue` methods have been "promisified" in a way that will still provide information to know whether or not the write buffer is full (and therefore, whether or not you should continue writing to it) by adding an additional `ok` boolean property to the promise.
3. A `channel` consumer callback will no longer receive `null` when that consumer had been cancelled by Rabbit MQ. Instead, the `channel` object will emit a `'cancelled'` event with all the arguments passed to the `channel.consume()` call for the consumer that was cancelled.
#### Examples of Modified Usage:
Automatic translation of JS object to JSON string to Buffer for sending/publishing:
```javascript
channel.sendToQueue('someQueue', { hello: 'world' });
channel.publish('someExchange', 'routingKey', { hello: 'world' });
```
Promisification of `sendToQueue` and `publish` methods:
```javascript
return channel.sendToQueue('someQueue', { hello: 'world' }).then(function(){
return channel.publish('someExchange', 'routingKey', { hello: 'world' });
});
```
Automatic translation of message Buffer to JSON string to JS object for consuming:
```javascript
channel.sendToQueue('someQueue', { hello: 'world' });
channel.consume('someQueue', function(parsedMessage, originalMessage){
console.log('hello', parsedMessage.hello); // => hello world
channel.ack(originalMessage);
});
```
Handling a consumer getting cancelled by Rabbit MQ:
```javascript
channel.on('cancelled', function(queue, callback, options){
// When the consumer below gets cancelled by Rabbit MQ
console.log(queue, callback.name, options); // 'someQueue', 'onMessage', { noAck: true }
});
channel.consume('someQueue', function onMessage(parsedMessage, originalMessage){
console.log(parsedMessage);
}, { noAck: true });
```
The `ok` property on the promises returned by the `sendToQueue` and `publish` methods:
```javascript
var sent = channel.sendToQueue('someQueue', { hello: 'world' });
if (sent.ok) {
// continue sending
} else {
// maybe pause sending until unblocked?
channel.once('drain', function(){
// continue sending
});
}
```
## Real World Example
Say you wanted to listen to the `'foo'` exchange and send a different message to the `'bar'` queue every time the message's `baz` property contained the word `'qux'`.
In your config.js:

@@ -78,2 +139,20 @@

cfg.channelMethodsToCall = {
assertQueue: // Channel method to invoke
[ // Array of channel method invocations
[ // channel.assertQueue( cfg.queue.toConsumeFrom )
cfg.queue.toConsumeFrom
],
[ // channel.assertQueue( cfg.queue.toSendTo, { durable: true } );
cfg.queue.toSendTo, { durable: true }
]
],
assertExchange: [
[ cfg.exchange, 'fanout' ] // channel.assetExchange(cfg.exchange, 'fanout')
],
bindQueue: [
[ cfg.queue.toConsumeFrom, cfg.exchange, '' ]
]
}
module.exports = cfg;

@@ -87,34 +166,51 @@ ```

var amqp = require('amqp.channel');
var channelMethodsToCall = {
// Channel method to invoke
assertQueue: [ // Array of channel method invocations
[ cfg.queue.toConsumeFrom ], // Arguments applied to the first invocation
[ cfg.queue.toSendTo, { durable: true }] // Arguments applied to the second
],
assertExchange: [
[ cfg.exchange, 'fanout' ] // channel.assetExchange(cfg.exchange, 'fanout')
],
bindQueue: [
[ cfg.queue.toConsumeFrom, cfg.exchange, '' ]
]
};
module.exports = amqp(cfg.amqpUrl, channelMethodsToCall).then(consumeQueue);
module.exports = amqp(cfg.amqpUrl, cfg.channelMethodsToCall)
.then(consumeAtMost(1))
.then(consumeFrom(cfg.queue.toConsumeFrom));
function consumeAtMost(maxMessages){
return function(channel){
// Only process `maxMessages` at a time and don't consume another
// message until we've either `ack` or `nack` the current one.
return channel.prefetch(maxMessage).then(function(){
return channel;
});
}
}
function consumeQueue(channel){
channel.consume(cfg.queue.toConsumeFrom, function onMessage(msg){
if (msg === null) {
channel.nack(msg);
console.warn('RabbitMQ cancelled consumer');
} else {
channel.ack(msg);
var message = msg.content.toString('utf8')
if (/baz/.test(message)) {
var msgToSend = new Buffer('qux');
function consumeFrom(queue){
return function(channel){
channel.consume(queue, function onMessage(parsed, msg){
if (/baz/.test(parsed.baz)) {
var msgToSend = { hello: 'world' };
var options = { persistent: true };
channel.sendToQueue(cfg.queue.toSendTo, msgToSend, options);
var sendMsg = channel.sendToQueue(cfg.queue.toSendTo, msgToSend, options);
sendMsg.catch(function(e){
console.error(e);
// Try to process message again?
// onMessage(parsed, msg);
});
if (sendMsg.ok) {
channel.ack(msg);
} else {
sendMsg.then(function(){
channel.ack(msg);
});
}
} else {
channel.ack(msg);
}
}
});
return channel;
});
channel.on('cancelled', function onConsumerCancelled(queue, cb, options){
console.warn('RabbitMQ cancelled your consumer for %s', queue);
// Try to setup the consumer again?
// channel.consume(queue, cb, options);
});
return channel;
}
}

@@ -121,0 +217,0 @@ ```

@@ -153,2 +153,7 @@ /*jslint nodejs: true, expr: true*/

});
after(function(){
connect.reset();
createConfirmChannel.reset();
});
});

@@ -159,5 +164,4 @@

var test = done.bind(null, null);
channel.assertExchange = channel.assertQueue =
sinon.stub().returns(Promise.reject(new Error('Failure')));
getChannel = stubChannel(amqpUrl, assertions, npmlog);
var methods = _.merge({ bogus : [['foo', 'bar']] }, assertions);
getChannel = stubChannel(amqpUrl, methods, npmlog);
getChannel.then(test, test);

@@ -170,3 +174,3 @@ });

.that.includes.something
.that.is.an.instanceOf(Error);
.that.is.an.instanceOf(TypeError);
})

@@ -180,5 +184,160 @@

it('should reject with an error', function(){
expect(getChannel).to.be.rejectedWith(Error);
expect(getChannel).to.be.rejectedWith(TypeError);
});
after(function(){
connect.reset();
createConfirmChannel.reset();
});
});
describe('Simplified', function(){
before(function(){
return getChannel = stubChannel(amqpUrl, null, null);
});
function serialize(thing){
return {
content: new Buffer(JSON.stringify(thing)),
fields: {},
properties: { contentType: 'application/json' }
};
}
function serializedMessage(msg){
return function validateSerializedMessage(buffer){
var parsed = JSON.parse(buffer.toString());
return expect(parsed).to.eql(msg) || true;
}
}
describe('#publish()', function(){
var promise = {
resolved: null,
rejected: null
};
var msg = { hello: 'world' };
var originalFn = sinon.stub();
originalFn.onFirstCall().returns(false).callsArgWith(4, new Error('test'));
originalFn.onSecondCall().returns(true).callsArgWith(4, null);
channel.publish = originalFn;
before(function(){
var ch = null
return getChannel.then(function(c){ ch = c;
return promise.rejected = ch.publish('exchange', 'routingKey', msg);
}).catch(function(){
return promise.resolved = ch.publish('exchange', 'routingKey', msg);
});
});
it('should call the original #publish method', function(){
expect(originalFn).to.have.been.calledTwice;
});
it('should serialize the passed message object into a Buffer', function(){
expect(originalFn).and.to.have.been.calledWithExactly(
'exchange',
'routingKey',
sinon.match(serializedMessage(msg)),
sinon.match({ contentType: 'application/json' }),
sinon.match.func
);
});
it('should return a promise with extra `ok` property', function(){
expect(promise.rejected).to.have.property('ok').that.is.false;
expect(promise.rejected).to.be.rejectedWith(Error);
expect(promise.resolved).to.have.property('ok').that.is.true;
expect(promise.resolved).to.be.resolved;
});
});
describe('#sendToQueue()', function(){
var promise = {
resolved: null,
rejected: null
};
var msg = { hello: 'world' };
var originalFn = sinon.stub();
originalFn.onFirstCall().returns(false).callsArgWith(3, new Error('test'));
originalFn.onSecondCall().returns(true).callsArgWith(3, null);
channel.sendToQueue = originalFn;
before(function(){
var ch = null
return getChannel.then(function(c){ ch = c;
return promise.rejected = ch.sendToQueue('queue', msg);
}).catch(function(){
return promise.resolved = ch.sendToQueue('queue', msg);
});
});
it('should call the original #publish method', function(){
expect(originalFn).to.have.been.calledTwice;
});
it('should serialize the passed message object into a Buffer', function(){
expect(originalFn).and.to.have.been.calledWithExactly(
'queue',
sinon.match(serializedMessage(msg)),
sinon.match({ contentType: 'application/json' }),
sinon.match.func
);
});
it('should return a promise with extra `ok` property', function(){
expect(promise.rejected).to.have.property('ok').that.is.false;
expect(promise.rejected).to.be.rejectedWith(Error);
expect(promise.resolved).to.have.property('ok').that.is.true;
expect(promise.resolved).to.be.resolved;
});
});
describe('#consume()', function(){
var receiveMessage = null;
var onCancelled = sinon.spy();
channel.consume = sinon.spy(function(queue, callback){
receiveMessage = callback;
});
channel.on('cancelled', onCancelled);
before(function(){
return getChannel;
});
// And this is how you get multiline test descriptions
it('should be modified so that the callback supplied in the second', test());
it('argument will itself be invoked with the parsed message object', test());
it('and the original message whenever a message is recieved and', test());
it('should emit a `cancelled` event on the channel with the parameters', test(true));
it('passed to channel.consume() when a consumer gets cancelled', test(true));
function test(cancelled){
var consumer = sinon.spy();
var msg = cancelled ? null : { hello: 'world', when: Date.now() };
var testMsg = cancelled ? null : serialize(msg);
return function(){
channel.consume('queue', consumer, { noAck: true });
receiveMessage(testMsg);
if (cancelled) {
expect(onCancelled).to.have.been.calledWithExactly(
'queue',
consumer,
{ noAck: true }
);
} else {
expect(consumer).to.have.been.calledWithExactly(
sinon.match(msg),
testMsg
);
}
}
}
});
after(function(){
connect.reset();
createConfirmChannel.reset();
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc