Comparing version 0.8.1 to 0.9.0
@@ -36,12 +36,7 @@ var debug = require('debug')('rascal:Publication') | ||
this.forward = function(message, overrides, next) { | ||
var originalQueue = message.properties.headers.rascal.originalQueue | ||
var forwarded = _.get(message, format('properties.headers.rascal.%s.forwarded', originalQueue), 0) | ||
var publishConfig = _.defaultsDeep(overrides, config, { routingKey: message.fields.routingKey }) | ||
publishConfig.options = _.defaultsDeep(publishConfig.options, message.properties) | ||
_.set(publishConfig, format('options.headers.rascal.%s.forwarded', originalQueue), forwarded + 1) | ||
_.set(publishConfig, 'options.headers.rascal.originalExchange', message.fields.exchange) | ||
_.set(publishConfig, 'options.headers.rascal.originalRoutingKey', message.fields.routingKey) | ||
_.set(publishConfig, 'options.CC', _.chain([]).concat(publishConfig.options.CC, format('%s.%s', originalQueue, publishConfig.routingKey)).unique().value()) | ||
@@ -48,0 +43,0 @@ _publish(message.content, publishConfig, next) |
@@ -57,3 +57,3 @@ var debug = require('debug')('rascal:SubscriberError') | ||
var originalQueue = _.get(message, 'properties.headers.rascal.originalQueue') | ||
var republished = _.get(message, format('properties.headers.rascal.%s.republished', originalQueue), 0) | ||
var republished = _.get(message, format('properties.headers.rascal.recovery.%s.republished', originalQueue), 0) | ||
@@ -66,3 +66,3 @@ if (strategyConfig.attempts && strategyConfig.attempts <= republished) { | ||
var publishOptions = _.cloneDeep(message.properties) | ||
_.set(publishOptions, format('headers.rascal.%s.republished', originalQueue), republished + 1) | ||
_.set(publishOptions, format('headers.rascal.recovery.%s.republished', originalQueue), republished + 1) | ||
_.set(publishOptions, 'headers.rascal.originalExchange', message.fields.exchange) | ||
@@ -72,3 +72,3 @@ _.set(publishOptions, 'headers.rascal.originalRoutingKey', message.fields.routingKey) | ||
_.set(publishOptions, 'headers.rascal.error.code', err.code) | ||
if (strategyConfig.immediateNack) _.set(publishOptions, format('headers.rascal.%s.immediateNack', originalQueue), true) | ||
if (strategyConfig.immediateNack) _.set(publishOptions, format('headers.rascal.recovery.%s.immediateNack', originalQueue), true) | ||
@@ -94,3 +94,3 @@ vhost.getConfirmChannel(function(err, publisherChannel) { | ||
var originalQueue = _.get(message, 'properties.headers.rascal.originalQueue') | ||
var forwarded = _.get(message, format('properties.headers.rascal.%s.forwarded', originalQueue), 0) | ||
var forwarded = _.get(message, format('properties.headers.rascal.recovery.%s.forwarded', originalQueue), 0) | ||
@@ -106,4 +106,6 @@ if (strategyConfig.attempts && strategyConfig.attempts <= forwarded) { | ||
var forwardOptions = _.cloneDeep(strategyConfig.options) || {} | ||
_.set(forwardOptions, format('options.headers.rascal.recovery.%s.forwarded', originalQueue), forwarded + 1) | ||
_.set(forwardOptions, 'options.headers.rascal.error.message', _.trunc(err.message, 1024)) | ||
_.set(forwardOptions, 'options.headers.rascal.error.code', err.code) | ||
_.set(forwardOptions, 'options.CC', _.chain([]).concat(format('%s.%s', originalQueue, message.fields.routingKey)).unique().value()) | ||
@@ -110,0 +112,0 @@ broker.forward(strategyConfig.publication, message, forwardOptions, function(err, publication) { |
@@ -84,3 +84,3 @@ var debug = require('debug')('rascal:Subscription') | ||
function immediateNack(message) { | ||
return _.get(message, format('properties.headers.rascal.%s.immediateNack', message.properties.headers.rascal.originalQueue)) | ||
return _.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue)) | ||
} | ||
@@ -87,0 +87,0 @@ |
{ | ||
"name": "rascal", | ||
"version": "0.8.1", | ||
"version": "0.9.0", | ||
"description": "A friendly wrapper around amqplib with (mostly) safe defaults", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -312,3 +312,4 @@ var assert = require('assert') | ||
assert.equal(message.content.toString(), 'test message') | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.ok(/\w+-\w+-\w+-\w+-\w+:q1/.test(message.properties.headers.rascal.originalQueue), format('%s failed to match expected pattern', message.properties.headers.rascal.originalQueue)) | ||
assert.equal(message.properties.headers.rascal.originalRoutingKey, 'rk1') | ||
assert.equal(message.properties.headers.rascal.originalExchange, namespace + ':e1') | ||
@@ -384,92 +385,2 @@ done() | ||
it('should CC forwarded messages to <queue>.<routing key>', function(done) { | ||
createBroker({ | ||
vhosts: { | ||
'/': { | ||
namespace: namespace, | ||
exchanges: { | ||
e1: { | ||
assert: true | ||
}, | ||
e2: { | ||
assert: true | ||
} | ||
}, | ||
queues: { | ||
q1: { | ||
assert: true | ||
}, | ||
q2: { | ||
assert: true | ||
} | ||
}, | ||
bindings: { | ||
b1: { | ||
source: 'e1', | ||
destination: 'q1' | ||
}, | ||
b2: { | ||
source: 'e2', | ||
destination: 'q2', | ||
bindingKey: 'q1.rk2', | ||
qualifyBindingKeys: true | ||
} | ||
} | ||
} | ||
}, | ||
publications: { | ||
p1: { | ||
exchange: 'e1', | ||
routingKey: 'rk1' | ||
}, | ||
p2: { | ||
exchange: 'e2', | ||
routingKey: 'rk2' | ||
} | ||
}, | ||
subscriptions: { | ||
s1: { | ||
vhost: '/', | ||
queue: 'q1' | ||
} | ||
} | ||
}, function(err, broker) { | ||
assert.ifError(err) | ||
var messageId | ||
broker.subscribe('s1', function(err, subscription) { | ||
assert.ifError(err) | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
broker.forward('p2', message, function(err, publication) { | ||
publication.on('success', function() { | ||
ackOrNack() | ||
amqputils.getMessage('q2', namespace, function(err, message) { | ||
assert.ifError(err) | ||
assert.ok(message) | ||
assert.equal(message.fields.routingKey, 'rk2') | ||
assert.equal(message.properties.messageId, messageId) | ||
assert.equal(message.properties.contentType, 'text/plain') | ||
assert.equal(message.content.toString(), 'test message') | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.originalRoutingKey, 'rk1') | ||
assert.equal(message.properties.headers.rascal.originalExchange, namespace + ':e1') | ||
done() | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
broker.publish('p1', 'test message', function(err, publication) { | ||
assert.ifError(err) | ||
publication.on('success', function(_messageId) { | ||
messageId = _messageId | ||
}) | ||
}) | ||
}) | ||
}) | ||
function createBroker(config, next) { | ||
@@ -476,0 +387,0 @@ config = _.defaultsDeep(config, testConfig) |
@@ -619,3 +619,3 @@ var assert = require('assert') | ||
if (messages[message.properties.messageId] < 10) return ackOrNack({ message: 'republish me', code: 'red' }, { strategy: 'republish' }) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].republished, 9) | ||
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 9) | ||
assert.equal(message.properties.headers.rascal.error.message, 'republish me') | ||
@@ -648,3 +648,3 @@ assert.equal(message.properties.headers.rascal.error.code, 'red') | ||
if (messages[message.properties.messageId] < 10) return ackOrNack(new Error(_.pad('x', 10000, 'x')), { strategy: 'republish' }) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].republished, 9) | ||
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 9) | ||
assert.equal(message.properties.headers.rascal.error.message.length, 1024) | ||
@@ -677,3 +677,3 @@ done() | ||
if (messages[message.properties.messageId] < 2) return ackOrNack(new Error('republish'), { strategy: 'republish' }) | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].republished, 1) | ||
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 1) | ||
assert.equal(message.properties.headers.foo, 'bar') | ||
@@ -746,3 +746,3 @@ assert.equal(message.properties.messageId, messageId) | ||
it.only('should immediately nack republished messages when requested', function(done) { | ||
it('should immediately nack republished messages when requested', function(done) { | ||
createBroker({ | ||
@@ -848,3 +848,5 @@ vhosts: { | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.CC.length, 1) | ||
assert.equal(message.properties.headers.CC[0], broker.qualify('/', 'q1') + '.foo') | ||
assert.equal(message.properties.headers.rascal.error.message, 'forward me') | ||
@@ -881,3 +883,3 @@ assert.equal(message.properties.headers.rascal.error.code, 'red') | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.error.message.length, 1024) | ||
@@ -913,3 +915,3 @@ done() | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1) | ||
done() | ||
@@ -951,3 +953,3 @@ }) | ||
ackOrNack() | ||
assert.equal(message.properties.headers.rascal[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1) | ||
assert.equal(message.properties.headers.foo, 'bar') | ||
@@ -954,0 +956,0 @@ assert.equal(message.properties.messageId, messageId) |
242849
5244