Comparing version 0.11.2 to 0.12.0
@@ -46,2 +46,3 @@ var debug = require('debug')('rascal:Subscription') | ||
if (immediateNack(message)) return ackOrNack(session, message, true) | ||
if (redeliveriesExceeded(message)) return handleRedeliveriesExceeded(session, message) | ||
@@ -69,2 +70,16 @@ getContent(message, config, function(err, content) { | ||
if (session.emit('invalid_message', err, message, ackOrNack.bind(null, session, message))) return | ||
nackAndError(session, message, err) | ||
} | ||
function redeliveriesExceeded(message) { | ||
return message.properties.headers.rascal.redeliveries > config.redeliveries.limit | ||
} | ||
function handleRedeliveriesExceeded(session, message) { | ||
var err = new Error(format('Message %s has exceeded %d redeliveries', message.properties.messageId, config.redeliveries.limit)) | ||
if (session.emit('redeliveries_exceeded', err, message, ackOrNack.bind(null, session, message))) return | ||
nackAndError(session, message, err) | ||
} | ||
function nackAndError(session, message, err) { | ||
ackOrNack(session, message, err, function() { | ||
@@ -95,3 +110,5 @@ // Using setTimeout rather than process.nextTick as the latter fires before any IO. | ||
function immediateNack(message) { | ||
return _.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue)) | ||
if (_.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue))) return true | ||
if (_.get(message, format('properties.headers.rascal.recovery.%s.immediateNack', message.properties.headers.rascal.originalQueue))) return true | ||
return false | ||
} | ||
@@ -98,0 +115,0 @@ |
@@ -47,3 +47,4 @@ module.exports = { | ||
size: 1000 | ||
} | ||
}, | ||
limit: 1000 | ||
} | ||
@@ -50,0 +51,0 @@ } |
{ | ||
"name": "rascal", | ||
"version": "0.11.2", | ||
"version": "0.12.0", | ||
"description": "A friendly wrapper around amqplib with (mostly) safe defaults", | ||
@@ -20,2 +20,4 @@ "main": "index.js", | ||
"devDependencies": { | ||
"chalk": "^1.1.3", | ||
"chance": "^1.0.1", | ||
"eslint": "^0.23.0", | ||
@@ -22,0 +24,0 @@ "eslint-config-imperative": "0.0.6", |
102
README.md
@@ -8,67 +8,8 @@ # Rascal | ||
We also added a small in memory LRU cache that can be used to detect redeliveries. See the Redeliveries section for more details | ||
We also added a small in memory LRU cache that can be used to detect redeliveries and configure Rascal to automatically nack messages that have been redelivered more than 1000 times. See the Redeliveries section for more details. | ||
## tl;dr | ||
```javascript | ||
var rascal = require('rascal') | ||
var definitions = require('./definitions.json') | ||
See the [examples](https://github.com/guidesmiths/rascal/tree/master/examples) | ||
var config = rascal.withDefaultConfig(definitions) | ||
rascal.createBroker(config, function(err, broker) { | ||
if (err) bail(err) | ||
broker.subscribe('s1', function(err, subscription) { | ||
if (err) bail(err) | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
console.log(content) | ||
ackOrNack() | ||
}).on('error', bail) | ||
}) | ||
setInterval(function() { | ||
broker.publish('p1', 'This is a test message', function(err, publication) { | ||
if (err) bail(err) | ||
}) | ||
}, 100).unref() | ||
}) | ||
function bail(err) { | ||
console.error(err) | ||
process.exit(1) | ||
} | ||
``` | ||
definitions.json | ||
```json | ||
{ | ||
"vhosts": { | ||
"/": { | ||
"exchanges": { | ||
"e1": {} | ||
}, | ||
"queues": { | ||
"q1": {} | ||
}, | ||
"bindings": { | ||
"b1": { | ||
"source": "e1", | ||
"destination": "q1" | ||
} | ||
} | ||
} | ||
}, | ||
"publications": { | ||
"p1": { | ||
"exchange": "e1" | ||
} | ||
}, | ||
"subscriptions": { | ||
"s1": { | ||
"queue": "q1" | ||
} | ||
} | ||
} | ||
``` | ||
## About | ||
@@ -90,2 +31,8 @@ Rascal is a wrapper for the excellent [amqplib](https://www.npmjs.com/package/amqplib). One of the best things about amqplib is that it doesn't make assumptions about how you use it. Another is that it doesn't attempt to abstract away [AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html). As a result the library offers a great deal of control and flexibility, but the onus is on you adopt appropriate patterns and configuration. You need to be aware that: | ||
* There are two situations when Rascal will drop a message, leading to potential data loss. | ||
1. When it is unable to parse the message content and the subscriber has no invalid_content listener | ||
2. When the subscribers redelivery limit has been exceeded and the subscriber has no redeliveries_exceeded listener | ||
The reason Rascal drops the message is because the alternative is to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events Rascal your messages should be safe. | ||
## VERY IMPORTANT SECTION ABOUT EVENT HANDLING | ||
@@ -439,3 +386,3 @@ [amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. There are four places where you should do this | ||
Rascal supports text, buffers and anything it can JSON.stringify. When publish a message Rascal sets the contentType message property to "text/plain", "application/json" (it uses this when reading the message too). The ```broker.publish``` method is overloaded to accept a runtime routing key or options. | ||
Rascal supports text, buffers and anything it can JSON.stringify. The ```broker.publish``` method is overloaded to accept a runtime routing key or options. | ||
@@ -448,3 +395,3 @@ ```javascript | ||
``` | ||
The callback parameters are err (indicating the publication could not be found) and publication. Listen to the publication's "success" event to obtain the Rascal generated message id and the "error" event to handle errors. If you specify the mandatory option (or use Rascal's defaults) you can also listen for returned messages (i.e. messages that were not delivered to any queues) | ||
The callback parameters are err (indicating the publication could not be found) and publication. Listen to the publication's "success" event to obtain the Rascal generated message id and the "error" event to handle errors. If you specify the "mandatory" option (or use Rascal's defaults) you can also listen for returned messages (i.e. messages that were not delivered to any queues) | ||
```javascript | ||
@@ -462,3 +409,3 @@ broker.publish("p1", "some message", function(err, publication) { | ||
On publish option you should be aware of is the "persistent". Unless persistent is true, your messages will be discarded when you restart Rabbit. Despite having an impact on performance Rascal sets this in it's default configuration. | ||
One publish option you should be aware of is the "persistent". Unless persistent is true, your messages will be discarded when you restart Rabbit. Despite having an impact on performance Rascal sets this in it's default configuration. | ||
@@ -554,7 +501,23 @@ Refer to the [amqplib](http://www.squaremobius.net/amqp.node/doc/channel_api.html) documentation for further exchange options. | ||
``` | ||
If the message has not been auto acknowledged you should ackOrNack it. If you do not listen for the invalid_content event rascal will nack the message (without requeue) and emit an error event instead. | ||
If the message has not been auto-acknowledged you should ackOrNack it. **If you do not listen for the invalid_content event rascal will nack the message (without requeue) and emit an error event instead, leading to message loss if you have not configured a dead letter exchange/queue**. | ||
#### Redeliveries | ||
If your node app crashes before acknowledging a message, the message will be rolled back. This can cause big problems if there's something in the messages which caused the crash in the first place. Unfortunately RabbitMQ doesn't limit the number of redeliveries per message or provide a per message redelivery count. For this reason Rascal keeps a small in-memory cache of message ids, and will update the ```message.properties.headers.rascal.redeliveries``` header with the number of hits. You should check this header before processing a message and nack the message if the redeliveries are too high (assuming you aren't using a requeue strategy). | ||
If your node app crashes before acknowledging a message, the message will be rolled back. This will cause a tight infinite loop if there was something wrong with the content of message which caused the crash. Unfortunately RabbitMQ doesn't allow you to limit the number of redeliveries per message or provide a redelivery count. For this reason Rascal keeps a small in-memory cache of message ids, and will update the ```message.properties.headers.rascal.redeliveries``` header with the number of hits. | ||
The subscriber emits a "redeliveries_exceeded" event whenever the subscriber redeliveries limit is exceeded. | ||
```javascript | ||
broker.subscribe('s1', function(err, subscription) { | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
// Do stuff with message | ||
}).on('error', function(err) { | ||
console.error('Subscriber error', err) | ||
}).on('redeliveries_exceeded', function(err, message, ackOrNack)) { | ||
console.error('Redeliveries Exceeded', err) | ||
ackOrNack(err) | ||
}) | ||
}) | ||
``` | ||
If the message has not been auto-acknowdelged you should ackOrNack it. **If you do not listen for the invalid_content event rascal will nack the message (without requeue) and emit an error event instead, leading to message loss if you have not configured a dead letter exchange/queue**. | ||
#### Message Acknowledgement and Recovery Strategies | ||
@@ -707,3 +670,10 @@ For messages which are not auto-acknowledged (the default) calling ```ackOrNack()``` with no arguments will acknowledge it. Calling ```ackOrNack(err, [options], [callback])``` will nack the message will trigger one of the Rascal's recovery strategies. | ||
"delay": 1000 | ||
}, | ||
"redeliveries": { | ||
"cache": { | ||
"size": 1000 | ||
}, | ||
"limit": 1000 | ||
} | ||
} | ||
} | ||
@@ -710,0 +680,0 @@ } |
@@ -201,3 +201,3 @@ var assert = require('assert') | ||
it('should consume to invalid messages messages when no listener is bound', function(done) { | ||
it('should consume to invalid messages when no listener is bound', function(done) { | ||
createBroker({ | ||
@@ -627,2 +627,152 @@ vhosts: vhosts, | ||
it('should notify when redeliveries exceeds', function(done) { | ||
createBroker({ | ||
vhosts: vhosts, | ||
publications: publications, | ||
subscriptions: { | ||
s1: { | ||
vhost: '/', | ||
queue: 'q1', | ||
redeliveries: { | ||
limit: 5 | ||
} | ||
} | ||
} | ||
}, function(err, broker) { | ||
assert.ifError(err) | ||
broker.publish('p1', 'test message', function(err) { | ||
assert.ifError(err) | ||
var errors = 0 | ||
broker.subscribe('s1', function(err, subscription) { | ||
assert.ifError(err) | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
if (message.properties.headers.rascal.redeliveries >= 10) return subscription.cancel(done) | ||
throw new Error('oh no') | ||
}).on('redeliveries_exceeded', function(err, message, ackOrNack) { | ||
broker.shutdown(function(err) { | ||
assert.ifError(err) | ||
amqputils.assertMessage('q1', namespace, 'test message', done) | ||
}) | ||
}).on('error', function(err) { | ||
if (errors++ > 5) done(new Error('Redeliveries were exceeded')) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
it('should not notify when redeliveries limit is 0', function(done) { | ||
createBroker({ | ||
vhosts: vhosts, | ||
publications: publications, | ||
subscriptions: { | ||
s1: { | ||
vhost: '/', | ||
queue: 'q1', | ||
redeliveries: { | ||
limit: 0 | ||
} | ||
} | ||
} | ||
}, function(err, broker) { | ||
assert.ifError(err) | ||
broker.publish('p1', 'test message', function(err) { | ||
assert.ifError(err) | ||
var errors = 0 | ||
broker.subscribe('s1', function(err, subscription) { | ||
assert.ifError(err) | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
if (message.properties.headers.rascal.redeliveries >= 10) return subscription.cancel(done) | ||
throw new Error('oh no') | ||
}).on('redeliveries_exceeded', function(err, message, ackOrNack) { | ||
assert(false, 'Redeliveries were exceeded') | ||
}).on('error', function(err) { | ||
if (errors++ > 5) done() | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
it('should consume to poison messages when no listener is bound', function(done) { | ||
createBroker({ | ||
vhosts: vhosts, | ||
publications: publications, | ||
subscriptions: { | ||
s1: { | ||
vhost: '/', | ||
queue: 'q1', | ||
redeliveries: { | ||
limit: 5 | ||
} | ||
} | ||
} | ||
}, function(err, broker) { | ||
assert.ifError(err) | ||
broker.publish('p1', 'test message', function(err) { | ||
assert.ifError(err) | ||
broker.subscribe('s1', function(err, subscription) { | ||
assert.ifError(err) | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
throw new Error('oh no') | ||
}).on('error', function(err) { | ||
if (!/Message .* has exceeded 5 redeliveries/.test(err.message)) return | ||
broker.shutdown(function(err) { | ||
assert.ifError(err) | ||
amqputils.assertMessageAbsent('q1', namespace, done) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
it('should consume a poision message when a listener acks it', function(done) { | ||
createBroker({ | ||
vhosts: vhosts, | ||
publications: publications, | ||
subscriptions: { | ||
s1: { | ||
vhost: '/', | ||
queue: 'q1', | ||
redeliveries: { | ||
limit: 5 | ||
} | ||
} | ||
} | ||
}, function(err, broker) { | ||
assert.ifError(err) | ||
assert.ifError(err) | ||
broker.publish('p1', 'test message', function(err) { | ||
assert.ifError(err) | ||
broker.subscribe('s1', function(err, subscription) { | ||
assert.ifError(err) | ||
var errors = 0 | ||
subscription.on('message', function(message, content, ackOrNack) { | ||
throw new Error('oh no') | ||
}).on('redeliveries_exceeded', function(err, message, ackOrNack) { | ||
ackOrNack(function() { | ||
setTimeout(function() { | ||
broker.shutdown(function(err) { | ||
assert.ifError(err) | ||
amqputils.assertMessageAbsent('q1', namespace, done) | ||
}) | ||
}) | ||
}) | ||
}).on('error', function(err) { | ||
if (errors++ > 5) done(new Error('Redeliveries were exceeded')) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
it('should republish messages to queue when requested', function(done) { | ||
@@ -629,0 +779,0 @@ |
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
266188
52
5727
6
796
4