New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rascal

Package Overview
Dependencies
Maintainers
3
Versions
184
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rascal - npm Package Compare versions

Comparing version 0.10.3 to 0.11.0

5

lib/amqp/Broker.js

@@ -17,3 +17,3 @@ var debug = require('debug')('rascal:Broker')

create: function(config, next) {
preflight(config, function(err, config) {
preflight(_.cloneDeep(config), function(err, config) {
if (err) return next(err)

@@ -40,2 +40,4 @@ new Broker(config).init(next)

this.config = config
this.init = function(next) {

@@ -59,2 +61,3 @@ debug('Initialising broker')

if (err) return next(err)
vhosts = publications = subscriptions = {}
debug('Finished nuking broker')

@@ -61,0 +64,0 @@ next()

@@ -7,2 +7,3 @@ var debug = require('debug')('rascal:Subscription')

var format = require('util').format
var LRU = require('lru-cache')

@@ -18,2 +19,3 @@

var cache = LRU({ max: config.redeliveries.cache.size })
var subscriberError = new SubscriberError(broker, vhost)

@@ -79,2 +81,4 @@ var self = this

message.properties.headers.rascal.originalQueue = config.source
message.properties.headers.rascal.redeliveries = 0
if (message.fields.redelivered && message.properties.messageId) incrementRedeliveries(message)
if (message.properties.headers.rascal.originalRoutingKey) message.fields.routingKey = message.properties.headers.rascal.originalRoutingKey

@@ -85,2 +89,8 @@ if (message.properties.headers.rascal.originalExchange) message.fields.exchange = message.properties.headers.rascal.originalExchange

function incrementRedeliveries(message) {
var redeliveries = cache.get(message.properties.messageId) + 1 || 1
cache.set(message.properties.messageId, redeliveries)
message.properties.headers.rascal.redeliveries = redeliveries
}
function immediateNack(message) {

@@ -87,0 +97,0 @@ return _.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue))

17

lib/config/configure.js

@@ -15,9 +15,12 @@ 'use strict'

rascalConfig = _.defaultsDeep(rascalConfig, emptyConfig, { publications: {}, subscriptions: {} })
configureVhosts(rascalConfig.vhosts)
configurePublications(rascalConfig.publications)
configureSubscriptions(rascalConfig.subscriptions)
configureShovels(rascalConfig.shovels)
try {
configureVhosts(rascalConfig.vhosts)
configurePublications(rascalConfig.publications)
configureSubscriptions(rascalConfig.subscriptions)
configureShovels(rascalConfig.shovels)
return next(null, Object.freeze(rascalConfig))
} catch(err) {
return next(err)
}
return next(null, rascalConfig)
function configureVhosts(vhosts) {

@@ -65,2 +68,3 @@ _.each(rascalConfig.vhosts, function(vhostConfig, name) {

debug('Configuring publication: %s', name)
if (rascalConfig.publications[name] && rascalConfig.publications[name].vhost !== publicationConfig.vhost) throw new Error(format('Duplicate publication: %s', name))
rascalConfig.publications[name] = _.defaultsDeep(publicationConfig, { name: name }, rascalConfig.defaults.publications)

@@ -87,2 +91,3 @@ if (!rascalConfig.vhosts[publicationConfig.vhost]) return

debug('Configuring subscription: %s', name)
if (rascalConfig.subscriptions[name] && rascalConfig.subscriptions[name].vhost !== subscriptionConfig.vhost) throw new Error(format('Duplicate subscription: %s', name))
rascalConfig.subscriptions[name] = _.defaultsDeep(subscriptionConfig, { name: name }, rascalConfig.defaults.subscriptions)

@@ -89,0 +94,0 @@ if (!rascalConfig.vhosts[subscriptionConfig.vhost]) return

@@ -43,2 +43,7 @@ module.exports = {

delay: 1000
},
redeliveries: {
cache: {
size: 1000
}
}

@@ -45,0 +50,0 @@ }

@@ -112,3 +112,3 @@ 'use strict'

function validateSubscription(subscription, subscriptionName) {
validateAttributes('Subscription', subscription, subscriptionName, ['name', 'vhost', 'queue', 'contentType', 'options', 'prefetch', 'retry', 'source', 'recovery', 'workflow', 'handler', 'workflows', 'handlers'])
validateAttributes('Subscription', subscription, subscriptionName, ['name', 'vhost', 'queue', 'contentType', 'options', 'prefetch', 'retry', 'source', 'recovery', 'workflow', 'handler', 'workflows', 'handlers', 'redeliveries'])

@@ -115,0 +115,0 @@ if (!subscription.vhost) throw new Error(format('Subscription: %s is missing a vhost', subscriptionName))

{
"name": "rascal",
"version": "0.10.3",
"version": "0.11.0",
"description": "A friendly wrapper around amqplib with (mostly) safe defaults",

@@ -12,2 +12,3 @@ "main": "index.js",

"lodash": "^3.9.3",
"lru-cache": "^4.0.1",
"merge-defaults": "^0.2.1",

@@ -14,0 +15,0 @@ "node-uuid": "^1.4.3",

@@ -84,4 +84,2 @@ # Rascal

* Rascal has plenty of automated tests, but is by no means battle hardened (yet).
## VERY IMPORTANT SECTION ABOUT EVENT HANDLING

@@ -513,3 +511,3 @@ [amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. There are four places where you should do this

It's also **very** important not to go async between getting the subscriptio and listening for the message or error events. If you do, you risk leaking messages and not handling errors.
It's also **very** important not to go async between getting the subscription and listening for the message or error events. If you do, you risk leaking messages and not handling errors.

@@ -550,2 +548,5 @@ Rascal supports text, buffers and anything it can JSON.parse, providing the contentType message property is set correctly. Text messages should be set to "text/plain" and JSON messages to "application/json". Other content types will be returned as a Buffer. If the publisher doesn't set the contentType or you want to override it you can do so in the subscriber configuration.

#### Redeliveries
If your node app crashes before acknowledging a message, the message will be rolled back. This can cause big problems if there's something in the messages which caused the crash in the first place. Unfortunately RabbitMQ doesn't limit the number of redeliveries per message or provide a per message redelivery count. For this reason Rascal keeps a small in-memory cache of message ids, and will update the ```message.properties.headers.rascal.redeliveries``` header with the number of hits. You should check this header before processing a message and nack the message if the redeliveries are too high (assuming you aren't using a requeue strategy).
#### Message Acknowledgement and Recovery Strategies

@@ -552,0 +553,0 @@ For messages which are not auto-acknowledged (the default) calling ```ackOrNack()``` with no arguments will acknowledge it. Calling ```ackOrNack(err, [options], [callback])``` will nack the message will trigger one of the Rascal's recovery strategies.

@@ -35,2 +35,12 @@ var assert = require('assert')

}
},
subscriptions: {
s1: {
queue: 'q1'
}
},
publications: {
p1: {
exchange: 'e1'
}
}

@@ -49,5 +59,4 @@ }

it('should provide fully qualified name', function(done) {
createBroker({
vhosts: vhosts
}, function(err, broker) {
var config = _.defaultsDeep({ vhosts: vhosts }, testConfig)
createBroker(config, function(err, broker) {
assert.ifError(err)

@@ -59,4 +68,25 @@ assert.equal(namespace + ':q1', broker.getFullyQualifiedName('/', 'q1'))

it('should not modify configuration', function(done) {
var config = _.defaultsDeep({ vhosts: vhosts }, testConfig)
var json = JSON.stringify(config, null, 2)
createBroker(config, function(err, broker) {
assert.ifError(err)
assert.equal(json, JSON.stringify(config, null, 2))
done()
})
})
it('should nuke', function(done) {
var config = _.defaultsDeep({ vhosts: vhosts }, testConfig)
var json = JSON.stringify(config, null, 2)
createBroker(config, function(err, broker) {
assert.ifError(err)
broker.nuke(function(err) {
assert.ifError(err)
done()
})
})
})
function createBroker(config, next) {
config = _.defaultsDeep(config, testConfig)
Broker.create(config, function(err, _broker) {

@@ -63,0 +93,0 @@ broker = _broker

@@ -955,2 +955,66 @@ var assert = require('assert')

it('should report duplicate subscriptions', function() {
configure({
vhosts: {
v1: {
queues: {
q1: {
}
},
subscriptions: {
s1: {
queue: 'q1'
}
}
},
v2: {
queues: {
q1: {
}
},
subscriptions: {
s1: {
queue: 'q1'
}
}
}
}
}, function(err, config) {
assert.equal(err.message, 'Duplicate subscription: s1')
})
})
it('should report duplicate publications', function() {
configure({
vhosts: {
v1: {
exchanges: {
e1: {
}
},
publications: {
p1: {
exchange: 'e1'
}
}
},
v2: {
exchanges: {
e1: {
}
},
publications: {
p1: {
exchange: 'e1'
}
}
}
}
}, function(err, config) {
assert.equal(err.message, 'Duplicate publication: p1')
})
})
it('should replace source with its fully qualified name', function() {

@@ -957,0 +1021,0 @@ configure({

@@ -601,2 +601,27 @@ var assert = require('assert')

it('should count redeliveries', function(done) {
createBroker({
vhosts: vhosts,
publications: publications,
subscriptions: subscriptions
}, function(err, broker) {
assert.ifError(err)
broker.publish('p1', 'test message', function(err) {
assert.ifError(err)
var errors = 0
broker.subscribe('s1', function(err, subscription) {
assert.ifError(err)
subscription.on('message', function(message, content, ackOrNack) {
if (message.properties.headers.rascal.redeliveries >= 10) return subscription.cancel(done)
throw new Error('oh no')
}).on('error', function(err) {
if (errors++ > 10) done(new Error('Redeliveries were not counted'))
})
})
})
})
})
it('should republish messages to queue when requested', function(done) {

@@ -603,0 +628,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc