New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-resque

Package Overview
Dependencies
Maintainers
3
Versions
181
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-resque - npm Package Compare versions

Comparing version 1.1.3 to 1.2.0

73

lib/plugins/simpleRetry.js
// If the job fails, sleep, and re-enqueue it.
// You probably never want to use this in production
var crypto = require('crypto');

@@ -13,2 +14,5 @@ var simpleRetry = function(worker, func, queue, job, args, options){

self.options = options;
if (! self.options.retryInterval) {
self.options.retryInterval = [60, 300, 600, 1800, 3600]
}

@@ -21,4 +25,6 @@ if(self.worker.queueObject){

self.sleep = 1000;
if(self.options.sleep){ self.sleep = self.options.sleep; }
if (self.args) {
jobHash = crypto.createHash('md5').update(JSON.stringify(self.args)).digest('hex');
self.retryKey = self.queueObject.connection.key('retrytimes', jobHash);
}
};

@@ -30,20 +36,59 @@

simpleRetry.prototype.updateRetryTimes = function(callback) {
var self = this;
self.queueObject.connection.redis.incr(self.retryKey, (function(err, result) {
if (err) { return callback(err); }
self.queueObject.connection.redis.expire(self.retryKey, self.options.retryInterval[self.options.retryInterval.length - 1] * 2, function(err) {
if (err) {
callback(err);
} else if (result > self.options.retryInterval.length) {
err.message = '(Resque Retry Max Attempts Reached) -> ' + err.message;
callback(err);
} else {
callback(null, result);
}
});
}));
};
simpleRetry.prototype.deleteRetryTimes = function(callback) {
this.queueObject.connection.redis.del(this.retryKey, callback);
};
simpleRetry.prototype.after_perform = function(callback){
// console.log("** after_perform")
var self = this;
if (self.worker.error) {
self.updateRetryTimes(function(err, retryTimes) {
if (err) {
if (err.message.indexOf('(Resque Retry Max Attempts Reached) -> ') == 0) {
self.deleteRetryTimes(function(err) {
return callback(err, true);
});
} else {
return callback(err);
}
}
if(self.worker.error){
if(self.options.errorCollector){
self.options.errorCollector.push( self.worker.error );
}
self.worker.error = null;
self.queueObject.enqueueIn(self.sleep, self.queue, self.func, self.args, function(err){
callback(err, true);
var delay = self.options.retryInterval[retryTimes - 1];
self.queueObject.enqueueIn(delay * 1000, self.queue, self.func, self.args, function(err) {
if (err) { return callback(err); }
self.worker.emit('reEnqueue', self.queue, self.job, {
times: retryTimes,
delay: delay,
err: self.worker.error
});
self.worker.error = null;
return callback(err, true);
});
});
}else{
callback(null, true);
}
} else {
self.deleteRetryTimes(function(err) {
return callback(err, true);
});
};
};
exports.simpleRetry = simpleRetry;
exports.simpleRetry = simpleRetry;

6

lib/worker.js

@@ -195,5 +195,5 @@ var os = require("os");

// Note: if an input is a string or a number, you CANNOT freeze it saddly.
for(var i in combinedInputs){
if(typeof combinedInputs[i] === 'object'){
Object.freeze(combinedInputs[i]);
for(var i in combinedInputs){
if((typeof combinedInputs[i] === 'object') && (combinedInputs[i] !== null)){
Object.freeze(combinedInputs[i]);
}

@@ -200,0 +200,0 @@ }

@@ -6,3 +6,3 @@ {

"license": "Apache-2.0",
"version": "1.1.3",
"version": "1.2.0",
"homepage": "http://github.com/taskrabbit/node-resque",

@@ -9,0 +9,0 @@ "repository": {

@@ -278,3 +278,4 @@ # node-resque

var scheduler = new NR.scheduler({connection: connectionDetails}, function(){
var scheduler = new NR.scheduler({connection: connectionDetails});
scheduler.connect(function(){
scheduler.start();

@@ -281,0 +282,0 @@ });

@@ -6,3 +6,2 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper;

var errorCollector = [];
var jobs = {

@@ -12,4 +11,3 @@ "brokenJob": {

pluginOptions: { simpleRetry: {
sleep: 100,
errorCollector: errorCollector,
retryInterval: [100]
},},

@@ -36,3 +34,2 @@ perform: function(a,b,callback){

describe('simpleRetry',function(){
it('bad job should not crash with simpleRetry', function(done){

@@ -42,6 +39,6 @@ queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){

len.should.equal(1);
var worker = new specHelper.NR.worker({
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
queues: specHelper.queue

@@ -51,8 +48,9 @@ }, jobs);

worker.connect(function(){
worker.on('success', function(q, job, result){
errorCollector.length.should.equal(1);
String(errorCollector[0]).should.equal('Error: BROKEN');
worker.end();
done();
specHelper.queue.should.equal(q);
queue.scheduledAt(specHelper.queue, "brokenJob", [1,2], function(err, timestamps){
timestamps.length.should.be.equal(1);
worker.end();
done();
});
});

@@ -71,2 +69,2 @@

});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc