Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rascal

Package Overview
Dependencies
Maintainers
4
Versions
183
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rascal - npm Package Compare versions

Comparing version 16.0.0 to 16.1.0

lib/amqp/tasks/closeChannels.js

4

CHANGELOG.md
# Change Log
## 16.1.0
- Added concurrency option for managing RabbitMQ topology. Rascal will create and use upto this number of channels when asserting/checking/deleting/purging queues, exchanges and bindings.
## 16.0.0

@@ -4,0 +8,0 @@

8

lib/amqp/tasks/applyBindings.js

@@ -12,6 +12,8 @@ const debug = require('debug')('rascal:tasks:applyBindings');

async.eachSeries(
async.eachOfLimit(
_.values(config.bindings),
(binding, callback) => {
bind[binding.destinationType](config, ctx.channel, binding, callback);
config.concurrency,
(binding, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
bind[binding.destinationType](config, channel, binding, cb);
},

@@ -18,0 +20,0 @@ (err) => {

@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:assertExchanges');

module.exports = _.curry((config, ctx, next) => {
async.eachSeries(
async.eachOfLimit(
_.keys(config.exchanges),
(name, callback) => {
assertExchange(ctx.channel, config.exchanges[name], callback);
config.concurrency,
(name, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
assertExchange(channel, config.exchanges[name], cb);
},

@@ -22,3 +24,7 @@ (err) => {

debug('Asserting exchange: %s', config.fullyQualifiedName);
channel.assertExchange(config.fullyQualifiedName, config.type, config.options, next);
channel.assertExchange(config.fullyQualifiedName, config.type, config.options, (err) => {
if (err) return next(err);
debug('Asserted exchange: %s', config.fullyQualifiedName);
next();
});
}

@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:assertQueues');

module.exports = _.curry((config, ctx, next) => {
async.eachSeries(
async.eachOfLimit(
_.keys(config.queues),
(name, callback) => {
assertQueue(ctx.channel, config.queues[name], callback);
config.concurrency,
(name, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
assertQueue(channel, config.queues[name], cb);
},

@@ -12,0 +14,0 @@ (err) => {

@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:checkExchanges');

module.exports = _.curry((config, ctx, next) => {
async.eachSeries(
async.eachOfLimit(
_.keys(config.exchanges),
(name, callback) => {
checkExchange(ctx.channel, config.exchanges[name], callback);
config.concurrency,
(name, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
checkExchange(channel, config.exchanges[name], cb);
},

@@ -12,0 +14,0 @@ (err) => {

@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:checkQueues');

module.exports = _.curry((config, ctx, next) => {
async.eachSeries(
async.eachOfLimit(
_.keys(config.queues),
(name, callback) => {
checkQueue(ctx.channel, config.queues[name], callback);
config.concurrency,
(name, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
checkQueue(channel, config.queues[name], cb);
},

@@ -12,0 +14,0 @@ (err) => {

@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:deleteExchanges');

module.exports = _.curry((config, ctx, next) => {
async.eachSeries(
async.eachOfLimit(
_.keys(config.exchanges),
(name, callback) => {
deleteExchange(ctx.channel, config.exchanges[name], callback);
config.concurrency,
(name, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
deleteExchange(channel, config.exchanges[name], cb);
},

@@ -12,0 +14,0 @@ (err) => {

@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:deleteQueues');

module.exports = _.curry((config, ctx, next) => {
async.eachSeries(
async.eachOfLimit(
_.keys(config.queues),
(name, callback) => {
deleteQueue(ctx.channel, config.queues[name], callback);
config.concurrency,
(name, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
deleteQueue(channel, config.queues[name], cb);
},

@@ -12,0 +14,0 @@ (err) => {

@@ -9,5 +9,5 @@ exports.applyBindings = require('./applyBindings');

exports.checkVhost = require('./checkVhost');
exports.closeChannel = require('./closeChannel');
exports.closeChannels = require('./closeChannels');
exports.closeConnection = require('./closeConnection');
exports.createChannel = require('./createChannel');
exports.createChannels = require('./createChannels');
exports.createConnection = require('./createConnection');

@@ -14,0 +14,0 @@ exports.deleteExchanges = require('./deleteExchanges');

@@ -6,6 +6,8 @@ const debug = require('debug')('rascal:tasks:purgeQueues');

module.exports = _.curry((config, ctx, next) => {
async.eachSeries(
async.eachOfLimit(
_.keys(config.queues),
(name, callback) => {
purgeQueue(ctx.channel, config.queues[name], ctx, callback);
config.concurrency,
(name, index, cb) => {
const channel = ctx.channels[index % config.concurrency];
purgeQueue(channel, config.queues[name], ctx, cb);
},

@@ -12,0 +14,0 @@ (err) => {

@@ -29,6 +29,6 @@ const debug = require('debug')('rascal:Vhost');

const init = async.compose(tasks.closeChannel, tasks.applyBindings, tasks.purgeQueues, tasks.checkQueues, tasks.assertQueues, tasks.checkExchanges, tasks.assertExchanges, tasks.createChannel, tasks.createConnection, tasks.checkVhost, tasks.assertVhost);
const init = async.compose(tasks.closeChannels, tasks.applyBindings, tasks.purgeQueues, tasks.checkQueues, tasks.assertQueues, tasks.checkExchanges, tasks.assertExchanges, tasks.createChannels, tasks.createConnection, tasks.checkVhost, tasks.assertVhost);
const connect = async.compose(tasks.createConnection);
const purge = async.compose(tasks.closeConnection, tasks.closeChannel, tasks.purgeQueues, tasks.createChannel, tasks.createConnection);
const nuke = async.compose(tasks.closeConnection, tasks.closeChannel, tasks.deleteQueues, tasks.deleteExchanges, tasks.createChannel, tasks.createConnection);
const purge = async.compose(tasks.closeConnection, tasks.closeChannels, tasks.purgeQueues, tasks.createChannels, tasks.createConnection);
const nuke = async.compose(tasks.closeConnection, tasks.closeChannels, tasks.deleteQueues, tasks.deleteExchanges, tasks.createChannels, tasks.createConnection);
let timer = backoff({});

@@ -35,0 +35,0 @@ let paused = true;

module.exports = {
defaults: {
vhosts: {
concurrency: 1,
publicationChannelPools: {

@@ -5,0 +6,0 @@ regularPool: {

@@ -46,2 +46,3 @@ const debug = require('debug')('rascal:config:configure');

namespace: rascalConfig.defaults.vhosts.namespace,
concurrency: rascalConfig.defaults.vhosts.concurrency,
connectionStrategy: rascalConfig.defaults.vhosts.connectionStrategy,

@@ -48,0 +49,0 @@ publicationChannelPools: rascalConfig.defaults.vhosts.publicationChannelPools,

@@ -60,2 +60,6 @@ {

},
"concurrency": {
"type": "integer",
"minimum": 1
},
"exchanges": {

@@ -62,0 +66,0 @@ "oneOf": [

@@ -23,3 +23,3 @@ const debug = require('debug')('rascal:config:validate');

function validateVhost(vhost, vhostName) {
validateAttributes('Vhost', vhost, vhostName, ['defaults', 'namespace', 'name', 'publicationChannelPools', 'connection', 'connections', 'connectionStrategy', 'exchanges', 'queues', 'bindings', 'check', 'assert']);
validateAttributes('Vhost', vhost, vhostName, ['defaults', 'namespace', 'name', 'concurrency', 'publicationChannelPools', 'connection', 'connections', 'connectionStrategy', 'exchanges', 'queues', 'bindings', 'check', 'assert']);
validateConnectionStrategy(vhost.connectionStrategy, vhostName);

@@ -26,0 +26,0 @@ validateConnectionAttributes(vhost.connection, vhostName, ['slashes', 'protocol', 'hostname', 'user', 'password', 'port', 'vhost', 'options', 'retry', 'auth', 'pathname', 'query', 'url', 'loggableUrl', 'management']);

{
"name": "rascal",
"version": "16.0.0",
"version": "16.1.0",
"description": "A config driven wrapper for amqplib supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption, channel pooling and publication timeouts",

@@ -5,0 +5,0 @@ "main": "index.js",

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc