Comparing version 0.12.0 to 0.13.0
@@ -7,7 +7,10 @@ module.exports = { | ||
// Define the vhost connection parameters | ||
"connection": { | ||
// Define the vhost connection parameters. Specify multiple entries for cluster | ||
"connections": [ | ||
{ | ||
"user": "app_user", | ||
"password": "password" | ||
}, | ||
"password": "password", | ||
"port": 5672 | ||
} | ||
], | ||
@@ -23,3 +26,3 @@ // Define exchanges within the registration vhost | ||
// Define queues within the registration vhost | ||
// A good naming convension for queues is consumer:entity:action naming convension | ||
// A good naming convension for queues is consumer:entity:action | ||
"queues": { | ||
@@ -153,2 +156,2 @@ | ||
} | ||
} | ||
} |
@@ -11,4 +11,6 @@ var format = require('util').format | ||
var routingKey = format('registration_service.user.deleted.%s', user.username) | ||
broker.publish('delete_user_succeeded', routingKey, cb) | ||
broker.publish('delete_user_succeeded', routingKey, function(err, publication) { | ||
publication.on('success', cb).on('error', console.error) | ||
}) | ||
} | ||
} |
@@ -24,4 +24,5 @@ var format = require('util').format | ||
var routingKey = format('registration_service.user.saved.%s', user.username) | ||
broker.publish('save_user_succeeded', routingKey, cb) | ||
return | ||
broker.publish('save_user_succeeded', routingKey, function(err, publication) { | ||
publication.on('success', cb).on('error', console.error) | ||
}) | ||
} | ||
@@ -28,0 +29,0 @@ } |
@@ -10,3 +10,5 @@ var Rascal = require('../..') | ||
broker.on('error', bail) | ||
broker.on('error', function(err) { | ||
console.error(err.message) | ||
}) | ||
@@ -47,5 +49,8 @@ _.each(broker.config.subscriptions, function(subscriptionConfig, subscriptionName) { | ||
if (err) return console.log(err.message) | ||
publication.on('success', function() { | ||
// confirmed | ||
}) | ||
publication | ||
.on('success', function() { | ||
// confirmed | ||
}).on('error', function(err) { | ||
console.error(err.message) | ||
}) | ||
}) | ||
@@ -52,0 +57,0 @@ }, 1000) |
@@ -9,3 +9,3 @@ 'use strict' | ||
module.exports = _.curry(function(config, ctx, next) { | ||
debug(format('Closing connection: %s', config.connection.loggableUrl)) | ||
debug(format('Closing connection: %s', ctx.connectionConfig.loggableUrl)) | ||
if (!ctx.connection) return next(null, config, ctx) | ||
@@ -12,0 +12,0 @@ ctx.connection.close(function(err) { |
@@ -7,28 +7,46 @@ 'use strict' | ||
var amqplib = require('amqplib/callback_api') | ||
var async = require('async') | ||
module.exports = _.curry(function(config, ctx, next) { | ||
debug(format('Connecting to broker using url: %s', config.connection.loggableUrl)) | ||
var candidates = _.shuffle(config.connections) | ||
var attempt = 0 | ||
amqplib.connect(config.connection.url, function(err, connection) { | ||
if (err) return next(err, config, ctx) | ||
async.retry(candidates.length, function(cb) { | ||
var connectionConfig = candidates[attempt++] | ||
connect(connectionConfig, function(err, connection) { | ||
if (err) return cb(err) | ||
ctx.connection = connection | ||
ctx.connectionConfig = connectionConfig | ||
cb() | ||
}) | ||
}, function(err) { | ||
next(err, config, ctx) | ||
}) | ||
ctx.connection = connection | ||
function connect(connectionConfig, cb) { | ||
debug(format('Connecting to broker using url: %s', connectionConfig.loggableUrl)) | ||
/* | ||
* If an error occurs during initialisation (e.g. if checkExchanges fails), | ||
* and no error handler has been bound to the connection, then the error will bubble up | ||
* to the UncaughtException handler, potentially crashing the node process. | ||
* | ||
* By adding an error handler now, we ensure that instead of being emitted as events | ||
* errors will be passed via the callback chain, so they can still be handled by the caller | ||
* | ||
* This error handle is removed in the vhost after the initialiation has complete | ||
*/ | ||
connection.on('error', function(err) { | ||
debug(format('Received error: %s', err.message)) | ||
amqplib.connect(connectionConfig.url, function(err, connection) { | ||
if (err) return cb(err) | ||
/* | ||
* If an error occurs during initialisation (e.g. if checkExchanges fails), | ||
* and no error handler has been bound to the connection, then the error will bubble up | ||
* to the UncaughtException handler, potentially crashing the node process. | ||
* | ||
* By adding an error handler now, we ensure that instead of being emitted as events | ||
* errors will be passed via the callback chain, so they can still be handled by the caller | ||
* | ||
* This error handle is removed in the vhost after the initialiation has complete | ||
*/ | ||
connection.on('error', function(err) { | ||
debug(format('Received error: %s', err.message)) | ||
}) | ||
cb(null, connection) | ||
}) | ||
next(null, config, ctx) | ||
}) | ||
} | ||
}) |
@@ -21,2 +21,3 @@ var debug = require('debug')('rascal:Vhost') | ||
var connection | ||
var connectionConfig | ||
var channelAllocator = async.queue(createChannel, 1) | ||
@@ -43,2 +44,3 @@ | ||
connection = ctx.connection | ||
connectionConfig = ctx.connectionConfig | ||
channelAllocator.resume() | ||
@@ -84,3 +86,3 @@ return next(null, self) | ||
connection.on('error', function(err) { | ||
debug(format('Error disconnecting from %s. Original error was: %s', config.connection.loggableUrl, err.message)) | ||
debug(format('Error disconnecting from %s. Original error was: %s', connectionConfig.loggableUrl, err.message)) | ||
}) | ||
@@ -111,3 +113,3 @@ connection.close(next) | ||
function handleConnectionError(config, err) { | ||
debug(format('Handling connection error: %s from %s', err.message, config.connection.loggableUrl)) | ||
debug(format('Handling connection error: %s from %s', err.message, connectionConfig.loggableUrl)) | ||
self.emit('disconnect') | ||
@@ -117,4 +119,4 @@ channelAllocator.pause() | ||
self.emit('error', err) | ||
config.connection.retry && self.init(function(err) { | ||
if (err) return setTimeout(handleConnectionError.bind(null, config, err), config.connection.retry.delay) | ||
connectionConfig.retry && self.init(function(err) { | ||
if (err) return setTimeout(handleConnectionError.bind(null, config, err), connectionConfig.retry.delay) | ||
}) | ||
@@ -121,0 +123,0 @@ } |
@@ -41,15 +41,24 @@ 'use strict' | ||
rascalConfig.vhosts[name].namespace = vhostConfig.namespace === true ? uuid() : vhostConfig.namespace | ||
configureConnection(vhostConfig, name) | ||
configureConnections(vhostConfig, name) | ||
} | ||
function configureConnection(vhostConfig, name) { | ||
vhostConfig.connection = _.defaultsDeep(vhostConfig.connection || {}, vhostConfig.defaults.connection) | ||
vhostConfig.connection.vhost = vhostConfig.connection.vhost !== undefined ? vhostConfig.connection.vhost : name, | ||
vhostConfig.connection.auth = vhostConfig.connection.user + ':' + vhostConfig.connection.password | ||
vhostConfig.connection.pathname = vhostConfig.connection.vhost === '/' ? '' : vhostConfig.connection.vhost | ||
vhostConfig.connection.query = vhostConfig.connection.options | ||
vhostConfig.connection.url = vhostConfig.connection.url || url.format(vhostConfig.connection) | ||
vhostConfig.connection.loggableUrl = vhostConfig.connection.url.replace(/:[^:]*?@/, ':***@') | ||
function configureConnections(vhostConfig, vhostName) { | ||
vhostConfig.connections = _.chain([]).concat(vhostConfig.connections, vhostConfig.connection).compact().unique().value() | ||
if (vhostConfig.connections.length === 0) vhostConfig.connections.push({}) | ||
_.each(vhostConfig.connections, function(connection) { | ||
configureConnection(vhostConfig, vhostName, connection) | ||
}) | ||
delete vhostConfig.connection | ||
} | ||
function configureConnection(vhostConfig, vhostName, connection) { | ||
_.defaultsDeep(connection, vhostConfig.defaults.connection) | ||
connection.vhost = connection.vhost !== undefined ? connection.vhost : vhostName, | ||
connection.auth = connection.user + ':' + connection.password | ||
connection.pathname = connection.vhost === '/' ? '' : connection.vhost | ||
connection.query = connection.options | ||
connection.url = connection.url || url.format(connection) | ||
connection.loggableUrl = connection.url.replace(/:[^:]*?@/, ':***@') | ||
} | ||
function configureVhostPublications(vhostConfig) { | ||
@@ -56,0 +65,0 @@ _.each(vhostConfig.publications, function(publicationConfig, name) { |
@@ -12,3 +12,4 @@ module.exports = { | ||
options: { | ||
heartbeat: 5 | ||
heartbeat: 5, | ||
connection_timeout: 5 | ||
}, | ||
@@ -15,0 +16,0 @@ retry: { |
@@ -25,3 +25,3 @@ 'use strict' | ||
function validateVhost(vhost, vhostName) { | ||
validateAttributes('Vhost', vhost, vhostName, ['defaults', 'namespace', 'name', 'connection', 'exchanges', 'queues', 'bindings']) | ||
validateAttributes('Vhost', vhost, vhostName, ['defaults', 'namespace', 'name', 'connection', 'connections', 'exchanges', 'queues', 'bindings']) | ||
validateConnectionAttributes(vhost.connection, vhostName, ['slashes', 'protocol', 'hostname', 'user', 'password', 'port', 'vhost', 'options', 'retry', 'auth', 'pathname', 'query', 'url', 'loggableUrl']) | ||
@@ -28,0 +28,0 @@ validateExchanges(vhost, vhostName, vhost.exchanges) |
{ | ||
"name": "rascal", | ||
"version": "0.12.0", | ||
"version": "0.13.0", | ||
"description": "A friendly wrapper around amqplib with (mostly) safe defaults", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
151
README.md
@@ -104,5 +104,5 @@ # Rascal | ||
"vhosts": { | ||
"v1": { | ||
"namespace": true | ||
} | ||
"v1": { | ||
"namespace": true | ||
} | ||
} | ||
@@ -118,7 +118,7 @@ } | ||
"vhosts": { | ||
"v1": { | ||
"connection": { | ||
"url": "amqp://guest:guest@example.com:5672/v1?heartbeat=10" | ||
} | ||
"v1": { | ||
"connection": { | ||
"url": "amqp://guest:guest@example.com:5672/v1?heartbeat=10" | ||
} | ||
} | ||
} | ||
@@ -131,16 +131,16 @@ } | ||
"vhosts": { | ||
"v1": { | ||
"connection": { | ||
"slashes": true, | ||
"protocol": "amqp", | ||
"hostname": "localhost", | ||
"user": "guest", | ||
"password": "guest", | ||
"port": 5672, | ||
"vhost": "v1", | ||
"options": { | ||
"heartbeat": 5 | ||
} | ||
} | ||
"v1": { | ||
"connection": { | ||
"slashes": true, | ||
"protocol": "amqp", | ||
"hostname": "localhost", | ||
"user": "guest", | ||
"password": "guest", | ||
"port": 5672, | ||
"vhost": "v1", | ||
"options": { | ||
"heartbeat": 5 | ||
} | ||
} | ||
} | ||
} | ||
@@ -153,9 +153,9 @@ } | ||
"vhosts": { | ||
"v1": { | ||
"connection": { | ||
"hostname": "example.com", | ||
"user": "bob", | ||
"password": "secret" | ||
} | ||
"v1": { | ||
"connection": { | ||
"hostname": "example.com", | ||
"user": "bob", | ||
"password": "secret" | ||
} | ||
} | ||
} | ||
@@ -168,9 +168,9 @@ } | ||
"vhosts": { | ||
"v1": { | ||
"connection": { | ||
"retry": { | ||
"delay": 1000 | ||
} | ||
} | ||
"v1": { | ||
"connection": { | ||
"retry": { | ||
"delay": 1000 | ||
} | ||
} | ||
} | ||
} | ||
@@ -180,2 +180,23 @@ } | ||
#### Cluster Connections | ||
If you specify an array of connections instead of a single connection object Rascal will pick at Random | ||
```json | ||
{ | ||
"vhosts": { | ||
"v1": { | ||
"connection": [ | ||
{ | ||
"url": "amqp://guest:guest@example1.com:5672/v1?heartbeat=10" | ||
}, | ||
{ | ||
"url": "amqp://guest:guest@example2.com:5672/v1?heartbeat=10" | ||
}, | ||
{ | ||
"url": "amqp://guest:guest@example3.com:5672/v1?heartbeat=10" | ||
} | ||
] | ||
} | ||
} | ||
} | ||
#### Exchanges | ||
@@ -191,10 +212,10 @@ | ||
"vhosts": { | ||
"v1": { | ||
"exchanges": { | ||
"e1": { | ||
"assert": false, | ||
"check": true | ||
} | ||
} | ||
"v1": { | ||
"exchanges": { | ||
"e1": { | ||
"assert": false, | ||
"check": true | ||
} | ||
} | ||
} | ||
} | ||
@@ -212,12 +233,12 @@ } | ||
"vhosts": { | ||
"v1": { | ||
"exchanges": { | ||
"e1": { | ||
"type": "fanout", | ||
"options": { | ||
"durable": false | ||
} | ||
} | ||
"v1": { | ||
"exchanges": { | ||
"e1": { | ||
"type": "fanout", | ||
"options": { | ||
"durable": false | ||
} | ||
} | ||
} | ||
} | ||
} | ||
@@ -238,10 +259,10 @@ } | ||
"vhosts": { | ||
"v1": { | ||
"queues": { | ||
"q1": { | ||
"assert": false, | ||
"check": true | ||
} | ||
} | ||
"v1": { | ||
"queues": { | ||
"q1": { | ||
"assert": false, | ||
"check": true | ||
} | ||
} | ||
} | ||
} | ||
@@ -256,9 +277,9 @@ } | ||
"vhosts": { | ||
"v1": { | ||
"queues": { | ||
"q1": { | ||
"purge": true | ||
} | ||
} | ||
"v1": { | ||
"queues": { | ||
"q1": { | ||
"purge": true | ||
} | ||
} | ||
} | ||
} | ||
@@ -273,8 +294,8 @@ } | ||
"queues": { | ||
"q1": { | ||
"options": { | ||
"durable": false, | ||
"exclusive": true | ||
} | ||
"q1": { | ||
"options": { | ||
"durable": false, | ||
"exclusive": true | ||
} | ||
} | ||
} | ||
@@ -281,0 +302,0 @@ } |
@@ -33,3 +33,3 @@ var assert = require('assert') | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connection.url, 'protocol://user:password@hostname:9000/vhost?heartbeat=10&channelMax=100') | ||
assert.equal(config.vhosts.v1.connections[0].url, 'protocol://user:password@hostname:9000/vhost?heartbeat=10&channelMax=100') | ||
}) | ||
@@ -60,3 +60,3 @@ }) | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connection.url, 'foo') | ||
assert.equal(config.vhosts.v1.connections[0].url, 'foo') | ||
}) | ||
@@ -85,3 +85,3 @@ }) | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connection.loggableUrl, 'protocol://user:***@hostname:9000/v1?heartbeat=10&channelMax=100') | ||
assert.equal(config.vhosts.v1.connections[0].loggableUrl, 'protocol://user:***@hostname:9000/v1?heartbeat=10&channelMax=100') | ||
}) | ||
@@ -110,3 +110,3 @@ }) | ||
assert.ifError(err) | ||
assert.equal(config.vhosts['/'].connection.loggableUrl, 'protocol://user:***@hostname:9000?heartbeat=10&channelMax=100') | ||
assert.equal(config.vhosts['/'].connections[0].loggableUrl, 'protocol://user:***@hostname:9000?heartbeat=10&channelMax=100') | ||
}) | ||
@@ -136,6 +136,37 @@ }) | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connection.loggableUrl, 'protocol://user:***@hostname:9000/vhost?heartbeat=10&channelMax=100') | ||
assert.equal(config.vhosts.v1.connections[0].loggableUrl, 'protocol://user:***@hostname:9000/vhost?heartbeat=10&channelMax=100') | ||
}) | ||
}) | ||
it('should generate connections from an array', function() { | ||
configure({ | ||
vhosts: { | ||
v1: { | ||
connections: [ | ||
{ | ||
url: 'foo', | ||
}, | ||
{ | ||
slashes: true, | ||
protocol: 'protocol', | ||
hostname: 'hostname', | ||
port: 9000, | ||
vhost: 'vhost', | ||
user: 'user', | ||
password: 'password', | ||
options: { | ||
heartbeat: 10, | ||
channelMax: 100 | ||
} | ||
} | ||
] | ||
} | ||
} | ||
}, function(err, config) { | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connections[0].url, 'foo') | ||
assert.equal(config.vhosts.v1.connections[1].url, 'protocol://user:password@hostname:9000/vhost?heartbeat=10&channelMax=100') | ||
}) | ||
}) | ||
it('should decorate the connection config with a loggable url (b)', function() { | ||
@@ -152,3 +183,3 @@ configure({ | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connection.loggableUrl, 'protocol://user:***@hostname:9000/vhost?heartbeat=10&channelMax=100') | ||
assert.equal(config.vhosts.v1.connections[0].loggableUrl, 'protocol://user:***@hostname:9000/vhost?heartbeat=10&channelMax=100') | ||
}) | ||
@@ -155,0 +186,0 @@ }) |
@@ -37,3 +37,3 @@ var assert = require('assert') | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connection.url, 'amqp://guest:guest@localhost:5672/v1?heartbeat=5') | ||
assert.equal(config.vhosts.v1.connections[0].url, 'amqp://guest:guest@localhost:5672/v1?heartbeat=5') | ||
}) | ||
@@ -73,3 +73,3 @@ }) | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.connection.url, 'amqp://foo:bar@localhost:5672?heartbeat=10') | ||
assert.equal(config.vhosts.v1.connections[0].url, 'amqp://foo:bar@localhost:5672?heartbeat=10') | ||
}) | ||
@@ -76,0 +76,0 @@ }) |
269113
5795
817