amqp-wrapper
Advanced tools
Comparing version 2.0.0 to 2.0.1
58
amqp.js
@@ -12,2 +12,33 @@ var amqp = require('amqplib'), | ||
/** | ||
* For publishing, we assert the queue is there and bind it to the routing | ||
* key we are going to use. | ||
*/ | ||
var setupForPublish = function(channel, queueParams) { | ||
var setupPublishes = queueParams.publish.map(function(queue) { | ||
return channel.assertQueue(queue.name, queue.options) | ||
.then(function() { | ||
return channel.bindQueue(queue.name, exchange, queue.routingKey); | ||
}); | ||
}); | ||
return Q.all(setupPublishes); | ||
}; | ||
// For consuming, we only assert the queue is there. | ||
var setupForConsume = function(channel, queueParams) { | ||
channel.prefetch(PREFETCH); | ||
return channel.assertQueue(queueParams.consume.name, | ||
queueParams.consume.options); | ||
}; | ||
if (process.env.NODE_ENV === 'test') { | ||
exports.replaceSetupFuncs = function(consume, publish) { | ||
setupForPublish = publish; | ||
setupForConsume = consume; | ||
}; | ||
exports.getSetupFuncs = function() { | ||
return {publish: setupForPublish, consume: setupForConsume}; | ||
}; | ||
} | ||
/** | ||
* Passes the AMQP channel created to the callback. | ||
@@ -29,29 +60,8 @@ */ | ||
var assert_exchange = ch.assertExchange(exchange, 'topic'); | ||
// For publishing, we assert the queue is there and bind it to the routing | ||
// key we are going to use. | ||
function setupForPublish() { | ||
var setupPublishes = queueParams.publish.map(function(queue) { | ||
return ch.assertQueue(queueParams.publish.name, | ||
queueParams.publish.options) | ||
.then(function() { | ||
return ch.bindQueue(queueParams.publish.name, exchange, | ||
queueParams.publish.routingKey); | ||
}); | ||
}); | ||
return Q.all(setupPublishes); | ||
} | ||
// For consuming, we only assert the queue is there. | ||
function setupForConsume() { | ||
ch.prefetch(PREFETCH); | ||
return ch.assertQueue(queueParams.consume.name, | ||
queueParams.consume.options); | ||
} | ||
var todo = assert_exchange; | ||
if (queueParams.publish && queueParams.publish.name && | ||
queueParams.publish.routingKey) { | ||
todo = todo.then(setupForPublish); | ||
if (queueParams.publish && queueParams.publish instanceof Array) { | ||
todo = todo.then(setupForPublish(ch, queueParams)); | ||
} | ||
if (queueParams.consume && queueParams.consume.name) { | ||
todo = todo.then(setupForConsume); | ||
todo = todo.then(setupForConsume(ch, queueParams)); | ||
} | ||
@@ -58,0 +68,0 @@ return todo; |
{ | ||
"name": "amqp-wrapper", | ||
"version": "2.0.0", | ||
"version": "2.0.1", | ||
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.", | ||
@@ -15,3 +15,4 @@ "main": "amqp.js", | ||
"amqp", | ||
"wrapper" | ||
"wrapper", | ||
"rabbitmq" | ||
], | ||
@@ -24,3 +25,3 @@ "author": "Timothy Leslie Allen <allen.timothy.email@gmail.com> (https://github.com/timlesallen)", | ||
"dependencies": { | ||
"amqplib": "~0.1.1", | ||
"amqplib": "~0.1.3", | ||
"q": "~0.9.7", | ||
@@ -27,0 +28,0 @@ "lodash": "~2.4.1" |
@@ -1,3 +0,2 @@ | ||
var AMQP = require('../amqp'), | ||
SandboxedModule = require('sandboxed-module'), | ||
var SandboxedModule = require('sandboxed-module'), | ||
expect = require('chai').expect, | ||
@@ -8,2 +7,6 @@ sinon = require('sinon'), | ||
describe('AMQP', function() { | ||
var AMQP; | ||
beforeEach(function() { | ||
AMQP = require('../amqp'); | ||
}); | ||
describe('#connect', function() { | ||
@@ -26,2 +29,21 @@ it('should call the callback successfully', function(done) { | ||
}); | ||
it('should setup for publishing and consuming', function(done) { | ||
AMQP.replaceSetupFuncs(sinon.spy(), sinon.spy()); | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', { | ||
consume: {name: 'myconsumequeue2'}, | ||
publish: [ | ||
{ | ||
name: 'mypublishqueue2', | ||
routingKey: 'mypublishqueue2' | ||
} | ||
] | ||
}, function(err, res) { | ||
if (err) return done(err); | ||
expect(AMQP.getSetupFuncs().consume.calledOnce, 'setupForConsume()').to. | ||
equal(true); | ||
expect(AMQP.getSetupFuncs().publish.calledOnce, 'setupForPublish()').to. | ||
equal(true); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
@@ -48,3 +70,3 @@ describe('#publishToQueue', function() { | ||
it('should call the callback successfully', function(done) { | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange',{}, | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {}, | ||
function(err, res) { | ||
@@ -51,0 +73,0 @@ if (err) return done(err); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
13367
300
2
Updatedamqplib@~0.1.3