amqplib-easy
Advanced tools
Comparing version 3.4.0 to 4.0.0
@@ -1,2 +0,2 @@ | ||
- [`Create(amqpUrl)`](#createamqpurl---amqp) | ||
- [`Create(amqpUrl)`](#createamqpurl-socketoptions---amqp) | ||
- [`AMQP.consume(config, handler)`](#amqpconsumeconfig-handler---cancellationpromise) | ||
@@ -8,3 +8,3 @@ - [`AMQP.publish(config, key, message, [options])`](#amqppublishconfig-key-message-options---promise) | ||
### `Create(amqpUrl)` -> `AMQP` | ||
### `Create(amqpUrl, [socketOptions])` -> `AMQP` | ||
Create an `AMQP` which connects to `amqpUrl`. E.g., | ||
@@ -15,2 +15,5 @@ ```javascript | ||
[`socketOptions`](http://www.squaremobius.net/amqp.node/channel_api.html#connect) | ||
default to `maxChannels: 100`. | ||
### `AMQP.consume(config, handler)` -> `CancellationPromise` | ||
@@ -17,0 +20,0 @@ Asserts queue and exchange specified in [`config`](#config) and binds them |
81
index.js
@@ -8,3 +8,4 @@ 'use strict'; | ||
diehard = require('diehard'), | ||
connections = {}; | ||
connections = {}, | ||
sendChannels = {}; | ||
@@ -32,6 +33,9 @@ function cleanup(done) { | ||
module.exports = function (amqpUrl) { | ||
module.exports = function (amqpUrl, socketOptions) { | ||
function connect() { | ||
if (!connections[amqpUrl]) { | ||
connections[amqpUrl] = BPromise.resolve(amqp.connect(amqpUrl)); | ||
socketOptions = defaults({}, socketOptions || {}, { | ||
channelMax: 100 | ||
}); | ||
connections[amqpUrl] = BPromise.resolve(amqp.connect(amqpUrl, socketOptions)); | ||
} | ||
@@ -41,2 +45,19 @@ return connections[amqpUrl]; | ||
function sendChannel() { | ||
if (!sendChannels[amqpUrl]) { | ||
sendChannels[amqpUrl] = connect() | ||
.then(function (connection) { | ||
return connection.createConfirmChannel() | ||
.then(function (channel) { | ||
channel.on('error', function () { | ||
sendChannels[amqpUrl] = null; | ||
// clear out the channel since it's in a bad state | ||
}); | ||
return channel; | ||
}); | ||
}); | ||
} | ||
return sendChannels[amqpUrl]; | ||
} | ||
function createChannel() { | ||
@@ -48,9 +69,2 @@ return connect().then(function (connection) { | ||
function createChannelDisposer() { | ||
return createChannel() | ||
.disposer(function (channel) { | ||
return channel.close(); | ||
}); | ||
} | ||
function consume(queueConfig, handler) { | ||
@@ -142,5 +156,4 @@ var options = defaults({}, queueConfig || {}, { | ||
function publish(queueConfig, key, json, messageOptions) { | ||
return BPromise.using( | ||
createChannelDisposer(), | ||
function (ch) { | ||
return sendChannel() | ||
.then(function (ch) { | ||
if (queueConfig.exchange === null || queueConfig.exchange === undefined) { | ||
@@ -154,6 +167,16 @@ throw new Error('Client tries to publish to an exchange while exchange name is not undefined.'); | ||
.then(function () { | ||
return ch.publish(queueConfig.exchange, | ||
key, | ||
toBuffer(json), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}); | ||
return new BPromise(function (resolve, reject) { | ||
ch.publish(queueConfig.exchange, | ||
key, | ||
toBuffer(json), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}, | ||
function (err) { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
resolve(); | ||
} | ||
} | ||
); | ||
}); | ||
}); | ||
@@ -165,12 +188,20 @@ } | ||
function sendToQueue(queueConfig, json, messageOptions) { | ||
return BPromise.using( | ||
createChannelDisposer(), | ||
function (ch) { | ||
return sendChannel() | ||
.then(function (ch) { | ||
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) | ||
.then(function () { | ||
return ch.sendToQueue( | ||
queueConfig.queue, | ||
toBuffer(json), | ||
messageOptions || queueConfig.messageOptions || {persistent: true} | ||
); | ||
return new BPromise(function (resolve, reject) { | ||
ch.sendToQueue( | ||
queueConfig.queue, | ||
toBuffer(json), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}, | ||
function (err) { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
resolve(); | ||
} | ||
} | ||
); | ||
}); | ||
}); | ||
@@ -177,0 +208,0 @@ } |
{ | ||
"name": "amqplib-easy", | ||
"version": "3.4.0", | ||
"version": "4.0.0", | ||
"description": "Simplified API for interacting with AMQP", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -34,3 +34,3 @@ /*globals it:false*/ | ||
describe('', function () { | ||
describe('consumer', function () { | ||
var cancel; | ||
@@ -129,2 +129,23 @@ | ||
}); | ||
it('should publish even if something causes the channel to die', function (done) { | ||
amqp.consume( | ||
{ | ||
exchange: 'cat', | ||
queue: 'found_cats', | ||
topics: [ 'found.*' ] | ||
}, | ||
function () { | ||
done(); | ||
} | ||
) | ||
.then(function (c) { | ||
cancel = c; | ||
return amqp.publish({ exchange: 'cat', exchangeType: 'direct' }, 'found.tawny', { name: 'Sally' }) | ||
.catch(function () { | ||
return amqp.publish({ exchange: 'cat', exchangeType: 'topic' }, 'found.tawny', { name: 'Sally' }); | ||
}); | ||
}) | ||
.catch(done); | ||
}); | ||
}); | ||
@@ -131,0 +152,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
21429
406
13