Comparing version 16.0.0 to 16.1.0
# Change Log | ||
## 16.1.0 | ||
- Added concurrency option for managing RabbitMQ topology. Rascal will create and use upto this number of channels when asserting/checking/deleting/purging queues, exchanges and bindings. | ||
## 16.0.0 | ||
@@ -4,0 +8,0 @@ |
@@ -12,6 +12,8 @@ const debug = require('debug')('rascal:tasks:applyBindings'); | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.values(config.bindings), | ||
(binding, callback) => { | ||
bind[binding.destinationType](config, ctx.channel, binding, callback); | ||
config.concurrency, | ||
(binding, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
bind[binding.destinationType](config, channel, binding, cb); | ||
}, | ||
@@ -18,0 +20,0 @@ (err) => { |
@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:assertExchanges'); | ||
module.exports = _.curry((config, ctx, next) => { | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.keys(config.exchanges), | ||
(name, callback) => { | ||
assertExchange(ctx.channel, config.exchanges[name], callback); | ||
config.concurrency, | ||
(name, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
assertExchange(channel, config.exchanges[name], cb); | ||
}, | ||
@@ -22,3 +24,7 @@ (err) => { | ||
debug('Asserting exchange: %s', config.fullyQualifiedName); | ||
channel.assertExchange(config.fullyQualifiedName, config.type, config.options, next); | ||
channel.assertExchange(config.fullyQualifiedName, config.type, config.options, (err) => { | ||
if (err) return next(err); | ||
debug('Asserted exchange: %s', config.fullyQualifiedName); | ||
next(); | ||
}); | ||
} |
@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:assertQueues'); | ||
module.exports = _.curry((config, ctx, next) => { | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.keys(config.queues), | ||
(name, callback) => { | ||
assertQueue(ctx.channel, config.queues[name], callback); | ||
config.concurrency, | ||
(name, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
assertQueue(channel, config.queues[name], cb); | ||
}, | ||
@@ -12,0 +14,0 @@ (err) => { |
@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:checkExchanges'); | ||
module.exports = _.curry((config, ctx, next) => { | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.keys(config.exchanges), | ||
(name, callback) => { | ||
checkExchange(ctx.channel, config.exchanges[name], callback); | ||
config.concurrency, | ||
(name, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
checkExchange(channel, config.exchanges[name], cb); | ||
}, | ||
@@ -12,0 +14,0 @@ (err) => { |
@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:checkQueues'); | ||
module.exports = _.curry((config, ctx, next) => { | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.keys(config.queues), | ||
(name, callback) => { | ||
checkQueue(ctx.channel, config.queues[name], callback); | ||
config.concurrency, | ||
(name, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
checkQueue(channel, config.queues[name], cb); | ||
}, | ||
@@ -12,0 +14,0 @@ (err) => { |
@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:deleteExchanges'); | ||
module.exports = _.curry((config, ctx, next) => { | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.keys(config.exchanges), | ||
(name, callback) => { | ||
deleteExchange(ctx.channel, config.exchanges[name], callback); | ||
config.concurrency, | ||
(name, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
deleteExchange(channel, config.exchanges[name], cb); | ||
}, | ||
@@ -12,0 +14,0 @@ (err) => { |
@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:deleteQueues'); | ||
module.exports = _.curry((config, ctx, next) => { | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.keys(config.queues), | ||
(name, callback) => { | ||
deleteQueue(ctx.channel, config.queues[name], callback); | ||
config.concurrency, | ||
(name, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
deleteQueue(channel, config.queues[name], cb); | ||
}, | ||
@@ -12,0 +14,0 @@ (err) => { |
@@ -9,5 +9,5 @@ exports.applyBindings = require('./applyBindings'); | ||
exports.checkVhost = require('./checkVhost'); | ||
exports.closeChannel = require('./closeChannel'); | ||
exports.closeChannels = require('./closeChannels'); | ||
exports.closeConnection = require('./closeConnection'); | ||
exports.createChannel = require('./createChannel'); | ||
exports.createChannels = require('./createChannels'); | ||
exports.createConnection = require('./createConnection'); | ||
@@ -14,0 +14,0 @@ exports.deleteExchanges = require('./deleteExchanges'); |
@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:purgeQueues'); | ||
module.exports = _.curry((config, ctx, next) => { | ||
async.eachSeries( | ||
async.eachOfLimit( | ||
_.keys(config.queues), | ||
(name, callback) => { | ||
purgeQueue(ctx.channel, config.queues[name], ctx, callback); | ||
config.concurrency, | ||
(name, index, cb) => { | ||
const channel = ctx.channels[index % config.concurrency]; | ||
purgeQueue(channel, config.queues[name], ctx, cb); | ||
}, | ||
@@ -12,0 +14,0 @@ (err) => { |
@@ -29,6 +29,6 @@ const debug = require('debug')('rascal:Vhost'); | ||
const init = async.compose(tasks.closeChannel, tasks.applyBindings, tasks.purgeQueues, tasks.checkQueues, tasks.assertQueues, tasks.checkExchanges, tasks.assertExchanges, tasks.createChannel, tasks.createConnection, tasks.checkVhost, tasks.assertVhost); | ||
const init = async.compose(tasks.closeChannels, tasks.applyBindings, tasks.purgeQueues, tasks.checkQueues, tasks.assertQueues, tasks.checkExchanges, tasks.assertExchanges, tasks.createChannels, tasks.createConnection, tasks.checkVhost, tasks.assertVhost); | ||
const connect = async.compose(tasks.createConnection); | ||
const purge = async.compose(tasks.closeConnection, tasks.closeChannel, tasks.purgeQueues, tasks.createChannel, tasks.createConnection); | ||
const nuke = async.compose(tasks.closeConnection, tasks.closeChannel, tasks.deleteQueues, tasks.deleteExchanges, tasks.createChannel, tasks.createConnection); | ||
const purge = async.compose(tasks.closeConnection, tasks.closeChannels, tasks.purgeQueues, tasks.createChannels, tasks.createConnection); | ||
const nuke = async.compose(tasks.closeConnection, tasks.closeChannels, tasks.deleteQueues, tasks.deleteExchanges, tasks.createChannels, tasks.createConnection); | ||
let timer = backoff({}); | ||
@@ -35,0 +35,0 @@ let paused = true; |
module.exports = { | ||
defaults: { | ||
vhosts: { | ||
concurrency: 1, | ||
publicationChannelPools: { | ||
@@ -5,0 +6,0 @@ regularPool: { |
@@ -46,2 +46,3 @@ const debug = require('debug')('rascal:config:configure'); | ||
namespace: rascalConfig.defaults.vhosts.namespace, | ||
concurrency: rascalConfig.defaults.vhosts.concurrency, | ||
connectionStrategy: rascalConfig.defaults.vhosts.connectionStrategy, | ||
@@ -48,0 +49,0 @@ publicationChannelPools: rascalConfig.defaults.vhosts.publicationChannelPools, |
@@ -60,2 +60,6 @@ { | ||
}, | ||
"concurrency": { | ||
"type": "integer", | ||
"minimum": 1 | ||
}, | ||
"exchanges": { | ||
@@ -62,0 +66,0 @@ "oneOf": [ |
@@ -23,3 +23,3 @@ const debug = require('debug')('rascal:config:validate'); | ||
function validateVhost(vhost, vhostName) { | ||
validateAttributes('Vhost', vhost, vhostName, ['defaults', 'namespace', 'name', 'publicationChannelPools', 'connection', 'connections', 'connectionStrategy', 'exchanges', 'queues', 'bindings', 'check', 'assert']); | ||
validateAttributes('Vhost', vhost, vhostName, ['defaults', 'namespace', 'name', 'concurrency', 'publicationChannelPools', 'connection', 'connections', 'connectionStrategy', 'exchanges', 'queues', 'bindings', 'check', 'assert']); | ||
validateConnectionStrategy(vhost.connectionStrategy, vhostName); | ||
@@ -26,0 +26,0 @@ validateConnectionAttributes(vhost.connection, vhostName, ['slashes', 'protocol', 'hostname', 'user', 'password', 'port', 'vhost', 'options', 'retry', 'auth', 'pathname', 'query', 'url', 'loggableUrl', 'management']); |
{ | ||
"name": "rascal", | ||
"version": "16.0.0", | ||
"version": "16.1.0", | ||
"description": "A config driven wrapper for amqplib supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption, channel pooling and publication timeouts", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
239581
4290
1827