node-resque
Advanced tools
Comparing version 0.4.0 to 0.4.1
@@ -90,5 +90,9 @@ var os = require("os"); | ||
}); | ||
}else if(self.working === true){ | ||
var err = new Error('refusing to get new job, already working'); | ||
self.emit('error', self.queue, null, err); | ||
}else{ | ||
self.working = true; | ||
self.connection.redis.lpop(self.connection.key('queue', self.queue), function(err, resp) { | ||
if (!err && resp) { | ||
if(!err && resp){ | ||
var currentJob = JSON.parse(resp.toString()); | ||
@@ -100,4 +104,5 @@ if(self.options.looping){ | ||
} | ||
} else { | ||
if (nQueue === self.queues.length - 1) { | ||
}else{ | ||
self.working = false; | ||
if(nQueue === self.queues.length - 1){ | ||
process.nextTick(function() { | ||
@@ -110,4 +115,4 @@ if(self.options.looping){ | ||
}); | ||
} else { | ||
process.nextTick(function() { | ||
}else{ | ||
process.nextTick(function(){ | ||
self.poll(nQueue + 1); | ||
@@ -135,6 +140,11 @@ }); | ||
if(cb != null) { | ||
self.working = true; | ||
self.job = job; | ||
var returnCounter = 0; // a state counter to prevent multiple returns from poor jobs or plugins | ||
var callbackError = new Error('refusing to continue with job, multiple callbacks detected'); | ||
self.runPlugins('before_perform', job["class"], self.queue, self.jobs[job["class"]], job.args, function(err, toRun){ | ||
if(toRun == false){ | ||
returnCounter++; | ||
if(returnCounter !== 1){ | ||
self.emit('error', self.queue, job, callbackError); | ||
} | ||
else if(toRun == false){ | ||
self.completeJob(null, null, callback); | ||
@@ -148,7 +158,17 @@ }else{ | ||
} | ||
cb.apply(self, [].slice.call(args).concat([function(err, result) { | ||
self.runPlugins('after_perform', job["class"], self.queue, self.jobs[job["class"]], job.args, function(e, toRun){ | ||
if(err == null && e != null){ err = e; } | ||
self.completeJob(err, result, callback); | ||
}); | ||
cb.apply(self, [].slice.call(args).concat([function(err, result){ | ||
returnCounter++; | ||
if(returnCounter !== 2){ | ||
self.emit('error', self.queue, job, callbackError); | ||
}else{ | ||
self.runPlugins('after_perform', job["class"], self.queue, self.jobs[job["class"]], job.args, function(e, toRun){ | ||
if(err == null && e != null){ err = e; } | ||
returnCounter++; | ||
if(returnCounter !== 3){ | ||
self.emit('error', self.queue, job, callbackError); | ||
}else{ | ||
self.completeJob(err, result, callback); | ||
} | ||
}); | ||
} | ||
}])); | ||
@@ -155,0 +175,0 @@ } |
@@ -5,3 +5,3 @@ { | ||
"description": "an opinionated implementation of resque in node", | ||
"version": "0.4.0", | ||
"version": "0.4.1", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
@@ -8,0 +8,0 @@ "repository": { |
@@ -21,2 +21,9 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
} | ||
}, | ||
"doubleCaller":{ | ||
perform: function(callback){ | ||
callback(null, 'a'); | ||
setTimeout(function(){ callback(null, 'b'); }, 500); | ||
setTimeout(function(){ callback(null, 'c'); }, 1000); | ||
} | ||
} | ||
@@ -147,5 +154,34 @@ }; | ||
it('will not double-work with a baddly defined job', function(done){ | ||
var callbackCounts = 0; | ||
var expected = 3; | ||
var errorCounter = 0; | ||
var successCounter = 0; | ||
var errorListener = worker.on('error', function(q, job, err){ | ||
String(err).should.equal('Error: refusing to continue with job, multiple callbacks detected'); | ||
callbackCounts++; | ||
errorCounter++; | ||
if(callbackCounts === expected){ complete(); } | ||
}); | ||
var successListener = worker.on('success', function(q, job, result){ | ||
result.should.equal('a'); | ||
successCounter++; | ||
callbackCounts++; | ||
if(callbackCounts === expected){ complete(); } | ||
}); | ||
var complete = function(){ | ||
errorCounter.should.equal(2); | ||
successCounter.should.equal(1); | ||
done(); | ||
} | ||
queue.enqueue(specHelper.queue, "doubleCaller", []); | ||
worker.start(); | ||
}); | ||
}); | ||
}); |
79435
2057