Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rascal

Package Overview
Dependencies
Maintainers
4
Versions
184
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rascal - npm Package Compare versions

Comparing version 9.4.0 to 10.0.0

3

CHANGELOG.md
# Change Log
## 10.0.0
- Using rascal to consume messages published with broker.forward no longer restores original routing headers by default, unless used in the context of a recovery strategy. See the broker.forward section of the readme for more information.
## 9.4.0

@@ -4,0 +7,0 @@ ### Added

5

lib/amqp/Publication.js

@@ -37,3 +37,3 @@ var debug = require('debug')('rascal:Publication');

this.publish = function(payload, overrides, next) {
var publishConfig = _.defaultsDeep(overrides, config);
var publishConfig = _.defaultsDeep({}, overrides, config);
var content = getContent(payload, publishConfig);

@@ -50,6 +50,7 @@ publishConfig.options.contentType = publishConfig.options.contentType || content.type;

var originalQueue = message.properties.headers.rascal.originalQueue;
var publishConfig = _.defaultsDeep(overrides, config, { routingKey: message.fields.routingKey });
var publishConfig = _.defaultsDeep({}, overrides, config, { routingKey: message.fields.routingKey });
publishConfig.options = _.defaultsDeep(publishConfig.options, message.properties);
_.set(publishConfig, 'options.headers.rascal.restoreRoutingHeaders', !!publishConfig.restoreRoutingHeaders);
_.set(publishConfig, 'options.headers.rascal.originalExchange', message.fields.exchange);

@@ -56,0 +57,0 @@ _.set(publishConfig, 'options.headers.rascal.originalRoutingKey', message.fields.routingKey);

@@ -84,2 +84,4 @@ var debug = require('debug')('rascal:SubscriberError');

_.set(publishOptions, 'headers.rascal.error.code', err.code);
_.set(publishOptions, 'headers.rascal.restoreRoutingHeaders', _.has(strategyConfig, 'restoreRoutingHeaders') ? strategyConfig.restoreRoutingHeaders : true);
if (strategyConfig.immediateNack) _.set(publishOptions, format('headers.rascal.recovery.%s.immediateNack', originalQueue), true);

@@ -120,8 +122,9 @@

var forwardOptions = _.cloneDeep(strategyConfig.options) || {};
_.set(forwardOptions, format('options.headers.rascal.recovery.%s.forwarded', originalQueue), forwarded + 1);
_.set(forwardOptions, 'options.headers.rascal.error.message', _.truncate(err.message, { length: 1024 }));
_.set(forwardOptions, 'options.headers.rascal.error.code', err.code);
var forwardOverrides = _.cloneDeep(strategyConfig.options) || {};
_.set(forwardOverrides, 'restoreRoutingHeaders', _.has(strategyConfig, 'restoreRoutingHeaders') ? strategyConfig.restoreRoutingHeaders : true);
_.set(forwardOverrides, format('options.headers.rascal.recovery.%s.forwarded', originalQueue), forwarded + 1);
_.set(forwardOverrides, 'options.headers.rascal.error.message', _.truncate(err.message, { length: 1024 }));
_.set(forwardOverrides, 'options.headers.rascal.error.code', err.code);
broker.forward(strategyConfig.publication, message, forwardOptions, function(err, publication) {
broker.forward(strategyConfig.publication, message, forwardOverrides, function(err, publication) {
if (err) return next(err);

@@ -128,0 +131,0 @@ publication.on('success', function() {

@@ -173,2 +173,4 @@ var debug = require('debug')('rascal:Subscription');

message.properties.headers.rascal.originalVhost = vhost.name;
if (!message.properties.headers.rascal.restoreRoutingHeaders) return;
if (message.properties.headers.rascal.originalRoutingKey) message.fields.routingKey = message.properties.headers.rascal.originalRoutingKey;

@@ -175,0 +177,0 @@ if (message.properties.headers.rascal.originalExchange) message.fields.exchange = message.properties.headers.rascal.originalExchange;

{
"name": "rascal",
"version": "9.4.0",
"version": "10.0.0",
"description": "A config driven wrapper for amqplib supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption, channel pooling and publication timeouts",

@@ -45,3 +45,3 @@ "main": "index.js",

"scripts": {
"test": "nyc --reporter text-summary mocha ./test --exit",
"test": "nyc --reporter text-summary mocha ./test",
"lint": "eslint .",

@@ -80,9 +80,9 @@ "docker": "docker run -d --name rascal-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management",

"mocha": {
"bail": true,
"check-leaks": true,
"recursive": true,
"slow": 2000,
"timeout": 5000
"timeout": 5000,
"exit": true
},
"snyk": true
}

@@ -933,2 +933,3 @@ # Rascal

```
Prior to version 10.0.0, if you used Rascal to consume a forwarded message, the subscriber would automatically restore the original routing key and exchange to the message.fields before emitting it. This was added to support the delayed retry loop advanced recovery strategy, but should not have been applied to `broker.forward`. From version 10.0.0 this behaviour has been disabled for `broker.forward` but you can turn it back on by setting `restoreRoutingHeaders` to true in the overrides. You can also disable this behaviour in the `forward` and `republish` recovery strategies by setting `restoreRoutingHeaders` to false.

@@ -935,0 +936,0 @@ **Since there is no native, transactional support for forwarding in amqplib, you are at risk of receiving duplicate messages when using ```broker.foward```**

@@ -439,2 +439,3 @@ var assert = require('assert');

assert.ok(/\w+-\w+-\w+-\w+-\w+:q1/.test(message.properties.headers.rascal.originalQueue), format('%s failed to match expected pattern', message.properties.headers.rascal.originalQueue));
assert.equal(message.properties.headers.rascal.restoreRoutingHeaders, false);
assert.equal(message.properties.headers.rascal.originalRoutingKey, 'rk1');

@@ -458,2 +459,50 @@ assert.equal(message.properties.headers.rascal.originalExchange, namespace + ':e1');

it('should instruct subscriber to restore routing headers when requested', function(done) {
createBroker({
vhosts: vhosts,
publications: {
p1: {
exchange: 'e1',
routingKey: 'rk1',
},
p2: {
exchange: 'e2',
routingKey: 'rk2',
},
},
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
},
},
}, function(err, broker) {
assert.ifError(err);
broker.subscribe('s1', function(err, subscription) {
assert.ifError(err);
subscription.on('message', function(message, content, ackOrNack) {
broker.forward('p2', message, { restoreRoutingHeaders: true }, function(err, publication) {
assert.ifError(err);
publication.on('success', function() {
ackOrNack();
amqputils.getMessage('q2', namespace, function(err, message) {
assert.ifError(err);
assert.ok(message);
assert.equal(message.properties.headers.rascal.restoreRoutingHeaders, true);
done();
});
});
});
});
});
broker.publish('p1', 'test message', function(err, publication) {
assert.ifError(err);
});
});
});
it('should forward messages to publications maintaining the original routing key when not overriden', function(done) {

@@ -460,0 +509,0 @@ createBroker({

@@ -331,2 +331,3 @@ var assert = require('assert');

assert.ok(/\w+-\w+-\w+-\w+-\w+:q1/.test(message.properties.headers.rascal.originalQueue), format('%s failed to match expected pattern', message.properties.headers.rascal.originalQueue));
assert.equal(message.properties.headers.rascal.restoreRoutingHeaders, false);
assert.equal(message.properties.headers.rascal.originalRoutingKey, 'rk1');

@@ -333,0 +334,0 @@ assert.equal(message.properties.headers.rascal.originalExchange, namespace + ':e1');

@@ -1167,3 +1167,3 @@ var assert = require('assert');

it('should override routing key when forward messages', function(done) {
it('should override routing key when forwarding messages', function(done) {
createBroker({

@@ -1197,3 +1197,3 @@ vhosts: vhosts,

it('should maintain original fields, properties and headers when forwarded', function(done) {
it('should maintain original fields, properties and headers when forwarding messages', function(done) {
createBroker({

@@ -1238,2 +1238,42 @@ vhosts: vhosts,

it('should not maintain original routing headers when requested', function(done) {
createBroker({
vhosts: vhosts,
publications: publications,
subscriptions: subscriptions,
}, function(err, broker) {
assert.ifError(err);
var messageId;
broker.publish('p1', 'test message', { options: { headers: { foo: 'bar' } } }, function(err, publication) {
assert.ifError(err);
publication.on('success', function(_messageId) {
messageId = _messageId;
});
});
broker.subscribe('s1', function(err, subscription) {
assert.ifError(err);
subscription.on('message', function(message, content, ackOrNack) {
assert.ok(message);
ackOrNack(new Error('forward'), { strategy: 'forward', publication: 'p2', restoreRoutingHeaders: false });
});
});
broker.subscribe('s2', function(err, subscription) {
assert.ifError(err);
subscription.on('message', function(message, content, ackOrNack) {
assert.ok(message);
ackOrNack();
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1);
assert.equal(message.properties.headers.foo, 'bar');
assert.equal(message.properties.messageId, messageId);
assert.equal(message.fields.routingKey, 'bar');
done();
});
});
});
});
it('should cap forwards when requested', function(done) {

@@ -1240,0 +1280,0 @@ createBroker({

@@ -1051,3 +1051,3 @@ var assert = require('assert');

it('should override routing key when forward messages', function(done) {
it('should override routing key when forwarding messages', function(done) {
createBroker({

@@ -1078,3 +1078,3 @@ vhosts: vhosts,

it('should maintain original fields, properties and headers when forwarded', function(done) {
it('should maintain original fields, properties and headers when forwarding messages', function(done) {
createBroker({

@@ -1115,2 +1115,38 @@ vhosts: vhosts,

it('should not maintain original routing headers when requested', function(done) {
createBroker({
vhosts: vhosts,
publications: publications,
subscriptions: subscriptions,
}).then(function(broker) {
var messageId;
broker.publish('p1', 'test message', { options: { headers: { foo: 'bar' } } }).then(function(publication) {
publication.on('success', function(_messageId) {
messageId = _messageId;
});
});
broker.subscribe('s1').then(function(subscription) {
subscription.on('message', function(message, content, ackOrNack) {
assert.ok(message);
ackOrNack(new Error('forward'), { strategy: 'forward', publication: 'p2', restoreRoutingHeaders: false });
});
});
broker.subscribe('s2').then(function(subscription) {
subscription.on('message', function(message, content, ackOrNack) {
assert.ok(message);
ackOrNack();
assert.equal(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1);
assert.equal(message.properties.headers.foo, 'bar');
assert.equal(message.properties.messageId, messageId);
assert.equal(message.fields.routingKey, 'bar');
done();
});
});
});
});
it('should cap forwards when requested', function(done) {

@@ -1117,0 +1153,0 @@ createBroker({

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc