node-resque
Advanced tools
Comparing version 1.1.3 to 1.2.0
// If the job fails, sleep, and re-enqueue it. | ||
// You probably never want to use this in production | ||
var crypto = require('crypto'); | ||
@@ -13,2 +14,5 @@ var simpleRetry = function(worker, func, queue, job, args, options){ | ||
self.options = options; | ||
if (! self.options.retryInterval) { | ||
self.options.retryInterval = [60, 300, 600, 1800, 3600] | ||
} | ||
@@ -21,4 +25,6 @@ if(self.worker.queueObject){ | ||
self.sleep = 1000; | ||
if(self.options.sleep){ self.sleep = self.options.sleep; } | ||
if (self.args) { | ||
jobHash = crypto.createHash('md5').update(JSON.stringify(self.args)).digest('hex'); | ||
self.retryKey = self.queueObject.connection.key('retrytimes', jobHash); | ||
} | ||
}; | ||
@@ -30,20 +36,59 @@ | ||
simpleRetry.prototype.updateRetryTimes = function(callback) { | ||
var self = this; | ||
self.queueObject.connection.redis.incr(self.retryKey, (function(err, result) { | ||
if (err) { return callback(err); } | ||
self.queueObject.connection.redis.expire(self.retryKey, self.options.retryInterval[self.options.retryInterval.length - 1] * 2, function(err) { | ||
if (err) { | ||
callback(err); | ||
} else if (result > self.options.retryInterval.length) { | ||
err.message = '(Resque Retry Max Attempts Reached) -> ' + err.message; | ||
callback(err); | ||
} else { | ||
callback(null, result); | ||
} | ||
}); | ||
})); | ||
}; | ||
simpleRetry.prototype.deleteRetryTimes = function(callback) { | ||
this.queueObject.connection.redis.del(this.retryKey, callback); | ||
}; | ||
simpleRetry.prototype.after_perform = function(callback){ | ||
// console.log("** after_perform") | ||
var self = this; | ||
if (self.worker.error) { | ||
self.updateRetryTimes(function(err, retryTimes) { | ||
if (err) { | ||
if (err.message.indexOf('(Resque Retry Max Attempts Reached) -> ') == 0) { | ||
self.deleteRetryTimes(function(err) { | ||
return callback(err, true); | ||
}); | ||
} else { | ||
return callback(err); | ||
} | ||
} | ||
if(self.worker.error){ | ||
if(self.options.errorCollector){ | ||
self.options.errorCollector.push( self.worker.error ); | ||
} | ||
self.worker.error = null; | ||
self.queueObject.enqueueIn(self.sleep, self.queue, self.func, self.args, function(err){ | ||
callback(err, true); | ||
var delay = self.options.retryInterval[retryTimes - 1]; | ||
self.queueObject.enqueueIn(delay * 1000, self.queue, self.func, self.args, function(err) { | ||
if (err) { return callback(err); } | ||
self.worker.emit('reEnqueue', self.queue, self.job, { | ||
times: retryTimes, | ||
delay: delay, | ||
err: self.worker.error | ||
}); | ||
self.worker.error = null; | ||
return callback(err, true); | ||
}); | ||
}); | ||
}else{ | ||
callback(null, true); | ||
} | ||
} else { | ||
self.deleteRetryTimes(function(err) { | ||
return callback(err, true); | ||
}); | ||
}; | ||
}; | ||
exports.simpleRetry = simpleRetry; | ||
exports.simpleRetry = simpleRetry; |
@@ -195,5 +195,5 @@ var os = require("os"); | ||
// Note: if an input is a string or a number, you CANNOT freeze it saddly. | ||
for(var i in combinedInputs){ | ||
if(typeof combinedInputs[i] === 'object'){ | ||
Object.freeze(combinedInputs[i]); | ||
for(var i in combinedInputs){ | ||
if((typeof combinedInputs[i] === 'object') && (combinedInputs[i] !== null)){ | ||
Object.freeze(combinedInputs[i]); | ||
} | ||
@@ -200,0 +200,0 @@ } |
@@ -6,3 +6,3 @@ { | ||
"license": "Apache-2.0", | ||
"version": "1.1.3", | ||
"version": "1.2.0", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
@@ -9,0 +9,0 @@ "repository": { |
@@ -278,3 +278,4 @@ # node-resque | ||
var scheduler = new NR.scheduler({connection: connectionDetails}, function(){ | ||
var scheduler = new NR.scheduler({connection: connectionDetails}); | ||
scheduler.connect(function(){ | ||
scheduler.start(); | ||
@@ -281,0 +282,0 @@ }); |
@@ -6,3 +6,2 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
var errorCollector = []; | ||
var jobs = { | ||
@@ -12,4 +11,3 @@ "brokenJob": { | ||
pluginOptions: { simpleRetry: { | ||
sleep: 100, | ||
errorCollector: errorCollector, | ||
retryInterval: [100] | ||
},}, | ||
@@ -36,3 +34,2 @@ perform: function(a,b,callback){ | ||
describe('simpleRetry',function(){ | ||
it('bad job should not crash with simpleRetry', function(done){ | ||
@@ -42,6 +39,6 @@ queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ | ||
len.should.equal(1); | ||
var worker = new specHelper.NR.worker({ | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: specHelper.queue | ||
@@ -51,8 +48,9 @@ }, jobs); | ||
worker.connect(function(){ | ||
worker.on('success', function(q, job, result){ | ||
errorCollector.length.should.equal(1); | ||
String(errorCollector[0]).should.equal('Error: BROKEN'); | ||
worker.end(); | ||
done(); | ||
specHelper.queue.should.equal(q); | ||
queue.scheduledAt(specHelper.queue, "brokenJob", [1,2], function(err, timestamps){ | ||
timestamps.length.should.be.equal(1); | ||
worker.end(); | ||
done(); | ||
}); | ||
}); | ||
@@ -71,2 +69,2 @@ | ||
}); | ||
}); | ||
}); |
510961
3584
429