Comparing version 0.8.0 to 0.8.1
@@ -115,3 +115,3 @@ var debug = require('debug')('rascal:Broker') | ||
this.getFullyQualifiedName = function(vhost, name) { | ||
this.getFullyQualifiedName = this.qualify = function(vhost, name) { | ||
return fqn.qualify(name, config.vhosts[vhost].namespace) | ||
@@ -118,0 +118,0 @@ } |
@@ -36,9 +36,12 @@ var debug = require('debug')('rascal:Publication') | ||
this.forward = function(message, overrides, next) { | ||
var originalQueue = message.properties.headers.rascal.originalQueue | ||
var forwarded = _.get(message, format('properties.headers.rascal.%s.forwarded', originalQueue), 0) | ||
var publishConfig = _.defaultsDeep(overrides, config, { routingKey: message.fields.routingKey }) | ||
publishConfig.options = _.defaultsDeep(publishConfig.options, message.properties, { CC: [], headers: { rascal: { forwarded: 0 } } }) | ||
publishConfig.options.headers.rascal.forwarded++ | ||
publishConfig.options.headers.rascal.originalExchange = message.fields.exchange | ||
publishConfig.options.headers.rascal.originalRoutingKey = message.fields.routingKey | ||
publishConfig.options = _.defaultsDeep(publishConfig.options, message.properties) | ||
publishConfig.options.CC = _.chain([]).concat(publishConfig.options.CC, format('%s.%s', message.properties.headers.rascal.originalQueue, publishConfig.routingKey)).unique().value() | ||
_.set(publishConfig, format('options.headers.rascal.%s.forwarded', originalQueue), forwarded + 1) | ||
_.set(publishConfig, 'options.headers.rascal.originalExchange', message.fields.exchange) | ||
_.set(publishConfig, 'options.headers.rascal.originalRoutingKey', message.fields.routingKey) | ||
_.set(publishConfig, 'options.CC', _.chain([]).concat(publishConfig.options.CC, format('%s.%s', originalQueue, publishConfig.routingKey)).unique().value()) | ||
@@ -45,0 +48,0 @@ _publish(message.content, publishConfig, next) |
@@ -23,3 +23,3 @@ var debug = require('debug')('rascal:SubscriberError') | ||
if (handled) { | ||
debug('Message %s: was recovered using stragegy: %s', message.properties.messageId, recoveryConfig.strategy) | ||
debug('Message: %s was recovered using stragegy: %s', message.properties.messageId, recoveryConfig.strategy) | ||
return next() | ||
@@ -57,3 +57,4 @@ } | ||
var republished = message.properties.headers.rascal && message.properties.headers.rascal.republished || 0 | ||
var originalQueue = _.get(message, 'properties.headers.rascal.originalQueue') | ||
var republished = _.get(message, format('properties.headers.rascal.%s.republished', originalQueue), 0) | ||
@@ -65,7 +66,9 @@ if (strategyConfig.attempts && strategyConfig.attempts <= republished) { | ||
var publishOptions = _.defaultsDeep(message.properties, { headers: { rascal: {} } }) | ||
publishOptions.headers.rascal.republished = republished + 1 | ||
publishOptions.headers.rascal.originalExchange = message.fields.exchange | ||
publishOptions.headers.rascal.originalRoutingKey = message.fields.routingKey | ||
publishOptions.headers.rascal.error = { message: _.trunc(err.message, 1024) } | ||
var publishOptions = _.cloneDeep(message.properties) | ||
_.set(publishOptions, format('headers.rascal.%s.republished', originalQueue), republished + 1) | ||
_.set(publishOptions, 'headers.rascal.originalExchange', message.fields.exchange) | ||
_.set(publishOptions, 'headers.rascal.originalRoutingKey', message.fields.routingKey) | ||
_.set(publishOptions, 'headers.rascal.error.message', _.trunc(err.message, 1024)) | ||
_.set(publishOptions, 'headers.rascal.error.code', err.code) | ||
if (strategyConfig.immediateNack) _.set(publishOptions, format('headers.rascal.%s.immediateNack', originalQueue), true) | ||
@@ -76,5 +79,5 @@ vhost.getConfirmChannel(function(err, publisherChannel) { | ||
publisherChannel.publish(undefined, message.properties.headers.rascal.originalQueue, message.content, publishOptions, function(err) { | ||
publisherChannel.publish(undefined, originalQueue, message.content, publishOptions, function(err) { | ||
if (err) return next(err) | ||
debug('Message: %s was republished to queue: %s %d times', message.properties.messageId, message.properties.headers.rascal.originalQueue, publishOptions.headers.rascal.republished) | ||
debug('Message: %s was republished to queue: %s %d times', message.properties.messageId, originalQueue, republished + 1) | ||
session._ack(message) | ||
@@ -91,3 +94,4 @@ next(null, true) | ||
var forwarded = message.properties.headers.rascal && message.properties.headers.rascal.forwarded || 0 | ||
var originalQueue = _.get(message, 'properties.headers.rascal.originalQueue') | ||
var forwarded = _.get(message, format('properties.headers.rascal.%s.forwarded', originalQueue), 0) | ||
@@ -102,4 +106,5 @@ if (strategyConfig.attempts && strategyConfig.attempts <= forwarded) { | ||
var forwardOptions = _.defaultsDeep(strategyConfig.options || {}, { options: { headers: { rascal: {}}}}) | ||
forwardOptions.options.headers.rascal.error = { message: _.trunc(err.message, 1024) } | ||
var forwardOptions = _.cloneDeep(strategyConfig.options) || {} | ||
_.set(forwardOptions, 'options.headers.rascal.error.message', _.trunc(err.message, 1024)) | ||
_.set(forwardOptions, 'options.headers.rascal.error.code', err.code) | ||
@@ -121,9 +126,2 @@ broker.forward(strategyConfig.publication, message, forwardOptions, function(err, publication) { | ||
{ | ||
name: 'fallback', | ||
execute: function(session, message, err, strategyConfig, next) { | ||
session._nack(message, {}) | ||
next(null, true) | ||
} | ||
}, | ||
{ | ||
name: 'unknown', | ||
@@ -130,0 +128,0 @@ execute: function(session, message, err, strategyConfig, next) { |
@@ -7,2 +7,3 @@ var debug = require('debug')('rascal:Subscription') | ||
var SubscriberError = require('./SubscriberError') | ||
var format = require('util').format | ||
@@ -42,5 +43,9 @@ | ||
debug('Received message: %s from queue: %s', message.properties.messageId, config.queue) | ||
message = decorate(message) | ||
if (immediateNack(message)) return ackOrNack(session, message, true) | ||
getContent(message, config, function(err, content) { | ||
err ? handleContentError(session, decorate(message), err) | ||
: session.emit('message', decorate(message), content, ackOrNack.bind(null, session, message)) | ||
err ? handleContentError(session, message, err) | ||
: session.emit('message', message, content, ackOrNack.bind(null, session, message)) | ||
}) | ||
@@ -80,2 +85,6 @@ }, config.options, function(err, response) { | ||
function immediateNack(message) { | ||
return _.get(message, format('properties.headers.rascal.%s.immediateNack', message.properties.headers.rascal.originalQueue)) | ||
} | ||
function ackOrNack(session, message, err, recovery, next) { | ||
@@ -87,3 +96,3 @@ if (arguments.length === 2) return ackOrNack(session, message, undefined, undefined, _.noop) | ||
if (arguments.length === 4) return ackOrNack(session, message, err, recovery, _.noop) | ||
if (err) return subscriberError.handle(session, message, err, recovery || { strategy: 'fallback' }, next) | ||
if (err) return subscriberError.handle(session, message, err, recovery || { strategy: 'nack' }, next) | ||
session._ack(message) | ||
@@ -90,0 +99,0 @@ next() |
{ | ||
"name": "rascal", | ||
"version": "0.8.0", | ||
"version": "0.8.1", | ||
"description": "A friendly wrapper around amqplib with (mostly) safe defaults", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -563,2 +563,4 @@ # Rascal | ||
Dead lettering is a good option for invalid messages but with one major flaw - because the message cannot be modified it cannot be annotated with the error details. This makes it difficult to do anything useful with messages once dead lettered. | ||
##### Republish | ||
@@ -577,2 +579,4 @@ ```javascript | ||
``` | ||
Rascal also annotates the message with detail of the error ```message.properties.headers.rascal.<queue>.error``` which can be useful if you eventually dead letter it. | ||
Before using republish please consider the following: | ||
@@ -588,2 +592,9 @@ | ||
##### Republish with immediate nack | ||
As mentioned previously, dead lettering invalid messages is a good strategy with one flaw - since there is no way to modify the message you cannot annotate it with failure details. A solution to this is to republish with attempts = 1 and then nacking it to a dead letter exchange. The problem with this approach is that invalid messages will always be processed twice. To workaround this set immediateNack to true in the recovery options. This will instruct Rascal to nack the message immediately instead of emitting the 'message' event. | ||
```js | ||
ackOrNack(err, { strategy: 'republish', immediateNack: true }) | ||
``` | ||
If you ever want to resend the message to the same queue you will have to remove the ```properties.headers.rascal.<queue>.immediateNack``` header first. | ||
##### Forward | ||
@@ -590,0 +601,0 @@ Instead of republishing the message to the same queue you can forward it to a Rascal publication. You should read the section entitled [Forwarding messages](Forwarding_messages) to understand the risks of this. |
@@ -190,3 +190,2 @@ var assert = require('assert') | ||
assert.ifError(err) | ||
console.log(config) | ||
assert.equal(config.vhosts.v1.exchanges.e1.assert, false) | ||
@@ -193,0 +192,0 @@ assert.equal(config.vhosts.v1.exchanges.e1.type, 'direct') |
@@ -312,3 +312,3 @@ var assert = require('assert') | ||
assert.equal(message.content.toString(), 'test message') | ||
assert.equal(message.properties.headers.rascal.forwarded, true) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.originalExchange, namespace + ':e1') | ||
@@ -455,3 +455,3 @@ done() | ||
assert.equal(message.content.toString(), 'test message') | ||
assert.equal(message.properties.headers.rascal.forwarded, true) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.originalRoutingKey, 'rk1') | ||
@@ -458,0 +458,0 @@ assert.equal(message.properties.headers.rascal.originalExchange, namespace + ':e1') |
@@ -618,5 +618,6 @@ var assert = require('assert') | ||
messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1 | ||
if (messages[message.properties.messageId] < 10) return ackOrNack(new Error('republish me'), { strategy: 'republish' }) | ||
assert.equal(message.properties.headers.rascal.republished, 9) | ||
if (messages[message.properties.messageId] < 10) return ackOrNack({ message: 'republish me', code: 'red' }, { strategy: 'republish' }) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].republished, 9) | ||
assert.equal(message.properties.headers.rascal.error.message, 'republish me') | ||
assert.equal(message.properties.headers.rascal.error.code, 'red') | ||
done() | ||
@@ -647,3 +648,3 @@ }) | ||
if (messages[message.properties.messageId] < 10) return ackOrNack(new Error(_.pad('x', 10000, 'x')), { strategy: 'republish' }) | ||
assert.equal(message.properties.headers.rascal.republished, 9) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].republished, 9) | ||
assert.equal(message.properties.headers.rascal.error.message.length, 1024) | ||
@@ -676,3 +677,3 @@ done() | ||
if (messages[message.properties.messageId] < 2) return ackOrNack(new Error('republish'), { strategy: 'republish' }) | ||
assert.equal(message.properties.headers.rascal.republished, 1) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].republished, 1) | ||
assert.equal(message.properties.headers.foo, 'bar') | ||
@@ -745,2 +746,80 @@ assert.equal(message.properties.messageId, messageId) | ||
it.only('should immediately nack republished messages when requested', function(done) { | ||
createBroker({ | ||
vhosts: { | ||
'/': { | ||
namespace: namespace, | ||
exchanges: { | ||
e1: { | ||
assert: true | ||
}, | ||
e2: { | ||
assert: true | ||
} | ||
}, | ||
queues: { | ||
q1: { | ||
assert: true, | ||
options: { | ||
arguments: { | ||
'x-dead-letter-exchange': 'e2' | ||
} | ||
} | ||
}, | ||
q2: { | ||
assert: true | ||
} | ||
}, | ||
bindings: { | ||
b1: { | ||
source: 'e1', | ||
destination: 'q1' | ||
}, | ||
b2: { | ||
source: 'e2', | ||
destination: 'q2' | ||
} | ||
} | ||
} | ||
}, | ||
publications: _.pick(publications, 'p1'), | ||
subscriptions: { | ||
s1: { | ||
vhost: '/', | ||
queue: 'q1' | ||
}, | ||
s2: { | ||
vhost: '/', | ||
queue: 'q2' | ||
} | ||
} | ||
}, function(err, broker) { | ||
assert.ifError(err) | ||
broker.publish('p1', 'test message', function(err) { | ||
assert.ifError(err) | ||
broker.subscribe('s1', function(err, subscription) { | ||
assert.ifError(err) | ||
var count = 0 | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
assert.equal(++count, 1) | ||
assert.ok(message) | ||
ackOrNack(new Error('immediate nack'), { | ||
strategy: 'republish', | ||
immediateNack: true | ||
}) | ||
}) | ||
}) | ||
broker.subscribe('s2', function(err, subscription) { | ||
assert.ifError(err) | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
assert.ok(message) | ||
done() | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
it('should forward messages to publication when requested', function(done) { | ||
@@ -760,3 +839,3 @@ | ||
assert.ok(message) | ||
ackOrNack(new Error('forward me'), { strategy: 'forward', publication: 'p2' }) | ||
ackOrNack({ message: 'forward me', code: 'red' }, { strategy: 'forward', publication: 'p2' }) | ||
}) | ||
@@ -770,4 +849,5 @@ }) | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal.forwarded, 1) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.error.message, 'forward me') | ||
assert.equal(message.properties.headers.rascal.error.code, 'red') | ||
done() | ||
@@ -802,3 +882,3 @@ }) | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal.forwarded, 1) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.error.message.length, 1024) | ||
@@ -834,3 +914,3 @@ done() | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal.forwarded, 1) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
done() | ||
@@ -872,3 +952,3 @@ }) | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal.forwarded, 1) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.foo, 'bar') | ||
@@ -875,0 +955,0 @@ assert.equal(message.properties.messageId, messageId) |
245914
5327
820