Socket
Socket
Sign inDemoInstall

rascal

Package Overview
Dependencies
Maintainers
4
Versions
183
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rascal - npm Package Compare versions

Comparing version 13.1.3 to 14.0.0

5

.prettierrc.json

@@ -1,1 +0,4 @@

{}
{
"singleQuote": true,
"printWidth": 300
}
# Change Log
## 14.0.0
- Rather than waiting an arbitrary time for channels to close when cancelling a subscription, Rascal now waits until any outstanding messages have been acknowledged. By default, Rascal will wait indefinitely, but this behaviour can be overriden by specifying a subscription.closeTimeout. If the timeout is exceeded following a direct call to `broker.unsubscribeAll` or `subscription.cancel` then an error will be yielded. If the timeout is exceeded following an indirect call to `subscription.cancel` (e.g. by `broker.shutdown`) then an error will be emitted but the operation will be allowed to continue.
- Messages which cannot be recovered by the republish or forward strategies are nacked resulting in message loss unless a dead letter is configured.
## 13.1.4
- Fixed potential for returned messages cause the forward error strategy to yield twice
- Report returned messages from republish error strategy
- Tweak prettier rules
## 13.1.3

@@ -4,0 +15,0 @@

8

examples/advanced/cluster.js

@@ -1,3 +0,3 @@

var Rascal = require("../..");
var cluster = require("cluster");
var Rascal = require('../..');
var cluster = require('cluster');

@@ -7,7 +7,7 @@ if (cluster.isMaster) {

cluster.fork();
cluster.on("exit", function () {
cluster.on('exit', function () {
cluster.fork();
});
} else {
require("./index");
require('./index');
}

@@ -5,3 +5,3 @@ module.exports = {

// Define the name of the vhost
"customer-vhost": {
'customer-vhost': {
// Creates the vhost if it doesn't exist (requires the RabbitMQ management plugin to be installed)

@@ -14,7 +14,7 @@ assert: true,

{
url: "amqp://does-not-exist-1b9935d9-5066-4b13-84dc-a8e2bb618154:5672/customer-vhost",
url: 'amqp://does-not-exist-1b9935d9-5066-4b13-84dc-a8e2bb618154:5672/customer-vhost',
},
{
user: "guest",
password: "guest",
user: 'guest',
password: 'guest',
port: 5672,

@@ -32,6 +32,6 @@ options: {

exchanges: [
"service", // Shared exchange for all services within this vhost
"delay", // To delay failed messages before a retry
"retry", // To retry failed messages up to maximum number of times
"dead_letters", // When retrying fails, messages end up here
'service', // Shared exchange for all services within this vhost
'delay', // To delay failed messages before a retry
'retry', // To retry failed messages up to maximum number of times
'dead_letters', // When retrying fails, messages end up here
],

@@ -43,8 +43,8 @@

// Create a queue for saving users
"registration_service:user:save": {
'registration_service:user:save': {
options: {
arguments: {
// Route nacked messages to a service specific dead letter queue
"x-dead-letter-exchange": "dead_letters",
"x-dead-letter-routing-key": "registration_service.dead_letter",
'x-dead-letter-exchange': 'dead_letters',
'x-dead-letter-routing-key': 'registration_service.dead_letter',
},

@@ -55,8 +55,8 @@ },

// Create a queue for deleting users
"registration_service:user:delete": {
'registration_service:user:delete': {
options: {
arguments: {
// Route nacked messages to a service specific dead letter queue
"x-dead-letter-exchange": "dead_letters",
"x-dead-letter-routing-key": "registration_service.dead_letter",
'x-dead-letter-exchange': 'dead_letters',
'x-dead-letter-routing-key': 'registration_service.dead_letter',
},

@@ -67,8 +67,8 @@ },

// Create a delay queue to hold failed messages for a short interval before retrying
"delay:1m": {
'delay:1m': {
options: {
arguments: {
// Configure messages to expire after 1 minute, then route them to the retry exchange
"x-message-ttl": 60000,
"x-dead-letter-exchange": "retry",
'x-message-ttl': 60000,
'x-dead-letter-exchange': 'retry',
},

@@ -79,3 +79,3 @@ },

// Queue for holding dead letters until they can be resolved
"dead_letters:registration_service": {},
'dead_letters:registration_service': {},
},

@@ -87,21 +87,16 @@

// Route create/update user messages to the save queue
"service[registration_webapp.user.created.#,registration_webapp.user.updated.#] -> registration_service:user:save":
{},
'service[registration_webapp.user.created.#,registration_webapp.user.updated.#] -> registration_service:user:save': {},
// Route delete user messages to the delete queue
"service[registration_webapp.user.deleted.#] -> registration_service:user:delete":
{},
'service[registration_webapp.user.deleted.#] -> registration_service:user:delete': {},
// Route delayed messages to the 1 minute delay queue
"delay[delay.1m] -> delay:1m": {},
'delay[delay.1m] -> delay:1m': {},
// Route retried messages back to their original queue using the CC routing keys set by Rascal
"retry[registration_service:user:save.#] -> registration_service:user:save":
{},
"retry[registration_service:user:delete.#] -> registration_service:user:delete":
{},
'retry[registration_service:user:save.#] -> registration_service:user:save': {},
'retry[registration_service:user:delete.#] -> registration_service:user:delete': {},
// Route dead letters the service specific dead letter queue
"dead_letters[registration_service.dead_letter] -> dead_letters:registration_service":
{},
'dead_letters[registration_service.dead_letter] -> dead_letters:registration_service': {},
},

@@ -112,7 +107,7 @@

save_user: {
queue: "registration_service:user:save",
handler: "saveUser.js",
queue: 'registration_service:user:save',
handler: 'saveUser.js',
redeliveries: {
limit: 5,
counter: "shared",
counter: 'shared',
},

@@ -122,7 +117,7 @@ },

delete_user: {
queue: "registration_service:user:delete",
handler: "deleteUser.js",
queue: 'registration_service:user:delete',
handler: 'deleteUser.js',
redeliveries: {
limit: 5,
counter: "shared",
counter: 'shared',
},

@@ -136,7 +131,7 @@ },

save_user_succeeded: {
exchange: "service",
exchange: 'service',
},
delete_user_succeeded: {
exchange: "service",
encryption: "well-known-v1",
exchange: 'service',
encryption: 'well-known-v1',
},

@@ -146,5 +141,5 @@

retry_in_1m: {
exchange: "delay",
exchange: 'delay',
options: {
CC: ["delay.1m"],
CC: ['delay.1m'],
},

@@ -156,7 +151,7 @@ },

user_event: {
exchange: "service",
exchange: 'service',
// Specifying an encryption profile in the publication will cause the message content to be encrypted
// The profile name and iv are added as headers, and used to automatically decrypt messages,
// providing the consumer configuration has a matching profile.
encryption: "well-known-v1",
encryption: 'well-known-v1',
},

@@ -182,9 +177,9 @@ },

{
strategy: "forward",
strategy: 'forward',
attempts: 10,
publication: "retry_in_1m",
publication: 'retry_in_1m',
xDeathFix: true, // See https://github.com/rabbitmq/rabbitmq-server/issues/161
},
{
strategy: "nack",
strategy: 'nack',
},

@@ -198,3 +193,3 @@ ],

{
strategy: "republish",
strategy: 'republish',
immediateNack: true,

@@ -209,3 +204,3 @@ },

size: 10,
type: "inMemoryCluster",
type: 'inMemoryCluster',
},

@@ -216,6 +211,6 @@ },

encryption: {
"well-known-v1": {
key: "f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315",
'well-known-v1': {
key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315',
ivLength: 16,
algorithm: "aes-256-cbc",
algorithm: 'aes-256-cbc',
},

@@ -222,0 +217,0 @@ },

@@ -1,3 +0,3 @@

var chalk = require("chalk");
var format = require("util").format;
var chalk = require('chalk');
var format = require('util').format;

@@ -8,20 +8,13 @@ module.exports = function (broker) {

setImmediate(function () {
console.log(chalk.cyan("Deleting user:"), user.username);
console.log(chalk.cyan('Deleting user:'), user.username);
if (user.crash) throw new Error("Crashing on user: " + user.username);
if (user.crash) throw new Error('Crashing on user: ' + user.username);
var routingKey = format(
"registration_service.user.deleted.%s",
user.username
);
broker.publish(
"delete_user_succeeded",
routingKey,
function (err, publication) {
if (err) return cb(err);
publication.on("success", () => cb).on("error", console.error);
}
);
var routingKey = format('registration_service.user.deleted.%s', user.username);
broker.publish('delete_user_succeeded', routingKey, function (err, publication) {
if (err) return cb(err);
publication.on('success', () => cb).on('error', console.error);
});
});
};
};

@@ -1,3 +0,3 @@

var chalk = require("chalk");
var format = require("util").format;
var chalk = require('chalk');
var format = require('util').format;

@@ -8,5 +8,5 @@ module.exports = function (broker) {

setImmediate(function () {
console.log(chalk.magenta("Saving user:"), user.username);
console.log(chalk.magenta('Saving user:'), user.username);
if (user.crash) throw new Error("Crashing on user: " + user.username);
if (user.crash) throw new Error('Crashing on user: ' + user.username);

@@ -17,3 +17,3 @@ // Simulate errors and success

case 5: {
err = new Error("Connection Timeout");
err = new Error('Connection Timeout');
err.recoverable = true;

@@ -23,3 +23,3 @@ return cb(err);

case 7: {
err = new Error("Duplicate Key Violation");
err = new Error('Duplicate Key Violation');
err.recoverable = false;

@@ -29,14 +29,7 @@ return cb(err);

default: {
var routingKey = format(
"registration_service.user.saved.%s",
user.username
);
broker.publish(
"save_user_succeeded",
routingKey,
function (err, publication) {
if (err) return cb(err);
publication.on("success", () => cb).on("error", console.error);
}
);
var routingKey = format('registration_service.user.saved.%s', user.username);
broker.publish('save_user_succeeded', routingKey, function (err, publication) {
if (err) return cb(err);
publication.on('success', () => cb).on('error', console.error);
});
}

@@ -43,0 +36,0 @@ }

@@ -1,115 +0,93 @@

var Rascal = require("../..");
var config = require("./config");
var _ = require("lodash");
var Chance = require("Chance");
var Rascal = require('../..');
var config = require('./config');
var _ = require('lodash');
var Chance = require('Chance');
var chance = new Chance();
var format = require("util").format;
var format = require('util').format;
Rascal.Broker.create(
Rascal.withDefaultConfig(config.rascal),
function (err, broker) {
if (err) bail(err);
Rascal.Broker.create(Rascal.withDefaultConfig(config.rascal), function (err, broker) {
if (err) bail(err);
broker.on("error", function (err) {
console.error(err.message);
});
broker.on('error', function (err) {
console.error(err.message);
});
_.each(
broker.config.subscriptions,
function (subscriptionConfig, subscriptionName) {
if (!subscriptionConfig.handler) return;
_.each(broker.config.subscriptions, function (subscriptionConfig, subscriptionName) {
if (!subscriptionConfig.handler) return;
var handler = require("./handlers/" + subscriptionConfig.handler)(
broker
);
var handler = require('./handlers/' + subscriptionConfig.handler)(broker);
broker.subscribe(subscriptionName, function (err, subscription) {
if (err) return bail(err);
subscription
.on("message", function (message, content, ackOrNack) {
handler(content, function (err) {
if (!err) return ackOrNack();
console.log(err);
ackOrNack(
err,
err.recoverable
? broker.config.recovery.deferred_retry
: broker.config.recovery.dead_letter
);
});
})
.on("invalid_content", function (err, message, ackOrNack) {
console.error("Invalid Content", err.message);
ackOrNack(err, broker.config.recovery.dead_letter);
})
.on("redeliveries_exceeded", function (err, message, ackOrNack) {
console.error("Redeliveries Exceeded", err.message);
ackOrNack(err, broker.config.recovery.dead_letter);
})
.on("cancel", function (err) {
console.warn(err.message);
})
.on("error", function (err) {
console.error(err.message);
});
broker.subscribe(subscriptionName, function (err, subscription) {
if (err) return bail(err);
subscription
.on('message', function (message, content, ackOrNack) {
handler(content, function (err) {
if (!err) return ackOrNack();
console.log(err);
ackOrNack(err, err.recoverable ? broker.config.recovery.deferred_retry : broker.config.recovery.dead_letter);
});
})
.on('invalid_content', function (err, message, ackOrNack) {
console.error('Invalid Content', err.message);
ackOrNack(err, broker.config.recovery.dead_letter);
})
.on('redeliveries_exceeded', function (err, message, ackOrNack) {
console.error('Redeliveries Exceeded', err.message);
ackOrNack(err, broker.config.recovery.dead_letter);
})
.on('cancel', function (err) {
console.warn(err.message);
})
.on('error', function (err) {
console.error(err.message);
});
}
);
});
});
// Simulate a web app handling user registrations
setInterval(function () {
var user = {
username: chance.first() + "_" + chance.last(),
crash: randomInt(10) === 10,
};
var events = { 1: "created", 2: "updated", 3: "deleted" };
var event = events[randomInt(3)];
var routingKey = format(
"registration_webapp.user.%s.%s",
event,
user.username
);
// Simulate a web app handling user registrations
setInterval(function () {
var user = {
username: chance.first() + '_' + chance.last(),
crash: randomInt(10) === 10,
};
var events = { 1: 'created', 2: 'updated', 3: 'deleted' };
var event = events[randomInt(3)];
var routingKey = format('registration_webapp.user.%s.%s', event, user.username);
broker.publish(
"user_event",
user,
routingKey,
function (err, publication) {
if (err) return console.log(err.message);
publication
.on("success", function () {
// confirmed
})
.on("error", function (err) {
console.error(err.message);
});
}
);
}, 1000);
broker.publish('user_event', user, routingKey, function (err, publication) {
if (err) return console.log(err.message);
publication
.on('success', function () {
// confirmed
})
.on('error', function (err) {
console.error(err.message);
});
});
}, 1000);
process
.on("SIGINT", function () {
broker.shutdown(function () {
process.exit();
});
})
.on("SIGTERM", () => {
broker.shutdown(function () {
process.exit();
});
})
.on("unhandledRejection", (reason, p) => {
console.error(reason, "Unhandled Rejection at Promise", p);
broker.shutdown(function () {
process.exit(-1);
});
})
.on("uncaughtException", (err) => {
console.error(err, "Uncaught Exception thrown");
broker.shutdown(function () {
process.exit(-2);
});
process
.on('SIGINT', function () {
broker.shutdown(function () {
process.exit();
});
}
);
})
.on('SIGTERM', () => {
broker.shutdown(function () {
process.exit();
});
})
.on('unhandledRejection', (reason, p) => {
console.error(reason, 'Unhandled Rejection at Promise', p);
broker.shutdown(function () {
process.exit(-1);
});
})
.on('uncaughtException', (err) => {
console.error(err, 'Uncaught Exception thrown');
broker.shutdown(function () {
process.exit(-2);
});
});
});

@@ -116,0 +94,0 @@ function randomInt(max) {

module.exports = {
vhosts: {
"/": {
'/': {
publicationChannelPools: {

@@ -19,9 +19,9 @@ confirmPool: {

},
exchanges: ["demo_ex"],
queues: ["demo_q"],
bindings: ["demo_ex[a.b.c] -> demo_q"],
exchanges: ['demo_ex'],
queues: ['demo_q'],
bindings: ['demo_ex[a.b.c] -> demo_q'],
publications: {
demo_pub: {
exchange: "demo_ex",
routingKey: "a.b.c",
exchange: 'demo_ex',
routingKey: 'a.b.c',
confirm: false,

@@ -35,3 +35,3 @@ options: {

demo_sub: {
queue: "demo_q",
queue: 'demo_q',
},

@@ -38,0 +38,0 @@ },

@@ -1,4 +0,4 @@

const Rascal = require("../..");
const config = require("./config");
const random = require("random-readable");
const Rascal = require('../..');
const config = require('./config');
const random = require('random-readable');

@@ -8,32 +8,26 @@ Rascal.Broker.create(Rascal.withDefaultConfig(config), (err, broker) => {

broker.on("error", console.error);
broker.on('error', console.error);
const stream = random
.createRandomStream()
.on("error", console.error)
.on("data", (data) => {
broker.publish("demo_pub", data, (err, publication) => {
.on('error', console.error)
.on('data', (data) => {
broker.publish('demo_pub', data, (err, publication) => {
if (err) throw err;
publication.on("error", console.error);
publication.on('error', console.error);
});
})
.on("end", () => {
console.log("end");
.on('end', () => {
console.log('end');
});
broker.on("busy", (details) => {
console.log(
Date.now(),
`Pausing vhost: ${details.vhost} (mode: ${details.mode}, queue: ${details.queue}, size: ${details.size}, borrowed: ${details.borrowed}, available: ${details.available})`
);
broker.on('busy', (details) => {
console.log(Date.now(), `Pausing vhost: ${details.vhost} (mode: ${details.mode}, queue: ${details.queue}, size: ${details.size}, borrowed: ${details.borrowed}, available: ${details.available})`);
stream.pause();
});
broker.on("ready", (details) => {
console.log(
Date.now(),
`Resuming vhost: ${details.vhost} (mode: ${details.mode}, queue: ${details.queue}, size: ${details.size}, borrowed: ${details.borrowed}, available: ${details.available})`
);
broker.on('ready', (details) => {
console.log(Date.now(), `Resuming vhost: ${details.vhost} (mode: ${details.mode}, queue: ${details.queue}, size: ${details.size}, borrowed: ${details.borrowed}, available: ${details.available})`);
stream.resume();
});
});
module.exports = {
vhosts: {
"/": {
'/': {
connection: {

@@ -10,7 +10,7 @@ heartbeat: 1,

},
exchanges: [""],
queues: ["demo_q"],
exchanges: [''],
queues: ['demo_q'],
publications: {
demo_pub: {
exchange: "",
exchange: '',
},

@@ -20,3 +20,3 @@ },

demo_sub: {
queue: "demo_q",
queue: 'demo_q',
},

@@ -23,0 +23,0 @@ },

@@ -1,3 +0,3 @@

var Rascal = require("../..");
var config = require("./config");
var Rascal = require('../..');
var config = require('./config');

@@ -8,24 +8,19 @@ Rascal.Broker.create(Rascal.withDefaultConfig(config), function (err, broker) {

broker
.subscribe("demo_sub", function (err, subscription) {
.subscribe('demo_sub', function (err, subscription) {
if (err) throw err;
subscription
.on("message", function (message, content, ackOrNack) {
.on('message', function (message, content, ackOrNack) {
console.log(content);
ackOrNack();
})
.on("error", console.error);
.on('error', console.error);
})
.on("error", console.error);
.on('error', console.error);
setInterval(function () {
broker.publish(
"demo_pub",
new Date().toISOString() + ": hello world",
"demo_q",
function (err, publication) {
if (err) throw err;
publication.on("error", console.error);
}
);
broker.publish('demo_pub', new Date().toISOString() + ': hello world', 'demo_q', function (err, publication) {
if (err) throw err;
publication.on('error', console.error);
});
}, 1000);
});
module.exports = {
vhosts: {
"/": {
exchanges: ["demo_ex"],
queues: ["demo_q"],
bindings: ["demo_ex[a.b.c] -> demo_q"],
'/': {
exchanges: ['demo_ex'],
queues: ['demo_q'],
bindings: ['demo_ex[a.b.c] -> demo_q'],
publications: {
demo_pub: {
exchange: "demo_ex",
routingKey: "a.b.c",
exchange: 'demo_ex',
routingKey: 'a.b.c',
},

@@ -15,3 +15,3 @@ },

demo_sub: {
queue: "demo_q",
queue: 'demo_q',
},

@@ -18,0 +18,0 @@ },

@@ -1,18 +0,15 @@

var assert = require("assert");
var Rascal = require("../..");
var config = require("./config.js");
var assert = require('assert');
var Rascal = require('../..');
var config = require('./config.js');
describe("Example rascal test", function () {
describe('Example rascal test', function () {
var broker;
before(function (done) {
config.vhosts["/"].publications.test_pub = { exchange: "demo_ex" };
Rascal.Broker.create(
Rascal.withTestConfig(config),
function (err, _broker) {
if (err) return done(err);
broker = _broker;
done();
}
);
config.vhosts['/'].publications.test_pub = { exchange: 'demo_ex' };
Rascal.Broker.create(Rascal.withTestConfig(config), function (err, _broker) {
if (err) return done(err);
broker = _broker;
done();
});
});

@@ -29,6 +26,6 @@

it("should demonstrate tests", function (done) {
broker.subscribe("demo_sub", function (err, subscription) {
it('should demonstrate tests', function (done) {
broker.subscribe('demo_sub', function (err, subscription) {
assert.ifError(err);
subscription.on("message", function (message, content, ackOrNack) {
subscription.on('message', function (message, content, ackOrNack) {
subscription.cancel();

@@ -40,11 +37,6 @@ ackOrNack();

broker.publish(
"test_pub",
"Hello Test",
"a.b.c",
function (err, publication) {
assert.ifError(err);
}
);
broker.publish('test_pub', 'Hello Test', 'a.b.c', function (err, publication) {
assert.ifError(err);
});
});
});
module.exports = {
vhosts: {
"/": {
'/': {
connection: {

@@ -10,9 +10,9 @@ heartbeat: 1,

},
exchanges: ["demo_ex"],
queues: ["demo_q"],
bindings: ["demo_ex[a.b.c] -> demo_q"],
exchanges: ['demo_ex'],
queues: ['demo_q'],
bindings: ['demo_ex[a.b.c] -> demo_q'],
publications: {
demo_pub: {
exchange: "demo_ex",
routingKey: "a.b.c",
exchange: 'demo_ex',
routingKey: 'a.b.c',
},

@@ -22,3 +22,3 @@ },

demo_sub: {
queue: "demo_q",
queue: 'demo_q',
},

@@ -25,0 +25,0 @@ },

@@ -1,19 +0,17 @@

var Rascal = require("../..");
var config = require("./config");
var Rascal = require('../..');
var config = require('./config');
(async function () {
try {
const broker = await Rascal.BrokerAsPromised.create(
Rascal.withDefaultConfig(config)
);
broker.on("error", console.error);
const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config));
broker.on('error', console.error);
try {
const subscription = await broker.subscribe("demo_sub");
const subscription = await broker.subscribe('demo_sub');
subscription
.on("message", function (message, content, ackOrNack) {
.on('message', function (message, content, ackOrNack) {
console.log(content);
ackOrNack();
})
.on("error", console.error);
.on('error', console.error);
} catch (err) {

@@ -25,7 +23,4 @@ console.error(err);

try {
const publication = await broker.publish(
"demo_pub",
new Date().toISOString() + ": hello world"
);
publication.on("error", console.error);
const publication = await broker.publish('demo_pub', new Date().toISOString() + ': hello world');
publication.on('error', console.error);
} catch (err) {

@@ -32,0 +27,0 @@ console.error(err);

module.exports = {
vhosts: {
"/": {
'/': {
publicationChannelPools: {

@@ -17,9 +17,9 @@ confirmPool: {

},
exchanges: ["demo_ex"],
queues: ["demo_q"],
bindings: ["demo_ex[a.b.c] -> demo_q"],
exchanges: ['demo_ex'],
queues: ['demo_q'],
bindings: ['demo_ex[a.b.c] -> demo_q'],
publications: {
demo_pub: {
exchange: "demo_ex",
routingKey: "a.b.c",
exchange: 'demo_ex',
routingKey: 'a.b.c',
options: {

@@ -32,3 +32,3 @@ persistent: false,

demo_sub: {
queue: "demo_q",
queue: 'demo_q',
},

@@ -35,0 +35,0 @@ },

@@ -1,3 +0,3 @@

var Rascal = require("../..");
var config = require("./config");
var Rascal = require('../..');
var config = require('./config');

@@ -7,23 +7,19 @@ Rascal.Broker.create(Rascal.withDefaultConfig(config), function (err, broker) {

broker.subscribe("demo_sub", function (err, subscription) {
broker.subscribe('demo_sub', function (err, subscription) {
if (err) throw err;
subscription.on("message", function (message, content, ackOrNack) {
subscription.on('message', function (message, content, ackOrNack) {
console.log(content);
ackOrNack();
});
subscription.on("error", console.error);
subscription.on("cancel", console.warn);
subscription.on('error', console.error);
subscription.on('cancel', console.warn);
});
broker.on("error", console.error);
broker.on('error', console.error);
setInterval(function () {
broker.publish(
"demo_pub",
new Date().toISOString() + ": hello world",
function (err, publication) {
if (err) throw err;
publication.on("error", console.error);
}
);
broker.publish('demo_pub', new Date().toISOString() + ': hello world', function (err, publication) {
if (err) throw err;
publication.on('error', console.error);
});
}, 1000);
});

@@ -1,6 +0,6 @@

const _ = require("lodash");
const defaultConfig = require("./lib/config/defaults");
const testConfig = require("./lib/config/tests");
const Broker = require("./lib/amqp/Broker");
const BrokerAsPromised = require("./lib/amqp/BrokerAsPromised");
const _ = require('lodash');
const defaultConfig = require('./lib/config/defaults');
const testConfig = require('./lib/config/tests');
const Broker = require('./lib/amqp/Broker');
const BrokerAsPromised = require('./lib/amqp/BrokerAsPromised');

@@ -21,4 +21,4 @@ module.exports = (function () {

},
counters: require("./lib/counters"),
counters: require('./lib/counters'),
};
})();

@@ -1,16 +0,16 @@

const debug = require("debug")("rascal:Broker");
const format = require("util").format;
const inherits = require("util").inherits;
const EventEmitter = require("events").EventEmitter;
const _ = require("lodash");
const async = require("async");
const tasks = require("./tasks");
const configure = require("../config/configure");
const validate = require("../config/validate");
const fqn = require("../config/fqn");
const debug = require('debug')('rascal:Broker');
const format = require('util').format;
const inherits = require('util').inherits;
const EventEmitter = require('events').EventEmitter;
const _ = require('lodash');
const async = require('async');
const tasks = require('./tasks');
const configure = require('../config/configure');
const validate = require('../config/validate');
const fqn = require('../config/fqn');
const preflight = async.compose(validate, configure);
const stub = require("../counters/stub");
const inMemory = require("../counters/inMemory");
const inMemoryCluster = require("../counters/inMemoryCluster").worker;
const setTimeoutUnref = require("../utils/setTimeoutUnref");
const stub = require('../counters/stub');
const inMemory = require('../counters/inMemory');
const inMemoryCluster = require('../counters/inMemoryCluster').worker;
const setTimeoutUnref = require('../utils/setTimeoutUnref');
const maxInterval = 2147483647;

@@ -36,14 +36,4 @@

let sessions = [];
const init = async.compose(
tasks.initShovels,
tasks.initSubscriptions,
tasks.initPublications,
tasks.initCounters,
tasks.initVhosts
);
const nukeVhost = async.compose(
tasks.deleteVhost,
tasks.shutdownVhost,
tasks.nukeVhost
);
const init = async.compose(tasks.initShovels, tasks.initSubscriptions, tasks.initPublications, tasks.initCounters, tasks.initVhosts);
const nukeVhost = async.compose(tasks.deleteVhost, tasks.shutdownVhost, tasks.nukeVhost);
const purgeVhost = tasks.purgeVhost;

@@ -63,3 +53,3 @@ const forewarnVhost = tasks.forewarnVhost;

this._init = function (next) {
debug("Initialising broker");
debug('Initialising broker');
vhosts = {};

@@ -78,29 +68,8 @@ publications = {};

this.connect = function (name, next) {
if (!vhosts[name])
return next(new Error(format("Unknown vhost: %s", name)));
if (!vhosts[name]) return next(new Error(format('Unknown vhost: %s', name)));
vhosts[name].connect(next);
};
this.nuke = function (next) {
debug("Nuking broker");
self.unsubscribeAll((err) => {
if (err) return next(err);
async.eachSeries(
_.values(vhosts),
(vhost, callback) => {
nukeVhost(config, { vhost }, callback);
},
(err) => {
if (err) return next(err);
vhosts = publications = subscriptions = {};
clearInterval(self.keepActive);
debug("Finished nuking broker");
next();
}
);
});
};
this.purge = function (next) {
debug("Purging all queues in all vhosts");
debug('Purging all queues in all vhosts');
async.eachSeries(

@@ -113,3 +82,3 @@ _.values(vhosts),

if (err) return next(err);
debug("Finished purging all queues in all vhosts");
debug('Finished purging all queues in all vhosts');
next();

@@ -121,3 +90,3 @@ }

this.shutdown = function (next) {
debug("Shutting down broker");
debug('Shutting down broker');
async.eachSeries(

@@ -131,3 +100,3 @@ _.values(vhosts),

self.unsubscribeAll((err) => {
if (err) return next(err);
if (err) self.emit('error', err);
async.eachSeries(

@@ -141,3 +110,3 @@ _.values(vhosts),

clearInterval(self.keepActive);
debug("Finished shutting down broker");
debug('Finished shutting down broker');
next();

@@ -152,5 +121,5 @@ }

this.bounce = function (next) {
debug("Bouncing broker");
debug('Bouncing broker');
self.unsubscribeAll((err) => {
if (err) return next(err);
if (err) self.emit('error', err);
async.eachSeries(

@@ -163,3 +132,3 @@ _.values(vhosts),

if (err) return next(err);
debug("Finished bouncing broker");
debug('Finished bouncing broker');
next();

@@ -171,9 +140,26 @@ }

this.nuke = function (next) {
debug('Nuking broker');
self.unsubscribeAll((err) => {
if (err) self.emit('error', err);
async.eachSeries(
_.values(vhosts),
(vhost, callback) => {
nukeVhost(config, { vhost }, callback);
},
(err) => {
if (err) return next(err);
vhosts = publications = subscriptions = {};
clearInterval(self.keepActive);
debug('Finished nuking broker');
next();
}
);
});
};
this.publish = function (name, message, overrides, next) {
if (arguments.length === 3)
return self.publish(name, message, {}, arguments[2]);
if (_.isString(overrides))
return self.publish(name, message, { routingKey: overrides }, next);
if (!publications[name])
return next(new Error(format("Unknown publication: %s", name)));
if (arguments.length === 3) return self.publish(name, message, {}, arguments[2]);
if (_.isString(overrides)) return self.publish(name, message, { routingKey: overrides }, next);
if (!publications[name]) return next(new Error(format('Unknown publication: %s', name)));
publications[name].publish(message, overrides, next);

@@ -183,8 +169,5 @@ };

this.forward = function (name, message, overrides, next) {
if (arguments.length === 3)
return self.forward(name, message, {}, arguments[2]);
if (_.isString(overrides))
return self.forward(name, message, { routingKey: overrides }, next);
if (!config.publications[name])
return next(new Error(format("Unknown publication: %s", name)));
if (arguments.length === 3) return self.forward(name, message, {}, arguments[2]);
if (_.isString(overrides)) return self.forward(name, message, { routingKey: overrides }, next);
if (!config.publications[name]) return next(new Error(format('Unknown publication: %s', name)));
publications[name].forward(message, overrides, next);

@@ -195,4 +178,3 @@ };

if (arguments.length === 2) return self.subscribe(name, {}, arguments[1]);
if (!subscriptions[name])
return next(new Error(format("Unknown subscription: %s", name)));
if (!subscriptions[name]) return next(new Error(format('Unknown subscription: %s', name)));
subscriptions[name].subscribe(overrides, (err, session) => {

@@ -210,6 +192,3 @@ if (err) return next(err);

}, arguments[0]);
const filteredSubscriptions = _.chain(config.subscriptions)
.values()
.filter(filter)
.value();
const filteredSubscriptions = _.chain(config.subscriptions).values().filter(filter).value();
async.mapSeries(

@@ -228,4 +207,3 @@ filteredSubscriptions,

this.unsubscribeAll = function (next) {
const timeout = getMaxDeferCloseChannelTimeout();
async.eachSeries(
async.each(
sessions.slice(),

@@ -236,7 +214,3 @@ (session, cb) => {

},
(err) => {
if (err) return next(err);
debug("Waiting %dms for all subscriber channels to close", timeout);
setTimeoutUnref(next, timeout);
}
next
);

@@ -266,8 +240,2 @@ };

};
function getMaxDeferCloseChannelTimeout() {
return sessions.reduce((value, session) => {
return session._maxDeferCloseChannel(value);
}, 0);
}
}

@@ -1,7 +0,7 @@

const inherits = require("util").inherits;
const EventEmitter = require("events").EventEmitter;
const forwardEvents = require("forward-emitter");
const _ = require("lodash");
const Broker = require("./Broker");
const SubscriberSessionAsPromised = require("./SubscriberSessionAsPromised");
const inherits = require('util').inherits;
const EventEmitter = require('events').EventEmitter;
const forwardEvents = require('forward-emitter');
const _ = require('lodash');
const Broker = require('./Broker');
const SubscriberSessionAsPromised = require('./SubscriberSessionAsPromised');

@@ -33,12 +33,3 @@ module.exports = {

function BrokerAsPromised(broker) {
const methods = [
"connect",
"nuke",
"purge",
"shutdown",
"bounce",
"publish",
"forward",
"unsubscribeAll",
];
const methods = ['connect', 'nuke', 'purge', 'shutdown', 'bounce', 'publish', 'forward', 'unsubscribeAll'];
const self = this;

@@ -45,0 +36,0 @@

@@ -1,8 +0,8 @@

const debug = require("debug")("rascal:Publication");
const format = require("util").format;
const _ = require("lodash");
const uuid = require("uuid").v4;
const crypto = require("crypto");
const PublicationSession = require("./PublicationSession");
const setTimeoutUnref = require("../utils/setTimeoutUnref");
const debug = require('debug')('rascal:Publication');
const format = require('util').format;
const _ = require('lodash');
const uuid = require('uuid').v4;
const crypto = require('crypto');
const PublicationSession = require('./PublicationSession');
const setTimeoutUnref = require('../utils/setTimeoutUnref');

@@ -18,52 +18,10 @@ module.exports = {

if (
Object.prototype.hasOwnProperty.call(config, "exchange") &&
config.confirm
)
return new Publication(
vhost,
borrowConfirmChannel,
returnConfirmChannel,
destroyConfirmChannel,
publishToConfirmExchange,
config
).init(next);
if (Object.prototype.hasOwnProperty.call(config, "exchange"))
return new Publication(
vhost,
borrowChannel,
returnChannel,
destroyChannel,
publishToExchange,
config
).init(next);
if (config.queue && config.confirm)
return new Publication(
vhost,
borrowConfirmChannel,
returnConfirmChannel,
destroyConfirmChannel,
sendToConfirmQueue,
config
).init(next);
if (config.queue)
return new Publication(
vhost,
borrowChannel,
returnChannel,
destroyChannel,
sendToQueue,
config
).init(next);
if (Object.prototype.hasOwnProperty.call(config, 'exchange') && config.confirm) return new Publication(vhost, borrowConfirmChannel, returnConfirmChannel, destroyConfirmChannel, publishToConfirmExchange, config).init(next);
if (Object.prototype.hasOwnProperty.call(config, 'exchange')) return new Publication(vhost, borrowChannel, returnChannel, destroyChannel, publishToExchange, config).init(next);
if (config.queue && config.confirm) return new Publication(vhost, borrowConfirmChannel, returnConfirmChannel, destroyConfirmChannel, sendToConfirmQueue, config).init(next);
if (config.queue) return new Publication(vhost, borrowChannel, returnChannel, destroyChannel, sendToQueue, config).init(next);
},
};
function Publication(
vhost,
borrowChannelFn,
returnChannelFn,
destroyChannelFn,
publishFn,
config
) {
function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn, publishFn, config) {
const self = this;

@@ -74,3 +32,3 @@

this.init = function (next) {
debug("Initialising publication: %s", config.name);
debug('Initialising publication: %s', config.name);
next(null, self);

@@ -82,9 +40,6 @@ };

const content = getContent(payload);
publishConfig.options.contentType =
publishConfig.options.contentType || content.type;
publishConfig.options.contentType = publishConfig.options.contentType || content.type;
publishConfig.options.messageId = publishConfig.options.messageId || uuid();
publishConfig.encryption
? _publishEncrypted(content.buffer, publishConfig, next)
: _publish(content.buffer, publishConfig, next);
publishConfig.encryption ? _publishEncrypted(content.buffer, publishConfig, next) : _publish(content.buffer, publishConfig, next);
};

@@ -98,34 +53,8 @@

publishConfig.options = _.defaultsDeep(
publishConfig.options,
message.properties
);
publishConfig.options = _.defaultsDeep(publishConfig.options, message.properties);
_.set(
publishConfig,
"options.headers.rascal.restoreRoutingHeaders",
!!publishConfig.restoreRoutingHeaders
);
_.set(
publishConfig,
"options.headers.rascal.originalExchange",
message.fields.exchange
);
_.set(
publishConfig,
"options.headers.rascal.originalRoutingKey",
message.fields.routingKey
);
_.set(
publishConfig,
"options.CC",
_.chain([])
.concat(
publishConfig.options.CC,
format("%s.%s", originalQueue, publishConfig.routingKey)
)
.uniq()
.compact()
.value()
);
_.set(publishConfig, 'options.headers.rascal.restoreRoutingHeaders', !!publishConfig.restoreRoutingHeaders);
_.set(publishConfig, 'options.headers.rascal.originalExchange', message.fields.exchange);
_.set(publishConfig, 'options.headers.rascal.originalRoutingKey', message.fields.routingKey);
_.set(publishConfig, 'options.CC', _.chain([]).concat(publishConfig.options.CC, format('%s.%s', originalQueue, publishConfig.routingKey)).uniq().compact().value());

@@ -137,29 +66,12 @@ _publish(message.content, publishConfig, next);

const encryptionConfig = publishConfig.encryption;
encrypt(
encryptionConfig.algorithm,
encryptionConfig.key,
encryptionConfig.ivLength,
buffer,
(err, iv, encrypted) => {
if (err) return next(err);
debug(
"Message was encrypted using encryption profile: %s",
encryptionConfig.name
);
_.set(
publishConfig,
"options.headers.rascal.encryption.name",
encryptionConfig.name
);
_.set(publishConfig, "options.headers.rascal.encryption.iv", iv);
_.set(
publishConfig,
"options.headers.rascal.encryption.originalContentType",
publishConfig.options.contentType
);
_.set(publishConfig, "options.contentType", "application/octet-stream");
encrypt(encryptionConfig.algorithm, encryptionConfig.key, encryptionConfig.ivLength, buffer, (err, iv, encrypted) => {
if (err) return next(err);
debug('Message was encrypted using encryption profile: %s', encryptionConfig.name);
_.set(publishConfig, 'options.headers.rascal.encryption.name', encryptionConfig.name);
_.set(publishConfig, 'options.headers.rascal.encryption.iv', iv);
_.set(publishConfig, 'options.headers.rascal.encryption.originalContentType', publishConfig.options.contentType);
_.set(publishConfig, 'options.contentType', 'application/octet-stream');
_publish(encrypted, publishConfig, next);
}
);
_publish(encrypted, publishConfig, next);
});
}

@@ -172,3 +84,3 @@

try {
const key = Buffer.from(keyHex, "hex");
const key = Buffer.from(keyHex, 'hex');
const cipher = crypto.createCipheriv(algorithm, key, iv);

@@ -179,3 +91,3 @@ encrypted = Buffer.concat([cipher.update(unencrypted), cipher.final()]);

}
next(null, iv.toString("hex"), encrypted);
next(null, iv.toString('hex'), encrypted);
});

@@ -189,8 +101,6 @@ }

session._removePausedListener();
if (err) return session.emit("error", err, messageId);
if (err) return session.emit('error', err, messageId);
if (session.isAborted()) return abortPublish(channel, messageId);
const errorHandler = _.once(
handleChannelError.bind(null, channel, messageId, session, config)
);
const returnHandler = session.emit.bind(session, "return");
const errorHandler = _.once(handleChannelError.bind(null, channel, messageId, session, config));
const returnHandler = session.emit.bind(session, 'return');
addListeners(channel, errorHandler, returnHandler);

@@ -203,14 +113,12 @@ try {

destroyChannel(channel, errorHandler, returnHandler);
return session.emit("error", err, messageId);
return session.emit('error', err, messageId);
}
ok
? returnChannel(channel, errorHandler, returnHandler)
: deferReturnChannel(channel, errorHandler, returnHandler);
ok ? returnChannel(channel, errorHandler, returnHandler) : deferReturnChannel(channel, errorHandler, returnHandler);
session.emit("success", messageId);
session.emit('success', messageId);
});
} catch (err) {
returnChannel(channel, errorHandler, returnHandler);
return session.emit("error", err, messageId);
return session.emit('error', err, messageId);
}

@@ -223,3 +131,3 @@ });

function abortPublish(channel, messageId) {
debug("Publication of message: %s was aborted", messageId);
debug('Publication of message: %s was aborted', messageId);
returnChannelFn(channel);

@@ -234,3 +142,3 @@ }

function deferReturnChannel(channel, errorHandler, returnHandler) {
channel.once("drain", () => {
channel.once('drain', () => {
returnChannel(channel, errorHandler, returnHandler);

@@ -256,3 +164,3 @@ });

function textMessage(payload) {
return { buffer: Buffer.from(payload), type: "text/plain" };
return { buffer: Buffer.from(payload), type: 'text/plain' };
}

@@ -263,3 +171,3 @@

buffer: Buffer.from(JSON.stringify(payload)),
type: "application/json",
type: 'application/json',
};

@@ -270,30 +178,18 @@ }

function addListeners(channel, errorHandler, returnHandler) {
channel.on("error", errorHandler);
channel.on("return", returnHandler);
channel.connection.once("error", errorHandler);
channel.connection.once("close", errorHandler);
channel.on('error', errorHandler);
channel.on('return', returnHandler);
channel.connection.once('error', errorHandler);
channel.connection.once('close', errorHandler);
}
function removeListeners(channel, errorHandler, returnHandler) {
channel.removeListener("error", errorHandler);
channel.removeListener("return", returnHandler);
channel.connection.removeListener("error", errorHandler);
channel.connection.removeListener("close", errorHandler);
channel.removeListener('error', errorHandler);
channel.removeListener('return', returnHandler);
channel.connection.removeListener('error', errorHandler);
channel.connection.removeListener('close', errorHandler);
}
function publishToExchange(channel, content, config, next) {
debug(
"Publishing %d bytes to exchange: %s with routingKeys: %s",
content.length,
config.exchange,
_.compact(
[].concat(config.routingKey, config.options.CC, config.options.BCC)
).join(", ")
);
const ok = channel.publish(
config.destination,
config.routingKey,
content,
config.options
);
debug('Publishing %d bytes to exchange: %s with routingKeys: %s', content.length, config.exchange, _.compact([].concat(config.routingKey, config.options.CC, config.options.BCC)).join(', '));
const ok = channel.publish(config.destination, config.routingKey, content, config.options);
next(null, ok);

@@ -303,30 +199,15 @@ }

function publishToConfirmExchange(channel, content, config, next) {
debug(
"Publishing %d bytes to confirm exchange: %s with routingKeys: %s",
content.length,
config.exchange,
_.compact(
[].concat(config.routingKey, config.options.CC, config.options.BCC)
).join(", ")
);
debug('Publishing %d bytes to confirm exchange: %s with routingKeys: %s', content.length, config.exchange, _.compact([].concat(config.routingKey, config.options.CC, config.options.BCC)).join(', '));
const once = _.once(next);
const timeout = config.timeout
? setConfirmationTimeout(config.timeout, config.destination, once)
: null;
const timeout = config.timeout ? setConfirmationTimeout(config.timeout, config.destination, once) : null;
const ok = channel.publish(
config.destination,
config.routingKey,
content,
config.options,
(err) => {
clearTimeout(timeout);
once(err, ok);
}
);
const ok = channel.publish(config.destination, config.routingKey, content, config.options, (err) => {
clearTimeout(timeout);
once(err, ok);
});
}
function sendToQueue(channel, content, config, next) {
debug("Publishing %d bytes to queue: %s", content.length, config.queue);
debug('Publishing %d bytes to queue: %s', content.length, config.queue);
const ok = channel.sendToQueue(config.destination, content, config.options);

@@ -337,18 +218,11 @@ next(null, ok);

function sendToConfirmQueue(channel, content, config, next) {
debug("Publishing %d bytes to queue: %s", content.length, config.queue);
debug('Publishing %d bytes to queue: %s', content.length, config.queue);
const once = _.once(next);
const timeout = config.timeout
? setConfirmationTimeout(config.timeout, config.destination, once)
: null;
const timeout = config.timeout ? setConfirmationTimeout(config.timeout, config.destination, once) : null;
const ok = channel.sendToQueue(
config.destination,
content,
config.options,
(err) => {
clearTimeout(timeout);
next(err, ok);
}
);
const ok = channel.sendToQueue(config.destination, content, config.options, (err) => {
clearTimeout(timeout);
next(err, ok);
});
}

@@ -358,11 +232,3 @@

return setTimeoutUnref(() => {
next(
new Error(
format(
"Timedout after %dms waiting for broker to confirm publication to: %s",
timeout,
destination
)
)
);
next(new Error(format('Timedout after %dms waiting for broker to confirm publication to: %s', timeout, destination)));
}, timeout);

@@ -372,10 +238,4 @@ }

function handleChannelError(borked, messageId, emitter, config, err) {
debug(
"Channel error: %s during publication of message: %s to %s using channel: %s",
err.message,
messageId,
config.name,
borked._rascal_id
);
emitter.emit("error", err, messageId);
debug('Channel error: %s during publication of message: %s to %s using channel: %s', err.message, messageId, config.name, borked._rascal_id);
emitter.emit('error', err, messageId);
}

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:SubscriberSession");
const EventEmitter = require("events").EventEmitter;
const inherits = require("util").inherits;
const debug = require('debug')('rascal:SubscriberSession');
const EventEmitter = require('events').EventEmitter;
const inherits = require('util').inherits;

@@ -24,3 +24,3 @@ module.exports = PublicationSession;

this._removePausedListener = function () {
vhost.removeListener("paused", emitPaused);
vhost.removeListener('paused', emitPaused);
};

@@ -37,11 +37,11 @@

function emitPaused() {
self.emit("paused", messageId);
self.emit('paused', messageId);
}
vhost.on("paused", emitPaused);
vhost.on('paused', emitPaused);
self.on("newListener", (event) => {
if (event !== "paused") return;
self.on('newListener', (event) => {
if (event !== 'paused') return;
if (vhost.isPaused()) emitPaused();
});
}

@@ -1,22 +0,15 @@

const debug = require("debug")("rascal:SubscriberError");
const format = require("util").format;
const _ = require("lodash");
const async = require("async");
const setTimeoutUnref = require("../utils/setTimeoutUnref");
const debug = require('debug')('rascal:SubscriberError');
const format = require('util').format;
const _ = require('lodash');
const async = require('async');
const setTimeoutUnref = require('../utils/setTimeoutUnref');
module.exports = function SubscriptionRecovery(broker, vhost) {
this.handle = function (session, message, err, recoveryProcess, next) {
debug(
"Handling subscriber error for message: %s",
message.properties.messageId
);
debug('Handling subscriber error for message: %s', message.properties.messageId);
async.eachSeries(
[].concat(recoveryProcess || []).concat({ strategy: "fallback-nack" }),
[].concat(recoveryProcess || []).concat({ strategy: 'fallback-nack' }),
(recoveryConfig, cb) => {
debug(
"Attempting to recover message: %s using strategy: %s",
message.properties.messageId,
recoveryConfig.strategy
);
debug('Attempting to recover message: %s using strategy: %s', message.properties.messageId, recoveryConfig.strategy);

@@ -26,29 +19,15 @@ const once = _.once(cb);

setTimeoutUnref(() => {
getStrategy(recoveryConfig).execute(
session,
message,
err,
_.omit(recoveryConfig, "defer"),
(err, handled) => {
if (err) {
debug(
"Message: %s failed to be recovered using stragegy: %s",
message.properties.messageId,
recoveryConfig.strategy
);
setImmediate(() => next(err));
return once(false);
}
if (handled) {
debug(
"Message: %s was recovered using stragegy: %s",
message.properties.messageId,
recoveryConfig.strategy
);
setImmediate(next);
return once(false);
}
once();
getStrategy(recoveryConfig).execute(session, message, err, _.omit(recoveryConfig, 'defer'), (err, handled) => {
if (err) {
debug('Message: %s failed to be recovered using stragegy: %s', message.properties.messageId, recoveryConfig.strategy);
setImmediate(() => next(err));
return once(false);
}
);
if (handled) {
debug('Message: %s was recovered using stragegy: %s', message.properties.messageId, recoveryConfig.strategy);
setImmediate(next);
return once(false);
}
once();
});
}, recoveryConfig.defer);

@@ -63,3 +42,3 @@ },

{
name: "ack",
name: 'ack',
execute(session, message, err, strategyConfig, next) {

@@ -72,3 +51,3 @@ session._ack(message, (err) => {

{
name: "nack",
name: 'nack',
execute(session, message, err, strategyConfig, next) {

@@ -81,3 +60,3 @@ session._nack(message, { requeue: strategyConfig.requeue }, (err) => {

{
name: "fallback-nack",
name: 'fallback-nack',
execute(session, message, err, strategyConfig, next) {

@@ -90,102 +69,53 @@ session._nack(message, { requeue: strategyConfig.requeue }, (err) => {

{
name: "republish",
name: 'republish',
execute(session, message, err, strategyConfig, next) {
debug("Republishing message: %s", message.properties.messageId);
debug('Republishing message: %s', message.properties.messageId);
const originalQueue = _.get(
message,
"properties.headers.rascal.originalQueue"
);
const republished = _.get(
message,
[
"properties",
"headers",
"rascal",
"recovery",
originalQueue,
"republished",
],
0
);
const once = _.once(next);
const originalQueue = _.get(message, 'properties.headers.rascal.originalQueue');
const republished = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'republished'], 0);
if (
strategyConfig.attempts &&
strategyConfig.attempts <= republished
) {
debug(
"Skipping recovery - message: %s has already been republished %d times.",
message.properties.messageId,
republished
);
return next(null, false);
if (strategyConfig.attempts && strategyConfig.attempts <= republished) {
debug('Skipping recovery - message: %s has already been republished %d times.', message.properties.messageId, republished);
return once(null, false);
}
const publishOptions = _.cloneDeep(message.properties);
_.set(
publishOptions,
["headers", "rascal", "recovery", originalQueue, "republished"],
republished + 1
);
_.set(
publishOptions,
"headers.rascal.originalExchange",
message.fields.exchange
);
_.set(
publishOptions,
"headers.rascal.originalRoutingKey",
message.fields.routingKey
);
_.set(
publishOptions,
"headers.rascal.error.message",
_.truncate(err.message, { length: 1024 })
);
_.set(publishOptions, "headers.rascal.error.code", err.code);
_.set(
publishOptions,
"headers.rascal.restoreRoutingHeaders",
_.has(strategyConfig, "restoreRoutingHeaders")
? strategyConfig.restoreRoutingHeaders
: true
);
_.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue, 'republished'], republished + 1);
_.set(publishOptions, 'headers.rascal.originalExchange', message.fields.exchange);
_.set(publishOptions, 'headers.rascal.originalRoutingKey', message.fields.routingKey);
_.set(publishOptions, 'headers.rascal.error.message', _.truncate(err.message, { length: 1024 }));
_.set(publishOptions, 'headers.rascal.error.code', err.code);
_.set(publishOptions, 'headers.rascal.restoreRoutingHeaders', _.has(strategyConfig, 'restoreRoutingHeaders') ? strategyConfig.restoreRoutingHeaders : true);
if (strategyConfig.immediateNack)
_.set(
publishOptions,
["headers", "rascal", "recovery", originalQueue, "immediateNack"],
true
);
if (strategyConfig.immediateNack) _.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue, 'immediateNack'], true);
vhost.getConfirmChannel((err, publisherChannel) => {
if (err) return next(err);
if (!publisherChannel)
return next(
new Error(
"Unable to handle subscriber error by republishing. The VHost is shutting down"
)
);
const nackMessage = (err) => {
session._nack(message, (nackErr) => {
// nackError just means the channel was already closed meaning the original message would have been rolled back
once(err);
});
};
publisherChannel.on("error", next);
if (err) return nackMessage(err);
publisherChannel.publish(
undefined,
originalQueue,
message.content,
publishOptions,
(err) => {
if (err) return next(err); // Channel will already be closed, reclosing will trigger an error
publisherChannel.close();
debug(
"Message: %s was republished to queue: %s %d times",
message.properties.messageId,
originalQueue,
republished + 1
);
session._ack(message, (err) => {
next(err, true);
});
}
);
if (!publisherChannel) return nackMessage(new Error('Unable to handle subscriber error by republishing. The VHost is shutting down'));
publisherChannel.on('error', (err) => {
nackMessage(err);
});
publisherChannel.on('return', () => {
nackMessage(new Error(format('Message: %s was republished to queue: %s, but was returned', message.properties.messageId, originalQueue)));
});
publisherChannel.publish(undefined, originalQueue, message.content, publishOptions, (err) => {
if (err) return nackMessage(err); // Channel will already be closed, reclosing will trigger an error
publisherChannel.close();
debug('Message: %s was republished to queue: %s %d times', message.properties.messageId, originalQueue, republished + 1);
session._ack(message, (err) => {
once(err, true);
});
});
});

@@ -195,123 +125,67 @@ },

{
name: "forward",
name: 'forward',
execute(session, message, err, strategyConfig, next) {
debug("Forwarding message: %s", message.properties.messageId);
debug('Forwarding message: %s to publication: %s', message.properties.messageId, strategyConfig.publication);
const originalQueue = _.get(
message,
"properties.headers.rascal.originalQueue"
);
const forwarded = _.get(
message,
[
"properties",
"headers",
"rascal",
"recovery",
originalQueue,
"forwarded",
],
0
);
const once = _.once(next);
const originalQueue = _.get(message, 'properties.headers.rascal.originalQueue');
const forwarded = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'forwarded'], 0);
if (strategyConfig.attempts && strategyConfig.attempts <= forwarded) {
debug(
"Skipping recovery - message: %s has already been forwarded %d times.",
message.properties.messageId,
forwarded
);
return next(null, false);
debug('Skipping recovery - message: %s has already been forwarded %d times.', message.properties.messageId, forwarded);
return once(null, false);
}
// See https://github.com/rabbitmq/rabbitmq-server/issues/161
if (strategyConfig.xDeathFix)
delete message.properties.headers["x-death"];
if (strategyConfig.xDeathFix) delete message.properties.headers['x-death'];
const forwardOverrides = _.cloneDeep(strategyConfig.options) || {};
_.set(
forwardOverrides,
"restoreRoutingHeaders",
_.has(strategyConfig, "restoreRoutingHeaders")
? strategyConfig.restoreRoutingHeaders
: true
);
_.set(
forwardOverrides,
[
"options",
"headers",
"rascal",
"recovery",
originalQueue,
"forwarded",
],
forwarded + 1
);
_.set(
forwardOverrides,
"options.headers.rascal.error.message",
_.truncate(err.message, { length: 1024 })
);
_.set(
forwardOverrides,
"options.headers.rascal.error.code",
err.code
);
_.set(forwardOverrides, 'restoreRoutingHeaders', _.has(strategyConfig, 'restoreRoutingHeaders') ? strategyConfig.restoreRoutingHeaders : true);
_.set(forwardOverrides, ['options', 'headers', 'rascal', 'recovery', originalQueue, 'forwarded'], forwarded + 1);
_.set(forwardOverrides, 'options.headers.rascal.error.message', _.truncate(err.message, { length: 1024 }));
_.set(forwardOverrides, 'options.headers.rascal.error.code', err.code);
broker.forward(
strategyConfig.publication,
message,
forwardOverrides,
(err, publication) => {
if (err) return next(err);
publication.on("success", () => {
debug(
"Message: %s was forwarded to publication: %s %d times",
message.properties.messageId,
strategyConfig.publication,
forwarded + 1
);
session._ack(message, (err) => {
next(err, true);
});
const nackMessage = (err) => {
session._nack(message, (nackErr) => {
// nackError just means the channel was already closed meaning the original message would have been rolled back
once(err);
});
};
broker.forward(strategyConfig.publication, message, forwardOverrides, (err, publication) => {
if (err) return nackMessage(err);
publication.on('success', () => {
debug('Message: %s was forwarded to publication: %s %d times', message.properties.messageId, strategyConfig.publication, forwarded + 1);
session._ack(message, (ackErr) => {
once(ackErr, true);
});
publication.on("error", next);
publication.on("return", () => {
next(
new Error(
format(
"Message: %s was forwared to publication: %s, but was returned",
message.properties.messageId,
strategyConfig.publication
)
)
);
});
}
);
});
publication.on('error', (err) => {
nackMessage(err);
});
publication.on('return', () => {
publication.removeAllListeners('success');
nackMessage(new Error(format('Message: %s was forwarded to publication: %s, but was returned', message.properties.messageId, strategyConfig.publication)));
});
});
},
},
{
name: "unknown",
name: 'unknown',
execute(session, message, err, strategyConfig, next) {
next(
new Error(
format(
"Error recovering message: %s. No such strategy: %s.",
message.properties.messageId,
strategyConfig.strategy
)
)
);
session._nack(message, () => {
next(new Error(format('Error recovering message: %s. No such strategy: %s.', message.properties.messageId, strategyConfig.strategy)));
});
},
},
],
"name"
'name'
);
function getStrategy(recoveryConfig) {
return (
recoveryStrategies[recoveryConfig.strategy] || recoveryStrategies.unknown
);
return recoveryStrategies[recoveryConfig.strategy] || recoveryStrategies.unknown;
}
};

@@ -1,6 +0,7 @@

const debug = require("debug")("rascal:SubscriberSession");
const EventEmitter = require("events").EventEmitter;
const inherits = require("util").inherits;
const _ = require("lodash");
const setTimeoutUnref = require("../utils/setTimeoutUnref");
const debug = require('debug')('rascal:SubscriberSession');
const EventEmitter = require('events').EventEmitter;
const inherits = require('util').inherits;
const _ = require('lodash');
const async = require('async');
const setTimeoutUnref = require('../utils/setTimeoutUnref');

@@ -26,11 +27,7 @@ module.exports = SubscriberSession;

this._open = function (channel, consumerTag, next) {
if (cancelled) return next(new Error("Subscriber has been cancelled"));
debug(
"Opening subscriber session: %s on channel: %s",
consumerTag,
channel._rascal_id
);
channels[consumerTag] = { index: index++, channel, consumerTag };
channel.once("close", unref.bind(null, consumerTag));
channel.once("error", unref.bind(null, consumerTag));
if (cancelled) return next(new Error('Subscriber has been cancelled'));
debug('Opening subscriber session: %s on channel: %s', consumerTag, channel._rascal_id);
channels[consumerTag] = { index: index++, channel, consumerTag, unacknowledgedMessages: 0 };
channel.once('close', unref.bind(null, consumerTag));
channel.once('error', unref.bind(null, consumerTag));
next();

@@ -55,16 +52,18 @@ };

withCurrentChannel(
(channel, consumerTag) => {
debug(
"Cancelling subscriber session: %s on channel: %s",
consumerTag,
channel._rascal_id
);
(channel, consumerTag, entry) => {
entry.doomed = true;
debug('Cancelling subscriber session: %s on channel: %s', consumerTag, channel._rascal_id);
channel.cancel(consumerTag, (err) => {
if (err) return next(err);
doom(consumerTag);
next();
const waitOrTimeout = config.closeTimeout ? async.timeout(waitForUnacknowledgedMessages, config.closeTimeout) : waitForUnacknowledgedMessages;
waitOrTimeout(entry, null, (err) => {
channel.close(() => {
debug('Channel: %s was closed', entry.channel._rascal_id);
next(err);
});
});
});
},
() => {
debug("No current subscriber session");
debug('No current subscriber session');
next();

@@ -79,6 +78,2 @@ }

this._maxDeferCloseChannel = function (other) {
return Math.max(config.deferCloseChannel, other);
};
this._getRascalChannelId = function () {

@@ -92,2 +87,16 @@ let rascalChannelId = null;

this._incrementUnacknowledgeMessageCount = function (consumerTag) {
if (config.options.noAck) return;
withConsumerChannel(consumerTag, (channel, __, entry) => {
debug('Channel: %s has %s unacknowledged messages', channel._rascal_id, ++entry.unacknowledgedMessages);
});
};
this._decrementUnacknowledgeMessageCount = function (consumerTag) {
if (config.options.noAck) return;
withConsumerChannel(consumerTag, (channel, __, entry) => {
debug('Channel: %s has %s unacknowledged messages', channel._rascal_id, --entry.unacknowledgedMessages);
});
};
this._ack = function (message, next) {

@@ -97,8 +106,5 @@ withConsumerChannel(

(channel) => {
debug(
"Acknowledging message: %s on channel: %s",
message.properties.messageId,
channel._rascal_id
);
debug('Acknowledging message: %s on channel: %s', message.properties.messageId, channel._rascal_id);
channel.ack(message);
self._decrementUnacknowledgeMessageCount(message.fields.consumerTag);
setImmediate(next);

@@ -108,3 +114,3 @@ },

setImmediate(() => {
next(new Error("The channel has been closed. Unable to ack message"));
next(new Error('The channel has been closed. Unable to ack message'));
});

@@ -116,14 +122,9 @@ }

this._nack = function (message, options, next) {
if (arguments.length === 2)
return self._nack(arguments[0], {}, arguments[1]);
if (arguments.length === 2) return self._nack(arguments[0], {}, arguments[1]);
withConsumerChannel(
message.fields.consumerTag,
(channel) => {
debug(
"Not acknowledging message: %s with requeue: %s on channel: %s",
message.properties.messageId,
!!options.requeue,
channel._rascal_id
);
debug('Not acknowledging message: %s with requeue: %s on channel: %s', message.properties.messageId, !!options.requeue, channel._rascal_id);
channel.nack(message, false, !!options.requeue);
self._decrementUnacknowledgeMessageCount(message.fields.consumerTag);
setImmediate(next);

@@ -133,5 +134,3 @@ },

setImmediate(() => {
next(
new Error("The channel has been closed. Unable to nack message")
);
next(new Error('The channel has been closed. Unable to nack message'));
});

@@ -145,6 +144,4 @@ }

.values()
.filter((channel) => {
return !channel.doomed;
})
.sortBy("index")
.filter((entry) => !entry.doomed)
.sortBy('index')
.last()

@@ -164,3 +161,3 @@ .value();

withConsumerChannel(consumerTag, (channel) => {
debug("Removing channel: %s from session", channel._rascal_id);
debug('Removing channel: %s from session', channel._rascal_id);
delete channels[consumerTag];

@@ -170,30 +167,13 @@ });

function doom(consumerTag) {
withConsumerChannel(consumerTag, (channel, consumerTag, entry) => {
if (entry.doomed) return;
entry.doomed = true;
scheduleClose(entry);
});
function waitForUnacknowledgedMessages(entry, previousCount, next) {
const currentCount = entry.unacknowledgedMessages;
if (currentCount > 0) {
if (currentCount !== previousCount) {
debug('Waiting for %d unacknowledged messages from channel: %s', currentCount, entry.channel._rascal_id);
}
setTimeoutUnref(() => waitForUnacknowledgedMessages(entry, currentCount, next), 100);
return;
}
next();
}
/*
There may still be delivered messages that have yet to be ack or nacked
but no way of telling how many are outstanding since due to potentially
complicated recovery strategies, with timeouts etc.
Keeping channels around for a minute shouldn't hurt
*/
function scheduleClose(entry) {
debug(
"Deferring close channel: %s by %dms",
entry.channel._rascal_id,
config.deferCloseChannel
);
setTimeoutUnref(() => {
withConsumerChannel(entry.consumerTag, (channel) => {
channel.close(() => {
debug("Channel: %s was closed", channel._rascal_id);
});
});
}, config.deferCloseChannel);
}
}

@@ -1,4 +0,4 @@

const EventEmitter = require("events").EventEmitter;
const inherits = require("util").inherits;
const forwardEvents = require("forward-emitter");
const EventEmitter = require('events').EventEmitter;
const inherits = require('util').inherits;
const forwardEvents = require('forward-emitter');

@@ -5,0 +5,0 @@ module.exports = SubscriberSessionAsPromised;

@@ -1,11 +0,11 @@

const debug = require("debug")("rascal:Subscription");
const _ = require("lodash");
const safeParse = require("safe-json-parse/callback");
const SubscriberSession = require("./SubscriberSession");
const SubscriberError = require("./SubscriberError");
const format = require("util").format;
const backoff = require("../backoff");
const crypto = require("crypto");
const async = require("async");
const setTimeoutUnref = require("../utils/setTimeoutUnref");
const debug = require('debug')('rascal:Subscription');
const _ = require('lodash');
const safeParse = require('safe-json-parse/callback');
const SubscriberSession = require('./SubscriberSession');
const SubscriberError = require('./SubscriberError');
const format = require('util').format;
const backoff = require('../backoff');
const crypto = require('crypto');
const async = require('async');
const setTimeoutUnref = require('../utils/setTimeoutUnref');

@@ -29,3 +29,3 @@ module.exports = {

this.init = function (next) {
debug("Initialising subscription: %s", config.name);
debug('Initialising subscription: %s', config.name);
return next(null, self);

@@ -41,7 +41,7 @@ };

function subscribeLater(session, config) {
session.on("newListener", (event) => {
if (event !== "message") return;
session.on('newListener', (event) => {
if (event !== 'message') return;
subscribeNow(session, config, (err) => {
if (err) return session.emit("error", err);
session.emit("subscribed");
if (err) return session.emit('error', err);
session.emit('subscribed');
});

@@ -54,6 +54,6 @@ });

if (session.isCancelled()) {
debug("Subscription to queue: %s has been cancelled", config.queue);
debug('Subscription to queue: %s has been cancelled', config.queue);
return done();
}
debug("Subscribing to queue: %s", config.queue);
debug('Subscribing to queue: %s', config.queue);
vhost.getChannel((err, channel) => {

@@ -65,36 +65,17 @@ if (err) return done(err);

const removeErrorHandlers = attachErrorHandlers(
channel,
session,
config
);
const onMessage = _onMessage.bind(
null,
session,
config,
removeErrorHandlers
);
const removeErrorHandlers = attachErrorHandlers(channel, session, config);
const onMessage = _onMessage.bind(null, session, config, removeErrorHandlers);
channel.consume(
config.source,
onMessage,
config.options,
(err, response) => {
if (err) {
debug(
"Error subscribing to %s using channel: %s. %s",
config.source,
channel._rascal_id,
err.message
);
removeErrorHandlers();
return done(err);
}
session._open(channel, response.consumerTag, (err) => {
if (err) return done(err);
timer.reset();
done();
});
channel.consume(config.source, onMessage, config.options, (err, response) => {
if (err) {
debug('Error subscribing to %s using channel: %s. %s', config.source, channel._rascal_id, err.message);
removeErrorHandlers();
return done(err);
}
);
session._open(channel, response.consumerTag, (err) => {
if (err) return done(err);
timer.reset();
done();
});
});
});

@@ -105,10 +86,6 @@ }, next);

function _onMessage(session, config, removeErrorHandlers, message) {
if (!message)
return handleConsumerCancel(session, config, removeErrorHandlers);
if (!message) return handleConsumerCancel(session, config, removeErrorHandlers);
debug(
"Received message: %s from queue: %s",
message.properties.messageId,
config.queue
);
debug('Received message: %s from queue: %s', message.properties.messageId, config.queue);
session._incrementUnacknowledgeMessageCount(message.fields.consumerTag);

@@ -120,14 +97,6 @@ decorateWithRoutingHeaders(message);

if (err) return handleRedeliveriesError(err, session, message);
if (redeliveriesExceeded(message))
return handleRedeliveriesExceeded(session, message);
if (redeliveriesExceeded(message)) return handleRedeliveriesExceeded(session, message);
getContent(message, config, (err, content) => {
err
? handleContentError(session, message, err)
: session.emit(
"message",
message,
content,
getAckOrNack(session, message)
);
err ? handleContentError(session, message, err) : session.emit('message', message, content, getAckOrNack(session, message));
});

@@ -139,30 +108,10 @@ });

if (message.properties.headers.rascal.encryption) {
const encryptionConfig =
config.encryption[message.properties.headers.rascal.encryption.name];
if (!encryptionConfig)
return next(
new Error(
format(
"Unknown encryption profile: %s",
message.properties.headers.rascal.encryption.name
)
)
);
decrypt(
encryptionConfig.algorithm,
encryptionConfig.key,
message.properties.headers.rascal.encryption.iv,
message.content,
(err, unencrypted) => {
if (err) return next(err);
debug(
"Message was decrypted using encryption profile: %s",
message.properties.headers.rascal.encryption.name
);
const contentType =
config.contentType ||
message.properties.headers.rascal.encryption.originalContentType;
negotiateContent(contentType, unencrypted, next);
}
);
const encryptionConfig = config.encryption[message.properties.headers.rascal.encryption.name];
if (!encryptionConfig) return next(new Error(format('Unknown encryption profile: %s', message.properties.headers.rascal.encryption.name)));
decrypt(encryptionConfig.algorithm, encryptionConfig.key, message.properties.headers.rascal.encryption.iv, message.content, (err, unencrypted) => {
if (err) return next(err);
debug('Message was decrypted using encryption profile: %s', message.properties.headers.rascal.encryption.name);
const contentType = config.contentType || message.properties.headers.rascal.encryption.originalContentType;
negotiateContent(contentType, unencrypted, next);
});
} else {

@@ -175,5 +124,4 @@ const contentType = config.contentType || message.properties.contentType;

function negotiateContent(contentType, content, next) {
if (contentType === "text/plain") return next(null, content.toString());
if (contentType === "application/json")
return safeParse(content.toString(), next);
if (contentType === 'text/plain') return next(null, content.toString());
if (contentType === 'application/json') return safeParse(content.toString(), next);
return next(null, content);

@@ -185,4 +133,4 @@ }

try {
const key = Buffer.from(keyHex, "hex");
const iv = Buffer.from(ivHex, "hex");
const key = Buffer.from(keyHex, 'hex');
const iv = Buffer.from(ivHex, 'hex');
const cipher = crypto.createDecipheriv(algorithm, key, iv);

@@ -197,26 +145,6 @@ unencrypted = Buffer.concat([cipher.update(encrypted), cipher.final()]);

function handleContentError(session, message, err) {
debug(
"Error getting content for message %s: %s",
message.properties.messageId,
err.message
);
debug('Error getting content for message %s: %s', message.properties.messageId, err.message);
// Documentation wrongly specified 'invalid_content' instead of 'invalid_message' emitting both
if (
session.emit(
"invalid_content",
err,
message,
getAckOrNack(session, message)
)
)
return;
if (
session.emit(
"invalid_message",
err,
message,
getAckOrNack(session, message)
)
)
return;
if (session.emit('invalid_content', err, message, getAckOrNack(session, message))) return;
if (session.emit('invalid_message', err, message, getAckOrNack(session, message))) return;
nackAndError(session, message, err);

@@ -226,31 +154,9 @@ }

function redeliveriesExceeded(message) {
return (
message.properties.headers.rascal.redeliveries > config.redeliveries.limit
);
return message.properties.headers.rascal.redeliveries > config.redeliveries.limit;
}
function handleRedeliveriesError(err, session, message) {
debug(
"Error handling redeliveries of message %s: %s",
message.properties.messageId,
err.message
);
if (
session.emit(
"redeliveries_error",
err,
message,
getAckOrNack(session, message)
)
)
return;
if (
session.emit(
"redeliveries_exceeded",
err,
message,
getAckOrNack(session, message)
)
)
return;
debug('Error handling redeliveries of message %s: %s', message.properties.messageId, err.message);
if (session.emit('redeliveries_error', err, message, getAckOrNack(session, message))) return;
if (session.emit('redeliveries_exceeded', err, message, getAckOrNack(session, message))) return;
nackAndError(session, message, err);

@@ -260,28 +166,6 @@ }

function handleRedeliveriesExceeded(session, message) {
const err = new Error(
format(
"Message %s has exceeded %d redeliveries",
message.properties.messageId,
config.redeliveries.limit
)
);
const err = new Error(format('Message %s has exceeded %d redeliveries', message.properties.messageId, config.redeliveries.limit));
debug(err.message);
if (
session.emit(
"redeliveries_exceeded",
err,
message,
getAckOrNack(session, message)
)
)
return;
if (
session.emit(
"redeliveries_error",
err,
message,
getAckOrNack(session, message)
)
)
return;
if (session.emit('redeliveries_exceeded', err, message, getAckOrNack(session, message))) return;
if (session.emit('redeliveries_error', err, message, getAckOrNack(session, message))) return;
nackAndError(session, message, err);

@@ -294,3 +178,3 @@ }

// If the app shuts down before the IO has completed, the message will be rolled back
setTimeoutUnref(session.emit.bind(session, "error", err));
setTimeoutUnref(session.emit.bind(session, 'error', err));
});

@@ -306,8 +190,4 @@ }

if (!message.properties.headers.rascal.restoreRoutingHeaders) return;
if (message.properties.headers.rascal.originalRoutingKey)
message.fields.routingKey =
message.properties.headers.rascal.originalRoutingKey;
if (message.properties.headers.rascal.originalExchange)
message.fields.exchange =
message.properties.headers.rascal.originalExchange;
if (message.properties.headers.rascal.originalRoutingKey) message.fields.routingKey = message.properties.headers.rascal.originalRoutingKey;
if (message.properties.headers.rascal.originalExchange) message.fields.exchange = message.properties.headers.rascal.originalExchange;
}

@@ -318,10 +198,3 @@

const timeout = setTimeoutUnref(() => {
once(
new Error(
format(
"Redeliveries timed out after %dms",
config.redeliveries.timeout
)
)
);
once(new Error(format('Redeliveries timed out after %dms', config.redeliveries.timeout)));
}, config.redeliveries.timeout);

@@ -339,20 +212,7 @@ countRedeliveries(message, (err, redeliveries) => {

if (!message.properties.messageId) return next(null, 0);
counter.incrementAndGet(
config.name + "/" + message.properties.messageId,
next
);
counter.incrementAndGet(config.name + '/' + message.properties.messageId, next);
}
function immediateNack(message) {
if (
_.get(message, [
"properties",
"headers",
"rascal",
"recovery",
message.properties.headers.rascal.originalQueue,
"immediateNack",
])
)
return true;
if (_.get(message, ['properties', 'headers', 'rascal', 'recovery', message.properties.headers.rascal.originalQueue, 'immediateNack'])) return true;
return false;

@@ -362,6 +222,4 @@ }

function getAckOrNack(session, message) {
if (!broker.promises || config.promisifyAckOrNack === false)
return ackOrNack.bind(null, session, message);
if (config.promisifyAckOrNack)
return ackOrNackP.bind(null, session, message);
if (!broker.promises || config.promisifyAckOrNack === false) return ackOrNack.bind(null, session, message);
if (config.promisifyAckOrNack) return ackOrNackP.bind(null, session, message);
return ackOrNack.bind(null, session, message);

@@ -371,40 +229,13 @@ }

function ackOrNack(session, message, err, recovery, next) {
if (arguments.length === 2)
return ackOrNack(
session,
message,
undefined,
undefined,
emitOnError.bind(null, session)
);
if (arguments.length === 3 && _.isFunction(arguments[2]))
return ackOrNack(session, message, undefined, undefined, arguments[2]);
if (arguments.length === 3)
return ackOrNack(
session,
message,
err,
undefined,
emitOnError.bind(null, session)
);
if (arguments.length === 4 && _.isFunction(arguments[3]))
return ackOrNack(session, message, err, undefined, arguments[3]);
if (arguments.length === 4)
return ackOrNack(
session,
message,
err,
recovery,
emitOnError.bind(null, session)
);
err
? subscriberError.handle(session, message, err, recovery, next)
: session._ack(message, next);
if (arguments.length === 2) return ackOrNack(session, message, undefined, undefined, emitOnError.bind(null, session));
if (arguments.length === 3 && _.isFunction(arguments[2])) return ackOrNack(session, message, undefined, undefined, arguments[2]);
if (arguments.length === 3) return ackOrNack(session, message, err, undefined, emitOnError.bind(null, session));
if (arguments.length === 4 && _.isFunction(arguments[3])) return ackOrNack(session, message, err, undefined, arguments[3]);
if (arguments.length === 4) return ackOrNack(session, message, err, recovery, emitOnError.bind(null, session));
err ? subscriberError.handle(session, message, err, recovery, next) : session._ack(message, next);
}
function ackOrNackP(session, message, err, recovery) {
if (arguments.length === 2)
return ackOrNackP(session, message, undefined, undefined);
if (arguments.length === 3)
return ackOrNackP(session, message, err, undefined);
if (arguments.length === 2) return ackOrNackP(session, message, undefined, undefined);
if (arguments.length === 3) return ackOrNackP(session, message, err, undefined);

@@ -415,5 +246,3 @@ return new Promise((resolve, reject) => {

};
err
? subscriberError.handle(session, message, err, recovery, cb)
: session._ack(message, cb);
err ? subscriberError.handle(session, message, err, recovery, cb) : session._ack(message, cb);
});

@@ -423,3 +252,3 @@ }

function emitOnError(session, err) {
if (err) session.emit("error", err);
if (err) session.emit('error', err);
}

@@ -431,37 +260,20 @@

const removeErrorHandlers = _.once(() => {
channel.removeListener("error", errorHandler);
channel.on("error", (err) => {
debug(
"Suppressing error on cancelled session: %s to prevent connection errors. %s",
channel._rascal_id,
err.message
);
channel.removeListener('error', errorHandler);
channel.on('error', (err) => {
debug('Suppressing error on cancelled session: %s to prevent connection errors. %s', channel._rascal_id, err.message);
});
connection.removeListener("error", errorHandler);
connection.removeListener("close", errorHandler);
connection.removeListener('error', errorHandler);
connection.removeListener('close', errorHandler);
});
const errorHandler = _.once(
handleChannelError.bind(null, session, config, removeErrorHandlers, 0)
);
channel.on("error", errorHandler);
connection.once("error", errorHandler);
connection.once("close", errorHandler);
const errorHandler = _.once(handleChannelError.bind(null, session, config, removeErrorHandlers, 0));
channel.on('error', errorHandler);
connection.once('error', errorHandler);
connection.once('close', errorHandler);
return removeErrorHandlers;
}
function handleChannelError(
session,
config,
removeErrorHandlers,
attempts,
err
) {
debug(
"Handling channel error: %s from %s using channel: %s",
err.message,
config.name,
session._getRascalChannelId()
);
function handleChannelError(session, config, removeErrorHandlers, attempts, err) {
debug('Handling channel error: %s from %s using channel: %s', err.message, config.name, session._getRascalChannelId());
if (removeErrorHandlers) removeErrorHandlers();
session.emit("error", err);
session.emit('error', err);
config.retry &&

@@ -471,19 +283,4 @@ subscribeNow(session, config, (err) => {

const delay = timer.next();
debug(
"Will attempt resubscription(%d) to %s in %dms",
attempts + 1,
config.name,
delay
);
session._schedule(
handleChannelError.bind(
null,
session,
config,
null,
attempts + 1,
err
),
delay
);
debug('Will attempt resubscription(%d) to %s in %dms', attempts + 1, config.name, delay);
session._schedule(handleChannelError.bind(null, session, config, null, attempts + 1, err), delay);
});

@@ -493,14 +290,8 @@ }

function handleConsumerCancel(session, config, removeErrorHandlers) {
debug(
"Received consumer cancel from %s using channel: %s",
config.name,
session._getRascalChannelId()
);
debug('Received consumer cancel from %s using channel: %s', config.name, session._getRascalChannelId());
removeErrorHandlers();
session._close((err) => {
if (err) debug("Error cancelling subscription: %s", err.message);
const cancelErr = new Error(
format("Subscription: %s was cancelled by the broker", config.name)
);
session.emit("cancelled", cancelErr) || session.emit("error", cancelErr);
if (err) debug('Error cancelling subscription: %s', err.message);
const cancelErr = new Error(format('Subscription: %s was cancelled by the broker', config.name));
session.emit('cancelled', cancelErr) || session.emit('error', cancelErr);
config.retry &&

@@ -510,12 +301,4 @@ subscribeNow(session, config, (err) => {

const delay = timer.next();
debug(
"Will attempt resubscription(%d) to %s in %dms",
1,
config.name,
delay
);
session._schedule(
handleChannelError.bind(null, session, config, null, 1, err),
delay
);
debug('Will attempt resubscription(%d) to %s in %dms', 1, config.name, delay);
session._schedule(handleChannelError.bind(null, session, config, null, 1, err), delay);
});

@@ -522,0 +305,0 @@ });

@@ -1,5 +0,5 @@

const debug = require("debug")("rascal:tasks:applyBindings");
const format = require("util").format;
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:applyBindings');
const format = require('util').format;
const _ = require('lodash');
const async = require('async');

@@ -25,24 +25,9 @@ module.exports = _.curry((config, ctx, next) => {

const destination = config.queues[binding.destination];
if (!destination)
return next(
new Error(format("Unknown destination: %s", binding.destination))
);
if (!destination) return next(new Error(format('Unknown destination: %s', binding.destination)));
const source = config.exchanges[binding.source];
if (!source)
return next(new Error(format("Unknown source: %s", binding.source)));
if (!source) return next(new Error(format('Unknown source: %s', binding.source)));
debug(
"Binding queue: %s to exchange: %s with binding key: %s",
destination.fullyQualifiedName,
source.fullyQualifiedName,
binding.bindingKey
);
channel.bindQueue(
destination.fullyQualifiedName,
source.fullyQualifiedName,
binding.bindingKey,
binding.options,
next
);
debug('Binding queue: %s to exchange: %s with binding key: %s', destination.fullyQualifiedName, source.fullyQualifiedName, binding.bindingKey);
channel.bindQueue(destination.fullyQualifiedName, source.fullyQualifiedName, binding.bindingKey, binding.options, next);
}

@@ -52,24 +37,9 @@

const destination = config.exchanges[binding.destination];
if (!destination)
return next(
new Error(format("Unknown destination: %s", binding.destination))
);
if (!destination) return next(new Error(format('Unknown destination: %s', binding.destination)));
const source = config.exchanges[binding.source];
if (!source)
return next(new Error(format("Unknown source: %s", binding.source)));
if (!source) return next(new Error(format('Unknown source: %s', binding.source)));
debug(
"Binding exchange: %s to exchange: %s with binding key: %s",
destination.fullyQualifiedName,
source.fullyQualifiedName,
binding.bindingKey
);
channel.bindExchange(
destination.fullyQualifiedName,
source.fullyQualifiedName,
binding.bindingKey,
binding.options,
next
);
debug('Binding exchange: %s to exchange: %s with binding key: %s', destination.fullyQualifiedName, source.fullyQualifiedName, binding.bindingKey);
channel.bindExchange(destination.fullyQualifiedName, source.fullyQualifiedName, binding.bindingKey, binding.options, next);
}

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:assertExchanges");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:assertExchanges');
const _ = require('lodash');
const async = require('async');

@@ -19,10 +19,5 @@ module.exports = _.curry((config, ctx, next) => {

if (!config.assert) return next();
if (config.fullyQualifiedName === "") return next();
debug("Asserting exchange: %s", config.fullyQualifiedName);
channel.assertExchange(
config.fullyQualifiedName,
config.type,
config.options,
next
);
if (config.fullyQualifiedName === '') return next();
debug('Asserting exchange: %s', config.fullyQualifiedName);
channel.assertExchange(config.fullyQualifiedName, config.type, config.options, next);
}

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:assertQueues");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:assertQueues');
const _ = require('lodash');
const async = require('async');

@@ -19,4 +19,4 @@ module.exports = _.curry((config, ctx, next) => {

if (!config.assert) return next();
debug("Asserting queue: %s", config.fullyQualifiedName);
debug('Asserting queue: %s', config.fullyQualifiedName);
channel.assertQueue(config.fullyQualifiedName, config.options, next);
}

@@ -1,5 +0,5 @@

const debug = require("debug")("rascal:tasks:assertVhost");
const _ = require("lodash");
const async = require("async");
const client = require("../../management/client");
const debug = require('debug')('rascal:tasks:assertVhost');
const _ = require('lodash');
const async = require('async');
const client = require('../../management/client');

@@ -6,0 +6,0 @@ module.exports = _.curry((config, ctx, next) => {

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:checkExchanges");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:checkExchanges');
const _ = require('lodash');
const async = require('async');

@@ -19,4 +19,4 @@ module.exports = _.curry((config, ctx, next) => {

if (!config.check) return next();
debug("Checking exchange: %s", config.fullyQualifiedName);
debug('Checking exchange: %s', config.fullyQualifiedName);
channel.checkExchange(config.fullyQualifiedName, next);
}

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:checkQueues");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:checkQueues');
const _ = require('lodash');
const async = require('async');

@@ -19,4 +19,4 @@ module.exports = _.curry((config, ctx, next) => {

if (!config.check) return next();
debug("Checking queue: %s", config.fullyQualifiedName);
debug('Checking queue: %s', config.fullyQualifiedName);
channel.checkQueue(config.fullyQualifiedName, next);
}

@@ -1,5 +0,5 @@

const debug = require("debug")("rascal:tasks:checkVhost");
const _ = require("lodash");
const async = require("async");
const client = require("../../management/client");
const debug = require('debug')('rascal:tasks:checkVhost');
const _ = require('lodash');
const async = require('async');
const client = require('../../management/client');

@@ -6,0 +6,0 @@ module.exports = _.curry((config, ctx, next) => {

@@ -1,6 +0,6 @@

const debug = require("debug")("rascal:tasks:closeChannel");
const _ = require("lodash");
const debug = require('debug')('rascal:tasks:closeChannel');
const _ = require('lodash');
module.exports = _.curry((config, ctx, next) => {
debug("Closing channel");
debug('Closing channel');

@@ -7,0 +7,0 @@ ctx.channel.close((err) => {

@@ -1,6 +0,6 @@

const debug = require("debug")("rascal:tasks:checkQueues");
const _ = require("lodash");
const debug = require('debug')('rascal:tasks:checkQueues');
const _ = require('lodash');
module.exports = _.curry((config, ctx, next) => {
debug("Closing connection: %s", ctx.connectionConfig.loggableUrl);
debug('Closing connection: %s', ctx.connectionConfig.loggableUrl);
if (!ctx.connection) return next(null, config, ctx);

@@ -7,0 +7,0 @@ ctx.connection.close((err) => {

@@ -1,6 +0,6 @@

const debug = require("debug")("rascal:tasks:createChannel");
const _ = require("lodash");
const debug = require('debug')('rascal:tasks:createChannel');
const _ = require('lodash');
module.exports = _.curry((config, ctx, next) => {
debug("Creating channel");
debug('Creating channel');

@@ -7,0 +7,0 @@ ctx.connection.createChannel((err, channel) => {

@@ -1,7 +0,7 @@

const debug = require("debug")("rascal:tasks:createConnection");
const _ = require("lodash");
const amqplib = require("amqplib/callback_api");
const async = require("async");
const format = require("util").format;
const uuid = require("uuid").v4;
const debug = require('debug')('rascal:tasks:createConnection');
const _ = require('lodash');
const amqplib = require('amqplib/callback_api');
const async = require('async');
const format = require('util').format;
const uuid = require('uuid').v4;

@@ -32,3 +32,3 @@ module.exports = _.curry((config, ctx, next) => {

function connect(connectionConfig, cb) {
debug("Connecting to broker using url: %s", connectionConfig.loggableUrl);
debug('Connecting to broker using url: %s', connectionConfig.loggableUrl);

@@ -39,54 +39,39 @@ // See https://github.com/guidesmiths/rascal/issues/17

amqplib.connect(
connectionConfig.url,
connectionConfig.socketOptions,
(err, connection) => {
invocations++;
amqplib.connect(connectionConfig.url, connectionConfig.socketOptions, (err, connection) => {
invocations++;
if (err) {
const betterMessage = format(
"Failed to connect to: %s. Original message was:",
connectionConfig.loggableUrl,
err.message
);
err.message = betterMessage;
return once(err);
}
if (err) {
const betterMessage = format('Failed to connect to: %s. Original message was:', connectionConfig.loggableUrl, err.message);
err.message = betterMessage;
return once(err);
}
connection._rascal_id = uuid();
debug("Obtained connection: %s", connection._rascal_id);
connection._rascal_id = uuid();
debug('Obtained connection: %s', connection._rascal_id);
/*
* If an error occurs during initialisation (e.g. if checkExchanges fails),
* and no error handler has been bound to the connection, then the error will bubble up
* to the UncaughtException handler, potentially crashing the node process.
*
* By adding an error handler now, we ensure that instead of being emitted as events
* errors will be passed via the callback chain, so they can still be handled by the caller
*
* This error handle is removed in the vhost after the initialiation has complete
*/
connection.on("error", (err) => {
debug(
"Received error: %s from %s",
err.message,
connectionConfig.loggableUrl
);
once(err);
});
/*
* If an error occurs during initialisation (e.g. if checkExchanges fails),
* and no error handler has been bound to the connection, then the error will bubble up
* to the UncaughtException handler, potentially crashing the node process.
*
* By adding an error handler now, we ensure that instead of being emitted as events
* errors will be passed via the callback chain, so they can still be handled by the caller
*
* This error handle is removed in the vhost after the initialiation has complete
*/
connection.on('error', (err) => {
debug('Received error: %s from %s', err.message, connectionConfig.loggableUrl);
once(err);
});
// See https://github.com/squaremo/amqp.node/issues/388
if (invocations > 1) {
debug(
"Closing superfluous connection: %s previously reported as errored",
connection._rascal_id
);
return connection.close();
}
// See https://github.com/squaremo/amqp.node/issues/388
if (invocations > 1) {
debug('Closing superfluous connection: %s previously reported as errored', connection._rascal_id);
return connection.close();
}
connection.setMaxListeners(0);
connection.setMaxListeners(0);
once(null, connection);
}
);
once(null, connection);
});
}

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:deleteExchanges");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:deleteExchanges');
const _ = require('lodash');
const async = require('async');

@@ -18,5 +18,5 @@ module.exports = _.curry((config, ctx, next) => {

function deleteExchange(channel, config, next) {
if (config.fullyQualifiedName === "") return next();
debug("Deleting exchange: %s", config.fullyQualifiedName);
if (config.fullyQualifiedName === '') return next();
debug('Deleting exchange: %s', config.fullyQualifiedName);
channel.deleteExchange(config.fullyQualifiedName, {}, next);
}

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:deleteQueues");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:deleteQueues');
const _ = require('lodash');
const async = require('async');

@@ -18,4 +18,4 @@ module.exports = _.curry((config, ctx, next) => {

function deleteQueue(channel, config, next) {
debug("Deleting queue: %s", config.fullyQualifiedName);
debug('Deleting queue: %s', config.fullyQualifiedName);
channel.deleteQueue(config.fullyQualifiedName, {}, next);
}

@@ -1,5 +0,5 @@

const debug = require("debug")("rascal:tasks:deleteVhost");
const _ = require("lodash");
const async = require("async");
const client = require("../../management/client");
const debug = require('debug')('rascal:tasks:deleteVhost');
const _ = require('lodash');
const async = require('async');
const client = require('../../management/client');

@@ -16,15 +16,10 @@ module.exports = _.curry((config, ctx, next) => {

const connectionConfig = candidates[ctx.vhost.connectionIndex];
client.deleteVhost(
vhostConfig.name,
connectionConfig.management,
(err) => {
if (err) {
ctx.vhost.connectionIndex =
(ctx.vhost.connectionIndex + 1) % candidates.length;
return cb(err);
}
ctx.connectionConfig = connectionConfig;
cb();
client.deleteVhost(vhostConfig.name, connectionConfig.management, (err) => {
if (err) {
ctx.vhost.connectionIndex = (ctx.vhost.connectionIndex + 1) % candidates.length;
return cb(err);
}
);
ctx.connectionConfig = connectionConfig;
cb();
});
},

@@ -31,0 +26,0 @@ (err) => {

@@ -1,25 +0,25 @@

exports.applyBindings = require("./applyBindings.js");
exports.assertExchanges = require("./assertExchanges.js");
exports.assertQueues = require("./assertQueues.js");
exports.assertVhost = require("./assertVhost.js");
exports.bounceVhost = require("./bounceVhost.js");
exports.checkExchanges = require("./checkExchanges.js");
exports.checkQueues = require("./checkQueues.js");
exports.checkVhost = require("./checkVhost.js");
exports.closeChannel = require("./closeChannel.js");
exports.closeConnection = require("./closeConnection.js");
exports.createChannel = require("./createChannel.js");
exports.createConnection = require("./createConnection.js");
exports.deleteExchanges = require("./deleteExchanges.js");
exports.deleteQueues = require("./deleteQueues.js");
exports.deleteVhost = require("./deleteVhost.js");
exports.initCounters = require("./initCounters.js");
exports.initPublications = require("./initPublications.js");
exports.initShovels = require("./initShovels.js");
exports.initSubscriptions = require("./initSubscriptions.js");
exports.initVhosts = require("./initVhosts.js");
exports.nukeVhost = require("./nukeVhost.js");
exports.purgeQueues = require("./purgeQueues.js");
exports.purgeVhost = require("./purgeVhost.js");
exports.forewarnVhost = require("./forewarnVhost.js");
exports.shutdownVhost = require("./shutdownVhost.js");
exports.applyBindings = require('./applyBindings.js');
exports.assertExchanges = require('./assertExchanges.js');
exports.assertQueues = require('./assertQueues.js');
exports.assertVhost = require('./assertVhost.js');
exports.bounceVhost = require('./bounceVhost.js');
exports.checkExchanges = require('./checkExchanges.js');
exports.checkQueues = require('./checkQueues.js');
exports.checkVhost = require('./checkVhost.js');
exports.closeChannel = require('./closeChannel.js');
exports.closeConnection = require('./closeConnection.js');
exports.createChannel = require('./createChannel.js');
exports.createConnection = require('./createConnection.js');
exports.deleteExchanges = require('./deleteExchanges.js');
exports.deleteQueues = require('./deleteQueues.js');
exports.deleteVhost = require('./deleteVhost.js');
exports.initCounters = require('./initCounters.js');
exports.initPublications = require('./initPublications.js');
exports.initShovels = require('./initShovels.js');
exports.initSubscriptions = require('./initSubscriptions.js');
exports.initVhosts = require('./initVhosts.js');
exports.nukeVhost = require('./nukeVhost.js');
exports.purgeQueues = require('./purgeQueues.js');
exports.purgeVhost = require('./purgeVhost.js');
exports.forewarnVhost = require('./forewarnVhost.js');
exports.shutdownVhost = require('./shutdownVhost.js');

@@ -1,5 +0,5 @@

const debug = require("debug")("rascal:tasks:initCounters");
const format = require("util").format;
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:initCounters');
const format = require('util').format;
const _ = require('lodash');
const async = require('async');

@@ -23,5 +23,4 @@ module.exports = _.curry((config, ctx, next) => {

function initCounter(config, ctx, next) {
if (!ctx.components.counters[config.type])
return next(new Error(format("Unknown counter type: %s", config.type)));
if (!ctx.components.counters[config.type]) return next(new Error(format('Unknown counter type: %s', config.type)));
next(null, ctx.components.counters[config.type](config));
}

@@ -1,5 +0,5 @@

const debug = require("debug")("rascal:tasks:initPublication");
const _ = require("lodash");
const async = require("async");
const Publication = require("../Publication");
const debug = require('debug')('rascal:tasks:initPublication');
const _ = require('lodash');
const async = require('async');
const Publication = require('../Publication');

@@ -6,0 +6,0 @@ module.exports = _.curry((config, ctx, next) => {

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:initShovels");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:initShovels');
const _ = require('lodash');
const async = require('async');

@@ -18,3 +18,3 @@ module.exports = _.curry((config, ctx, next) => {

function initShovel(config, ctx, next) {
debug("Initialising shovel: %s", config.name);
debug('Initialising shovel: %s', config.name);

@@ -24,22 +24,17 @@ ctx.broker.subscribe(config.subscription, {}, (err, subscription) => {

subscription.on("message", (message, content, ackOrNack) => {
ctx.broker.forward(
config.publication,
message,
{},
(err, publication) => {
if (err) return next(err);
publication.on("success", () => {
ackOrNack();
});
}
);
subscription.on('message', (message, content, ackOrNack) => {
ctx.broker.forward(config.publication, message, {}, (err, publication) => {
if (err) return next(err);
publication.on('success', () => {
ackOrNack();
});
});
});
subscription.on("error", (err) => {
ctx.broker.emit("error", err);
subscription.on('error', (err) => {
ctx.broker.emit('error', err);
});
subscription.on("cancelled", (err) => {
ctx.broker.emit("cancelled", err) || ctx.broker.emit("error", err);
subscription.on('cancelled', (err) => {
ctx.broker.emit('cancelled', err) || ctx.broker.emit('error', err);
});

@@ -46,0 +41,0 @@

@@ -1,5 +0,5 @@

const debug = require("debug")("rascal:tasks:initSubscriptions");
const _ = require("lodash");
const async = require("async");
const Subscription = require("../Subscription");
const debug = require('debug')('rascal:tasks:initSubscriptions');
const _ = require('lodash');
const async = require('async');
const Subscription = require('../Subscription');

@@ -22,9 +22,3 @@ module.exports = _.curry((config, ctx, next) => {

function initSubscription(config, ctx, next) {
Subscription.create(
ctx.broker,
ctx.vhosts[config.vhost],
ctx.counters[config.redeliveries.counter],
config,
next
);
Subscription.create(ctx.broker, ctx.vhosts[config.vhost], ctx.counters[config.redeliveries.counter], config, next);
}

@@ -1,6 +0,6 @@

const debug = require("debug")("rascal:tasks:initVhosts");
const _ = require("lodash");
const async = require("async");
const forwardEvents = require("forward-emitter");
const Vhost = require("../Vhost");
const debug = require('debug')('rascal:tasks:initVhosts');
const _ = require('lodash');
const async = require('async');
const forwardEvents = require('forward-emitter');
const Vhost = require('../Vhost');

@@ -7,0 +7,0 @@ module.exports = _.curry((config, ctx, next) => {

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:tasks:purgeQueues");
const _ = require("lodash");
const async = require("async");
const debug = require('debug')('rascal:tasks:purgeQueues');
const _ = require('lodash');
const async = require('async');

@@ -19,4 +19,4 @@ module.exports = _.curry((config, ctx, next) => {

if (!config.purge && !ctx.purge) return next();
debug("Purging queue: %s", config.fullyQualifiedName);
debug('Purging queue: %s', config.fullyQualifiedName);
channel.purgeQueue(config.fullyQualifiedName, next);
}

@@ -1,12 +0,12 @@

const debug = require("debug")("rascal:Vhost");
const format = require("util").format;
const inherits = require("util").inherits;
const EventEmitter = require("events").EventEmitter;
const async = require("async");
const genericPool = require("generic-pool");
const tasks = require("./tasks");
const uuid = require("uuid").v4;
const _ = require("lodash");
const backoff = require("../backoff");
const setTimeoutUnref = require("../utils/setTimeoutUnref");
const debug = require('debug')('rascal:Vhost');
const format = require('util').format;
const inherits = require('util').inherits;
const EventEmitter = require('events').EventEmitter;
const async = require('async');
const genericPool = require('generic-pool');
const tasks = require('./tasks');
const uuid = require('uuid').v4;
const _ = require('lodash');
const backoff = require('../backoff');
const setTimeoutUnref = require('../utils/setTimeoutUnref');

@@ -29,31 +29,6 @@ module.exports = {

const init = async.compose(
tasks.closeChannel,
tasks.applyBindings,
tasks.purgeQueues,
tasks.checkQueues,
tasks.assertQueues,
tasks.checkExchanges,
tasks.assertExchanges,
tasks.createChannel,
tasks.createConnection,
tasks.checkVhost,
tasks.assertVhost
);
const init = async.compose(tasks.closeChannel, tasks.applyBindings, tasks.purgeQueues, tasks.checkQueues, tasks.assertQueues, tasks.checkExchanges, tasks.assertExchanges, tasks.createChannel, tasks.createConnection, tasks.checkVhost, tasks.assertVhost);
const connect = async.compose(tasks.createConnection);
const purge = async.compose(
tasks.closeConnection,
tasks.closeChannel,
tasks.purgeQueues,
tasks.createChannel,
tasks.createConnection
);
const nuke = async.compose(
tasks.closeConnection,
tasks.closeChannel,
tasks.deleteQueues,
tasks.deleteExchanges,
tasks.createChannel,
tasks.createConnection
);
const purge = async.compose(tasks.closeConnection, tasks.closeChannel, tasks.purgeQueues, tasks.createChannel, tasks.createConnection);
const nuke = async.compose(tasks.closeConnection, tasks.closeChannel, tasks.deleteQueues, tasks.deleteExchanges, tasks.createChannel, tasks.createConnection);
let timer = backoff({});

@@ -71,37 +46,29 @@ let paused = true;

if (shuttingDown) {
debug("Aborting initialisation. Vhost %s is shutting down.", self.name);
debug('Aborting initialisation. Vhost %s is shutting down.', self.name);
return next();
}
debug("Initialising vhost: %s", self.name);
debug('Initialising vhost: %s', self.name);
pauseChannelAllocation();
init(
config,
{ connectionIndex: self.connectionIndex },
(err, config, ctx) => {
if (err) return next(err);
init(config, { connectionIndex: self.connectionIndex }, (err, config, ctx) => {
if (err) return next(err);
connection = ctx.connection;
self.connectionIndex = ctx.connectionIndex;
connectionConfig = ctx.connectionConfig;
timer = backoff(ctx.connectionConfig.retry);
connection = ctx.connection;
self.connectionIndex = ctx.connectionIndex;
connectionConfig = ctx.connectionConfig;
timer = backoff(ctx.connectionConfig.retry);
attachErrorHandlers(config);
forwardRabbitMQConnectionEvents();
ensureChannelPools();
resumeChannelAllocation();
attachErrorHandlers(config);
forwardRabbitMQConnectionEvents();
ensureChannelPools();
resumeChannelAllocation();
debug(
"vhost: %s was initialised with connection: %s",
self.name,
connection._rascal_id
);
debug('vhost: %s was initialised with connection: %s', self.name, connection._rascal_id);
self.emit("connect");
self.emit("vhost_initialised", self.getConnectionDetails());
self.emit('connect');
self.emit('vhost_initialised', self.getConnectionDetails());
return next(null, self);
}
);
return next(null, self);
});
return self;

@@ -111,3 +78,3 @@ };

this.forewarn = function (next) {
debug("Forewarning vhost: %s about impending shutdown", self.name);
debug('Forewarning vhost: %s about impending shutdown', self.name);
pauseChannelAllocation();

@@ -120,3 +87,3 @@ shuttingDown = true;

this.shutdown = function (next) {
debug("Shutting down vhost: %s", self.name);
debug('Shutting down vhost: %s', self.name);
clearTimeout(reconnectTimeout);

@@ -131,3 +98,3 @@ pauseChannelAllocation();

this.nuke = function (next) {
debug("Nuking vhost: %s", self.name);
debug('Nuking vhost: %s', self.name);
pauseChannelAllocation();

@@ -138,3 +105,3 @@ drainChannelPools((err) => {

if (err) return next(err);
debug("Finished nuking vhost: %s", self.name);
debug('Finished nuking vhost: %s', self.name);
setImmediate(next);

@@ -146,39 +113,31 @@ });

this.purge = function (next) {
debug("Purging vhost: %s", self.name);
purge(
config,
{ purge: true, connectionIndex: self.connectionIndex },
(err) => {
if (err) return next(err);
debug("Finished purging vhost: %s", self.name);
setImmediate(next);
}
);
debug('Purging vhost: %s', self.name);
purge(config, { purge: true, connectionIndex: self.connectionIndex }, (err) => {
if (err) return next(err);
debug('Finished purging vhost: %s', self.name);
setImmediate(next);
});
};
this.bounce = function (next) {
async.series([self.disconnect, self.init], next);
async.series([self.disconnect, self.init], (err) => {
if (err) return next(err);
debug('Finished bouncing vhost: %s', self.name);
setImmediate(next);
});
};
this.connect = function (next) {
debug("Connecting to vhost: %s", self.name);
connect(
config,
{ connectionIndex: self.connectionIndex },
(err, config, ctx) => {
return next(err, ctx.connection);
}
);
debug('Connecting to vhost: %s', self.name);
connect(config, { connectionIndex: self.connectionIndex }, (err, config, ctx) => {
return next(err, ctx.connection);
});
};
this.disconnect = function (next) {
debug("Disconnecting from vhost: %s", self.name);
debug('Disconnecting from vhost: %s', self.name);
if (!connection) return next();
connection.removeAllListeners();
connection.on("error", (err) => {
debug(
"Error disconnecting from %s. Original error was: %s",
connectionConfig.loggableUrl,
err.message
);
connection.on('error', (err) => {
debug('Error disconnecting from %s. Original error was: %s', connectionConfig.loggableUrl, err.message);
});

@@ -193,6 +152,3 @@ connection.close((err) => {

channelCreator.push({ confirm: false }, next);
debug(
"Requested channel. Outstanding channel requests: %d",
channelCreator.length()
);
debug('Requested channel. Outstanding channel requests: %d', channelCreator.length());
};

@@ -202,18 +158,7 @@

channelCreator.push({ confirm: true }, next);
debug(
"Requested confirm channel. Outstanding channel requests: %d",
channelCreator.length()
);
debug('Requested confirm channel. Outstanding channel requests: %d', channelCreator.length());
};
this.borrowChannel = function (next) {
if (!regularChannelPool)
return next(
new Error(
format(
"Vhost: %s must be initialised before you can borrow a channel",
self.name
)
)
);
if (!regularChannelPool) return next(new Error(format('Vhost: %s must be initialised before you can borrow a channel', self.name)));
regularChannelPool.borrow(next);

@@ -233,11 +178,3 @@ };

this.borrowConfirmChannel = function (next) {
if (!confirmChannelPool)
return next(
new Error(
format(
"Vhost: %s must be initialised before you can borrow a confirm channel",
self.name
)
)
);
if (!confirmChannelPool) return next(new Error(format('Vhost: %s must be initialised before you can borrow a confirm channel', self.name)));
confirmChannelPool.borrow(next);

@@ -273,32 +210,17 @@ };

return new Promise((resolve, reject) => {
debug("Creating pooled %s channel for vhost: %s", mode, config.name);
debug('Creating pooled %s channel for vhost: %s', mode, config.name);
createChannelWhenInitialised(options.confirm, (err, channel) => {
if (err) return deferRejection(reject, err);
if (!channel)
return deferRejection(
reject,
new Error("Vhost is shutting down")
);
if (!channel) return deferRejection(reject, new Error('Vhost is shutting down'));
const destroyChannel = _.once(() => {
debug(
"Destroying %s channel: %s for vhost: %s due to error or close event",
mode,
channel._rascal_id,
config.name
);
debug('Destroying %s channel: %s for vhost: %s due to error or close event', mode, channel._rascal_id, config.name);
channel._rascal_closed = true;
if (pool.isBorrowedResource(channel)) {
pool.destroy(channel).catch((err) => {
debug(
"Error destroying %s channel: %s for vhost: %s. %s",
mode,
channel._rascal_id,
config.name,
err.message
);
debug('Error destroying %s channel: %s for vhost: %s. %s', mode, channel._rascal_id, config.name, err.message);
});
}
});
channel.on("error", destroyChannel);
channel.on("close", destroyChannel);
channel.on('error', destroyChannel);
channel.on('close', destroyChannel);
resolve(channel);

@@ -310,11 +232,6 @@ });

return new Promise((resolve, reject) => {
debug(
"Destroying %s channel: %s for vhost: %s",
mode,
channel._rascal_id,
config.name
);
debug('Destroying %s channel: %s for vhost: %s', mode, channel._rascal_id, config.name);
if (channel._rascal_closed) return resolve();
channel.removeAllListeners();
channel.on("error", reject);
channel.on('error', reject);
const closeChannelCb = (err) => {

@@ -330,13 +247,3 @@ if (err) return reject(err);

setTimeoutUnref(() => {
once(
new Error(
format(
"Timeout after %dms closing %s channel: %s for vhost: %s",
options.pool.destroyTimeoutMillis,
mode,
channel._rascal_id,
config.name
)
)
);
once(new Error(format('Timeout after %dms closing %s channel: %s for vhost: %s', options.pool.destroyTimeoutMillis, mode, channel._rascal_id, config.name)));
}, 1000);

@@ -348,7 +255,3 @@ channel.close(once);

return new Promise((resolve) => {
resolve(
!channel._rascal_closed &&
connection &&
connection.connection === channel.connection
);
resolve(!channel._rascal_closed && connection && connection.connection === channel.connection);
});

@@ -381,10 +284,10 @@ },

function borrow(next) {
debug("Requested %s channel. %o", mode, stats());
debug('Requested %s channel. %o', mode, stats());
if (poolQueue.length() >= options.pool.max) {
busy = true;
self.emit("busy", stats());
self.emit('busy', stats());
}
poolQueue.push(null, (err, channel) => {
if (err) return next(err);
debug("Borrowed %s channel: %s. %o", mode, channel._rascal_id, stats());
debug('Borrowed %s channel: %s. %o', mode, channel._rascal_id, stats());
next(null, channel);

@@ -395,12 +298,7 @@ });

function release(channel) {
debug("Releasing %s channel: %s. %o", mode, channel._rascal_id, stats());
debug('Releasing %s channel: %s. %o', mode, channel._rascal_id, stats());
pool
.release(channel)
.catch((err) => {
debug(
"Error releasing %s channel: %s. %s",
mode,
channel._rascal_id,
err.message
);
debug('Error releasing %s channel: %s. %s', mode, channel._rascal_id, err.message);
})

@@ -410,3 +308,3 @@ .then(() => {

busy = false;
self.emit("ready", stats());
self.emit('ready', stats());
});

@@ -416,12 +314,7 @@ }

function destroy(channel) {
debug("Destroying %s channel: %s. %o", mode, channel._rascal_id, stats());
debug('Destroying %s channel: %s. %o', mode, channel._rascal_id, stats());
pool
.destroy(channel)
.catch((err) => {
debug(
"Error destroying %s channel: %s. %s",
mode,
channel._rascal_id,
err.message
);
debug('Error destroying %s channel: %s. %s', mode, channel._rascal_id, err.message);
})

@@ -431,3 +324,3 @@ .then(() => {

busy = false;
self.emit("ready", stats());
self.emit('ready', stats());
});

@@ -437,3 +330,3 @@ }

function drain(next) {
debug("Draining %s channel pool. %o", mode, stats());
debug('Draining %s channel pool. %o', mode, stats());
pool

@@ -443,3 +336,3 @@ .drain()

return pool.clear().then(() => {
debug("Drained %s channel pool. %o", mode, stats());
debug('Drained %s channel pool. %o', mode, stats());
setImmediate(next);

@@ -449,3 +342,3 @@ });

.catch((err) => {
debug("Error draining %s channel pool. %s", mode, err.message);
debug('Error draining %s channel pool. %s', mode, err.message);
setImmediate(next);

@@ -455,9 +348,9 @@ });

debug("Creating %s channel pool %o", mode, options.pool);
debug('Creating %s channel pool %o', mode, options.pool);
pool = genericPool.createPool(factory, options.pool);
pool.on("factoryCreateError", (err) => {
debug("Create error emitted by %s channel pool: %s", mode, err.message);
pool.on('factoryCreateError', (err) => {
debug('Create error emitted by %s channel pool: %s', mode, err.message);
});
pool.on("factoryDestroyError", (err) => {
debug("Destroy error emitted by %s channel pool: %s", mode, err.message);
pool.on('factoryDestroyError', (err) => {
debug('Destroy error emitted by %s channel pool: %s', mode, err.message);
});

@@ -489,15 +382,12 @@

if (connection) return createChannel(confirm, next);
debug(
"Vhost: %s is not initialised. Deferring channel creation",
self.name
);
debug('Vhost: %s is not initialised. Deferring channel creation', self.name);
setTimeoutUnref(() => {
self.removeListener("vhost_initialised", onVhostInitialised);
next(new Error("Timedout acquiring channel"), 5000);
self.removeListener('vhost_initialised', onVhostInitialised);
next(new Error('Timedout acquiring channel'), 5000);
});
function onVhostInitialised() {
debug("Vhost: %s was initialised. Resuming channel creation", self.name);
debug('Vhost: %s was initialised. Resuming channel creation', self.name);
createChannel(confirm, next);
}
self.once("vhost_initialised", onVhostInitialised);
self.once('vhost_initialised', onVhostInitialised);
}

@@ -507,17 +397,6 @@

if (shuttingDown) {
debug(
"Ignoring create channel request. Vhost: %s is shutting down.",
self.name
);
debug('Ignoring create channel request. Vhost: %s is shutting down.', self.name);
return next();
}
if (!connection)
return next(
new Error(
format(
"Vhost: %s must be initialised before you can create a channel",
self.name
)
)
);
if (!connection) return next(new Error(format('Vhost: %s must be initialised before you can create a channel', self.name)));

@@ -529,10 +408,8 @@ // Same problem as https://github.com/guidesmiths/rascal/issues/17

connection.once("close", closeHandler);
connection.once("error", errorHandler);
confirm
? connection.createConfirmChannel(callback)
: connection.createChannel(callback);
connection.once('close', closeHandler);
connection.once('error', errorHandler);
confirm ? connection.createConfirmChannel(callback) : connection.createChannel(callback);
function closeHandler() {
once(new Error("Connection closed"));
once(new Error('Connection closed'));
}

@@ -546,11 +423,6 @@

invocations++;
connection && connection.removeListener("close", closeHandler);
connection && connection.removeListener("error", errorHandler);
connection && connection.removeListener('close', closeHandler);
connection && connection.removeListener('error', errorHandler);
if (err) {
debug(
"Error creating channel: %s from %s: %s",
channelId,
connectionConfig.loggableUrl,
err.message
);
debug('Error creating channel: %s from %s: %s', channelId, connectionConfig.loggableUrl, err.message);
return once(err);

@@ -561,15 +433,7 @@ }

channel.connection._rascal_id = connection._rascal_id;
debug(
"Created %s channel: %s from connection: %s",
getChannelMode(confirm),
channel._rascal_id,
connection._rascal_id
);
debug('Created %s channel: %s from connection: %s', getChannelMode(confirm), channel._rascal_id, connection._rascal_id);
// See https://github.com/squaremo/amqp.node/issues/388
if (invocations > 1) {
debug(
"Closing superfluous channel: %s previously reported as errored",
channel._rascal_id
);
debug('Closing superfluous channel: %s previously reported as errored', channel._rascal_id);
return channel.close();

@@ -583,3 +447,3 @@ }

function getChannelMode(confirm) {
return confirm ? "confirm" : "regular";
return confirm ? 'confirm' : 'regular';
}

@@ -592,3 +456,3 @@

paused = true;
self.emit("paused", { vhost: self.name });
self.emit('paused', { vhost: self.name });
}

@@ -601,11 +465,11 @@

paused = false;
self.emit("resumed", { vhost: self.name });
self.emit('resumed', { vhost: self.name });
}
function forwardRabbitMQConnectionEvents() {
connection.on("blocked", (reason) => {
self.emit("blocked", reason, self.getConnectionDetails());
connection.on('blocked', (reason) => {
self.emit('blocked', reason, self.getConnectionDetails());
});
connection.on("unblocked", () => {
self.emit("unblocked", self.getConnectionDetails());
connection.on('unblocked', () => {
self.emit('unblocked', self.getConnectionDetails());
});

@@ -644,21 +508,14 @@ }

function attachErrorHandlers(config) {
connection.removeAllListeners("error");
const errorHandler = _.once(
handleConnectionError.bind(null, connection, config)
);
connection.on("error", errorHandler);
connection.on("close", errorHandler);
connection.removeAllListeners('error');
const errorHandler = _.once(handleConnectionError.bind(null, connection, config));
connection.on('error', errorHandler);
connection.on('close', errorHandler);
}
function handleConnectionError(borked, config, err) {
debug(
"Handling connection error: %s initially from connection: %s, %s",
err.message,
borked._rascal_id,
connectionConfig.loggableUrl
);
debug('Handling connection error: %s initially from connection: %s, %s', err.message, borked._rascal_id, connectionConfig.loggableUrl);
pauseChannelAllocation();
connection = undefined;
self.emit("disconnect");
self.emit("error", err, self.getConnectionDetails());
self.emit('disconnect');
self.emit('error', err, self.getConnectionDetails());
connectionConfig.retry &&

@@ -668,9 +525,6 @@ self.init((err) => {

const delay = timer.next();
debug("Will attempt reconnection in in %dms", delay);
reconnectTimeout = setTimeoutUnref(
handleConnectionError.bind(null, borked, config, err),
delay
);
debug('Will attempt reconnection in in %dms', delay);
reconnectTimeout = setTimeoutUnref(handleConnectionError.bind(null, borked, config, err), delay);
});
}
}

@@ -1,8 +0,8 @@

const get = require("lodash").get;
const get = require('lodash').get;
module.exports = function (options) {
const min = get(options, "min", 1000);
const max = get(options, "max", Math.pow(min, 10));
const factor = get(options, "factor", 2);
const randomise = get(options, "randomise", true);
const min = get(options, 'min', 1000);
const max = get(options, 'max', Math.pow(min, 10));
const factor = get(options, 'factor', 2);
const randomise = get(options, 'randomise', true);
let lower = min;

@@ -13,5 +13,3 @@

const upper = lower * factor;
const value = randomise
? Math.floor(Math.random() * (upper - lower + 1) + lower)
: lower;
const value = randomise ? Math.floor(Math.random() * (upper - lower + 1) + lower) : lower;
const capped = Math.min(max, value);

@@ -18,0 +16,0 @@ lower = upper;

const strategies = {
exponential: require("./exponential"),
linear: require("./linear"),
exponential: require('./exponential'),
linear: require('./linear'),
};

@@ -5,0 +5,0 @@

@@ -1,6 +0,6 @@

const get = require("lodash").get;
const get = require('lodash').get;
module.exports = function (options) {
const min = get(options, "min", 1000);
const max = get(options, "max", min);
const min = get(options, 'min', 1000);
const max = get(options, 'max', min);

@@ -7,0 +7,0 @@ function next() {

@@ -28,10 +28,10 @@ module.exports = {

},
connectionStrategy: "random",
connectionStrategy: 'random',
connection: {
slashes: true,
protocol: "amqp",
hostname: "localhost",
user: "guest",
password: "guest",
port: "5672",
protocol: 'amqp',
hostname: 'localhost',
user: 'guest',
password: 'guest',
port: '5672',
options: {},

@@ -42,7 +42,7 @@ retry: {

factor: 2,
strategy: "exponential",
strategy: 'exponential',
},
management: {
slashes: true,
protocol: "http",
protocol: 'http',
port: 15672,

@@ -54,3 +54,3 @@ options: {},

assert: true,
type: "topic",
type: 'topic',
options: {},

@@ -65,4 +65,4 @@ },

bindings: {
destinationType: "queue",
bindingKey: "#",
destinationType: 'queue',
bindingKey: '#',
options: {},

@@ -72,3 +72,3 @@ },

publications: {
vhost: "/",
vhost: '/',
confirm: true,

@@ -82,3 +82,3 @@ timeout: 10000,

subscriptions: {
vhost: "/",
vhost: '/',
prefetch: 10,

@@ -89,3 +89,3 @@ retry: {

factor: 2,
strategy: "exponential",
strategy: 'exponential',
},

@@ -95,5 +95,4 @@ redeliveries: {

timeout: 1000,
counter: "stub",
counter: 'stub',
},
deferCloseChannel: 10000,
options: {},

@@ -100,0 +99,0 @@ },

@@ -1,10 +0,10 @@

const debug = require("debug")("rascal:config:configure");
const format = require("util").format;
const url = require("url");
const _ = require("lodash");
const uuid = require("uuid").v4;
const baseline = require("./baseline");
const fqn = require("./fqn");
const XRegExp = require("xregexp");
const freeze = require("deep-freeze");
const debug = require('debug')('rascal:config:configure');
const format = require('util').format;
const url = require('url');
const _ = require('lodash');
const uuid = require('uuid').v4;
const baseline = require('./baseline');
const fqn = require('./fqn');
const XRegExp = require('xregexp');
const freeze = require('deep-freeze');

@@ -41,3 +41,3 @@ module.exports = _.curry((rascalConfig, next) => {

function configureVhost(name, vhostConfig) {
debug("Configuring vhost: %s", name);
debug('Configuring vhost: %s', name);
rascalConfig.vhosts[name] = _.defaultsDeep(

@@ -49,9 +49,7 @@ vhostConfig,

connectionStrategy: rascalConfig.defaults.vhosts.connectionStrategy,
publicationChannelPools:
rascalConfig.defaults.vhosts.publicationChannelPools,
publicationChannelPools: rascalConfig.defaults.vhosts.publicationChannelPools,
},
{ defaults: rascalConfig.defaults.vhosts }
);
rascalConfig.vhosts[name].namespace =
vhostConfig.namespace === true ? uuid() : vhostConfig.namespace;
rascalConfig.vhosts[name].namespace = vhostConfig.namespace === true ? uuid() : vhostConfig.namespace;
configureConnections(vhostConfig, name);

@@ -73,3 +71,3 @@ }

});
vhostConfig.connections = _.sortBy(vhostConfig.connections, "index");
vhostConfig.connections = _.sortBy(vhostConfig.connections, 'index');
delete vhostConfig.connection;

@@ -80,15 +78,9 @@ }

_.defaultsDeep(connection, vhostConfig.defaults.connection);
connection.vhost =
connection.vhost !== undefined ? connection.vhost : vhostName;
connection.auth =
connection.auth || getAuth(connection.user, connection.password);
connection.pathname = connection.vhost === "/" ? "" : connection.vhost;
connection.vhost = connection.vhost !== undefined ? connection.vhost : vhostName;
connection.auth = connection.auth || getAuth(connection.user, connection.password);
connection.pathname = connection.vhost === '/' ? '' : connection.vhost;
connection.query = connection.options;
connection.url = connection.url || url.format(connection);
connection.loggableUrl = connection.url.replace(/:[^:]*?@/, ":***@");
connection.index = getConnectionIndex(
vhostConfig.connectionStrategy,
connection,
index
);
connection.loggableUrl = connection.url.replace(/:[^:]*?@/, ':***@');
connection.index = getConnectionIndex(vhostConfig.connectionStrategy, connection, index);
configureManagementConnection(vhostConfig, vhostName, connection);

@@ -99,20 +91,13 @@ }

_.defaultsDeep(connection.management, { hostname: connection.hostname });
connection.management.auth =
connection.management.auth ||
getAuth(connection.management.user, connection.management.password) ||
connection.auth;
connection.management.url =
connection.management.url || url.format(connection.management);
connection.management.loggableUrl = connection.management.url.replace(
/:[^:]*?@/,
":***@"
);
connection.management.auth = connection.management.auth || getAuth(connection.management.user, connection.management.password) || connection.auth;
connection.management.url = connection.management.url || url.format(connection.management);
connection.management.loggableUrl = connection.management.url.replace(/:[^:]*?@/, ':***@');
}
function getAuth(user, password) {
return user && password ? user + ":" + password : undefined;
return user && password ? user + ':' + password : undefined;
}
function getConnectionIndex(strategy, connection, index) {
if (strategy === "fixed") return index;
if (strategy === 'fixed') return index;
const host = url.parse(connection.url).host;

@@ -137,6 +122,3 @@ if (_.has(connectionIndexes, host)) return connectionIndexes[host];

_.each(vhost.exchanges, (exchange) => {
const name =
vhost.name === "/"
? format("/%s", exchange.name)
: format("%s/%s", vhost.name, exchange.name);
const name = vhost.name === '/' ? format('/%s', exchange.name) : format('%s/%s', vhost.name, exchange.name);
publications[name] = {

@@ -149,6 +131,3 @@ exchange: exchange.name,

_.each(vhost.queues, (queue) => {
const name =
vhost.name === "/"
? format("/%s", queue.name)
: format("%s/%s", vhost.name, queue.name);
const name = vhost.name === '/' ? format('/%s', queue.name) : format('%s/%s', vhost.name, queue.name);
publications[name] = {

@@ -165,42 +144,15 @@ queue: queue.name,

_.each(
_.defaults({}, publications, defaultPublications),
configurePublication
);
_.each(_.defaults({}, publications, defaultPublications), configurePublication);
}
function configurePublication(publicationConfig, name) {
debug("Configuring publication: %s", name);
if (
rascalConfig.publications[name] &&
rascalConfig.publications[name].vhost !== publicationConfig.vhost
)
throw new Error(format("Duplicate publication: %s", name));
rascalConfig.publications[name] = _.defaultsDeep(
publicationConfig,
{ name },
rascalConfig.defaults.publications
);
debug('Configuring publication: %s', name);
if (rascalConfig.publications[name] && rascalConfig.publications[name].vhost !== publicationConfig.vhost) throw new Error(format('Duplicate publication: %s', name));
rascalConfig.publications[name] = _.defaultsDeep(publicationConfig, { name }, rascalConfig.defaults.publications);
if (!rascalConfig.vhosts[publicationConfig.vhost]) return;
const destination = Object.prototype.hasOwnProperty.call(
publicationConfig,
"exchange"
)
? rascalConfig.vhosts[publicationConfig.vhost].exchanges[
publicationConfig.exchange
]
: rascalConfig.vhosts[publicationConfig.vhost].queues[
publicationConfig.queue
];
rascalConfig.publications[name].destination =
destination.fullyQualifiedName;
const destination = Object.prototype.hasOwnProperty.call(publicationConfig, 'exchange') ? rascalConfig.vhosts[publicationConfig.vhost].exchanges[publicationConfig.exchange] : rascalConfig.vhosts[publicationConfig.vhost].queues[publicationConfig.queue];
rascalConfig.publications[name].destination = destination.fullyQualifiedName;
if (
publicationConfig.encryption &&
_.isString(publicationConfig.encryption)
) {
rascalConfig.publications[name].encryption = _.defaultsDeep(
{ name: publicationConfig.encryption },
rascalConfig.encryption[publicationConfig.encryption]
);
if (publicationConfig.encryption && _.isString(publicationConfig.encryption)) {
rascalConfig.publications[name].encryption = _.defaultsDeep({ name: publicationConfig.encryption }, rascalConfig.encryption[publicationConfig.encryption]);
}

@@ -222,6 +174,3 @@ }

_.each(vhost.queues, (queue) => {
const name =
vhost.name === "/"
? format("/%s", queue.name)
: format("%s/%s", vhost.name, queue.name);
const name = vhost.name === '/' ? format('/%s', queue.name) : format('%s/%s', vhost.name, queue.name);
subscriptions[name] = {

@@ -237,28 +186,12 @@ queue: queue.name,

);
_.each(
_.defaults({}, subscriptions, defaultSubscriptions),
configureSubscription
);
_.each(_.defaults({}, subscriptions, defaultSubscriptions), configureSubscription);
}
function configureSubscription(subscriptionConfig, name) {
debug("Configuring subscription: %s", name);
if (
rascalConfig.subscriptions[name] &&
rascalConfig.subscriptions[name].vhost !== subscriptionConfig.vhost
)
throw new Error(format("Duplicate subscription: %s", name));
rascalConfig.subscriptions[name] = _.defaultsDeep(
subscriptionConfig,
{ name },
rascalConfig.defaults.subscriptions
);
debug('Configuring subscription: %s', name);
if (rascalConfig.subscriptions[name] && rascalConfig.subscriptions[name].vhost !== subscriptionConfig.vhost) throw new Error(format('Duplicate subscription: %s', name));
rascalConfig.subscriptions[name] = _.defaultsDeep(subscriptionConfig, { name }, rascalConfig.defaults.subscriptions);
if (!rascalConfig.vhosts[subscriptionConfig.vhost]) return;
subscriptionConfig.source =
rascalConfig.vhosts[subscriptionConfig.vhost].queues[
subscriptionConfig.queue
].fullyQualifiedName;
subscriptionConfig.encryption =
subscriptionConfig.encryption ||
_.defaultsDeep({}, rascalConfig.encryption);
subscriptionConfig.source = rascalConfig.vhosts[subscriptionConfig.vhost].queues[subscriptionConfig.queue].fullyQualifiedName;
subscriptionConfig.encryption = subscriptionConfig.encryption || _.defaultsDeep({}, rascalConfig.encryption);
}

@@ -272,16 +205,9 @@

function configureShovel(shovelConfig, name) {
debug("Configuring shovel: %s", name);
debug('Configuring shovel: %s', name);
const parsedConfig = parseShovelName(name);
rascalConfig.shovels[name] = _.defaultsDeep(
shovelConfig,
{ name },
parsedConfig,
rascalConfig.defaults.shovels
);
rascalConfig.shovels[name] = _.defaultsDeep(shovelConfig, { name }, parsedConfig, rascalConfig.defaults.shovels);
}
function parseShovelName(name) {
const pattern = XRegExp(
"(?<subscription>[\\w:]+)\\s*->\\s*(?<publication>[\\w:]+)"
);
const pattern = XRegExp('(?<subscription>[\\w:]+)\\s*->\\s*(?<publication>[\\w:]+)');
const match = XRegExp.exec(name, pattern);

@@ -303,27 +229,15 @@ return match

function configureCounter(counterConfig, name) {
debug("Configuring counter: %s", name);
debug('Configuring counter: %s', name);
const counterType = counterConfig.type || name;
const counterDefaultConfigPath =
"defaults.redeliveries.counters." + counterType;
const counterDefaultConfigPath = 'defaults.redeliveries.counters.' + counterType;
const counterDefaults = _.get(rascalConfig, counterDefaultConfigPath);
rascalConfig.redeliveries.counters[name] = _.defaultsDeep(
counterConfig,
{ name, type: name },
counterDefaults
);
rascalConfig.redeliveries.counters[name] = _.defaultsDeep(counterConfig, { name, type: name }, counterDefaults);
}
function configureExchanges(config) {
const defaultExchange = { "": {} };
config.exchanges = _.defaultsDeep(
ensureKeyedCollection(config.exchanges),
defaultExchange
);
const defaultExchange = { '': {} };
config.exchanges = _.defaultsDeep(ensureKeyedCollection(config.exchanges), defaultExchange);
_.each(config.exchanges, (exchangeConfig, name) => {
debug("Configuring exchange: %s", name);
config.exchanges[name] = _.defaultsDeep(
exchangeConfig,
{ name, fullyQualifiedName: fqn.qualify(name, config.namespace) },
config.defaults.exchanges
);
debug('Configuring exchange: %s', name);
config.exchanges[name] = _.defaultsDeep(exchangeConfig, { name, fullyQualifiedName: fqn.qualify(name, config.namespace) }, config.defaults.exchanges);
});

@@ -335,9 +249,5 @@ }

_.each(config.queues, (queueConfig, name) => {
debug("Configuring queue: %s", name);
queueConfig.replyTo =
queueConfig.replyTo === true ? uuid() : queueConfig.replyTo;
qualifyArguments(
config.namespace,
queueConfig.options && queueConfig.options.arguments
);
debug('Configuring queue: %s', name);
queueConfig.replyTo = queueConfig.replyTo === true ? uuid() : queueConfig.replyTo;
qualifyArguments(config.namespace, queueConfig.options && queueConfig.options.arguments);
config.queues[name] = _.defaultsDeep(

@@ -347,7 +257,3 @@ queueConfig,

name,
fullyQualifiedName: fqn.qualify(
name,
config.namespace,
queueConfig.replyTo
),
fullyQualifiedName: fqn.qualify(name, config.namespace, queueConfig.replyTo),
},

@@ -363,22 +269,12 @@ config.defaults.queues

_.each(config.bindings, (bindingConfig, name) => {
debug("Configuring binding: %s", name);
debug('Configuring binding: %s', name);
config.bindings[name] = _.defaultsDeep(
bindingConfig,
config.defaults.bindings
);
config.bindings[name] = _.defaultsDeep(bindingConfig, config.defaults.bindings);
if (bindingConfig.qualifyBindingKeys) {
config.bindings[name].bindingKey = fqn.qualify(
bindingConfig.bindingKey,
config.namespace
);
config.bindings[name].bindingKey = fqn.qualify(bindingConfig.bindingKey, config.namespace);
}
if (bindingConfig.destinationType === "queue") {
if (bindingConfig.destinationType === 'queue') {
const queue = config.queues[bindingConfig.destination];
config.bindings[name].bindingKey = fqn.prefix(
queue && queue.replyTo,
bindingConfig.bindingKey,
"."
);
config.bindings[name].bindingKey = fqn.prefix(queue && queue.replyTo, bindingConfig.bindingKey, '.');
}

@@ -389,5 +285,3 @@ });

function parseBindingName(name) {
const pattern = XRegExp(
"(?<source>[\\w:\\.\\-]+)\\s*(?:\\[\\s*(?<keys>.*)\\s*\\])?\\s*->\\s*(?<destination>[\\w:\\.\\-]+)"
);
const pattern = XRegExp('(?<source>[\\w:\\.\\-]+)\\s*(?:\\[\\s*(?<keys>.*)\\s*\\])?\\s*->\\s*(?<destination>[\\w:\\.\\-]+)');
const match = XRegExp.exec(name, pattern);

@@ -412,23 +306,9 @@ return match

const parsedConfig = parseBindingName(name);
const bindingKeys = _.chain([])
.concat(
bindingConfig.bindingKeys,
bindingConfig.bindingKey,
parsedConfig.bindingKeys
)
.compact()
.uniq()
.value();
const bindingKeys = _.chain([]).concat(bindingConfig.bindingKeys, bindingConfig.bindingKey, parsedConfig.bindingKeys).compact().uniq().value();
if (bindingKeys.length <= 1) {
result[name] = _({ bindingKey: bindingKeys[0] })
.defaults(bindingConfig, parsedConfig)
.omit("bindingKeys")
.value();
result[name] = _({ bindingKey: bindingKeys[0] }).defaults(bindingConfig, parsedConfig).omit('bindingKeys').value();
return result[name];
}
_.each(bindingKeys, (bindingKey) => {
result[format("%s:%s", name, bindingKey)] = _({ bindingKey })
.defaults(bindingConfig, parsedConfig)
.omit("bindingKeys")
.value();
result[format('%s:%s', name, bindingKey)] = _({ bindingKey }).defaults(bindingConfig, parsedConfig).omit('bindingKeys').value();
});

@@ -441,7 +321,4 @@ });

if (!args) return;
_.each(["x-dead-letter-exchange"], (name) => {
args[name] =
args[name] !== undefined
? fqn.qualify(args[name], namespace)
: args[name];
_.each(['x-dead-letter-exchange'], (name) => {
args[name] = args[name] !== undefined ? fqn.qualify(args[name], namespace) : args[name];
});

@@ -454,9 +331,7 @@ }

.map((item) => {
return _.isString(item)
? { name: item }
: _.defaults(item, { name: "unnamed-" + uuid() });
return _.isString(item) ? { name: item } : _.defaults(item, { name: 'unnamed-' + uuid() });
})
.keyBy("name")
.keyBy('name')
.value();
}
});

@@ -8,3 +8,3 @@ module.exports = {

function qualify(name, namespace, unique) {
if (name === "") return name;
if (name === '') return name;
name = prefix(namespace, name);

@@ -16,7 +16,7 @@ name = suffix(unique || undefined, name);

function prefix(prefix, name, separator) {
return prefix ? prefix + (separator || ":") + name : name;
return prefix ? prefix + (separator || ':') + name : name;
}
function suffix(suffix, name, separator) {
return suffix ? name + (separator || ":") + suffix : name;
return suffix ? name + (separator || ':') + suffix : name;
}

@@ -1,3 +0,3 @@

const _ = require("lodash").runInContext();
const defaultConfig = require("./defaults");
const _ = require('lodash').runInContext();
const defaultConfig = require('./defaults');

@@ -32,3 +32,3 @@ module.exports = _.defaultsDeep(

subscriptions: {
deferCloseChannel: 100,
closeTimeout: 500,
},

@@ -35,0 +35,0 @@ },

@@ -1,4 +0,4 @@

const debug = require("debug")("rascal:config:validate");
const format = require("util").format;
const _ = require("lodash");
const debug = require('debug')('rascal:config:validate');
const format = require('util').format;
const _ = require('lodash');

@@ -22,54 +22,6 @@ module.exports = _.curry((config, next) => {

function validateVhost(vhost, vhostName) {
validateAttributes("Vhost", vhost, vhostName, [
"defaults",
"namespace",
"name",
"publicationChannelPools",
"connection",
"connections",
"connectionStrategy",
"exchanges",
"queues",
"bindings",
"check",
"assert",
]);
validateAttributes('Vhost', vhost, vhostName, ['defaults', 'namespace', 'name', 'publicationChannelPools', 'connection', 'connections', 'connectionStrategy', 'exchanges', 'queues', 'bindings', 'check', 'assert']);
validateConnectionStrategy(vhost.connectionStrategy, vhostName);
validateConnectionAttributes(vhost.connection, vhostName, [
"slashes",
"protocol",
"hostname",
"user",
"password",
"port",
"vhost",
"options",
"retry",
"auth",
"pathname",
"query",
"url",
"loggableUrl",
"management",
]);
validateManagementConnectionAttributes(
_.get(vhost),
"connection.management",
vhostName,
[
"slashes",
"protocol",
"hostname",
"user",
"password",
"port",
"vhost",
"options",
"auth",
"pathname",
"query",
"url",
"loggableUrl",
]
);
validateConnectionAttributes(vhost.connection, vhostName, ['slashes', 'protocol', 'hostname', 'user', 'password', 'port', 'vhost', 'options', 'retry', 'auth', 'pathname', 'query', 'url', 'loggableUrl', 'management']);
validateManagementConnectionAttributes(_.get(vhost), 'connection.management', vhostName, ['slashes', 'protocol', 'hostname', 'user', 'password', 'port', 'vhost', 'options', 'auth', 'pathname', 'query', 'url', 'loggableUrl']);
validatePublicationChannelPools(vhost.publicationChannelPools, vhostName);

@@ -83,22 +35,7 @@ validateExchanges(vhost, vhostName, vhost.exchanges);

const invalid = _.chain(object).omit(valid).keys().value();
if (invalid.length)
throw new Error(
format(
"%s: %s refers to an unsupported attribute: %s",
type,
objectName,
invalid[0]
)
);
if (invalid.length) throw new Error(format('%s: %s refers to an unsupported attribute: %s', type, objectName, invalid[0]));
}
function validateConnectionStrategy(strategy, vhostName) {
if (![undefined, "random", "fixed"].includes(strategy))
throw new Error(
format(
"Vhost: %s refers to an unknown connection strategy: %s",
vhostName,
strategy
)
);
if (![undefined, 'random', 'fixed'].includes(strategy)) throw new Error(format('Vhost: %s refers to an unknown connection strategy: %s', vhostName, strategy));
}

@@ -108,62 +45,19 @@

const invalid = _.chain(connection).omit(valid).keys().value();
if (invalid.length)
throw new Error(
format(
"Vhost: %s connection refers to an unsupported attribute: %s",
vhostName,
invalid[0]
)
);
if (invalid.length) throw new Error(format('Vhost: %s connection refers to an unsupported attribute: %s', vhostName, invalid[0]));
}
function validateManagementConnectionAttributes(
connection,
vhostName,
valid
) {
function validateManagementConnectionAttributes(connection, vhostName, valid) {
if (!connection) return;
const invalid = _.chain(connection).omit(valid).keys().value();
if (invalid.length)
throw new Error(
format(
"Vhost: %s management connection refers to an unsupported attribute: %s",
vhostName,
invalid[0]
)
);
if (invalid.length) throw new Error(format('Vhost: %s management connection refers to an unsupported attribute: %s', vhostName, invalid[0]));
}
function validatePublicationChannelPools(publicationChannelPools, vhostName) {
const invalid = _.chain(publicationChannelPools)
.omit(["regularPool", "confirmPool"])
.keys()
.value();
if (invalid.length)
throw new Error(
format(
"Publication channel pool in vhost: %s refers to an unsupported attribute: %s",
vhostName,
invalid[0]
)
);
const invalid = _.chain(publicationChannelPools).omit(['regularPool', 'confirmPool']).keys().value();
if (invalid.length) throw new Error(format('Publication channel pool in vhost: %s refers to an unsupported attribute: %s', vhostName, invalid[0]));
}
function validateVhostChildAttributes(
vhostName,
type,
child,
childName,
valid
) {
function validateVhostChildAttributes(vhostName, type, child, childName, valid) {
const invalid = _.chain(child).omit(valid).keys().value();
if (invalid.length)
throw new Error(
format(
"%s: %s in vhost: %s refers to an unsupported attribute: %s",
type,
childName,
vhostName,
invalid[0]
)
);
if (invalid.length) throw new Error(format('%s: %s in vhost: %s refers to an unsupported attribute: %s', type, childName, vhostName, invalid[0]));
}

@@ -176,9 +70,3 @@

function validateExchange(vhost, vhostName, exchange, exchangeName) {
validateVhostChildAttributes(
vhostName,
"Exchange",
exchange,
exchangeName,
["fullyQualifiedName", "name", "assert", "check", "type", "options"]
);
validateVhostChildAttributes(vhostName, 'Exchange', exchange, exchangeName, ['fullyQualifiedName', 'name', 'assert', 'check', 'type', 'options']);
}

@@ -191,12 +79,3 @@

function validateQueue(vhost, vhostName, queue, queueName) {
validateVhostChildAttributes(vhostName, "Queue", queue, queueName, [
"fullyQualifiedName",
"name",
"assert",
"check",
"type",
"purge",
"replyTo",
"options",
]);
validateVhostChildAttributes(vhostName, 'Queue', queue, queueName, ['fullyQualifiedName', 'name', 'assert', 'check', 'type', 'purge', 'replyTo', 'options']);
}

@@ -209,94 +88,15 @@

function validateBinding(vhost, vhostName, binding, bindingName) {
validateVhostChildAttributes(vhostName, "Binding", binding, bindingName, [
"fullyQualifiedName",
"name",
"source",
"destination",
"destinationType",
"bindingKey",
"bindingKeys",
"qualifyBindingKeys",
"options",
]);
if (!binding.source)
throw new Error(
format(
"Binding: %s in vhost: %s is missing a source",
bindingName,
vhostName
)
);
if (!binding.destination)
throw new Error(
format(
"Binding: %s in vhost: %s is missing a destination",
bindingName,
vhostName
)
);
if (!binding.destinationType)
throw new Error(
format(
"Binding: %s in vhost: %s is missing a destination type",
bindingName,
vhostName
)
);
if (["queue", "exchange"].indexOf(binding.destinationType) < 0)
throw new Error(
format(
"Binding: %s in vhost: %s has an invalid destination type: %s",
bindingName,
vhostName,
binding.destinationType
)
);
validateVhostChildAttributes(vhostName, 'Binding', binding, bindingName, ['fullyQualifiedName', 'name', 'source', 'destination', 'destinationType', 'bindingKey', 'bindingKeys', 'qualifyBindingKeys', 'options']);
if (!binding.source) throw new Error(format('Binding: %s in vhost: %s is missing a source', bindingName, vhostName));
if (!binding.destination) throw new Error(format('Binding: %s in vhost: %s is missing a destination', bindingName, vhostName));
if (!binding.destinationType) throw new Error(format('Binding: %s in vhost: %s is missing a destination type', bindingName, vhostName));
if (['queue', 'exchange'].indexOf(binding.destinationType) < 0) throw new Error(format('Binding: %s in vhost: %s has an invalid destination type: %s', bindingName, vhostName, binding.destinationType));
if (!vhost.exchanges)
throw new Error(
format(
"Binding: %s in vhost: %s refers to an unknown exchange: %s",
bindingName,
vhostName,
binding.source
)
);
if (!vhost.exchanges[binding.source])
throw new Error(
format(
"Binding: %s in vhost: %s refers to an unknown exchange: %s",
bindingName,
vhostName,
binding.source
)
);
if (!vhost.exchanges) throw new Error(format('Binding: %s in vhost: %s refers to an unknown exchange: %s', bindingName, vhostName, binding.source));
if (!vhost.exchanges[binding.source]) throw new Error(format('Binding: %s in vhost: %s refers to an unknown exchange: %s', bindingName, vhostName, binding.source));
if (binding.destinationType === "queue") {
if (!vhost.queues)
throw new Error(
format(
"Binding: %s in vhost: %s refers to an unknown queue: %s",
bindingName,
vhostName,
binding.destination
)
);
if (!vhost.queues[binding.destination])
throw new Error(
format(
"Binding: %s in vhost: %s refers to an unknown queue: %s",
bindingName,
vhostName,
binding.destination
)
);
} else if (!vhost.exchanges[binding.destination])
throw new Error(
format(
"Binding: %s in vhost: %s refers to an unknown exchange: %s",
bindingName,
vhostName,
binding.destination
)
);
if (binding.destinationType === 'queue') {
if (!vhost.queues) throw new Error(format('Binding: %s in vhost: %s refers to an unknown queue: %s', bindingName, vhostName, binding.destination));
if (!vhost.queues[binding.destination]) throw new Error(format('Binding: %s in vhost: %s refers to an unknown queue: %s', bindingName, vhostName, binding.destination));
} else if (!vhost.exchanges[binding.destination]) throw new Error(format('Binding: %s in vhost: %s refers to an unknown exchange: %s', bindingName, vhostName, binding.destination));
}

@@ -309,99 +109,19 @@

function validatePublication(publication, publicationName) {
validateAttributes("Publication", publication, publicationName, [
"name",
"vhost",
"exchange",
"queue",
"routingKey",
"confirm",
"options",
"destination",
"autoCreated",
"deprecated",
"encryption",
"timeout",
]);
if (!publication.vhost)
throw new Error(
format("Publication: %s is missing a vhost", publicationName)
);
if (
!(
Object.prototype.hasOwnProperty.call(publication, "exchange") ||
publication.queue
)
)
throw new Error(
format(
"Publication: %s is missing an exchange or a queue",
publicationName
)
);
if (
Object.prototype.hasOwnProperty.call(publication, "exchange") &&
publication.queue
)
throw new Error(
format("Publication: %s has an exchange and a queue", publicationName)
);
validateAttributes('Publication', publication, publicationName, ['name', 'vhost', 'exchange', 'queue', 'routingKey', 'confirm', 'options', 'destination', 'autoCreated', 'deprecated', 'encryption', 'timeout']);
if (!publication.vhost) throw new Error(format('Publication: %s is missing a vhost', publicationName));
if (!(Object.prototype.hasOwnProperty.call(publication, 'exchange') || publication.queue)) throw new Error(format('Publication: %s is missing an exchange or a queue', publicationName));
if (Object.prototype.hasOwnProperty.call(publication, 'exchange') && publication.queue) throw new Error(format('Publication: %s has an exchange and a queue', publicationName));
if (!config.vhosts)
throw new Error(
format(
"Publication: %s refers to an unknown vhost: %s",
publicationName,
publication.vhost
)
);
if (!config.vhosts[publication.vhost])
throw new Error(
format(
"Publication: %s refers to an unknown vhost: %s",
publicationName,
publication.vhost
)
);
if (!config.vhosts) throw new Error(format('Publication: %s refers to an unknown vhost: %s', publicationName, publication.vhost));
if (!config.vhosts[publication.vhost]) throw new Error(format('Publication: %s refers to an unknown vhost: %s', publicationName, publication.vhost));
if (Object.prototype.hasOwnProperty.call(publication, "exchange")) {
if (!config.vhosts[publication.vhost].exchanges)
throw new Error(
format(
"Publication: %s refers to an unknown exchange: %s in vhost: %s",
publicationName,
publication.exchange,
publication.vhost
)
);
if (!config.vhosts[publication.vhost].exchanges[publication.exchange])
throw new Error(
format(
"Publication: %s refers to an unknown exchange: %s in vhost: %s",
publicationName,
publication.exchange,
publication.vhost
)
);
if (Object.prototype.hasOwnProperty.call(publication, 'exchange')) {
if (!config.vhosts[publication.vhost].exchanges) throw new Error(format('Publication: %s refers to an unknown exchange: %s in vhost: %s', publicationName, publication.exchange, publication.vhost));
if (!config.vhosts[publication.vhost].exchanges[publication.exchange]) throw new Error(format('Publication: %s refers to an unknown exchange: %s in vhost: %s', publicationName, publication.exchange, publication.vhost));
} else {
if (!config.vhosts[publication.vhost].queues)
throw new Error(
format(
"Publication: %s refers to an unknown queue: %s in vhost: %s",
publicationName,
publication.queue,
publication.vhost
)
);
if (!config.vhosts[publication.vhost].queues[publication.queue])
throw new Error(
format(
"Publication: %s refers to an unknown queue: %s in vhost: %s",
publicationName,
publication.queue,
publication.vhost
)
);
if (!config.vhosts[publication.vhost].queues) throw new Error(format('Publication: %s refers to an unknown queue: %s in vhost: %s', publicationName, publication.queue, publication.vhost));
if (!config.vhosts[publication.vhost].queues[publication.queue]) throw new Error(format('Publication: %s refers to an unknown queue: %s in vhost: %s', publicationName, publication.queue, publication.vhost));
}
if (publication.encryption)
validateEncryptionProfile(publication.encryption);
if (publication.encryption) validateEncryptionProfile(publication.encryption);
}

@@ -414,90 +134,37 @@

function validateSubscription(subscription, subscriptionName) {
validateAttributes("Subscription", subscription, subscriptionName, [
"name",
"vhost",
"queue",
"contentType",
"options",
"prefetch",
"retry",
"source",
"recovery",
"workflow",
"handler",
"workflows",
"handlers",
"redeliveries",
"autoCreated",
"deprecated",
"deferCloseChannel",
"encryption",
"promisifyAckOrNack",
validateAttributes('Subscription', subscription, subscriptionName, [
'name',
'vhost',
'queue',
'contentType',
'options',
'prefetch',
'retry',
'source',
'recovery',
'workflow',
'handler',
'workflows',
'handlers',
'redeliveries',
'autoCreated',
'deprecated',
'closeTimeout',
'encryption',
'promisifyAckOrNack',
]);
if (!subscription.vhost)
throw new Error(
format("Subscription: %s is missing a vhost", subscriptionName)
);
if (!subscription.queue)
throw new Error(
format("Subscription: %s is missing a queue", subscriptionName)
);
if (!subscription.vhost) throw new Error(format('Subscription: %s is missing a vhost', subscriptionName));
if (!subscription.queue) throw new Error(format('Subscription: %s is missing a queue', subscriptionName));
if (!config.vhosts)
throw new Error(
format(
"Subscription: %s refers to an unknown vhost: %s",
subscriptionName,
subscription.vhost
)
);
if (!config.vhosts[subscription.vhost])
throw new Error(
format(
"Subscription: %s refers to an unknown vhost: %s",
subscriptionName,
subscription.vhost
)
);
if (!config.vhosts) throw new Error(format('Subscription: %s refers to an unknown vhost: %s', subscriptionName, subscription.vhost));
if (!config.vhosts[subscription.vhost]) throw new Error(format('Subscription: %s refers to an unknown vhost: %s', subscriptionName, subscription.vhost));
if (!config.vhosts[subscription.vhost].queues)
throw new Error(
format(
"Subscription: %s refers to an unknown queue: %s in vhost: %s",
subscriptionName,
subscription.queue,
subscription.vhost
)
);
if (!config.vhosts[subscription.vhost].queues[subscription.queue])
throw new Error(
format(
"Subscription: %s refers to an unknown queue: %s in vhost: %s",
subscriptionName,
subscription.queue,
subscription.vhost
)
);
if (!config.vhosts[subscription.vhost].queues) throw new Error(format('Subscription: %s refers to an unknown queue: %s in vhost: %s', subscriptionName, subscription.queue, subscription.vhost));
if (!config.vhosts[subscription.vhost].queues[subscription.queue]) throw new Error(format('Subscription: %s refers to an unknown queue: %s in vhost: %s', subscriptionName, subscription.queue, subscription.vhost));
if (_.get(config, "config.redeliveries.counters"))
throw new Error(
format(
"Subscription: %s refers to an unknown counter: %s in vhost: %s",
subscriptionName,
subscription.redeliveries.counter,
subscription.vhost
)
);
if (!config.redeliveries.counters[subscription.redeliveries.counter])
throw new Error(
format(
"Subscription: %s refers to an unknown counter: %s in vhost: %s",
subscriptionName,
subscription.redeliveries.counter,
subscription.vhost
)
);
if (_.get(config, 'config.redeliveries.counters')) throw new Error(format('Subscription: %s refers to an unknown counter: %s in vhost: %s', subscriptionName, subscription.redeliveries.counter, subscription.vhost));
if (!config.redeliveries.counters[subscription.redeliveries.counter]) throw new Error(format('Subscription: %s refers to an unknown counter: %s in vhost: %s', subscriptionName, subscription.redeliveries.counter, subscription.vhost));
if (subscription.encryption)
validateEncryptionProfiles(subscription.encryption);
if (subscription.encryption) validateEncryptionProfiles(subscription.encryption);
}

@@ -510,20 +177,6 @@

function validateEncryptionProfile(encryption, encryptionName) {
validateAttributes("Encryption", encryption, encryptionName, [
"name",
"key",
"algorithm",
"ivLength",
]);
if (!encryption.key)
throw new Error(
format("Encryption profile: %s is missing a key", encryptionName)
);
if (!encryption.algorithm)
throw new Error(
format("Encryption profile: %s is missing an algorithm", encryptionName)
);
if (!encryption.ivLength)
throw new Error(
format("Encryption profile: %s is missing ivLength", encryptionName)
);
validateAttributes('Encryption', encryption, encryptionName, ['name', 'key', 'algorithm', 'ivLength']);
if (!encryption.key) throw new Error(format('Encryption profile: %s is missing a key', encryptionName));
if (!encryption.algorithm) throw new Error(format('Encryption profile: %s is missing an algorithm', encryptionName));
if (!encryption.ivLength) throw new Error(format('Encryption profile: %s is missing ivLength', encryptionName));
}

@@ -536,33 +189,9 @@

function validateShovel(shovel, shovelName) {
validateAttributes("Shovel", shovel, shovelName, [
"name",
"subscription",
"publication",
]);
if (!shovel.subscription)
throw new Error(
format("Shovel: %s is missing a subscription", shovelName)
);
if (!shovel.publication)
throw new Error(
format("Shovel: %s is missing a publication", shovelName)
);
validateAttributes('Shovel', shovel, shovelName, ['name', 'subscription', 'publication']);
if (!shovel.subscription) throw new Error(format('Shovel: %s is missing a subscription', shovelName));
if (!shovel.publication) throw new Error(format('Shovel: %s is missing a publication', shovelName));
if (!config.subscriptions[shovel.subscription])
throw new Error(
format(
"Shovel: %s refers to an unknown subscription: %s",
shovelName,
shovel.subscription
)
);
if (!config.publications[shovel.publication])
throw new Error(
format(
"Shovel: %s refers to an unknown publication: %s",
shovelName,
shovel.publication
)
);
if (!config.subscriptions[shovel.subscription]) throw new Error(format('Shovel: %s refers to an unknown subscription: %s', shovelName, shovel.subscription));
if (!config.publications[shovel.publication]) throw new Error(format('Shovel: %s refers to an unknown publication: %s', shovelName, shovel.publication));
}
});
module.exports = {
stub: require("./stub"),
inMemory: require("./inMemory"),
inMemoryCluster: require("./inMemoryCluster"),
stub: require('./stub'),
inMemory: require('./inMemory'),
inMemoryCluster: require('./inMemoryCluster'),
};

@@ -1,6 +0,6 @@

const _ = require("lodash");
const LRUCache = require("lru-cache");
const _ = require('lodash');
const LRUCache = require('lru-cache');
module.exports = function init(options) {
const size = _.get(options, "size") || 1000;
const size = _.get(options, 'size') || 1000;
const cache = new LRUCache({ max: size });

@@ -7,0 +7,0 @@

@@ -1,6 +0,6 @@

const cluster = require("cluster");
const inMemory = require("./inMemory");
const uuid = require("uuid").v4;
const Stashback = require("stashback");
const debug = "rascal:counters:inMemoryCluster";
const cluster = require('cluster');
const inMemory = require('./inMemory');
const uuid = require('uuid').v4;
const Stashback = require('stashback');
const debug = 'rascal:counters:inMemoryCluster';

@@ -13,10 +13,6 @@ module.exports = {

function handleMessage(worker, message) {
if (
message.sender !== "rascal-in-memory-cluster-counter" ||
message.cmd !== "incrementAndGet"
)
return;
if (message.sender !== 'rascal-in-memory-cluster-counter' || message.cmd !== 'incrementAndGet') return;
counter.incrementAndGet(message.key, (err, value) => {
worker.send({
sender: "rascal-in-memory-cluster-counter",
sender: 'rascal-in-memory-cluster-counter',
correlationId: message.correlationId,

@@ -29,12 +25,12 @@ value: err ? 1 : value,

cluster
.on("fork", (worker) => {
.on('fork', (worker) => {
workers[worker.id] = worker;
worker.on("message", (message) => {
worker.on('message', (message) => {
handleMessage(worker, message);
});
})
.on("disconnect", (worker) => {
.on('disconnect', (worker) => {
delete workers[worker.id];
})
.on("exit", (worker) => {
.on('exit', (worker) => {
delete workers[worker.id];

@@ -44,6 +40,3 @@ });

worker: function worker(options) {
if (!cluster.isWorker)
throw new Error(
"You cannot use Rascal's in memmory cluster counter outside of a cluster"
);
if (!cluster.isWorker) throw new Error("You cannot use Rascal's in memmory cluster counter outside of a cluster");
if (!options) return worker({});

@@ -53,4 +46,4 @@ const timeout = options.timeout || 100;

process.on("message", (message) => {
if (message.sender !== "rascal-in-memory-cluster-counter") return;
process.on('message', (message) => {
if (message.sender !== 'rascal-in-memory-cluster-counter') return;
stashback.unstash(message.correlationId, (err, cb) => {

@@ -72,5 +65,5 @@ err ? cb(null, 1) : cb(null, message.value);

process.send({
sender: "rascal-in-memory-cluster-counter",
sender: 'rascal-in-memory-cluster-counter',
correlationId,
cmd: "incrementAndGet",
cmd: 'incrementAndGet',
});

@@ -77,0 +70,0 @@ }

@@ -1,24 +0,12 @@

const debug = require("debug")("rascal:management:client");
const format = require("util").format;
const _ = require("lodash");
const agent = require("superagent");
const debug = require('debug')('rascal:management:client');
const format = require('util').format;
const _ = require('lodash');
const agent = require('superagent');
function assertVhost(name, config, next) {
debug("Asserting vhost: %s", name);
debug('Asserting vhost: %s', name);
const options = getVhostOptions(name, config);
request("put", options.url, options.timeout, (err) => {
request('put', options.url, options.timeout, (err) => {
if (!err) return next();
const message = err.status
? format(
"Failed to assert vhost: %s. %s returned status %d",
name,
config.loggableUrl,
err.status
)
: format(
"Failed to assert vhost: %s. %s errored with: %s",
name,
config.loggableUrl,
err.message
);
const message = err.status ? format('Failed to assert vhost: %s. %s returned status %d', name, config.loggableUrl, err.status) : format('Failed to assert vhost: %s. %s errored with: %s', name, config.loggableUrl, err.message);
return next(new Error(message));

@@ -29,19 +17,7 @@ });

function checkVhost(name, config, next) {
debug("Checking vhost: %s", name);
debug('Checking vhost: %s', name);
const options = getVhostOptions(name, config);
request("get", options.url, options.timeout, (err) => {
request('get', options.url, options.timeout, (err) => {
if (!err) return next();
const message = err.status
? format(
"Failed to check vhost: %s. %s returned status %d",
name,
config.loggableUrl,
err.status
)
: format(
"Failed to check vhost: %s. %s errored with: %s",
name,
config.loggableUrl,
err.message
);
const message = err.status ? format('Failed to check vhost: %s. %s returned status %d', name, config.loggableUrl, err.status) : format('Failed to check vhost: %s. %s errored with: %s', name, config.loggableUrl, err.message);
return next(new Error(message));

@@ -52,19 +28,7 @@ });

function deleteVhost(name, config, next) {
debug("Deleting vhost: %s", name);
debug('Deleting vhost: %s', name);
const options = getVhostOptions(name, config);
request("delete", options.url, options.timeout, (err) => {
request('delete', options.url, options.timeout, (err) => {
if (!err) return next();
const message = err.status
? format(
"Failed to delete vhost: %s. %s returned status %d",
name,
config.loggableUrl,
err.status
)
: format(
"Failed to delete vhost: %s. %s errored with: %s",
name,
config.loggableUrl,
err.message
);
const message = err.status ? format('Failed to delete vhost: %s. %s returned status %d', name, config.loggableUrl, err.status) : format('Failed to delete vhost: %s. %s errored with: %s', name, config.loggableUrl, err.message);
return next(new Error(message));

@@ -75,3 +39,3 @@ });

function getVhostOptions(name, config) {
const url = format("%s/%s/%s", config.url, "api/vhosts", name);
const url = format('%s/%s/%s', config.url, 'api/vhosts', name);
return _.defaultsDeep({ url }, config.options);

@@ -78,0 +42,0 @@ }

{
"name": "rascal",
"version": "13.1.3",
"version": "14.0.0",
"description": "A config driven wrapper for amqplib supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption, channel pooling and publication timeouts",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -10,5 +10,3 @@ # Rascal

[![Test Coverage](https://codeclimate.com/github/guidesmiths/rascal/badges/coverage.svg)](https://codeclimate.com/github/guidesmiths/rascal/coverage)
[![Code Style](https://img.shields.io/badge/code%20style-esnext-brightgreen.svg)](https://www.npmjs.com/package/eslint-config-esnext)
[![Dependency Status](https://david-dm.org/guidesmiths/rascal.svg)](https://david-dm.org/guidesmiths/rascal)
[![devDependencies Status](https://david-dm.org/guidesmiths/rascal/dev-status.svg)](https://david-dm.org/guidesmiths/rascal?type=dev)
[![Code Style](https://img.shields.io/badge/code%20style-prettier-brightgreen.svg)](https://github.com/prettier/prettier)
[![rascal](https://snyk.io/advisor/npm-package/rascal/badge.svg)](https://snyk.io/advisor/npm-package/rascal)

@@ -53,2 +51,6 @@ [![Discover zUnit](https://img.shields.io/badge/Discover-zUnit-brightgreen)](https://www.npmjs.com/package/zunit)

### Breaking Changes in Rascal@14
Rascal@15 waits for inflight messages to be acknowledged before closing subscriber channels. Prior to this version Rascal just waited an arbitary amount of time. If you application does not acknowledge a message for some reason (quite likely in tests) calling `subscription.cancel`, `broker.unsubscribeAll`, `broker.bounce`, `broker.shutdown` or `broker.nuke` will wait indefinitely. You can specify a `closeTimeout` in your subscription config, however if this is exceeded the `subscription.cancel` and `broker.unsubscribeAll` methods will yield an error, while the `broker.bounce`, `broker.shutdown` and `broker.nuke` methods will emit an error, but attempt to continue. In both cases the error will have a code of `ETIMEDOUT` and message stating `Callback function "waitForUnacknowledgedMessages" timed out`.
### Special Note

@@ -63,4 +65,4 @@

```js
const Broker = require("rascal").BrokerAsPromised;
const config = require("./config");
const Broker = require('rascal').BrokerAsPromised;
const config = require('./config');

@@ -70,19 +72,16 @@ (async () => {

const broker = await Broker.create(config);
broker.on("error", console.error);
broker.on('error', console.error);
// Publish a message
const publication = await broker.publish(
"demo_publication",
"Hello World!"
);
publication.on("error", console.error);
const publication = await broker.publish('demo_publication', 'Hello World!');
publication.on('error', console.error);
// Consume a message
const subscription = await broker.subscribe("demo_subscription");
const subscription = await broker.subscribe('demo_subscription');
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
console.log(content);
ackOrNack();
})
.on("error", console.error);
.on('error', console.error);
} catch (err) {

@@ -97,4 +96,4 @@ console.error(err);

```js
const Broker = require("rascal").Broker;
const config = require("./config");
const Broker = require('rascal').Broker;
const config = require('./config');

@@ -104,19 +103,19 @@ Broker.create(config, (err, broker) => {

broker.on("error", console.error);
broker.on('error', console.error);
// Publish a message
broker.publish("demo_publication", "Hello World!", (err, publication) => {
broker.publish('demo_publication', 'Hello World!', (err, publication) => {
if (err) throw err;
publication.on("error", console.error);
publication.on('error', console.error);
});
// Consume a message
broker.subscribe("demo_subscription", (err, subscription) => {
broker.subscribe('demo_subscription', (err, subscription) => {
if (err) throw err;
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
console.log(content);
ackOrNack();
})
.on("error", console.error);
.on('error', console.error);
});

@@ -130,8 +129,9 @@ });

There are two situations when Rascal will nack a message without requeue, leading to potential data loss.
There are three situations when Rascal will nack a message without requeue, leading to potential data loss.
1. When it is unable to parse the message content and the subscriber has no 'invalid_content' listener
1. When the subscriber's (optional) redelivery limit has been exceeded and the subscriber has neither a 'redeliveries_error' nor a 'redeliveries_exceeded' listener
1. When attempting to recover by [republishing](#republishing), [forwarding](#forwarding), but the recovery operation fails.
The reason Rascal nacks the message is because the alternative is to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events, your messages should be safe.
The reason Rascal nacks the message is because the alternatives are to leave the message unacknowledged indefinitely, or to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events, your messages should be safe.

@@ -145,4 +145,4 @@ ## Very Important Section About Event Handling

```js
broker.on("error", (err, { vhost, connectionUrl }) => {
console.error("Broker error", err, vhost, connectionUrl);
broker.on('error', (err, { vhost, connectionUrl }) => {
console.error('Broker error', err, vhost, connectionUrl);
});

@@ -156,9 +156,9 @@ ```

try {
const subscription = await broker.subscribe("s1");
const subscription = await broker.subscribe('s1');
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
});

@@ -172,10 +172,10 @@ } catch (err) {

// Callbacks
broker.subscribe("s1", (err, subscription) => {
broker.subscribe('s1', (err, subscription) => {
if (err) throw new Error(`Rascal config error: ${err.message}`);
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
});

@@ -190,5 +190,5 @@ });

try {
const publication = await broker.publish("p1", "some text");
publication.on("error", (err, messageId) => {
console.error("Publisher error", err, messageId);
const publication = await broker.publish('p1', 'some text');
publication.on('error', (err, messageId) => {
console.error('Publisher error', err, messageId);
});

@@ -202,6 +202,6 @@ } catch (err) {

// Callbacks
broker.publish("p1", "some text", (err, publication) => {
broker.publish('p1', 'some text', (err, publication) => {
if (err) throw new Error(`Rascal config error: ${err.message}`);
publication.on("error", (err, messageId) => {
console.error("Publisher error", err, messageId);
publication.on('error', (err, messageId) => {
console.error('Publisher error', err, messageId);
});

@@ -216,5 +216,5 @@ });

try {
const publication = await broker.forward("p1", message);
publication.on("error", (err, messageId) => {
console.error("Publisher error", err, messageId);
const publication = await broker.forward('p1', message);
publication.on('error', (err, messageId) => {
console.error('Publisher error', err, messageId);
});

@@ -228,6 +228,6 @@ } catch (err) {

// Callbacks
broker.forward("p1", message, (err, publication) => {
broker.forward('p1', message, (err, publication) => {
if (err) throw new Error(`Rascal config error: ${err.message}`);
publication.on("error", (err, messageId) => {
console.error("Publisher error", err, messageId);
publication.on('error', (err, messageId) => {
console.error('Publisher error', err, messageId);
});

@@ -244,6 +244,4 @@ });

```js
broker.on("vhost_initialised", ({ vhost, connectionUrl }) => {
console.log(
`Vhost: ${vhost} was initialised using connection: ${connectionUrl}`
);
broker.on('vhost_initialised', ({ vhost, connectionUrl }) => {
console.log(`Vhost: ${vhost} was initialised using connection: ${connectionUrl}`);
});

@@ -257,11 +255,7 @@ ```

```js
broker.on("blocked", (reason, { vhost, connectionUrl }) => {
console.log(
`Vhost: ${vhost} was blocked using connection: ${connectionUrl}. Reason: ${reason}`
);
broker.on('blocked', (reason, { vhost, connectionUrl }) => {
console.log(`Vhost: ${vhost} was blocked using connection: ${connectionUrl}. Reason: ${reason}`);
});
broker.on("unblocked", ({ vhost, connectionUrl }) => {
console.log(
`Vhost: ${vhost} was unblocked using connection: ${connectionUrl}.`
);
broker.on('unblocked', ({ vhost, connectionUrl }) => {
console.log(`Vhost: ${vhost} was unblocked using connection: ${connectionUrl}.`);
});

@@ -275,4 +269,4 @@ ```

```js
var rascal = require("rascal");
var definitions = require("./your-config.json");
var rascal = require('rascal');
var definitions = require('./your-config.json');
var config = rascal.withDefaultConfig(definitions);

@@ -284,4 +278,4 @@ ```

```js
var rascal = require("rascal");
var definitions = require("./your-test-config.json");
var rascal = require('rascal');
var definitions = require('./your-test-config.json');
var config = rascal.withTestConfig(definitions);

@@ -437,7 +431,3 @@ ```

"connectionStrategy": "random",
"connections": [
"amqp://guest:guest@broker1.example.com:5672/v1?heartbeat=10",
"amqp://guest:guest@broker2.example.com:5672/v1?heartbeat=10",
"amqp://guest:guest@broker3.example.com:5672/v1?heartbeat=10"
]
"connections": ["amqp://guest:guest@broker1.example.com:5672/v1?heartbeat=10", "amqp://guest:guest@broker2.example.com:5672/v1?heartbeat=10", "amqp://guest:guest@broker3.example.com:5672/v1?heartbeat=10"]
}

@@ -579,17 +569,11 @@ }

```js
broker.on(
"busy",
({ vhost, mode, queue, size, available, borrowed, min, max }) => {
if (vhost === "events") return eventStream.pause();
console.warn(`vhost ${vhost} is busy`);
}
);
broker.on('busy', ({ vhost, mode, queue, size, available, borrowed, min, max }) => {
if (vhost === 'events') return eventStream.pause();
console.warn(`vhost ${vhost} is busy`);
});
broker.on(
"ready",
({ vhost, mode, queue, size, available, borrowed, min, max }) => {
if (vhost === "events") return eventStream.resume();
console.info(`vhost ${vhost} is ready`);
}
);
broker.on('ready', ({ vhost, mode, queue, size, available, borrowed, min, max }) => {
if (vhost === 'events') return eventStream.resume();
console.info(`vhost ${vhost} is ready`);
});
```

@@ -845,3 +829,3 @@

```js
broker.publish("p1", "some message");
broker.publish('p1', 'some message');
```

@@ -886,7 +870,7 @@

```js
broker.publish("p1", "some message", callback);
broker.publish("p1", "some message", "some.routing.key", callback);
broker.publish("p1", "some message", {
routingKey: "some.routing.key",
options: { messageId: "foo", expiration: 5000 },
broker.publish('p1', 'some message', callback);
broker.publish('p1', 'some message', 'some.routing.key', callback);
broker.publish('p1', 'some message', {
routingKey: 'some.routing.key',
options: { messageId: 'foo', expiration: 5000 },
});

@@ -896,7 +880,7 @@ ```

```js
await broker.publish("p1", "some message");
await broker.publish("p1", "some message", "some.routing.key");
await broker.publish("p1", "some message", {
routingKey: "some.routing.key",
options: { messageId: "foo", expiration: 5000 },
await broker.publish('p1', 'some message');
await broker.publish('p1', 'some message', 'some.routing.key');
await broker.publish('p1', 'some message', {
routingKey: 'some.routing.key',
options: { messageId: 'foo', expiration: 5000 },
});

@@ -910,13 +894,13 @@ ```

```js
broker.publish("p1", "some message", (err, publication) => {
broker.publish('p1', 'some message', (err, publication) => {
if (err) throw err; // publication didn't exist
publication
.on("success", (messageId) => {
console.log("Message id was: ", messageId);
.on('success', (messageId) => {
console.log('Message id was: ', messageId);
})
.on("error", (err, messageId) => {
console.error("Error was: ", err.message);
.on('error', (err, messageId) => {
console.error('Error was: ', err.message);
})
.on("return", (message) => {
console.warn("Message was returned: ", message.properties.messageId);
.on('return', (message) => {
console.warn('Message was returned: ', message.properties.messageId);
});

@@ -928,12 +912,12 @@ });

try {
const publication = await broker.publish("p1", "some message");
const publication = await broker.publish('p1', 'some message');
publication
.on("success", (messageId) => {
console.log("Message id was: ", messageId);
.on('success', (messageId) => {
console.log('Message id was: ', messageId);
})
.on("error", (err, messageId) => {
console.error("Error was: ", err.message);
.on('error', (err, messageId) => {
console.error('Error was: ', err.message);
})
.on("return", (message) => {
console.warn("Message was returned: ", message.properties.messageId);
.on('return', (message) => {
console.warn('Message was returned: ', message.properties.messageId);
});

@@ -999,13 +983,13 @@ } catch (err) {

```js
broker.publish("p1", "some message", (err, publication) => {
broker.publish('p1', 'some message', (err, publication) => {
if (err) throw err; // publication didn't exist
publication
.on("success", (messageId) => {
console.log("Message id was: ", messageId);
.on('success', (messageId) => {
console.log('Message id was: ', messageId);
})
.on("error", (err, messageId) => {
console.error("Error was: ", err.message);
.on('error', (err, messageId) => {
console.error('Error was: ', err.message);
})
.on("paused", (messageId) => {
console.warn("Publication was paused. Aborting message: ", messageId);
.on('paused', (messageId) => {
console.warn('Publication was paused. Aborting message: ', messageId);
publication.abort();

@@ -1018,12 +1002,12 @@ });

try {
const publication = await broker.publish("p1", "some message");
const publication = await broker.publish('p1', 'some message');
publication
.on("success", (messageId) => {
console.log("Message id was: ", messageId);
.on('success', (messageId) => {
console.log('Message id was: ', messageId);
})
.on("error", (err, messageId) => {
console.error("Error was: ", err.message);
.on('error', (err, messageId) => {
console.error('Error was: ', err.message);
})
.on("paused", (messageId) => {
console.warn("Publication was paused. Aborting message: ", messageId);
.on('paused', (messageId) => {
console.warn('Publication was paused. Aborting message: ', messageId);
publication.abort();

@@ -1072,13 +1056,13 @@ });

```js
broker.forward("p1", message, overrides, (err, publication) => {
broker.forward('p1', message, overrides, (err, publication) => {
if (err) throw err; // publication didn't exist
publication
.on("success", (messageId) => {
console.log("Message id was: ", messageId);
.on('success', (messageId) => {
console.log('Message id was: ', messageId);
})
.on("error", (err, messageId) => {
console.error("Error was: ", err.message);
.on('error', (err, messageId) => {
console.error('Error was: ', err.message);
})
.on("return", (message) => {
console.warn("Message was returned: ", message.properties.messageId);
.on('return', (message) => {
console.warn('Message was returned: ', message.properties.messageId);
});

@@ -1090,12 +1074,12 @@ });

try {
const publication = await broker.forward("p1", message, overrides);
const publication = await broker.forward('p1', message, overrides);
publication
.on("success", (messageId) => {
console.log("Message id was: ", messageId);
.on('success', (messageId) => {
console.log('Message id was: ', messageId);
})
.on("error", (err, messageId) => {
console.error("Error was: ", err.message);
.on('error', (err, messageId) => {
console.error('Error was: ', err.message);
})
.on("return", (message) => {
console.warn("Message was returned: ", message.properties.messageId);
.on('return', (message) => {
console.warn('Message was returned: ', message.properties.messageId);
});

@@ -1127,10 +1111,10 @@ } catch (err) {

```js
broker.subscribe("s1", (err, subscription) => {
broker.subscribe('s1', (err, subscription) => {
if (err) throw err; // subscription didn't exist
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
});

@@ -1142,9 +1126,9 @@ });

try {
const subscription = await broker.subscribe("s1");
const subscription = await broker.subscribe('s1');
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
});

@@ -1177,3 +1161,3 @@ } catch (err) {

```js
broker.subscribe("s1", { prefetch: 10, retry: false }, callback);
broker.subscribe('s1', { prefetch: 10, retry: false }, callback);
```

@@ -1198,7 +1182,7 @@

subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
});

@@ -1214,7 +1198,7 @@ });

subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
});

@@ -1236,7 +1220,7 @@ });

subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
});

@@ -1268,13 +1252,13 @@ });

```js
broker.subscribe("s1", (err, subscription) => {
broker.subscribe('s1', (err, subscription) => {
if (err) throw err; // subscription didn't exist
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
})
.on("invalid_content", (err, message, ackOrNack) => {
console.error("Invalid content", err);
.on('invalid_content', (err, message, ackOrNack) => {
console.error('Invalid content', err);
ackOrNack(err);

@@ -1287,12 +1271,12 @@ });

try {
const subscription = await broker.subscribe("s1");
const subscription = await broker.subscribe('s1');
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
})
.on("invalid_content", (err, message, ackOrNack) => {
console.error("Invalid content", err);
.on('invalid_content', (err, message, ackOrNack) => {
console.error('Invalid content', err);
ackOrNack(err);

@@ -1370,13 +1354,13 @@ });

```js
broker.subscribe("s1", (err, subscription) => {
broker.subscribe('s1', (err, subscription) => {
if (err) throw err; // subscription didn't exist
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
})
.on("redeliveries_exceeded", (err, message, ackOrNack) => {
console.error("Redeliveries exceeded", err);
.on('redeliveries_exceeded', (err, message, ackOrNack) => {
console.error('Redeliveries exceeded', err);
ackOrNack(err);

@@ -1389,12 +1373,12 @@ });

try {
const subscription = await broker.subscribe("s1");
const subscription = await broker.subscribe('s1');
subscription
.on("message", (message, content, ackOrNack) => {
.on('message', (message, content, ackOrNack) => {
// Do stuff with message
})
.on("error", (err) => {
console.error("Subscriber error", err);
.on('error', (err) => {
console.error('Subscriber error', err);
})
.on("redeliveries_exceeded", (err, message, ackOrNack) => {
console.error("Redeliveries exceeded", err);
.on('redeliveries_exceeded', (err, message, ackOrNack) => {
console.error('Redeliveries exceeded', err);
ackOrNack(err);

@@ -1434,3 +1418,3 @@ });

```js
ackOrNack(err, { strategy: "nack" });
ackOrNack(err, { strategy: 'nack' });
```

@@ -1443,3 +1427,3 @@

```js
ackOrNack(err, { strategy: "nack", defer: 1000, requeue: true });
ackOrNack(err, { strategy: 'nack', defer: 1000, requeue: true });
```

@@ -1454,3 +1438,3 @@

```js
ackOrNack(err, { strategy: "republish", defer: 1000 });
ackOrNack(err, { strategy: 'republish', defer: 1000 });
```

@@ -1463,6 +1447,3 @@

```js
ackOrNack(err, [
{ strategy: "republish", defer: 1000, attempts: 10 },
{ strategy: "nack" },
]);
ackOrNack(err, [{ strategy: 'republish', defer: 1000, attempts: 10 }, { strategy: 'nack' }]);
```

@@ -1487,3 +1468,3 @@

```js
ackOrNack(err, { strategy: "republish", immediateNack: true });
ackOrNack(err, { strategy: 'republish', immediateNack: true });
```

@@ -1498,3 +1479,3 @@

```js
ackOrNack(err, { strategy: "forward", publication: "some_exchange" });
ackOrNack(err, { strategy: 'forward', publication: 'some_exchange' });
```

@@ -1510,8 +1491,8 @@

{
strategy: "forward",
publication: "some_exchange",
strategy: 'forward',
publication: 'some_exchange',
defer: 1000,
attempts: 10,
},
{ strategy: "nack" },
{ strategy: 'nack' },
]);

@@ -1525,7 +1506,7 @@ ```

{
strategy: "forward",
publication: "some_exchange",
options: { routingKey: "custom.routing.key" },
strategy: 'forward',
publication: 'some_exchange',
options: { routingKey: 'custom.routing.key' },
},
{ strategy: "nack" },
{ strategy: 'nack' },
]);

@@ -1541,3 +1522,3 @@ ```

```js
ackOrNack(err, { strategy: "ack" });
ackOrNack(err, { strategy: 'ack' });
```

@@ -1552,3 +1533,3 @@

{
strategy: "republish",
strategy: 'republish',
defer: 1000,

@@ -1558,3 +1539,3 @@ attempts: 10,

{
strategy: "nack",
strategy: 'nack',
},

@@ -1565,3 +1546,3 @@ ]);

Far more sophisticated strategies are achievable...
![Retry BackOff Fail](https://user-images.githubusercontent.com/229672/49589770-2359d080-f962-11e8-957e-8d5368561afd.png "Retry BackOff Fail")
![Retry BackOff Fail](https://user-images.githubusercontent.com/229672/49589770-2359d080-f962-11e8-957e-8d5368561afd.png 'Retry BackOff Fail')

@@ -1591,9 +1572,9 @@ 1. Producer publishes a message with the routing key "a.b.c" to the "jobs" topic exchange

// Does not retry. This will cause an error to be emitted which unhandled will crash your process. See [Subscriber Events](#subscriber-events)
broker.subscribe("s1", { prefetch: 10, retry: false }, callback);
broker.subscribe('s1', { prefetch: 10, retry: false }, callback);
// Retries without delay.
broker.subscribe("s1", { prefetch: 10, retry: true }, callback);
broker.subscribe('s1', { prefetch: 10, retry: true }, callback);
// Retries after a one second interval.
broker.subscribe("s1", { prefetch: 10, retry: { delay: 1000 } }, callback);
broker.subscribe('s1', { prefetch: 10, retry: { delay: 1000 } }, callback);
```

@@ -1603,9 +1584,9 @@

// Does not retry. This will cause an error to be emitted which unhandled will crash your process. See [Subscriber Events](#subscriber-events)
await broker.subscribe("s1", { prefetch: 10, retry: false });
await broker.subscribe('s1', { prefetch: 10, retry: false });
// Retries without delay.
await broker.subscribe("s1", { prefetch: 10, retry: true });
await broker.subscribe('s1', { prefetch: 10, retry: true });
// Retries after a one second interval.
await broker.subscribe("s1", { prefetch: 10, retry: { delay: 1000 } });
await broker.subscribe('s1', { prefetch: 10, retry: { delay: 1000 } });
```

@@ -1666,3 +1647,3 @@

```js
broker.subscribe("s1", (err, subscription) => {
broker.subscribe('s1', (err, subscription) => {
if (err) throw err; // subscription didn't exist

@@ -1677,3 +1658,3 @@ subscription.cancel((err) => {

try {
const subscription = await broker.subscribe("s1");
const subscription = await broker.subscribe('s1');
await subscription.cancel();

@@ -1685,7 +1666,7 @@ } catch (err) {

Cancelling a subscribion will stop consuming messages, but leave the channel open for a short while so your application can still ack/nack messages. By default the channel is left open for 10 seconds, but can be overridden through the `deferCloseChannel` subscription property.
Cancelling a subscribion will stop consuming messages, but leave the channel open until any outstanding messages have been acknowledged, or the timeout specified by through the `closeTimeout` subscription property is exceeded.
## Shutdown
You can shutdown the broker by calling `await broker.shutdown()` or `broker.shutdown(cb)`. Shutting down the broker will cancel all subscriptions, then wait a short amount of time for inflight messages to be acknowledged (configurable via the `deferCloseChannel` subscription property), before closing channels and disconnecting.
You can shutdown the broker by calling `await broker.shutdown()` or `broker.shutdown(cb)`.

@@ -1764,3 +1745,3 @@ ## Bonus Features

```js
broker.connect("/", (err, connection) => {
broker.connect('/', (err, connection) => {
if (err) throw new Error(`Connection error: ${err.message}`);

@@ -1773,3 +1754,3 @@ // profit

try {
const connection = broker.connect("/");
const connection = broker.connect('/');
// profit

@@ -1776,0 +1757,0 @@ } catch (err) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc