Comparing version 1.8.1 to 1.9.0
109
index.js
@@ -7,2 +7,3 @@ 'use strict' | ||
const amqplib = require('amqplib/callback_api') | ||
const Pool = require('pool2') | ||
@@ -20,2 +21,3 @@ module.exports = function (opts) { | ||
if (!opts) opts = {} | ||
const self = this | ||
@@ -34,3 +36,2 @@ // Exposed items | ||
this._publish_channel = null | ||
this._work_channel = null | ||
this._exchange = null | ||
@@ -53,2 +54,25 @@ | ||
// Temp channels | ||
this._worker_pool = new Pool({ | ||
acquire: (callback) => { | ||
self.__connect(() => { | ||
self._connection.createChannel((err, channel) => { | ||
if (err) return callback(err) | ||
channel.on('error', () => {}) | ||
channel.on('close', () => {}) | ||
return callback(null, channel) | ||
}) | ||
}) | ||
}, | ||
dispose: (channel, callback) => { | ||
callback() | ||
}, | ||
min: 5, | ||
max: 10 | ||
}) | ||
return this | ||
@@ -540,54 +564,2 @@ } | ||
Remit.prototype.__use_work_channel = function __use_work_channel (callback) { | ||
const self = this | ||
if (!callback) { | ||
callback = function () {} | ||
} | ||
if (self._work_channel) { | ||
if (self._work_channel_callbacks.length) { | ||
self._work_channel_callbacks.push(callback) | ||
return | ||
} | ||
return callback() | ||
} | ||
const first = !self._work_channel_callbacks.length | ||
self._work_channel_callbacks.push(callback) | ||
if (!first) { | ||
return | ||
} | ||
self.__connect(() => { | ||
self._connection.createChannel((err, channel) => { | ||
channel.on('error', (err) => { | ||
self._work_channel = null | ||
self.__use_work_channel() | ||
}) | ||
channel.on('close', () => { | ||
self._work_channel = null | ||
self.__use_work_channel() | ||
}) | ||
self._work_channel = channel | ||
// Loop through and make everything happen! | ||
while (self._work_channel_callbacks.length > 0) { | ||
self._work_channel_callbacks[0]() | ||
self._work_channel_callbacks.shift() | ||
} | ||
}) | ||
}) | ||
} | ||
Remit.prototype.__assert_exchange = function __assert_exchange (callback) { | ||
@@ -631,4 +603,4 @@ const self = this | ||
// Let's try making this exchange! | ||
self.__use_work_channel(() => { | ||
self._work_channel.assertExchange(self._exchange_name, 'topic', { | ||
self._worker_pool.acquire((err, channel) => { | ||
channel.assertExchange(self._exchange_name, 'topic', { | ||
autoDelete: true | ||
@@ -638,4 +610,7 @@ }, (err, ok) => { | ||
console.error(err) | ||
self._worker_pool.remove(channel) | ||
} | ||
self._worker_pool.release(channel) | ||
// Everything went awesome so we'll let everything | ||
@@ -700,5 +675,7 @@ // know that the exchange is up. | ||
function check_and_republish() { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
self._worker_pool.acquire((err, channel) => { | ||
channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
if (err) { | ||
self._worker_pool.remove(channel) | ||
// If we got a proper queue error then the queue must | ||
@@ -714,2 +691,4 @@ // just not be around. | ||
} else { | ||
self._worker_pool.release(channel) | ||
self.__use_publish_channel(() => { | ||
@@ -742,5 +721,7 @@ self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties) | ||
function check_and_publish () { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
self._worker_pool.acquire((err, channel) => { | ||
channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
if (err) { | ||
self._worker_pool.remove(channel) | ||
// If we got a proper queue error then the queue must | ||
@@ -756,2 +737,4 @@ // just not be around. | ||
} else { | ||
self._worker_pool.release(channel) | ||
self.__use_publish_channel(() => { | ||
@@ -783,5 +766,7 @@ self._publish_channel.publish('', message.properties.replyTo, res_data, options) | ||
function check_and_republish () { | ||
self.__use_work_channel(() => { | ||
self._work_channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
self._worker_pool.acquire((err, channel) => { | ||
channel.checkQueue(message.properties.replyTo, (err, ok) => { | ||
if (err) { | ||
self._worker_pool.remove(channel) | ||
// If we got a proper queue error then the queue must | ||
@@ -797,2 +782,4 @@ // just not be around. | ||
} else { | ||
self._worker_pool.release(channel) | ||
self.__use_publish_channel(() => { | ||
@@ -799,0 +786,0 @@ self._publish_channel.publish('', message.properties.replyTo, message.content, message.properties) |
{ | ||
"name": "remit", | ||
"version": "1.8.1", | ||
"version": "1.9.0", | ||
"description": "A small set of functionality used to create microservices that don't need to be aware of one-another's existence.", | ||
@@ -13,4 +13,5 @@ "main": "index.js", | ||
"dependencies": { | ||
"amqplib": "^0.4.2", | ||
"debug": "^2.6.0", | ||
"amqplib": "^0.5.1", | ||
"debug": "^2.6.8", | ||
"pool2": "^1.4.1", | ||
"stack-trace": "0.0.9", | ||
@@ -17,0 +18,0 @@ "uuid": "^3.0.1" |
46848
5
779
+ Addedpool2@^1.4.1
+ Addedamqplib@0.5.6(transitive)
+ Addedbitsyntax@0.1.0(transitive)
+ Addedbluebird@3.7.2(transitive)
+ Addedbuffer-more-ints@1.0.0(transitive)
+ Addeddouble-ended-queue@2.1.0-0(transitive)
+ Addedhashmap@2.4.0(transitive)
+ Addedpool2@1.4.1(transitive)
+ Addedquerystringify@2.2.0(transitive)
+ Addedrequires-port@1.0.0(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedsimple-backoff@1.1.0(transitive)
+ Addedurl-parse@1.4.7(transitive)
- Removedamqplib@0.4.2(transitive)
- Removedbitsyntax@0.0.4(transitive)
- Removedbuffer-more-ints@0.0.2(transitive)
- Removedwhen@3.6.4(transitive)
Updatedamqplib@^0.5.1
Updateddebug@^2.6.8