New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rascal

Package Overview
Dependencies
Maintainers
3
Versions
184
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rascal - npm Package Compare versions

Comparing version 0.11.2 to 0.12.0

examples/advanced/config.js

19

lib/amqp/Subscription.js

@@ -46,2 +46,3 @@ var debug = require('debug')('rascal:Subscription')

if (immediateNack(message)) return ackOrNack(session, message, true)
if (redeliveriesExceeded(message)) return handleRedeliveriesExceeded(session, message)

@@ -69,2 +70,16 @@ getContent(message, config, function(err, content) {

if (session.emit('invalid_message', err, message, ackOrNack.bind(null, session, message))) return
nackAndError(session, message, err)
}
function redeliveriesExceeded(message) {
return message.properties.headers.rascal.redeliveries > config.redeliveries.limit
}
function handleRedeliveriesExceeded(session, message) {
var err = new Error(format('Message %s has exceeded %d redeliveries', message.properties.messageId, config.redeliveries.limit))
if (session.emit('redeliveries_exceeded', err, message, ackOrNack.bind(null, session, message))) return
nackAndError(session, message, err)
}
function nackAndError(session, message, err) {
ackOrNack(session, message, err, function() {

@@ -95,3 +110,5 @@ // Using setTimeout rather than process.nextTick as the latter fires before any IO.

function immediateNack(message) {
return _.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue))
if (_.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue))) return true
if (_.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue))) return true
return false
}

@@ -98,0 +115,0 @@

3

lib/config/defaults.js

@@ -47,3 +47,4 @@ module.exports = {

size: 1000
}
},
limit: 1000
}

@@ -50,0 +51,0 @@ }

{
"name": "rascal",
"version": "0.11.2",
"version": "0.12.0",
"description": "A friendly wrapper around amqplib with (mostly) safe defaults",

@@ -20,2 +20,4 @@ "main": "index.js",

"devDependencies": {
"chalk": "^1.1.3",
"chance": "^1.0.1",
"eslint": "^0.23.0",

@@ -22,0 +24,0 @@ "eslint-config-imperative": "0.0.6",

@@ -8,67 +8,8 @@ # Rascal

We also added a small in memory LRU cache that can be used to detect redeliveries. See the Redeliveries section for more details
We also added a small in memory LRU cache that can be used to detect redeliveries and configure Rascal to automatically nack messages that have been redelivered more than 1000 times. See the Redeliveries section for more details.
## tl;dr
```javascript
var rascal = require('rascal')
var definitions = require('./definitions.json')
See the [examples](https://github.com/guidesmiths/rascal/tree/master/examples)
var config = rascal.withDefaultConfig(definitions)
rascal.createBroker(config, function(err, broker) {
if (err) bail(err)
broker.subscribe('s1', function(err, subscription) {
if (err) bail(err)
subscription.on('message', function(message, content, ackOrNack) {
console.log(content)
ackOrNack()
}).on('error', bail)
})
setInterval(function() {
broker.publish('p1', 'This is a test message', function(err, publication) {
if (err) bail(err)
})
}, 100).unref()
})
function bail(err) {
console.error(err)
process.exit(1)
}
```
definitions.json
```json
{
"vhosts": {
"/": {
"exchanges": {
"e1": {}
},
"queues": {
"q1": {}
},
"bindings": {
"b1": {
"source": "e1",
"destination": "q1"
}
}
}
},
"publications": {
"p1": {
"exchange": "e1"
}
},
"subscriptions": {
"s1": {
"queue": "q1"
}
}
}
```
## About

@@ -90,2 +31,8 @@ Rascal is a wrapper for the excellent [amqplib](https://www.npmjs.com/package/amqplib). One of the best things about amqplib is that it doesn't make assumptions about how you use it. Another is that it doesn't attempt to abstract away [AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html). As a result the library offers a great deal of control and flexibility, but the onus is on you adopt appropriate patterns and configuration. You need to be aware that:

* There are two situations when Rascal will drop a message, leading to potential data loss.
1. When it is unable to parse the message content and the subscriber has no invalid_content listener
2. When the subscribers redelivery limit has been exceeded and the subscriber has no redeliveries_exceeded listener
The reason Rascal drops the message is because the alternative is to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events Rascal your messages should be safe.
## VERY IMPORTANT SECTION ABOUT EVENT HANDLING

@@ -439,3 +386,3 @@ [amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. There are four places where you should do this

Rascal supports text, buffers and anything it can JSON.stringify. When publish a message Rascal sets the contentType message property to "text/plain", "application/json" (it uses this when reading the message too). The ```broker.publish``` method is overloaded to accept a runtime routing key or options.
Rascal supports text, buffers and anything it can JSON.stringify. The ```broker.publish``` method is overloaded to accept a runtime routing key or options.

@@ -448,3 +395,3 @@ ```javascript

```
The callback parameters are err (indicating the publication could not be found) and publication. Listen to the publication's "success" event to obtain the Rascal generated message id and the "error" event to handle errors. If you specify the mandatory option (or use Rascal's defaults) you can also listen for returned messages (i.e. messages that were not delivered to any queues)
The callback parameters are err (indicating the publication could not be found) and publication. Listen to the publication's "success" event to obtain the Rascal generated message id and the "error" event to handle errors. If you specify the "mandatory" option (or use Rascal's defaults) you can also listen for returned messages (i.e. messages that were not delivered to any queues)
```javascript

@@ -462,3 +409,3 @@ broker.publish("p1", "some message", function(err, publication) {

On publish option you should be aware of is the "persistent". Unless persistent is true, your messages will be discarded when you restart Rabbit. Despite having an impact on performance Rascal sets this in it's default configuration.
One publish option you should be aware of is the "persistent". Unless persistent is true, your messages will be discarded when you restart Rabbit. Despite having an impact on performance Rascal sets this in it's default configuration.

@@ -554,7 +501,23 @@ Refer to the [amqplib](http://www.squaremobius.net/amqp.node/doc/channel_api.html) documentation for further exchange options.

```
If the message has not been auto acknowledged you should ackOrNack it. If you do not listen for the invalid_content event rascal will nack the message (without requeue) and emit an error event instead.
If the message has not been auto-acknowledged you should ackOrNack it. **If you do not listen for the invalid_content event rascal will nack the message (without requeue) and emit an error event instead, leading to message loss if you have not configured a dead letter exchange/queue**.
#### Redeliveries
If your node app crashes before acknowledging a message, the message will be rolled back. This can cause big problems if there's something in the messages which caused the crash in the first place. Unfortunately RabbitMQ doesn't limit the number of redeliveries per message or provide a per message redelivery count. For this reason Rascal keeps a small in-memory cache of message ids, and will update the ```message.properties.headers.rascal.redeliveries``` header with the number of hits. You should check this header before processing a message and nack the message if the redeliveries are too high (assuming you aren't using a requeue strategy).
If your node app crashes before acknowledging a message, the message will be rolled back. This will cause a tight infinite loop if there was something wrong with the content of message which caused the crash. Unfortunately RabbitMQ doesn't allow you to limit the number of redeliveries per message or provide a redelivery count. For this reason Rascal keeps a small in-memory cache of message ids, and will update the ```message.properties.headers.rascal.redeliveries``` header with the number of hits.
The subscriber emits a "redeliveries_exceeded" event whenever the subscriber redeliveries limit is exceeded.
```javascript
broker.subscribe('s1', function(err, subscription) {
subscription.on('message', function(message, content, ackOrNack) {
// Do stuff with message
}).on('error', function(err) {
console.error('Subscriber error', err)
}).on('redeliveries_exceeded', function(err, message, ackOrNack)) {
console.error('Redeliveries Exceeded', err)
ackOrNack(err)
})
})
```
If the message has not been auto-acknowdelged you should ackOrNack it. **If you do not listen for the invalid_content event rascal will nack the message (without requeue) and emit an error event instead, leading to message loss if you have not configured a dead letter exchange/queue**.
#### Message Acknowledgement and Recovery Strategies

@@ -707,3 +670,10 @@ For messages which are not auto-acknowledged (the default) calling ```ackOrNack()``` with no arguments will acknowledge it. Calling ```ackOrNack(err, [options], [callback])``` will nack the message will trigger one of the Rascal's recovery strategies.

"delay": 1000
},
"redeliveries": {
"cache": {
"size": 1000
},
"limit": 1000
}
}
}

@@ -710,0 +680,0 @@ }

@@ -201,3 +201,3 @@ var assert = require('assert')

it('should consume to invalid messages messages when no listener is bound', function(done) {
it('should consume to invalid messages when no listener is bound', function(done) {
createBroker({

@@ -627,2 +627,152 @@ vhosts: vhosts,

it('should notify when redeliveries exceeds', function(done) {
createBroker({
vhosts: vhosts,
publications: publications,
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
redeliveries: {
limit: 5
}
}
}
}, function(err, broker) {
assert.ifError(err)
broker.publish('p1', 'test message', function(err) {
assert.ifError(err)
var errors = 0
broker.subscribe('s1', function(err, subscription) {
assert.ifError(err)
subscription.on('message', function(message, content, ackOrNack) {
if (message.properties.headers.rascal.redeliveries >= 10) return subscription.cancel(done)
throw new Error('oh no')
}).on('redeliveries_exceeded', function(err, message, ackOrNack) {
broker.shutdown(function(err) {
assert.ifError(err)
amqputils.assertMessage('q1', namespace, 'test message', done)
})
}).on('error', function(err) {
if (errors++ > 5) done(new Error('Redeliveries were exceeded'))
})
})
})
})
})
it('should not notify when redeliveries limit is 0', function(done) {
createBroker({
vhosts: vhosts,
publications: publications,
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
redeliveries: {
limit: 0
}
}
}
}, function(err, broker) {
assert.ifError(err)
broker.publish('p1', 'test message', function(err) {
assert.ifError(err)
var errors = 0
broker.subscribe('s1', function(err, subscription) {
assert.ifError(err)
subscription.on('message', function(message, content, ackOrNack) {
if (message.properties.headers.rascal.redeliveries >= 10) return subscription.cancel(done)
throw new Error('oh no')
}).on('redeliveries_exceeded', function(err, message, ackOrNack) {
assert(false, 'Redeliveries were exceeded')
}).on('error', function(err) {
if (errors++ > 5) done()
})
})
})
})
})
it('should consume to poison messages when no listener is bound', function(done) {
createBroker({
vhosts: vhosts,
publications: publications,
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
redeliveries: {
limit: 5
}
}
}
}, function(err, broker) {
assert.ifError(err)
broker.publish('p1', 'test message', function(err) {
assert.ifError(err)
broker.subscribe('s1', function(err, subscription) {
assert.ifError(err)
subscription.on('message', function(message, content, ackOrNack) {
throw new Error('oh no')
}).on('error', function(err) {
if (!/Message .* has exceeded 5 redeliveries/.test(err.message)) return
broker.shutdown(function(err) {
assert.ifError(err)
amqputils.assertMessageAbsent('q1', namespace, done)
})
})
})
})
})
})
it('should consume a poision message when a listener acks it', function(done) {
createBroker({
vhosts: vhosts,
publications: publications,
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
redeliveries: {
limit: 5
}
}
}
}, function(err, broker) {
assert.ifError(err)
assert.ifError(err)
broker.publish('p1', 'test message', function(err) {
assert.ifError(err)
broker.subscribe('s1', function(err, subscription) {
assert.ifError(err)
var errors = 0
subscription.on('message', function(message, content, ackOrNack) {
throw new Error('oh no')
}).on('redeliveries_exceeded', function(err, message, ackOrNack) {
ackOrNack(function() {
setTimeout(function() {
broker.shutdown(function(err) {
assert.ifError(err)
amqputils.assertMessageAbsent('q1', namespace, done)
})
})
})
}).on('error', function(err) {
if (errors++ > 5) done(new Error('Redeliveries were exceeded'))
})
})
})
})
})
it('should republish messages to queue when requested', function(done) {

@@ -629,0 +779,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc