Comparing version 0.6.6 to 0.7.0
@@ -36,3 +36,3 @@ var debug = require('debug')('rascal:Publication') | ||
this.forward = function(message, overrides, next) { | ||
var publishConfig = _.defaultsDeep(overrides, config) | ||
var publishConfig = _.defaultsDeep(overrides, config, { routingKey: message.fields.routingKey }) | ||
publishConfig.options = _.defaultsDeep(publishConfig.options, message.properties, { headers: { rascal: { forwarded: 0 } } }) | ||
@@ -39,0 +39,0 @@ publishConfig.options.headers.rascal.forwarded++ |
@@ -91,2 +91,6 @@ 'use strict' | ||
config.bindings[name] = _.defaultsDeep(bindingConfig, config.defaults.bindings) | ||
if (bindingConfig.qualifyBindingKeys) { | ||
config.bindings[name].bindingKey = fqn.qualify(bindingConfig.bindingKey, config.namespace) | ||
} | ||
if (bindingConfig.destinationType === 'queue') { | ||
@@ -93,0 +97,0 @@ config.bindings[name].bindingKey = fqn.prefix(config.queues[bindingConfig.destination].replyTo, bindingConfig.bindingKey, '.') |
@@ -32,3 +32,2 @@ module.exports = { | ||
vhost: '/', | ||
routingKey: '', | ||
confirm: true, | ||
@@ -35,0 +34,0 @@ options: { |
@@ -67,3 +67,3 @@ 'use strict' | ||
function validateBinding(vhost, vhostName, binding, bindingName) { | ||
validateVhostChildAttributes(vhostName, 'Binding', binding, bindingName, ['fullyQualifiedName', 'name', 'source', 'destination', 'destinationType', 'bindingKey', 'bindingKeys', 'options']) | ||
validateVhostChildAttributes(vhostName, 'Binding', binding, bindingName, ['fullyQualifiedName', 'name', 'source', 'destination', 'destinationType', 'bindingKey', 'bindingKeys', 'qualifyBindingKeys', 'options']) | ||
if (!binding.source) throw new Error(format('Binding: %s in vhost: %s is missing a source', bindingName, vhostName)) | ||
@@ -70,0 +70,0 @@ if (!binding.destination) throw new Error(format('Binding: %s in vhost: %s is missing a destination', bindingName, vhostName)) |
{ | ||
"name": "rascal", | ||
"version": "0.6.6", | ||
"version": "0.7.0", | ||
"description": "A friendly wrapper around amqplib with (mostly) safe defaults", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -515,6 +515,6 @@ # Rascal | ||
#### Forwarding messages | ||
Sometimes you want to forward a message to a publication. This may be part of a shovel program for transferming messages between vhosts, or because you want to ensure a sequence in some workflow, but do not need to modify the original message. Rascal supports this via ```broker.forward```. The syntax is similar to broker.publish apart from you pass in the original message you want to be forwarded instead of the message payload. | ||
Sometimes you want to forward a message to a publication. This may be part of a shovel program for transferming messages between vhosts, or because you want to ensure a sequence in some workflow, but do not need to modify the original message. Rascal supports this via ```broker.forward```. The syntax is similar to ```broker.publish``` except from you pass in the original message you want to be forwarded instead of the message payload. If the publication or overrides don't specify a routing key, the original forwarding key will be maintained. | ||
```javascript | ||
broker.forward("p1", message, function(err, publication) { | ||
broker.forward("p1", message, overrides, function(err, publication) { | ||
publication.on("success", function(messageId) { | ||
@@ -521,0 +521,0 @@ console.log("Message id was", messageId) |
@@ -443,2 +443,31 @@ var assert = require('assert') | ||
it('should qualify bindings keys when specified', function() { | ||
configure({ | ||
vhosts: { | ||
v1: { | ||
namespace: 'ns1', | ||
queues: { | ||
q1: { | ||
} | ||
}, | ||
bindings: { | ||
b1: { | ||
source: 'e1', | ||
destination: 'q1', | ||
bindingKey: 'q1', | ||
qualifyBindingKeys: true | ||
}, | ||
'e1[q1] -> q1': { | ||
qualifyBindingKeys: true | ||
} | ||
} | ||
} | ||
} | ||
}, function(err, config) { | ||
assert.ifError(err) | ||
assert.equal(config.vhosts.v1.bindings.b1.bindingKey, 'ns1:q1') | ||
assert.equal(config.vhosts.v1.bindings['e1[q1] -> q1'].bindingKey, 'ns1:q1') | ||
}) | ||
}) | ||
it('should inflate bindings with empty structure', function() { | ||
@@ -445,0 +474,0 @@ configure({ |
@@ -278,6 +278,8 @@ var assert = require('assert') | ||
p1: { | ||
exchange: 'e1' | ||
exchange: 'e1', | ||
routingKey: 'rk1' | ||
}, | ||
p2: { | ||
exchange: 'e2' | ||
exchange: 'e2', | ||
routingKey: 'rk1' | ||
} | ||
@@ -307,2 +309,3 @@ }, | ||
assert.ok(message) | ||
assert.equal(message.fields.routingKey, 'rk1') | ||
assert.equal(message.properties.messageId, messageId) | ||
@@ -329,2 +332,54 @@ assert.equal(message.properties.contentType, 'text/plain') | ||
it('should forward messages to publications maintaining the original routing key when not specified', function(done) { | ||
createBroker({ | ||
vhosts: vhosts, | ||
publications: { | ||
p1: { | ||
exchange: 'e1' | ||
}, | ||
p2: { | ||
exchange: 'e2' | ||
} | ||
}, | ||
subscriptions: { | ||
s1: { | ||
vhost: '/', | ||
queue: 'q1' | ||
} | ||
} | ||
}, function(err, broker) { | ||
assert.ifError(err) | ||
var messageId | ||
broker.subscribe('s1', function(err, subscription) { | ||
assert.ifError(err) | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
broker.forward('p2', message, function(err, publication) { | ||
publication.on('success', function() { | ||
ackOrNack() | ||
amqputils.getMessage('q2', namespace, function(err, message) { | ||
assert.ifError(err) | ||
assert.ok(message) | ||
assert.equal(message.fields.routingKey, 'original-routing-key') | ||
done() | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
broker.publish('p1', 'test message', 'original-routing-key', function(err, publication) { | ||
assert.ifError(err) | ||
publication.on('success', function(_messageId) { | ||
messageId = _messageId | ||
}) | ||
}) | ||
}) | ||
}) | ||
function createBroker(config, next) { | ||
@@ -331,0 +386,0 @@ config = _.defaultsDeep(config, testConfig) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
208880
4543